# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import annotations import inspect import logging import socket from typing import TYPE_CHECKING, Any, Callable from airflow.configuration import conf from airflow.typing_compat import Protocol log = logging.getLogger(__name__) def gen_context(trace_id, span_id): """Generate span context from trace_id and span_id.""" from airflow.traces.otel_tracer import gen_context as otel_gen_context return otel_gen_context(trace_id, span_id) def gen_links_from_kv_list(list): """Generate links from kv list of {trace_id:int, span_id:int}.""" from airflow.traces.otel_tracer import gen_links_from_kv_list return gen_links_from_kv_list(list) def span(func): """Decorate a function with span.""" def wrapper(*args, **kwargs): func_name = func.__name__ qual_name = func.__qualname__ module_name = func.__module__ if "." in qual_name: component = f"{qual_name.rsplit('.', 1)[0]}" else: component = module_name with Trace.start_span(span_name=func_name, component=component): if len(inspect.signature(func).parameters) > 0: return func(*args, **kwargs) else: return func() return wrapper class EmptyContext: """If no Tracer is configured, EmptyContext is used as a fallback.""" def __init__(self): self.trace_id = 1 class EmptySpan: """If no Tracer is configured, EmptySpan is used as a fallback.""" def __enter__(self): """Enter.""" return self def __exit__(self, *args, **kwargs): """Exit.""" pass def __call__(self, obj): """Call.""" return obj def get_span_context(self): """Get span context.""" return EMPTY_CTX def set_attribute(self, key, value) -> None: """Set an attribute to the span.""" pass def set_attributes(self, attributes) -> None: """Set multiple attributes at once.""" pass def is_recording(self): return False def add_event( self, name: str, attributes: Any | None = None, timestamp: int | None = None, ) -> None: """Add event to span.""" pass def add_link( self, context: Any, attributes: Any | None = None, ) -> None: """Add link to the span.""" pass def end(self, end_time=None, *args, **kwargs) -> None: """End.""" pass EMPTY_SPAN = EmptySpan() EMPTY_CTX = EmptyContext() class Tracer(Protocol): """This class is only used for TypeChecking (for IDEs, mypy, etc).""" instance: Tracer | EmptyTrace | None = None @classmethod def get_tracer(cls, component): """Get a tracer.""" raise NotImplementedError() @classmethod def start_span( cls, span_name: str, component: str | None = None, parent_sc=None, span_id=None, links=None, start_time=None, ): """Start a span.""" raise NotImplementedError() @classmethod def use_span(cls, span): """Use a span as current.""" raise NotImplementedError() @classmethod def get_current_span(self): raise NotImplementedError() @classmethod def start_span_from_dagrun( cls, dagrun, span_name=None, service_name=None, component=None, links=None, ): """Start a span from dagrun.""" raise NotImplementedError() @classmethod def start_span_from_taskinstance( cls, ti, span_name=None, component=None, child=False, links=None, ): """Start a span from taskinstance.""" raise NotImplementedError() class EmptyTrace: """If no Tracer is configured, EmptyTracer is used as a fallback.""" @classmethod def get_tracer( cls, component: str, trace_id: int | None = None, span_id: int | None = None, ): """Get a tracer using provided node id and trace id.""" return cls @classmethod def start_span( cls, span_name: str, component: str | None = None, parent_sc=None, span_id=None, links=None, start_time=None, ) -> EmptySpan: """Start a span.""" return EMPTY_SPAN @classmethod def use_span(cls, span) -> EmptySpan: """Use a span as current.""" return EMPTY_SPAN @classmethod def get_current_span(self) -> EmptySpan: """Get the current span.""" return EMPTY_SPAN @classmethod def start_span_from_dagrun( cls, dagrun, span_name=None, service_name=None, component=None, links=None, ) -> EmptySpan: """Start a span from dagrun.""" return EMPTY_SPAN @classmethod def start_span_from_taskinstance( cls, ti, span_name=None, component=None, child=False, links=None, ) -> EmptySpan: """Start a span from taskinstance.""" return EMPTY_SPAN class _Trace(type): factory: Callable instance: Tracer | EmptyTrace | None = None def __getattr__(cls, name: str) -> str: if not cls.instance: try: cls.instance = cls.factory() except (socket.gaierror, ImportError) as e: log.error("Could not configure Trace: %s, using EmptyTrace instead.", e) cls.instance = EmptyTrace() return getattr(cls.instance, name) def __init__(cls, *args, **kwargs) -> None: super().__init__(cls) if not hasattr(cls.__class__, "factory"): if conf.has_option("traces", "otel_on") and conf.getboolean("traces", "otel_on"): from airflow.traces import otel_tracer cls.__class__.factory = otel_tracer.get_otel_tracer else: cls.__class__.factory = EmptyTrace @classmethod def get_constant_tags(cls) -> str | None: """Get constant tags to add to all traces.""" tags_in_string = conf.get("traces", "tags", fallback=None) if not tags_in_string: return None return tags_in_string if TYPE_CHECKING: Trace: EmptyTrace else: class Trace(metaclass=_Trace): """Empty class for Trace - we use metaclass to inject the right one."""