datadog_logger.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import datetime
  19. import logging
  20. from typing import TYPE_CHECKING
  21. from airflow.configuration import conf
  22. from airflow.metrics.protocols import Timer
  23. from airflow.metrics.validators import (
  24. AllowListValidator,
  25. BlockListValidator,
  26. get_validator,
  27. validate_stat,
  28. )
  29. if TYPE_CHECKING:
  30. from datadog import DogStatsd
  31. from airflow.metrics.protocols import DeltaType, TimerProtocol
  32. from airflow.metrics.validators import (
  33. ListValidator,
  34. )
  35. log = logging.getLogger(__name__)
  36. class SafeDogStatsdLogger:
  37. """DogStatsd Logger."""
  38. def __init__(
  39. self,
  40. dogstatsd_client: DogStatsd,
  41. metrics_validator: ListValidator = AllowListValidator(),
  42. metrics_tags: bool = False,
  43. metric_tags_validator: ListValidator = AllowListValidator(),
  44. ) -> None:
  45. self.dogstatsd = dogstatsd_client
  46. self.metrics_validator = metrics_validator
  47. self.metrics_tags = metrics_tags
  48. self.metric_tags_validator = metric_tags_validator
  49. @validate_stat
  50. def incr(
  51. self,
  52. stat: str,
  53. count: int = 1,
  54. rate: float = 1,
  55. *,
  56. tags: dict[str, str] | None = None,
  57. ) -> None:
  58. """Increment stat."""
  59. if self.metrics_tags and isinstance(tags, dict):
  60. tags_list = [
  61. f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
  62. ]
  63. else:
  64. tags_list = []
  65. if self.metrics_validator.test(stat):
  66. return self.dogstatsd.increment(metric=stat, value=count, tags=tags_list, sample_rate=rate)
  67. return None
  68. @validate_stat
  69. def decr(
  70. self,
  71. stat: str,
  72. count: int = 1,
  73. rate: float = 1,
  74. *,
  75. tags: dict[str, str] | None = None,
  76. ) -> None:
  77. """Decrement stat."""
  78. if self.metrics_tags and isinstance(tags, dict):
  79. tags_list = [
  80. f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
  81. ]
  82. else:
  83. tags_list = []
  84. if self.metrics_validator.test(stat):
  85. return self.dogstatsd.decrement(metric=stat, value=count, tags=tags_list, sample_rate=rate)
  86. return None
  87. @validate_stat
  88. def gauge(
  89. self,
  90. stat: str,
  91. value: int | float,
  92. rate: float = 1,
  93. delta: bool = False,
  94. *,
  95. tags: dict[str, str] | None = None,
  96. ) -> None:
  97. """Gauge stat."""
  98. if self.metrics_tags and isinstance(tags, dict):
  99. tags_list = [
  100. f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
  101. ]
  102. else:
  103. tags_list = []
  104. if self.metrics_validator.test(stat):
  105. return self.dogstatsd.gauge(metric=stat, value=value, tags=tags_list, sample_rate=rate)
  106. return None
  107. @validate_stat
  108. def timing(
  109. self,
  110. stat: str,
  111. dt: DeltaType,
  112. *,
  113. tags: dict[str, str] | None = None,
  114. ) -> None:
  115. """Stats timing."""
  116. if self.metrics_tags and isinstance(tags, dict):
  117. tags_list = [
  118. f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
  119. ]
  120. else:
  121. tags_list = []
  122. if self.metrics_validator.test(stat):
  123. if isinstance(dt, datetime.timedelta):
  124. dt = dt.total_seconds()
  125. return self.dogstatsd.timing(metric=stat, value=dt, tags=tags_list)
  126. return None
  127. @validate_stat
  128. def timer(
  129. self,
  130. stat: str | None = None,
  131. tags: dict[str, str] | None = None,
  132. **kwargs,
  133. ) -> TimerProtocol:
  134. """Timer metric that can be cancelled."""
  135. if self.metrics_tags and isinstance(tags, dict):
  136. tags_list = [
  137. f"{key}:{value}" for key, value in tags.items() if self.metric_tags_validator.test(key)
  138. ]
  139. else:
  140. tags_list = []
  141. if stat and self.metrics_validator.test(stat):
  142. return Timer(self.dogstatsd.timed(stat, tags=tags_list, **kwargs))
  143. return Timer()
  144. def get_dogstatsd_logger(cls) -> SafeDogStatsdLogger:
  145. """Get DataDog StatsD logger."""
  146. from datadog import DogStatsd
  147. dogstatsd = DogStatsd(
  148. host=conf.get("metrics", "statsd_host"),
  149. port=conf.getint("metrics", "statsd_port"),
  150. namespace=conf.get("metrics", "statsd_prefix"),
  151. constant_tags=cls.get_constant_tags(),
  152. )
  153. datadog_metrics_tags = conf.getboolean("metrics", "statsd_datadog_metrics_tags", fallback=True)
  154. metric_tags_validator = BlockListValidator(conf.get("metrics", "statsd_disabled_tags", fallback=None))
  155. return SafeDogStatsdLogger(dogstatsd, get_validator(), datadog_metrics_tags, metric_tags_validator)