tracer.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import inspect
  20. import logging
  21. import socket
  22. from typing import TYPE_CHECKING, Any, Callable
  23. from airflow.configuration import conf
  24. from airflow.typing_compat import Protocol
  25. log = logging.getLogger(__name__)
  26. def gen_context(trace_id, span_id):
  27. """Generate span context from trace_id and span_id."""
  28. from airflow.traces.otel_tracer import gen_context as otel_gen_context
  29. return otel_gen_context(trace_id, span_id)
  30. def gen_links_from_kv_list(list):
  31. """Generate links from kv list of {trace_id:int, span_id:int}."""
  32. from airflow.traces.otel_tracer import gen_links_from_kv_list
  33. return gen_links_from_kv_list(list)
  34. def span(func):
  35. """Decorate a function with span."""
  36. def wrapper(*args, **kwargs):
  37. func_name = func.__name__
  38. qual_name = func.__qualname__
  39. module_name = func.__module__
  40. if "." in qual_name:
  41. component = f"{qual_name.rsplit('.', 1)[0]}"
  42. else:
  43. component = module_name
  44. with Trace.start_span(span_name=func_name, component=component):
  45. if len(inspect.signature(func).parameters) > 0:
  46. return func(*args, **kwargs)
  47. else:
  48. return func()
  49. return wrapper
  50. class EmptyContext:
  51. """If no Tracer is configured, EmptyContext is used as a fallback."""
  52. def __init__(self):
  53. self.trace_id = 1
  54. class EmptySpan:
  55. """If no Tracer is configured, EmptySpan is used as a fallback."""
  56. def __enter__(self):
  57. """Enter."""
  58. return self
  59. def __exit__(self, *args, **kwargs):
  60. """Exit."""
  61. pass
  62. def __call__(self, obj):
  63. """Call."""
  64. return obj
  65. def get_span_context(self):
  66. """Get span context."""
  67. return EMPTY_CTX
  68. def set_attribute(self, key, value) -> None:
  69. """Set an attribute to the span."""
  70. pass
  71. def set_attributes(self, attributes) -> None:
  72. """Set multiple attributes at once."""
  73. pass
  74. def is_recording(self):
  75. return False
  76. def add_event(
  77. self,
  78. name: str,
  79. attributes: Any | None = None,
  80. timestamp: int | None = None,
  81. ) -> None:
  82. """Add event to span."""
  83. pass
  84. def add_link(
  85. self,
  86. context: Any,
  87. attributes: Any | None = None,
  88. ) -> None:
  89. """Add link to the span."""
  90. pass
  91. def end(self, end_time=None, *args, **kwargs) -> None:
  92. """End."""
  93. pass
  94. EMPTY_SPAN = EmptySpan()
  95. EMPTY_CTX = EmptyContext()
  96. class Tracer(Protocol):
  97. """This class is only used for TypeChecking (for IDEs, mypy, etc)."""
  98. instance: Tracer | EmptyTrace | None = None
  99. @classmethod
  100. def get_tracer(cls, component):
  101. """Get a tracer."""
  102. raise NotImplementedError()
  103. @classmethod
  104. def start_span(
  105. cls,
  106. span_name: str,
  107. component: str | None = None,
  108. parent_sc=None,
  109. span_id=None,
  110. links=None,
  111. start_time=None,
  112. ):
  113. """Start a span."""
  114. raise NotImplementedError()
  115. @classmethod
  116. def use_span(cls, span):
  117. """Use a span as current."""
  118. raise NotImplementedError()
  119. @classmethod
  120. def get_current_span(self):
  121. raise NotImplementedError()
  122. @classmethod
  123. def start_span_from_dagrun(
  124. cls,
  125. dagrun,
  126. span_name=None,
  127. service_name=None,
  128. component=None,
  129. links=None,
  130. ):
  131. """Start a span from dagrun."""
  132. raise NotImplementedError()
  133. @classmethod
  134. def start_span_from_taskinstance(
  135. cls,
  136. ti,
  137. span_name=None,
  138. component=None,
  139. child=False,
  140. links=None,
  141. ):
  142. """Start a span from taskinstance."""
  143. raise NotImplementedError()
  144. class EmptyTrace:
  145. """If no Tracer is configured, EmptyTracer is used as a fallback."""
  146. @classmethod
  147. def get_tracer(
  148. cls,
  149. component: str,
  150. trace_id: int | None = None,
  151. span_id: int | None = None,
  152. ):
  153. """Get a tracer using provided node id and trace id."""
  154. return cls
  155. @classmethod
  156. def start_span(
  157. cls,
  158. span_name: str,
  159. component: str | None = None,
  160. parent_sc=None,
  161. span_id=None,
  162. links=None,
  163. start_time=None,
  164. ) -> EmptySpan:
  165. """Start a span."""
  166. return EMPTY_SPAN
  167. @classmethod
  168. def use_span(cls, span) -> EmptySpan:
  169. """Use a span as current."""
  170. return EMPTY_SPAN
  171. @classmethod
  172. def get_current_span(self) -> EmptySpan:
  173. """Get the current span."""
  174. return EMPTY_SPAN
  175. @classmethod
  176. def start_span_from_dagrun(
  177. cls,
  178. dagrun,
  179. span_name=None,
  180. service_name=None,
  181. component=None,
  182. links=None,
  183. ) -> EmptySpan:
  184. """Start a span from dagrun."""
  185. return EMPTY_SPAN
  186. @classmethod
  187. def start_span_from_taskinstance(
  188. cls,
  189. ti,
  190. span_name=None,
  191. component=None,
  192. child=False,
  193. links=None,
  194. ) -> EmptySpan:
  195. """Start a span from taskinstance."""
  196. return EMPTY_SPAN
  197. class _Trace(type):
  198. factory: Callable
  199. instance: Tracer | EmptyTrace | None = None
  200. def __getattr__(cls, name: str) -> str:
  201. if not cls.instance:
  202. try:
  203. cls.instance = cls.factory()
  204. except (socket.gaierror, ImportError) as e:
  205. log.error("Could not configure Trace: %s, using EmptyTrace instead.", e)
  206. cls.instance = EmptyTrace()
  207. return getattr(cls.instance, name)
  208. def __init__(cls, *args, **kwargs) -> None:
  209. super().__init__(cls)
  210. if not hasattr(cls.__class__, "factory"):
  211. if conf.has_option("traces", "otel_on") and conf.getboolean("traces", "otel_on"):
  212. from airflow.traces import otel_tracer
  213. cls.__class__.factory = otel_tracer.get_otel_tracer
  214. else:
  215. cls.__class__.factory = EmptyTrace
  216. @classmethod
  217. def get_constant_tags(cls) -> str | None:
  218. """Get constant tags to add to all traces."""
  219. tags_in_string = conf.get("traces", "tags", fallback=None)
  220. if not tags_in_string:
  221. return None
  222. return tags_in_string
  223. if TYPE_CHECKING:
  224. Trace: EmptyTrace
  225. else:
  226. class Trace(metaclass=_Trace):
  227. """Empty class for Trace - we use metaclass to inject the right one."""