validators.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. # Only characters in the character set are considered valid
  18. # for the stat_name if stat_name_default_handler is used.
  19. from __future__ import annotations
  20. import abc
  21. import logging
  22. import string
  23. import warnings
  24. from functools import partial, wraps
  25. from typing import Callable, Iterable, Pattern, cast
  26. import re2
  27. from airflow.configuration import conf
  28. from airflow.exceptions import InvalidStatsNameException, RemovedInAirflow3Warning
  29. log = logging.getLogger(__name__)
  30. class MetricNameLengthExemptionWarning(Warning):
  31. """
  32. A Warning class to be used for the metric name length exemption notice.
  33. Using a custom Warning class allows us to easily test that it is used.
  34. """
  35. # Only characters in the character set are considered valid
  36. # for the stat_name if stat_name_default_handler is used.
  37. ALLOWED_CHARACTERS = frozenset(string.ascii_letters + string.digits + "_.-/")
  38. # The following set contains existing metrics whose names are too long for
  39. # OpenTelemetry and should be deprecated over time. This is implemented to
  40. # ensure that any new metrics we introduce have names which meet the OTel
  41. # standard while also allowing us time to deprecate the old names.
  42. # NOTE: No new names should be added to this list. This list should
  43. # only ever shorten over time as we deprecate these names.
  44. BACK_COMPAT_METRIC_NAME_PATTERNS: set[str] = {
  45. r"^(?P<job_name>.*)_start$",
  46. r"^(?P<job_name>.*)_end$",
  47. r"^(?P<job_name>.*)_heartbeat_failure$",
  48. r"^local_task_job.task_exit\.(?P<job_id>.*)\.(?P<dag_id>.*)\.(?P<task_id>.*)\.(?P<return_code>.*)$",
  49. r"^operator_failures_(?P<operator_name>.*)$",
  50. r"^operator_successes_(?P<operator_name>.*)$",
  51. r"^ti.start.(?P<dag_id>.*)\.(?P<task_id>.*)$",
  52. r"^ti.finish.(?P<dag_id>.*)\.(?P<task_id>.*)\.(?P<state>.*)$",
  53. r"^task_removed_from_dag\.(?P<dag_id>.*)$",
  54. r"^task_restored_to_dag\.(?P<dag_id>.*)$",
  55. r"^task_instance_created_(?P<operator_name>.*)$",
  56. r"^dag_processing\.last_run\.seconds_ago\.(?P<dag_file>.*)$",
  57. r"^pool\.open_slots\.(?P<pool_name>.*)$",
  58. r"^pool\.queued_slots\.(?P<pool_name>.*)$",
  59. r"^pool\.running_slots\.(?P<pool_name>.*)$",
  60. r"^pool\.deferred_slots\.(?P<pool_name>.*)$",
  61. r"^pool\.starving_tasks\.(?P<pool_name>.*)$",
  62. r"^dagrun\.dependency-check\.(?P<dag_id>.*)$",
  63. r"^dag\.(?P<dag_id>.*)\.(?P<task_id>.*)\.duration$",
  64. r"^dag\.(?P<dag_id>.*)\.(?P<task_id>.*)\.queued_duration$",
  65. r"^dag\.(?P<dag_id>.*)\.(?P<task_id>.*)\.scheduled_duration$",
  66. r"^dag_processing\.last_duration\.(?P<dag_file>.*)$",
  67. r"^dagrun\.duration\.success\.(?P<dag_id>.*)$",
  68. r"^dagrun\.duration\.failed\.(?P<dag_id>.*)$",
  69. r"^dagrun\.schedule_delay\.(?P<dag_id>.*)$",
  70. r"^dagrun\.(?P<dag_id>.*)\.first_task_scheduling_delay$",
  71. }
  72. BACK_COMPAT_METRIC_NAMES: set[Pattern[str]] = {re2.compile(name) for name in BACK_COMPAT_METRIC_NAME_PATTERNS}
  73. OTEL_NAME_MAX_LENGTH = 63
  74. DEFAULT_VALIDATOR_TYPE = "allow"
  75. def get_validator() -> ListValidator:
  76. validators = {
  77. "basic": {"allow": AllowListValidator, "block": BlockListValidator},
  78. "pattern": {"allow": PatternAllowListValidator, "block": PatternBlockListValidator},
  79. }
  80. metric_lists = {
  81. "allow": (metric_allow_list := conf.get("metrics", "metrics_allow_list", fallback=None)),
  82. "block": (metric_block_list := conf.get("metrics", "metrics_block_list", fallback=None)),
  83. }
  84. use_pattern = conf.getboolean("metrics", "metrics_use_pattern_match", fallback=False)
  85. validator_type = "pattern" if use_pattern else "basic"
  86. if not use_pattern:
  87. warnings.warn(
  88. "The basic metric validator will be deprecated in the future in favor of pattern-matching. "
  89. "You can try this now by setting config option metrics_use_pattern_match to True.",
  90. RemovedInAirflow3Warning,
  91. stacklevel=2,
  92. )
  93. if metric_allow_list:
  94. list_type = "allow"
  95. if metric_block_list:
  96. log.warning(
  97. "Ignoring metrics_block_list as both metrics_allow_list and metrics_block_list have been set."
  98. )
  99. elif metric_block_list:
  100. list_type = "block"
  101. else:
  102. list_type = DEFAULT_VALIDATOR_TYPE
  103. return validators[validator_type][list_type](metric_lists[list_type])
  104. def validate_stat(fn: Callable) -> Callable:
  105. """Check if stat name contains invalid characters; logs and does not emit stats if name is invalid."""
  106. @wraps(fn)
  107. def wrapper(self, stat: str | None = None, *args, **kwargs) -> Callable | None:
  108. try:
  109. if stat is not None:
  110. handler_stat_name_func = get_current_handler_stat_name_func()
  111. stat = handler_stat_name_func(stat)
  112. return fn(self, stat, *args, **kwargs)
  113. except InvalidStatsNameException:
  114. log.exception("Invalid stat name: %s.", stat)
  115. return None
  116. return cast(Callable, wrapper)
  117. def stat_name_otel_handler(
  118. stat_prefix: str,
  119. stat_name: str,
  120. max_length: int = OTEL_NAME_MAX_LENGTH,
  121. ) -> str:
  122. """
  123. Verify that a proposed prefix and name combination will meet OpenTelemetry naming standards.
  124. See: https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax
  125. :param stat_prefix: The proposed prefix applied to all metric names.
  126. :param stat_name: The proposed name.
  127. :param max_length: The max length of the combined prefix and name; defaults to the max length
  128. as defined in the OpenTelemetry standard, but can be overridden.
  129. :returns: Returns the approved combined name or raises an InvalidStatsNameException.
  130. """
  131. proposed_stat_name: str = f"{stat_prefix}.{stat_name}"
  132. name_length_exemption: bool = False
  133. matched_exemption: str = ""
  134. # This test case is here to enforce that the values can not be None and
  135. # must be a valid String. Without this test here, those values get cast
  136. # to a string and pass when they should not, potentially resulting in
  137. # metrics named "airflow.None", "airflow.42", or "None.42" for example.
  138. if not (isinstance(stat_name, str) and isinstance(stat_prefix, str)):
  139. raise InvalidStatsNameException("Stat name and prefix must both be strings.")
  140. if len(proposed_stat_name) > OTEL_NAME_MAX_LENGTH:
  141. # If the name is in the exceptions list, do not fail it for being too long.
  142. # It may still be deemed invalid for other reasons below.
  143. for exemption in BACK_COMPAT_METRIC_NAMES:
  144. if re2.match(exemption, stat_name):
  145. # There is a back-compat exception for this name; proceed
  146. name_length_exemption = True
  147. matched_exemption = exemption.pattern
  148. break
  149. else:
  150. raise InvalidStatsNameException(
  151. f"Invalid stat name: {proposed_stat_name}. Please see "
  152. f"https://opentelemetry.io/docs/reference/specification/metrics/api/#instrument-name-syntax"
  153. )
  154. # `stat_name_default_handler` throws InvalidStatsNameException if the
  155. # provided value is not valid or returns the value if it is. We don't
  156. # need the return value but will make use of the validation checks. If
  157. # no exception is thrown, then the proposed name meets OTel requirements.
  158. stat_name_default_handler(proposed_stat_name, max_length=999 if name_length_exemption else max_length)
  159. # This warning is down here instead of up above because the exemption only
  160. # applies to the length and a name may still be invalid for other reasons.
  161. if name_length_exemption:
  162. warnings.warn(
  163. f"Stat name {stat_name} matches exemption {matched_exemption} and "
  164. f"will be truncated to {proposed_stat_name[:OTEL_NAME_MAX_LENGTH]}. "
  165. f"This stat name will be deprecated in the future and replaced with "
  166. f"a shorter name combined with Attributes/Tags.",
  167. MetricNameLengthExemptionWarning,
  168. stacklevel=2,
  169. )
  170. return proposed_stat_name
  171. def stat_name_default_handler(
  172. stat_name: str, max_length: int = 250, allowed_chars: Iterable[str] = ALLOWED_CHARACTERS
  173. ) -> str:
  174. """
  175. Validate the metric stat name.
  176. Apply changes when necessary and return the transformed stat name.
  177. """
  178. if not isinstance(stat_name, str):
  179. raise InvalidStatsNameException("The stat_name has to be a string")
  180. if len(stat_name) > max_length:
  181. raise InvalidStatsNameException(
  182. f"The stat_name ({stat_name}) has to be less than {max_length} characters."
  183. )
  184. if any(c not in allowed_chars for c in stat_name):
  185. raise InvalidStatsNameException(
  186. f"The stat name ({stat_name}) has to be composed of ASCII "
  187. f"alphabets, numbers, or the underscore, dot, or dash characters."
  188. )
  189. return stat_name
  190. def get_current_handler_stat_name_func() -> Callable[[str], str]:
  191. """Get Stat Name Handler from airflow.cfg."""
  192. handler = conf.getimport("metrics", "stat_name_handler")
  193. if handler is None:
  194. if conf.get("metrics", "statsd_influxdb_enabled", fallback=False):
  195. handler = partial(stat_name_default_handler, allowed_chars={*ALLOWED_CHARACTERS, ",", "="})
  196. else:
  197. handler = stat_name_default_handler
  198. return handler
  199. class ListValidator(metaclass=abc.ABCMeta):
  200. """
  201. ListValidator metaclass that can be implemented as a AllowListValidator or BlockListValidator.
  202. The test method must be overridden by its subclass.
  203. """
  204. def __init__(self, validate_list: str | None = None) -> None:
  205. self.validate_list: tuple[str, ...] | None = (
  206. tuple(item.strip().lower() for item in validate_list.split(",")) if validate_list else None
  207. )
  208. @classmethod
  209. def __subclasshook__(cls, subclass: Callable[[str], str]) -> bool:
  210. return hasattr(subclass, "test") and callable(subclass.test) or NotImplemented
  211. @abc.abstractmethod
  212. def test(self, name: str) -> bool:
  213. """Test if name is allowed."""
  214. raise NotImplementedError
  215. def _has_pattern_match(self, name: str) -> bool:
  216. for entry in self.validate_list or ():
  217. if re2.findall(entry, name.strip().lower()):
  218. return True
  219. return False
  220. class AllowListValidator(ListValidator):
  221. """AllowListValidator only allows names that match the allowed prefixes."""
  222. def test(self, name: str) -> bool:
  223. if self.validate_list is not None:
  224. return name.strip().lower().startswith(self.validate_list)
  225. else:
  226. return True # default is all metrics are allowed
  227. class PatternAllowListValidator(ListValidator):
  228. """Match the provided strings anywhere in the metric name."""
  229. def test(self, name: str) -> bool:
  230. if self.validate_list is not None:
  231. return super()._has_pattern_match(name)
  232. else:
  233. return True # default is all metrics are allowed
  234. class BlockListValidator(ListValidator):
  235. """BlockListValidator only allows names that do not match the blocked prefixes."""
  236. def test(self, name: str) -> bool:
  237. if self.validate_list is not None:
  238. return not name.strip().lower().startswith(self.validate_list)
  239. else:
  240. return True # default is all metrics are allowed
  241. class PatternBlockListValidator(ListValidator):
  242. """Only allow names that do not match the blocked strings."""
  243. def test(self, name: str) -> bool:
  244. if self.validate_list is not None:
  245. return not super()._has_pattern_match(name)
  246. else:
  247. return True # default is all metrics are allowed