otel_logger.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import datetime
  19. import logging
  20. import random
  21. import warnings
  22. from functools import partial
  23. from typing import TYPE_CHECKING, Callable, Iterable, Union
  24. from opentelemetry import metrics
  25. from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
  26. from opentelemetry.metrics import Observation
  27. from opentelemetry.sdk.metrics import MeterProvider
  28. from opentelemetry.sdk.metrics._internal.export import ConsoleMetricExporter, PeriodicExportingMetricReader
  29. from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
  30. from airflow.configuration import conf
  31. from airflow.metrics.protocols import Timer
  32. from airflow.metrics.validators import (
  33. OTEL_NAME_MAX_LENGTH,
  34. AllowListValidator,
  35. ListValidator,
  36. get_validator,
  37. stat_name_otel_handler,
  38. )
  39. from airflow.utils.net import get_hostname
  40. if TYPE_CHECKING:
  41. from opentelemetry.metrics import Instrument
  42. from opentelemetry.util.types import Attributes
  43. from airflow.metrics.protocols import DeltaType, TimerProtocol
  44. log = logging.getLogger(__name__)
  45. GaugeValues = Union[int, float]
  46. DEFAULT_GAUGE_VALUE = 0.0
  47. # "airflow.dag_processing.processes" is currently the only UDC used in Airflow. If more are added,
  48. # we should add a better system for this.
  49. #
  50. # Generally in OTel a Counter is monotonic (can only go up) and there is an UpDownCounter which,
  51. # as you can guess, is non-monotonic; it can go up or down. The choice here is to either drop
  52. # this one metric and implement the rest as monotonic Counters, implement all counters as
  53. # UpDownCounters, or add a bit of logic to do it intelligently. The catch is that the Collector
  54. # which transmits these metrics to the upstream dashboard tools (Prometheus, Grafana, etc.) assigns
  55. # the type of Gauge to any UDC instead of Counter. Adding this logic feels like the best compromise
  56. # where normal Counters still get typed correctly, and we don't lose an existing metric.
  57. # See:
  58. # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#counter-creation
  59. # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#updowncounter
  60. UP_DOWN_COUNTERS = {"airflow.dag_processing.processes"}
  61. DEFAULT_METRIC_NAME_PREFIX = "airflow"
  62. # Delimiter is placed between the universal metric prefix and the unique metric name.
  63. DEFAULT_METRIC_NAME_DELIMITER = "."
  64. def full_name(name: str, *, prefix: str = DEFAULT_METRIC_NAME_PREFIX) -> str:
  65. """Assembles the prefix, delimiter, and name and returns it as a string."""
  66. return f"{prefix}{DEFAULT_METRIC_NAME_DELIMITER}{name}"
  67. def _is_up_down_counter(name):
  68. return name in UP_DOWN_COUNTERS
  69. def _generate_key_name(name: str, attributes: Attributes = None):
  70. if attributes:
  71. key = name
  72. for item in attributes.items():
  73. key += f"_{item[0]}_{item[1]}"
  74. else:
  75. key = name
  76. return key
  77. def name_is_otel_safe(prefix: str, name: str) -> bool:
  78. """
  79. Return True if the provided name and prefix would result in a name that meets the OpenTelemetry standard.
  80. Legal names are defined here:
  81. https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax
  82. """
  83. return bool(stat_name_otel_handler(prefix, name, max_length=OTEL_NAME_MAX_LENGTH))
  84. def _type_as_str(obj: Instrument) -> str:
  85. """
  86. Given an OpenTelemetry Instrument, returns the type of the instrument as a string.
  87. :param obj: An OTel Instrument or subclass
  88. :returns: The type() of the Instrument without all the nested class info
  89. """
  90. # type().__name__ will return something like: '_Counter',
  91. # this drops the leading underscore for cleaner logging.
  92. return type(obj).__name__[1:]
  93. def _get_otel_safe_name(name: str) -> str:
  94. """
  95. Verify that the provided name does not exceed OpenTelemetry's maximum length for metric names.
  96. :param name: The original metric name
  97. :returns: The name, truncated to an OTel-acceptable length if required.
  98. """
  99. otel_safe_name = name[:OTEL_NAME_MAX_LENGTH]
  100. if name != otel_safe_name:
  101. warnings.warn(
  102. f"Metric name `{name}` exceeds OpenTelemetry's name length limit of "
  103. f"{OTEL_NAME_MAX_LENGTH} characters and will be truncated to `{otel_safe_name}`.",
  104. category=UserWarning,
  105. stacklevel=2,
  106. )
  107. return otel_safe_name
  108. def _skip_due_to_rate(rate: float) -> bool:
  109. if rate < 0:
  110. raise ValueError("rate must be a positive value.")
  111. return rate < 1 and random.random() > rate
  112. class _OtelTimer(Timer):
  113. """
  114. An implementation of Stats.Timer() which records the result in the OTel Metrics Map.
  115. OpenTelemetry does not have a native timer, we will store the values as a Gauge.
  116. :param name: The name of the timer.
  117. :param tags: Tags to append to the timer.
  118. """
  119. def __init__(self, otel_logger: SafeOtelLogger, name: str | None, tags: Attributes):
  120. super().__init__()
  121. self.otel_logger = otel_logger
  122. self.name = name
  123. self.tags = tags
  124. def stop(self, send: bool = True) -> None:
  125. super().stop(send)
  126. if self.name and send:
  127. self.otel_logger.metrics_map.set_gauge_value(
  128. full_name(prefix=self.otel_logger.prefix, name=self.name), self.duration, False, self.tags
  129. )
  130. class SafeOtelLogger:
  131. """Otel Logger."""
  132. def __init__(
  133. self,
  134. otel_provider,
  135. prefix: str = DEFAULT_METRIC_NAME_PREFIX,
  136. metrics_validator: ListValidator = AllowListValidator(),
  137. ):
  138. self.otel: Callable = otel_provider
  139. self.prefix: str = prefix
  140. self.metrics_validator = metrics_validator
  141. self.meter = otel_provider.get_meter(__name__)
  142. self.metrics_map = MetricsMap(self.meter)
  143. def incr(
  144. self,
  145. stat: str,
  146. count: int = 1,
  147. rate: float = 1,
  148. tags: Attributes = None,
  149. ):
  150. """
  151. Increment stat by count.
  152. :param stat: The name of the stat to increment.
  153. :param count: A positive integer to add to the current value of stat.
  154. :param rate: value between 0 and 1 that represents the sample rate at
  155. which the metric is going to be emitted.
  156. :param tags: Tags to append to the stat.
  157. """
  158. if _skip_due_to_rate(rate):
  159. return
  160. if count < 0:
  161. raise ValueError("count must be a positive value.")
  162. if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat):
  163. counter = self.metrics_map.get_counter(full_name(prefix=self.prefix, name=stat), attributes=tags)
  164. counter.add(count, attributes=tags)
  165. return counter
  166. def decr(
  167. self,
  168. stat: str,
  169. count: int = 1,
  170. rate: float = 1,
  171. tags: Attributes = None,
  172. ):
  173. """
  174. Decrement stat by count.
  175. :param stat: The name of the stat to decrement.
  176. :param count: A positive integer to subtract from current value of stat.
  177. :param rate: value between 0 and 1 that represents the sample rate at
  178. which the metric is going to be emitted.
  179. :param tags: Tags to append to the stat.
  180. """
  181. if _skip_due_to_rate(rate):
  182. return
  183. if count < 0:
  184. raise ValueError("count must be a positive value.")
  185. if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat):
  186. counter = self.metrics_map.get_counter(full_name(prefix=self.prefix, name=stat))
  187. counter.add(-count, attributes=tags)
  188. return counter
  189. def gauge(
  190. self,
  191. stat: str,
  192. value: int | float,
  193. rate: float = 1,
  194. delta: bool = False,
  195. *,
  196. tags: Attributes = None,
  197. back_compat_name: str = "",
  198. ) -> None:
  199. """
  200. Record a new value for a Gauge.
  201. :param stat: The name of the stat to update.
  202. :param value: The new value of stat, either a float or an int.
  203. :param rate: value between 0 and 1 that represents the sample rate at
  204. which the metric is going to be emitted.
  205. :param delta: If true, the provided value will be added to the previous value.
  206. If False the new value will override the previous.
  207. :param tags: Tags to append to the stat.
  208. :param back_compat_name: If an alternative name is provided, the
  209. stat will be emitted using both names if possible.
  210. """
  211. if _skip_due_to_rate(rate):
  212. return
  213. if back_compat_name and self.metrics_validator.test(back_compat_name):
  214. self.metrics_map.set_gauge_value(
  215. full_name(prefix=self.prefix, name=back_compat_name), value, delta, tags
  216. )
  217. if self.metrics_validator.test(stat):
  218. self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, name=stat), value, delta, tags)
  219. def timing(
  220. self,
  221. stat: str,
  222. dt: DeltaType,
  223. *,
  224. tags: Attributes = None,
  225. ) -> None:
  226. """OTel does not have a native timer, stored as a Gauge whose value is number of seconds elapsed."""
  227. if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat):
  228. if isinstance(dt, datetime.timedelta):
  229. dt = dt.total_seconds()
  230. self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, name=stat), float(dt), False, tags)
  231. def timer(
  232. self,
  233. stat: str | None = None,
  234. *args,
  235. tags: Attributes = None,
  236. **kwargs,
  237. ) -> TimerProtocol:
  238. """Timer context manager returns the duration and can be cancelled."""
  239. return _OtelTimer(self, stat, tags)
  240. class MetricsMap:
  241. """Stores Otel Instruments."""
  242. def __init__(self, meter):
  243. self.meter = meter
  244. self.map = {}
  245. def clear(self) -> None:
  246. self.map.clear()
  247. def _create_counter(self, name):
  248. """Create a new counter or up_down_counter for the provided name."""
  249. otel_safe_name = _get_otel_safe_name(name)
  250. if _is_up_down_counter(name):
  251. counter = self.meter.create_up_down_counter(name=otel_safe_name)
  252. else:
  253. counter = self.meter.create_counter(name=otel_safe_name)
  254. log.debug("Created %s as type: %s", otel_safe_name, _type_as_str(counter))
  255. return counter
  256. def get_counter(self, name: str, attributes: Attributes = None):
  257. """
  258. Return the counter; creates a new one if it did not exist.
  259. :param name: The name of the counter to fetch or create.
  260. :param attributes: Counter attributes, used to generate a unique key to store the counter.
  261. """
  262. key = _generate_key_name(name, attributes)
  263. if key not in self.map:
  264. self.map[key] = self._create_counter(name)
  265. return self.map[key]
  266. def del_counter(self, name: str, attributes: Attributes = None) -> None:
  267. """
  268. Delete a counter.
  269. :param name: The name of the counter to delete.
  270. :param attributes: Counter attributes which were used to generate a unique key to store the counter.
  271. """
  272. key = _generate_key_name(name, attributes)
  273. if key in self.map.keys():
  274. del self.map[key]
  275. def set_gauge_value(self, name: str, value: float | None, delta: bool, tags: Attributes):
  276. """
  277. Override the last reading for a Gauge with a new value.
  278. :param name: The name of the gauge to record.
  279. :param value: The new reading to record.
  280. :param delta: If True, value is added to the previous reading, else it overrides.
  281. :param tags: Gauge attributes which were used to generate a unique key to store the counter.
  282. :returns: None
  283. """
  284. key: str = _generate_key_name(name, tags)
  285. new_value = value or DEFAULT_GAUGE_VALUE
  286. old_value = self.poke_gauge(name, tags)
  287. if delta:
  288. new_value += old_value
  289. # If delta is true, add the new value to the last reading otherwise overwrite it.
  290. self.map[key] = Observation(new_value, tags)
  291. def _create_gauge(self, name: str, attributes: Attributes = None):
  292. """
  293. Create a new Observable Gauge with the provided name and the default value.
  294. :param name: The name of the gauge to fetch or create.
  295. :param attributes: Gauge attributes, used to generate a unique key to store the gauge.
  296. """
  297. otel_safe_name = _get_otel_safe_name(name)
  298. key = _generate_key_name(name, attributes)
  299. gauge = self.meter.create_observable_gauge(
  300. name=otel_safe_name,
  301. callbacks=[partial(self.read_gauge, _generate_key_name(name, attributes))],
  302. )
  303. self.map[key] = Observation(DEFAULT_GAUGE_VALUE, attributes)
  304. return gauge
  305. def read_gauge(self, key: str, *args) -> Iterable[Observation]:
  306. """Return the Observation for the provided key; callback for the Observable Gauges."""
  307. yield self.map[key]
  308. def poke_gauge(self, name: str, attributes: Attributes = None) -> GaugeValues:
  309. """
  310. Return the value of the gauge; creates a new one with the default value if it did not exist.
  311. :param name: The name of the gauge to fetch or create.
  312. :param attributes: Gauge attributes, used to generate a unique key to store the gauge.
  313. :returns: The integer or float value last recorded for the provided Gauge name.
  314. """
  315. key = _generate_key_name(name, attributes)
  316. if key not in self.map:
  317. self._create_gauge(name, attributes)
  318. return self.map[key].value
  319. def get_otel_logger(cls) -> SafeOtelLogger:
  320. host = conf.get("metrics", "otel_host") # ex: "breeze-otel-collector"
  321. port = conf.getint("metrics", "otel_port") # ex: 4318
  322. prefix = conf.get("metrics", "otel_prefix") # ex: "airflow"
  323. ssl_active = conf.getboolean("metrics", "otel_ssl_active")
  324. # PeriodicExportingMetricReader will default to an interval of 60000 millis.
  325. interval = conf.getint("metrics", "otel_interval_milliseconds", fallback=None) # ex: 30000
  326. debug = conf.getboolean("metrics", "otel_debugging_on")
  327. service_name = conf.get("metrics", "otel_service")
  328. resource = Resource.create(attributes={HOST_NAME: get_hostname(), SERVICE_NAME: service_name})
  329. protocol = "https" if ssl_active else "http"
  330. endpoint = f"{protocol}://{host}:{port}/v1/metrics"
  331. log.info("[Metric Exporter] Connecting to OpenTelemetry Collector at %s", endpoint)
  332. readers = [
  333. PeriodicExportingMetricReader(
  334. OTLPMetricExporter(
  335. endpoint=endpoint,
  336. headers={"Content-Type": "application/json"},
  337. ),
  338. export_interval_millis=interval,
  339. )
  340. ]
  341. if debug:
  342. export_to_console = PeriodicExportingMetricReader(ConsoleMetricExporter())
  343. readers.append(export_to_console)
  344. metrics.set_meter_provider(
  345. MeterProvider(
  346. resource=resource,
  347. metric_readers=readers,
  348. shutdown_on_exit=False,
  349. ),
  350. )
  351. return SafeOtelLogger(metrics.get_meter_provider(), prefix, get_validator())