sentry.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. """Sentry Integration."""
  19. from __future__ import annotations
  20. import logging
  21. from functools import wraps
  22. from typing import TYPE_CHECKING
  23. from airflow.configuration import conf
  24. from airflow.executors.executor_loader import ExecutorLoader
  25. from airflow.utils.session import find_session_idx, provide_session
  26. from airflow.utils.state import TaskInstanceState
  27. if TYPE_CHECKING:
  28. from sqlalchemy.orm import Session
  29. from airflow.models.taskinstance import TaskInstance
  30. log = logging.getLogger(__name__)
  31. class DummySentry:
  32. """Blank class for Sentry."""
  33. def add_tagging(self, task_instance):
  34. """Blank function for tagging."""
  35. def add_breadcrumbs(self, task_instance, session: Session | None = None):
  36. """Blank function for breadcrumbs."""
  37. def enrich_errors(self, run):
  38. """Blank function for formatting a TaskInstance._run_raw_task."""
  39. return run
  40. def flush(self):
  41. """Blank function for flushing errors."""
  42. Sentry: DummySentry = DummySentry()
  43. if conf.getboolean("sentry", "sentry_on", fallback=False):
  44. import sentry_sdk
  45. from sentry_sdk.integrations.flask import FlaskIntegration
  46. from sentry_sdk.integrations.logging import ignore_logger
  47. class ConfiguredSentry(DummySentry):
  48. """Configure Sentry SDK."""
  49. SCOPE_DAG_RUN_TAGS = frozenset(("data_interval_end", "data_interval_start", "execution_date"))
  50. SCOPE_TASK_INSTANCE_TAGS = frozenset(("task_id", "dag_id", "try_number"))
  51. SCOPE_CRUMBS = frozenset(("task_id", "state", "operator", "duration"))
  52. UNSUPPORTED_SENTRY_OPTIONS = frozenset(
  53. (
  54. "integrations",
  55. "in_app_include",
  56. "in_app_exclude",
  57. "ignore_errors",
  58. "before_breadcrumb",
  59. )
  60. )
  61. def __init__(self):
  62. """Initialize the Sentry SDK."""
  63. ignore_logger("airflow.task")
  64. sentry_flask = FlaskIntegration()
  65. # LoggingIntegration is set by default.
  66. integrations = [sentry_flask]
  67. executor_class, _ = ExecutorLoader.import_default_executor_cls(validate=False)
  68. if executor_class.supports_sentry:
  69. from sentry_sdk.integrations.celery import CeleryIntegration
  70. sentry_celery = CeleryIntegration()
  71. integrations.append(sentry_celery)
  72. dsn = None
  73. sentry_config_opts = conf.getsection("sentry") or {}
  74. if sentry_config_opts:
  75. sentry_config_opts.pop("sentry_on")
  76. old_way_dsn = sentry_config_opts.pop("sentry_dsn", None)
  77. new_way_dsn = sentry_config_opts.pop("dsn", None)
  78. # supported backward compatibility with old way dsn option
  79. dsn = old_way_dsn or new_way_dsn
  80. unsupported_options = self.UNSUPPORTED_SENTRY_OPTIONS.intersection(sentry_config_opts.keys())
  81. if unsupported_options:
  82. log.warning(
  83. "There are unsupported options in [sentry] section: %s",
  84. ", ".join(unsupported_options),
  85. )
  86. sentry_config_opts["before_send"] = conf.getimport("sentry", "before_send", fallback=None)
  87. sentry_config_opts["transport"] = conf.getimport("sentry", "transport", fallback=None)
  88. if dsn:
  89. sentry_sdk.init(dsn=dsn, integrations=integrations, **sentry_config_opts)
  90. else:
  91. # Setting up Sentry using environment variables.
  92. log.debug("Defaulting to SENTRY_DSN in environment.")
  93. sentry_sdk.init(integrations=integrations, **sentry_config_opts)
  94. def add_tagging(self, task_instance):
  95. """Add tagging for a task_instance."""
  96. dag_run = task_instance.dag_run
  97. task = task_instance.task
  98. with sentry_sdk.configure_scope() as scope:
  99. for tag_name in self.SCOPE_TASK_INSTANCE_TAGS:
  100. attribute = getattr(task_instance, tag_name)
  101. scope.set_tag(tag_name, attribute)
  102. for tag_name in self.SCOPE_DAG_RUN_TAGS:
  103. attribute = getattr(dag_run, tag_name)
  104. scope.set_tag(tag_name, attribute)
  105. scope.set_tag("operator", task.__class__.__name__)
  106. @provide_session
  107. def add_breadcrumbs(
  108. self,
  109. task_instance: TaskInstance,
  110. session: Session | None = None,
  111. ) -> None:
  112. """Add breadcrumbs inside of a task_instance."""
  113. if session is None:
  114. return
  115. dr = task_instance.get_dagrun(session)
  116. task_instances = dr.get_task_instances(
  117. state={TaskInstanceState.SUCCESS, TaskInstanceState.FAILED},
  118. session=session,
  119. )
  120. for ti in task_instances:
  121. data = {}
  122. for crumb_tag in self.SCOPE_CRUMBS:
  123. data[crumb_tag] = getattr(ti, crumb_tag)
  124. sentry_sdk.add_breadcrumb(category="completed_tasks", data=data, level="info")
  125. def enrich_errors(self, func):
  126. """
  127. Decorate errors.
  128. Wrap TaskInstance._run_raw_task to support task specific tags and breadcrumbs.
  129. """
  130. session_args_idx = find_session_idx(func)
  131. @wraps(func)
  132. def wrapper(_self, *args, **kwargs):
  133. # Wrapping the _run_raw_task function with push_scope to contain
  134. # tags and breadcrumbs to a specific Task Instance
  135. try:
  136. session = kwargs.get("session", args[session_args_idx])
  137. except IndexError:
  138. session = None
  139. with sentry_sdk.push_scope():
  140. try:
  141. # Is a LocalTaskJob get the task instance
  142. if hasattr(_self, "task_instance"):
  143. task_instance = _self.task_instance
  144. else:
  145. task_instance = _self
  146. self.add_tagging(task_instance)
  147. self.add_breadcrumbs(task_instance, session=session)
  148. return func(_self, *args, **kwargs)
  149. except Exception as e:
  150. sentry_sdk.capture_exception(e)
  151. raise
  152. return wrapper
  153. def flush(self):
  154. sentry_sdk.flush()
  155. Sentry = ConfiguredSentry()