otel_tracer.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import logging
  20. import random
  21. from opentelemetry import trace
  22. from opentelemetry.context import create_key
  23. from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
  24. from opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resource
  25. from opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, TracerProvider
  26. from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
  27. from opentelemetry.sdk.trace.id_generator import IdGenerator
  28. from opentelemetry.trace import Link, NonRecordingSpan, SpanContext, TraceFlags, Tracer
  29. from opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_ID
  30. from airflow.configuration import conf
  31. from airflow.traces import (
  32. TRACEPARENT,
  33. TRACESTATE,
  34. )
  35. from airflow.traces.utils import (
  36. gen_dag_span_id,
  37. gen_span_id,
  38. gen_trace_id,
  39. parse_traceparent,
  40. parse_tracestate,
  41. )
  42. from airflow.utils.dates import datetime_to_nano
  43. from airflow.utils.net import get_hostname
  44. log = logging.getLogger(__name__)
  45. _NEXT_ID = create_key("next_id")
  46. class OtelTrace:
  47. """
  48. Handle all tracing requirements such as getting the tracer, and starting a new span.
  49. When OTEL is enabled, the Trace class will be replaced by this class.
  50. """
  51. def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, tag_string: str | None = None):
  52. self.span_exporter = span_exporter
  53. self.span_processor = BatchSpanProcessor(self.span_exporter)
  54. self.tag_string = tag_string
  55. self.otel_service = conf.get("traces", "otel_service")
  56. def get_tracer(
  57. self, component: str, trace_id: int | None = None, span_id: int | None = None
  58. ) -> OpenTelemetryTracer | Tracer:
  59. """Tracer that will use special AirflowOtelIdGenerator to control producing certain span and trace id."""
  60. resource = Resource(attributes={HOST_NAME: get_hostname(), SERVICE_NAME: self.otel_service})
  61. if trace_id or span_id:
  62. # in case where trace_id or span_id was given
  63. tracer_provider = TracerProvider(
  64. resource=resource, id_generator=AirflowOtelIdGenerator(span_id=span_id, trace_id=trace_id)
  65. )
  66. else:
  67. tracer_provider = TracerProvider(resource=resource)
  68. tracer_provider.add_span_processor(self.span_processor)
  69. tracer = tracer_provider.get_tracer(component)
  70. """
  71. Tracer will produce a single ID value if value is provided. Note that this is one-time only, so any
  72. subsequent call will produce the normal random ids.
  73. """
  74. return tracer
  75. def get_current_span(self):
  76. return trace.get_current_span()
  77. def use_span(self, span: Span):
  78. return trace.use_span(span=span)
  79. def start_span(
  80. self,
  81. span_name: str,
  82. component: str | None = None,
  83. parent_sc: SpanContext | None = None,
  84. span_id=None,
  85. links=None,
  86. start_time=None,
  87. ):
  88. """Start a span; if service_name is not given, otel_service is used."""
  89. if component is None:
  90. component = self.otel_service
  91. trace_id = self.get_current_span().get_span_context().trace_id
  92. tracer = self.get_tracer(component=component, trace_id=trace_id, span_id=span_id)
  93. attributes = parse_tracestate(self.tag_string) if self.tag_string else {}
  94. if links is not None:
  95. _links = gen_links_from_kv_list(links)
  96. else:
  97. _links = []
  98. if start_time is not None:
  99. start_time = datetime_to_nano(start_time)
  100. if parent_sc is not None:
  101. ctx = trace.set_span_in_context(NonRecordingSpan(parent_sc))
  102. span = tracer.start_as_current_span(
  103. span_name, context=ctx, attributes=attributes, links=_links, start_time=start_time
  104. )
  105. else:
  106. span = tracer.start_as_current_span(
  107. span_name, attributes=attributes, links=_links, start_time=start_time
  108. )
  109. return span
  110. def start_span_from_dagrun(
  111. self, dagrun, span_name: str | None = None, component: str = "dagrun", links=None
  112. ):
  113. """Produce a span from dag run."""
  114. # check if dagrun has configs
  115. conf = dagrun.conf
  116. trace_id = int(gen_trace_id(dag_run=dagrun, as_int=True))
  117. span_id = int(gen_dag_span_id(dag_run=dagrun, as_int=True))
  118. tracer = self.get_tracer(component=component, span_id=span_id, trace_id=trace_id)
  119. tag_string = self.tag_string if self.tag_string else ""
  120. tag_string = tag_string + ("," + conf.get(TRACESTATE) if (conf and conf.get(TRACESTATE)) else "")
  121. if span_name is None:
  122. span_name = dagrun.dag_id
  123. _links = gen_links_from_kv_list(links) if links else []
  124. _links.append(
  125. Link(
  126. context=trace.get_current_span().get_span_context(),
  127. attributes={"meta.annotation_type": "link", "from": "parenttrace"},
  128. )
  129. )
  130. if conf and conf.get(TRACEPARENT):
  131. # add the trace parent as the link
  132. _links.append(gen_link_from_traceparent(conf.get(TRACEPARENT)))
  133. span_ctx = SpanContext(
  134. trace_id=INVALID_TRACE_ID, span_id=INVALID_SPAN_ID, is_remote=True, trace_flags=TraceFlags(0x01)
  135. )
  136. ctx = trace.set_span_in_context(NonRecordingSpan(span_ctx))
  137. span = tracer.start_as_current_span(
  138. name=span_name,
  139. context=ctx,
  140. links=_links,
  141. start_time=datetime_to_nano(dagrun.queued_at),
  142. attributes=parse_tracestate(tag_string),
  143. )
  144. return span
  145. def start_span_from_taskinstance(
  146. self,
  147. ti,
  148. span_name: str | None = None,
  149. component: str = "taskinstance",
  150. child: bool = False,
  151. links=None,
  152. ):
  153. """
  154. Create and start span from given task instance.
  155. Essentially the span represents the ti itself if child == True, it will create a 'child' span under the given span.
  156. """
  157. dagrun = ti.dag_run
  158. trace_id = int(gen_trace_id(dag_run=dagrun, as_int=True))
  159. span_id = int(gen_span_id(ti=ti, as_int=True))
  160. if span_name is None:
  161. span_name = ti.task_id
  162. parent_id = span_id if child else int(gen_dag_span_id(dag_run=dagrun, as_int=True))
  163. span_ctx = SpanContext(
  164. trace_id=trace_id, span_id=parent_id, is_remote=True, trace_flags=TraceFlags(0x01)
  165. )
  166. _links = gen_links_from_kv_list(links) if links else []
  167. _links.append(
  168. Link(
  169. context=SpanContext(
  170. trace_id=trace.get_current_span().get_span_context().trace_id,
  171. span_id=span_id,
  172. is_remote=True,
  173. trace_flags=TraceFlags(0x01),
  174. ),
  175. attributes={"meta.annotation_type": "link", "from": "parenttrace"},
  176. )
  177. )
  178. if child is False:
  179. tracer = self.get_tracer(component=component, span_id=span_id, trace_id=trace_id)
  180. else:
  181. tracer = self.get_tracer(component=component)
  182. ctx = trace.set_span_in_context(NonRecordingSpan(span_ctx))
  183. span = tracer.start_as_current_span(
  184. name=span_name, context=ctx, start_time=datetime_to_nano(ti.queued_dttm), links=_links
  185. )
  186. return span
  187. def gen_context(trace_id: int, span_id: int):
  188. """Generate a remote span context for given trace and span id."""
  189. span_ctx = SpanContext(trace_id=trace_id, span_id=span_id, is_remote=True, trace_flags=TraceFlags(0x01))
  190. return span_ctx
  191. def gen_links_from_kv_list(kv_list):
  192. """Convert list of kv dic of trace_id and span_id and generate list of SpanContext."""
  193. result = []
  194. for a in kv_list:
  195. trace_id = a["trace_id"] # string of hexa
  196. span_id = a["span_id"] # string of hexa
  197. span_ctx = gen_context(trace_id, span_id)
  198. a_link = Link(
  199. context=span_ctx,
  200. attributes={"meta.annotation_type": "link"},
  201. )
  202. result.append(a_link)
  203. return result
  204. def gen_link_from_traceparent(traceparent: str):
  205. """Generate Link object from provided traceparent string."""
  206. if traceparent is None:
  207. return None
  208. trace_ctx = parse_traceparent(traceparent)
  209. trace_id = trace_ctx["trace_id"]
  210. span_id = trace_ctx["parent_id"]
  211. span_ctx = gen_context(int(trace_id, 16), int(span_id, 16))
  212. return Link(context=span_ctx, attributes={"meta.annotation_type": "link", "from": "traceparent"})
  213. def get_otel_tracer(cls) -> OtelTrace:
  214. """Get OTEL tracer from airflow configuration."""
  215. host = conf.get("traces", "otel_host")
  216. port = conf.getint("traces", "otel_port")
  217. debug = conf.getboolean("traces", "otel_debugging_on")
  218. ssl_active = conf.getboolean("traces", "otel_ssl_active")
  219. tag_string = cls.get_constant_tags()
  220. if debug is True:
  221. log.info("[ConsoleSpanExporter] is being used")
  222. return OtelTrace(span_exporter=ConsoleSpanExporter(), tag_string=tag_string)
  223. else:
  224. protocol = "https" if ssl_active else "http"
  225. endpoint = f"{protocol}://{host}:{port}/v1/traces"
  226. log.info("[OTLPSpanExporter] Connecting to OpenTelemetry Collector at %s", endpoint)
  227. return OtelTrace(
  228. span_exporter=OTLPSpanExporter(endpoint=endpoint, headers={"Content-Type": "application/json"}),
  229. tag_string=tag_string,
  230. )
  231. class AirflowOtelIdGenerator(IdGenerator):
  232. """
  233. ID Generator for span id and trace id.
  234. The specific purpose of this ID generator is to generate a given span_id when the
  235. generate_span_id is called for the FIRST time. Any subsequent calls to the generate_span_id()
  236. will then fall back into producing random ones. As for the trace_id, the class is designed
  237. to produce the provided trace id (and not anything random)
  238. """
  239. def __init__(self, span_id=None, trace_id=None):
  240. super().__init__()
  241. self.span_id = span_id
  242. self.trace_id = trace_id
  243. def generate_span_id(self) -> int:
  244. if self.span_id is not None:
  245. id = self.span_id
  246. self.span_id = None
  247. return id
  248. else:
  249. new_id = random.getrandbits(64)
  250. return new_id
  251. def generate_trace_id(self) -> int:
  252. if self.trace_id is not None:
  253. id = self.trace_id
  254. return id
  255. else:
  256. new_id = random.getrandbits(128)
  257. return new_id