123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- from __future__ import annotations
- import datetime
- import logging
- from typing import TYPE_CHECKING
- from airflow.configuration import conf
- from airflow.metrics.protocols import Timer
- from airflow.metrics.validators import (
- AllowListValidator,
- BlockListValidator,
- get_validator,
- validate_stat,
- )
- if TYPE_CHECKING:
- from datadog import DogStatsd
- from airflow.metrics.protocols import DeltaType, TimerProtocol
- from airflow.metrics.validators import (
- ListValidator,
- )
- log = logging.getLogger(__name__)
- class SafeDogStatsdLogger:
- """DogStatsd Logger."""
- def __init__(
- self,
- dogstatsd_client: DogStatsd,
- metrics_validator: ListValidator = AllowListValidator(),
- metrics_tags: bool = False,
- metric_tags_validator: ListValidator = AllowListValidator(),
- ) -> None:
- self.dogstatsd = dogstatsd_client
- self.metrics_validator = metrics_validator
- self.metrics_tags = metrics_tags
- self.metric_tags_validator = metric_tags_validator
- @validate_stat
- def incr(
- self,
- stat: str,
- count: int = 1,
- rate: float = 1,
- *,
- tags: dict[str, str] | None = None,
- ) -> None:
- """Increment stat."""
- if self.metrics_tags and isinstance(tags, dict):
- tags_list = [
- f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
- ]
- else:
- tags_list = []
- if self.metrics_validator.test(stat):
- return self.dogstatsd.increment(metric=stat, value=count, tags=tags_list, sample_rate=rate)
- return None
- @validate_stat
- def decr(
- self,
- stat: str,
- count: int = 1,
- rate: float = 1,
- *,
- tags: dict[str, str] | None = None,
- ) -> None:
- """Decrement stat."""
- if self.metrics_tags and isinstance(tags, dict):
- tags_list = [
- f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
- ]
- else:
- tags_list = []
- if self.metrics_validator.test(stat):
- return self.dogstatsd.decrement(metric=stat, value=count, tags=tags_list, sample_rate=rate)
- return None
- @validate_stat
- def gauge(
- self,
- stat: str,
- value: int | float,
- rate: float = 1,
- delta: bool = False,
- *,
- tags: dict[str, str] | None = None,
- ) -> None:
- """Gauge stat."""
- if self.metrics_tags and isinstance(tags, dict):
- tags_list = [
- f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
- ]
- else:
- tags_list = []
- if self.metrics_validator.test(stat):
- return self.dogstatsd.gauge(metric=stat, value=value, tags=tags_list, sample_rate=rate)
- return None
- @validate_stat
- def timing(
- self,
- stat: str,
- dt: DeltaType,
- *,
- tags: dict[str, str] | None = None,
- ) -> None:
- """Stats timing."""
- if self.metrics_tags and isinstance(tags, dict):
- tags_list = [
- f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
- ]
- else:
- tags_list = []
- if self.metrics_validator.test(stat):
- if isinstance(dt, datetime.timedelta):
- dt = dt.total_seconds()
- return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list)
- return None
- @validate_stat
- def timer(
- self,
- stat: str | None = None,
- tags: dict[str, str] | None = None,
- **kwargs,
- ) -> TimerProtocol:
- """Timer metric that can be cancelled."""
- if self.metrics_tags and isinstance(tags, dict):
- tags_list = [
- f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
- ]
- else:
- tags_list = []
- if stat and self.metrics_validator.test(stat):
- return Timer(self.dogstatsd.timed(stat, tags=tags_list, **kwargs))
- return Timer()
- def get_dogstatsd_logger(cls) -> SafeDogStatsdLogger:
- """Get DataDog StatsD logger."""
- from datadog import DogStatsd
- dogstatsd = DogStatsd(
- host=conf.get("metrics", "statsd_host"),
- port=conf.getint("metrics", "statsd_port"),
- namespace=conf.get("metrics", "statsd_prefix"),
- constant_tags=cls.get_constant_tags(),
- )
- datadog_metrics_tags = conf.getboolean("metrics", "statsd_datadog_metrics_tags", fallback=True)
- metric_tags_validator = BlockListValidator(conf.get("metrics", "statsd_disabled_tags", fallback=None))
- return SafeDogStatsdLogger(dogstatsd, get_validator(), datadog_metrics_tags, metric_tags_validator)
|