| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- #
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- from __future__ import annotations
- import inspect
- import logging
- import socket
- from typing import TYPE_CHECKING, Any, Callable
- from airflow.configuration import conf
- from airflow.typing_compat import Protocol
- log = logging.getLogger(__name__)
- def gen_context(trace_id, span_id):
- """Generate span context from trace_id and span_id."""
- from airflow.traces.otel_tracer import gen_context as otel_gen_context
- return otel_gen_context(trace_id, span_id)
- def gen_links_from_kv_list(list):
- """Generate links from kv list of {trace_id:int, span_id:int}."""
- from airflow.traces.otel_tracer import gen_links_from_kv_list
- return gen_links_from_kv_list(list)
- def span(func):
- """Decorate a function with span."""
- def wrapper(*args, **kwargs):
- func_name = func.__name__
- qual_name = func.__qualname__
- module_name = func.__module__
- if "." in qual_name:
- component = f"{qual_name.rsplit('.', 1)[0]}"
- else:
- component = module_name
- with Trace.start_span(span_name=func_name, component=component):
- if len(inspect.signature(func).parameters) > 0:
- return func(*args, **kwargs)
- else:
- return func()
- return wrapper
- class EmptyContext:
- """If no Tracer is configured, EmptyContext is used as a fallback."""
- def __init__(self):
- self.trace_id = 1
- class EmptySpan:
- """If no Tracer is configured, EmptySpan is used as a fallback."""
- def __enter__(self):
- """Enter."""
- return self
- def __exit__(self, *args, **kwargs):
- """Exit."""
- pass
- def __call__(self, obj):
- """Call."""
- return obj
- def get_span_context(self):
- """Get span context."""
- return EMPTY_CTX
- def set_attribute(self, key, value) -> None:
- """Set an attribute to the span."""
- pass
- def set_attributes(self, attributes) -> None:
- """Set multiple attributes at once."""
- pass
- def is_recording(self):
- return False
- def add_event(
- self,
- name: str,
- attributes: Any | None = None,
- timestamp: int | None = None,
- ) -> None:
- """Add event to span."""
- pass
- def add_link(
- self,
- context: Any,
- attributes: Any | None = None,
- ) -> None:
- """Add link to the span."""
- pass
- def end(self, end_time=None, *args, **kwargs) -> None:
- """End."""
- pass
- EMPTY_SPAN = EmptySpan()
- EMPTY_CTX = EmptyContext()
- class Tracer(Protocol):
- """This class is only used for TypeChecking (for IDEs, mypy, etc)."""
- instance: Tracer | EmptyTrace | None = None
- @classmethod
- def get_tracer(cls, component):
- """Get a tracer."""
- raise NotImplementedError()
- @classmethod
- def start_span(
- cls,
- span_name: str,
- component: str | None = None,
- parent_sc=None,
- span_id=None,
- links=None,
- start_time=None,
- ):
- """Start a span."""
- raise NotImplementedError()
- @classmethod
- def use_span(cls, span):
- """Use a span as current."""
- raise NotImplementedError()
- @classmethod
- def get_current_span(self):
- raise NotImplementedError()
- @classmethod
- def start_span_from_dagrun(
- cls,
- dagrun,
- span_name=None,
- service_name=None,
- component=None,
- links=None,
- ):
- """Start a span from dagrun."""
- raise NotImplementedError()
- @classmethod
- def start_span_from_taskinstance(
- cls,
- ti,
- span_name=None,
- component=None,
- child=False,
- links=None,
- ):
- """Start a span from taskinstance."""
- raise NotImplementedError()
- class EmptyTrace:
- """If no Tracer is configured, EmptyTracer is used as a fallback."""
- @classmethod
- def get_tracer(
- cls,
- component: str,
- trace_id: int | None = None,
- span_id: int | None = None,
- ):
- """Get a tracer using provided node id and trace id."""
- return cls
- @classmethod
- def start_span(
- cls,
- span_name: str,
- component: str | None = None,
- parent_sc=None,
- span_id=None,
- links=None,
- start_time=None,
- ) -> EmptySpan:
- """Start a span."""
- return EMPTY_SPAN
- @classmethod
- def use_span(cls, span) -> EmptySpan:
- """Use a span as current."""
- return EMPTY_SPAN
- @classmethod
- def get_current_span(self) -> EmptySpan:
- """Get the current span."""
- return EMPTY_SPAN
- @classmethod
- def start_span_from_dagrun(
- cls,
- dagrun,
- span_name=None,
- service_name=None,
- component=None,
- links=None,
- ) -> EmptySpan:
- """Start a span from dagrun."""
- return EMPTY_SPAN
- @classmethod
- def start_span_from_taskinstance(
- cls,
- ti,
- span_name=None,
- component=None,
- child=False,
- links=None,
- ) -> EmptySpan:
- """Start a span from taskinstance."""
- return EMPTY_SPAN
- class _Trace(type):
- factory: Callable
- instance: Tracer | EmptyTrace | None = None
- def __getattr__(cls, name: str) -> str:
- if not cls.instance:
- try:
- cls.instance = cls.factory()
- except (socket.gaierror, ImportError) as e:
- log.error("Could not configure Trace: %s, using EmptyTrace instead.", e)
- cls.instance = EmptyTrace()
- return getattr(cls.instance, name)
- def __init__(cls, *args, **kwargs) -> None:
- super().__init__(cls)
- if not hasattr(cls.__class__, "factory"):
- if conf.has_option("traces", "otel_on") and conf.getboolean("traces", "otel_on"):
- from airflow.traces import otel_tracer
- cls.__class__.factory = otel_tracer.get_otel_tracer
- else:
- cls.__class__.factory = EmptyTrace
- @classmethod
- def get_constant_tags(cls) -> str | None:
- """Get constant tags to add to all traces."""
- tags_in_string = conf.get("traces", "tags", fallback=None)
- if not tags_in_string:
- return None
- return tags_in_string
- if TYPE_CHECKING:
- Trace: EmptyTrace
- else:
- class Trace(metaclass=_Trace):
- """Empty class for Trace - we use metaclass to inject the right one."""
|