123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- from __future__ import annotations
- import datetime
- import logging
- import random
- import warnings
- from functools import partial
- from typing import TYPE_CHECKING, Callable, Iterable, Union
- from opentelemetry import metrics
- from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
- from opentelemetry.metrics import Observation
- from opentelemetry.sdk.metrics import MeterProvider
- from opentelemetry.sdk.metrics._internal.export import ConsoleMetricExporter, PeriodicExportingMetricReader
- from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
- from airflow.configuration import conf
- from airflow.metrics.protocols import Timer
- from airflow.metrics.validators import (
- OTEL_NAME_MAX_LENGTH,
- AllowListValidator,
- ListValidator,
- get_validator,
- stat_name_otel_handler,
- )
- from airflow.utils.net import get_hostname
- if TYPE_CHECKING:
- from opentelemetry.metrics import Instrument
- from opentelemetry.util.types import Attributes
- from airflow.metrics.protocols import DeltaType, TimerProtocol
- log = logging.getLogger(__name__)
- GaugeValues = Union[int, float]
- DEFAULT_GAUGE_VALUE = 0.0
- # "airflow.dag_processing.processes" is currently the only UDC used in Airflow. If more are added,
- # we should add a better system for this.
- #
- # Generally in OTel a Counter is monotonic (can only go up) and there is an UpDownCounter which,
- # as you can guess, is non-monotonic; it can go up or down. The choice here is to either drop
- # this one metric and implement the rest as monotonic Counters, implement all counters as
- # UpDownCounters, or add a bit of logic to do it intelligently. The catch is that the Collector
- # which transmits these metrics to the upstream dashboard tools (Prometheus, Grafana, etc.) assigns
- # the type of Gauge to any UDC instead of Counter. Adding this logic feels like the best compromise
- # where normal Counters still get typed correctly, and we don't lose an existing metric.
- # See:
- # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#counter-creation
- # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/api.md#updowncounter
- UP_DOWN_COUNTERS = {"airflow.dag_processing.processes"}
- DEFAULT_METRIC_NAME_PREFIX = "airflow"
- # Delimiter is placed between the universal metric prefix and the unique metric name.
- DEFAULT_METRIC_NAME_DELIMITER = "."
- def full_name(name: str, *, prefix: str = DEFAULT_METRIC_NAME_PREFIX) -> str:
- """Assembles the prefix, delimiter, and name and returns it as a string."""
- return f"{prefix}{DEFAULT_METRIC_NAME_DELIMITER}{name}"
- def _is_up_down_counter(name):
- return name in UP_DOWN_COUNTERS
- def _generate_key_name(name: str, attributes: Attributes = None):
- if attributes:
- key = name
- for item in attributes.items():
- key += f"_{item[0]}_{item[1]}"
- else:
- key = name
- return key
- def name_is_otel_safe(prefix: str, name: str) -> bool:
- """
- Return True if the provided name and prefix would result in a name that meets the OpenTelemetry standard.
- Legal names are defined here:
- https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax
- """
- return bool(stat_name_otel_handler(prefix, name, max_length=OTEL_NAME_MAX_LENGTH))
- def _type_as_str(obj: Instrument) -> str:
- """
- Given an OpenTelemetry Instrument, returns the type of the instrument as a string.
- :param obj: An OTel Instrument or subclass
- :returns: The type() of the Instrument without all the nested class info
- """
- # type().__name__ will return something like: '_Counter',
- # this drops the leading underscore for cleaner logging.
- return type(obj).__name__[1:]
- def _get_otel_safe_name(name: str) -> str:
- """
- Verify that the provided name does not exceed OpenTelemetry's maximum length for metric names.
- :param name: The original metric name
- :returns: The name, truncated to an OTel-acceptable length if required.
- """
- otel_safe_name = name[:OTEL_NAME_MAX_LENGTH]
- if name != otel_safe_name:
- warnings.warn(
- f"Metric name `{name}` exceeds OpenTelemetry's name length limit of "
- f"{OTEL_NAME_MAX_LENGTH} characters and will be truncated to `{otel_safe_name}`.",
- category=UserWarning,
- stacklevel=2,
- )
- return otel_safe_name
- def _skip_due_to_rate(rate: float) -> bool:
- if rate < 0:
- raise ValueError("rate must be a positive value.")
- return rate < 1 and random.random() > rate
- class _OtelTimer(Timer):
- """
- An implementation of Stats.Timer() which records the result in the OTel Metrics Map.
- OpenTelemetry does not have a native timer, we will store the values as a Gauge.
- :param name: The name of the timer.
- :param tags: Tags to append to the timer.
- """
- def __init__(self, otel_logger: SafeOtelLogger, name: str | None, tags: Attributes):
- super().__init__()
- self.otel_logger = otel_logger
- self.name = name
- self.tags = tags
- def stop(self, send: bool = True) -> None:
- super().stop(send)
- if self.name and send:
- self.otel_logger.metrics_map.set_gauge_value(
- full_name(prefix=self.otel_logger.prefix, name=self.name), self.duration, False, self.tags
- )
- class SafeOtelLogger:
- """Otel Logger."""
- def __init__(
- self,
- otel_provider,
- prefix: str = DEFAULT_METRIC_NAME_PREFIX,
- metrics_validator: ListValidator = AllowListValidator(),
- ):
- self.otel: Callable = otel_provider
- self.prefix: str = prefix
- self.metrics_validator = metrics_validator
- self.meter = otel_provider.get_meter(__name__)
- self.metrics_map = MetricsMap(self.meter)
- def incr(
- self,
- stat: str,
- count: int = 1,
- rate: float = 1,
- tags: Attributes = None,
- ):
- """
- Increment stat by count.
- :param stat: The name of the stat to increment.
- :param count: A positive integer to add to the current value of stat.
- :param rate: value between 0 and 1 that represents the sample rate at
- which the metric is going to be emitted.
- :param tags: Tags to append to the stat.
- """
- if _skip_due_to_rate(rate):
- return
- if count < 0:
- raise ValueError("count must be a positive value.")
- if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat):
- counter = self.metrics_map.get_counter(full_name(prefix=self.prefix, name=stat), attributes=tags)
- counter.add(count, attributes=tags)
- return counter
- def decr(
- self,
- stat: str,
- count: int = 1,
- rate: float = 1,
- tags: Attributes = None,
- ):
- """
- Decrement stat by count.
- :param stat: The name of the stat to decrement.
- :param count: A positive integer to subtract from current value of stat.
- :param rate: value between 0 and 1 that represents the sample rate at
- which the metric is going to be emitted.
- :param tags: Tags to append to the stat.
- """
- if _skip_due_to_rate(rate):
- return
- if count < 0:
- raise ValueError("count must be a positive value.")
- if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat):
- counter = self.metrics_map.get_counter(full_name(prefix=self.prefix, name=stat))
- counter.add(-count, attributes=tags)
- return counter
- def gauge(
- self,
- stat: str,
- value: int | float,
- rate: float = 1,
- delta: bool = False,
- *,
- tags: Attributes = None,
- back_compat_name: str = "",
- ) -> None:
- """
- Record a new value for a Gauge.
- :param stat: The name of the stat to update.
- :param value: The new value of stat, either a float or an int.
- :param rate: value between 0 and 1 that represents the sample rate at
- which the metric is going to be emitted.
- :param delta: If true, the provided value will be added to the previous value.
- If False the new value will override the previous.
- :param tags: Tags to append to the stat.
- :param back_compat_name: If an alternative name is provided, the
- stat will be emitted using both names if possible.
- """
- if _skip_due_to_rate(rate):
- return
- if back_compat_name and self.metrics_validator.test(back_compat_name):
- self.metrics_map.set_gauge_value(
- full_name(prefix=self.prefix, name=back_compat_name), value, delta, tags
- )
- if self.metrics_validator.test(stat):
- self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, name=stat), value, delta, tags)
- def timing(
- self,
- stat: str,
- dt: DeltaType,
- *,
- tags: Attributes = None,
- ) -> None:
- """OTel does not have a native timer, stored as a Gauge whose value is number of seconds elapsed."""
- if self.metrics_validator.test(stat) and name_is_otel_safe(self.prefix, stat):
- if isinstance(dt, datetime.timedelta):
- dt = dt.total_seconds()
- self.metrics_map.set_gauge_value(full_name(prefix=self.prefix, name=stat), float(dt), False, tags)
- def timer(
- self,
- stat: str | None = None,
- *args,
- tags: Attributes = None,
- **kwargs,
- ) -> TimerProtocol:
- """Timer context manager returns the duration and can be cancelled."""
- return _OtelTimer(self, stat, tags)
- class MetricsMap:
- """Stores Otel Instruments."""
- def __init__(self, meter):
- self.meter = meter
- self.map = {}
- def clear(self) -> None:
- self.map.clear()
- def _create_counter(self, name):
- """Create a new counter or up_down_counter for the provided name."""
- otel_safe_name = _get_otel_safe_name(name)
- if _is_up_down_counter(name):
- counter = self.meter.create_up_down_counter(name=otel_safe_name)
- else:
- counter = self.meter.create_counter(name=otel_safe_name)
- log.debug("Created %s as type: %s", otel_safe_name, _type_as_str(counter))
- return counter
- def get_counter(self, name: str, attributes: Attributes = None):
- """
- Return the counter; creates a new one if it did not exist.
- :param name: The name of the counter to fetch or create.
- :param attributes: Counter attributes, used to generate a unique key to store the counter.
- """
- key = _generate_key_name(name, attributes)
- if key not in self.map:
- self.map[key] = self._create_counter(name)
- return self.map[key]
- def del_counter(self, name: str, attributes: Attributes = None) -> None:
- """
- Delete a counter.
- :param name: The name of the counter to delete.
- :param attributes: Counter attributes which were used to generate a unique key to store the counter.
- """
- key = _generate_key_name(name, attributes)
- if key in self.map.keys():
- del self.map[key]
- def set_gauge_value(self, name: str, value: float | None, delta: bool, tags: Attributes):
- """
- Override the last reading for a Gauge with a new value.
- :param name: The name of the gauge to record.
- :param value: The new reading to record.
- :param delta: If True, value is added to the previous reading, else it overrides.
- :param tags: Gauge attributes which were used to generate a unique key to store the counter.
- :returns: None
- """
- key: str = _generate_key_name(name, tags)
- new_value = value or DEFAULT_GAUGE_VALUE
- old_value = self.poke_gauge(name, tags)
- if delta:
- new_value += old_value
- # If delta is true, add the new value to the last reading otherwise overwrite it.
- self.map[key] = Observation(new_value, tags)
- def _create_gauge(self, name: str, attributes: Attributes = None):
- """
- Create a new Observable Gauge with the provided name and the default value.
- :param name: The name of the gauge to fetch or create.
- :param attributes: Gauge attributes, used to generate a unique key to store the gauge.
- """
- otel_safe_name = _get_otel_safe_name(name)
- key = _generate_key_name(name, attributes)
- gauge = self.meter.create_observable_gauge(
- name=otel_safe_name,
- callbacks=[partial(self.read_gauge, _generate_key_name(name, attributes))],
- )
- self.map[key] = Observation(DEFAULT_GAUGE_VALUE, attributes)
- return gauge
- def read_gauge(self, key: str, *args) -> Iterable[Observation]:
- """Return the Observation for the provided key; callback for the Observable Gauges."""
- yield self.map[key]
- def poke_gauge(self, name: str, attributes: Attributes = None) -> GaugeValues:
- """
- Return the value of the gauge; creates a new one with the default value if it did not exist.
- :param name: The name of the gauge to fetch or create.
- :param attributes: Gauge attributes, used to generate a unique key to store the gauge.
- :returns: The integer or float value last recorded for the provided Gauge name.
- """
- key = _generate_key_name(name, attributes)
- if key not in self.map:
- self._create_gauge(name, attributes)
- return self.map[key].value
- def get_otel_logger(cls) -> SafeOtelLogger:
- host = conf.get("metrics", "otel_host") # ex: "breeze-otel-collector"
- port = conf.getint("metrics", "otel_port") # ex: 4318
- prefix = conf.get("metrics", "otel_prefix") # ex: "airflow"
- ssl_active = conf.getboolean("metrics", "otel_ssl_active")
- # PeriodicExportingMetricReader will default to an interval of 60000 millis.
- interval = conf.getint("metrics", "otel_interval_milliseconds", fallback=None) # ex: 30000
- debug = conf.getboolean("metrics", "otel_debugging_on")
- service_name = conf.get("metrics", "otel_service")
- resource = Resource.create(attributes={HOST_NAME: get_hostname(), SERVICE_NAME: service_name})
- protocol = "https" if ssl_active else "http"
- endpoint = f"{protocol}://{host}:{port}/v1/metrics"
- log.info("[Metric Exporter] Connecting to OpenTelemetry Collector at %s", endpoint)
- readers = [
- PeriodicExportingMetricReader(
- OTLPMetricExporter(
- endpoint=endpoint,
- headers={"Content-Type": "application/json"},
- ),
- export_interval_millis=interval,
- )
- ]
- if debug:
- export_to_console = PeriodicExportingMetricReader(ConsoleMetricExporter())
- readers.append(export_to_console)
- metrics.set_meter_provider(
- MeterProvider(
- resource=resource,
- metric_readers=readers,
- shutdown_on_exit=False,
- ),
- )
- return SafeOtelLogger(metrics.get_meter_provider(), prefix, get_validator())
|