| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307 | ## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements.  See the NOTICE file# distributed with this work for additional information# regarding copyright ownership.  The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License.  You may obtain a copy of the License at##   http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied.  See the License for the# specific language governing permissions and limitations# under the License.from __future__ import annotationsimport loggingimport randomfrom opentelemetry import tracefrom opentelemetry.context import create_keyfrom opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporterfrom opentelemetry.sdk.resources import HOST_NAME, SERVICE_NAME, Resourcefrom opentelemetry.sdk.trace import Span, Tracer as OpenTelemetryTracer, TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporterfrom opentelemetry.sdk.trace.id_generator import IdGeneratorfrom opentelemetry.trace import Link, NonRecordingSpan, SpanContext, TraceFlags, Tracerfrom opentelemetry.trace.span import INVALID_SPAN_ID, INVALID_TRACE_IDfrom airflow.configuration import conffrom airflow.traces import (    TRACEPARENT,    TRACESTATE,)from airflow.traces.utils import (    gen_dag_span_id,    gen_span_id,    gen_trace_id,    parse_traceparent,    parse_tracestate,)from airflow.utils.dates import datetime_to_nanofrom airflow.utils.net import get_hostnamelog = logging.getLogger(__name__)_NEXT_ID = create_key("next_id")class OtelTrace:    """    Handle all tracing requirements such as getting the tracer, and starting a new span.    When OTEL is enabled, the Trace class will be replaced by this class.    """    def __init__(self, span_exporter: ConsoleSpanExporter | OTLPSpanExporter, tag_string: str | None = None):        self.span_exporter = span_exporter        self.span_processor = BatchSpanProcessor(self.span_exporter)        self.tag_string = tag_string        self.otel_service = conf.get("traces", "otel_service")    def get_tracer(        self, component: str, trace_id: int | None = None, span_id: int | None = None    ) -> OpenTelemetryTracer | Tracer:        """Tracer that will use special AirflowOtelIdGenerator to control producing certain span and trace id."""        resource = Resource(attributes={HOST_NAME: get_hostname(), SERVICE_NAME: self.otel_service})        if trace_id or span_id:            # in case where trace_id or span_id was given            tracer_provider = TracerProvider(                resource=resource, id_generator=AirflowOtelIdGenerator(span_id=span_id, trace_id=trace_id)            )        else:            tracer_provider = TracerProvider(resource=resource)        tracer_provider.add_span_processor(self.span_processor)        tracer = tracer_provider.get_tracer(component)        """        Tracer will produce a single ID value if value is provided. Note that this is one-time only, so any        subsequent call will produce the normal random ids.        """        return tracer    def get_current_span(self):        return trace.get_current_span()    def use_span(self, span: Span):        return trace.use_span(span=span)    def start_span(        self,        span_name: str,        component: str | None = None,        parent_sc: SpanContext | None = None,        span_id=None,        links=None,        start_time=None,    ):        """Start a span; if service_name is not given, otel_service is used."""        if component is None:            component = self.otel_service        trace_id = self.get_current_span().get_span_context().trace_id        tracer = self.get_tracer(component=component, trace_id=trace_id, span_id=span_id)        attributes = parse_tracestate(self.tag_string) if self.tag_string else {}        if links is not None:            _links = gen_links_from_kv_list(links)        else:            _links = []        if start_time is not None:            start_time = datetime_to_nano(start_time)        if parent_sc is not None:            ctx = trace.set_span_in_context(NonRecordingSpan(parent_sc))            span = tracer.start_as_current_span(                span_name, context=ctx, attributes=attributes, links=_links, start_time=start_time            )        else:            span = tracer.start_as_current_span(                span_name, attributes=attributes, links=_links, start_time=start_time            )        return span    def start_span_from_dagrun(        self, dagrun, span_name: str | None = None, component: str = "dagrun", links=None    ):        """Produce a span from dag run."""        # check if dagrun has configs        conf = dagrun.conf        trace_id = int(gen_trace_id(dag_run=dagrun, as_int=True))        span_id = int(gen_dag_span_id(dag_run=dagrun, as_int=True))        tracer = self.get_tracer(component=component, span_id=span_id, trace_id=trace_id)        tag_string = self.tag_string if self.tag_string else ""        tag_string = tag_string + ("," + conf.get(TRACESTATE) if (conf and conf.get(TRACESTATE)) else "")        if span_name is None:            span_name = dagrun.dag_id        _links = gen_links_from_kv_list(links) if links else []        _links.append(            Link(                context=trace.get_current_span().get_span_context(),                attributes={"meta.annotation_type": "link", "from": "parenttrace"},            )        )        if conf and conf.get(TRACEPARENT):            # add the trace parent as the link            _links.append(gen_link_from_traceparent(conf.get(TRACEPARENT)))        span_ctx = SpanContext(            trace_id=INVALID_TRACE_ID, span_id=INVALID_SPAN_ID, is_remote=True, trace_flags=TraceFlags(0x01)        )        ctx = trace.set_span_in_context(NonRecordingSpan(span_ctx))        span = tracer.start_as_current_span(            name=span_name,            context=ctx,            links=_links,            start_time=datetime_to_nano(dagrun.queued_at),            attributes=parse_tracestate(tag_string),        )        return span    def start_span_from_taskinstance(        self,        ti,        span_name: str | None = None,        component: str = "taskinstance",        child: bool = False,        links=None,    ):        """        Create and start span from given task instance.        Essentially the span represents the ti itself if child == True, it will create a 'child' span under the given span.        """        dagrun = ti.dag_run        trace_id = int(gen_trace_id(dag_run=dagrun, as_int=True))        span_id = int(gen_span_id(ti=ti, as_int=True))        if span_name is None:            span_name = ti.task_id        parent_id = span_id if child else int(gen_dag_span_id(dag_run=dagrun, as_int=True))        span_ctx = SpanContext(            trace_id=trace_id, span_id=parent_id, is_remote=True, trace_flags=TraceFlags(0x01)        )        _links = gen_links_from_kv_list(links) if links else []        _links.append(            Link(                context=SpanContext(                    trace_id=trace.get_current_span().get_span_context().trace_id,                    span_id=span_id,                    is_remote=True,                    trace_flags=TraceFlags(0x01),                ),                attributes={"meta.annotation_type": "link", "from": "parenttrace"},            )        )        if child is False:            tracer = self.get_tracer(component=component, span_id=span_id, trace_id=trace_id)        else:            tracer = self.get_tracer(component=component)        ctx = trace.set_span_in_context(NonRecordingSpan(span_ctx))        span = tracer.start_as_current_span(            name=span_name, context=ctx, start_time=datetime_to_nano(ti.queued_dttm), links=_links        )        return spandef gen_context(trace_id: int, span_id: int):    """Generate a remote span context for given trace and span id."""    span_ctx = SpanContext(trace_id=trace_id, span_id=span_id, is_remote=True, trace_flags=TraceFlags(0x01))    return span_ctxdef gen_links_from_kv_list(kv_list):    """Convert list of kv dic of trace_id and span_id and generate list of SpanContext."""    result = []    for a in kv_list:        trace_id = a["trace_id"]  # string of hexa        span_id = a["span_id"]  # string of hexa        span_ctx = gen_context(trace_id, span_id)        a_link = Link(            context=span_ctx,            attributes={"meta.annotation_type": "link"},        )        result.append(a_link)    return resultdef gen_link_from_traceparent(traceparent: str):    """Generate Link object from provided traceparent string."""    if traceparent is None:        return None    trace_ctx = parse_traceparent(traceparent)    trace_id = trace_ctx["trace_id"]    span_id = trace_ctx["parent_id"]    span_ctx = gen_context(int(trace_id, 16), int(span_id, 16))    return Link(context=span_ctx, attributes={"meta.annotation_type": "link", "from": "traceparent"})def get_otel_tracer(cls) -> OtelTrace:    """Get OTEL tracer from airflow configuration."""    host = conf.get("traces", "otel_host")    port = conf.getint("traces", "otel_port")    debug = conf.getboolean("traces", "otel_debugging_on")    ssl_active = conf.getboolean("traces", "otel_ssl_active")    tag_string = cls.get_constant_tags()    if debug is True:        log.info("[ConsoleSpanExporter] is being used")        return OtelTrace(span_exporter=ConsoleSpanExporter(), tag_string=tag_string)    else:        protocol = "https" if ssl_active else "http"        endpoint = f"{protocol}://{host}:{port}/v1/traces"        log.info("[OTLPSpanExporter] Connecting to OpenTelemetry Collector at %s", endpoint)        return OtelTrace(            span_exporter=OTLPSpanExporter(endpoint=endpoint, headers={"Content-Type": "application/json"}),            tag_string=tag_string,        )class AirflowOtelIdGenerator(IdGenerator):    """    ID Generator for span id and trace id.    The specific purpose of this ID generator is to generate a given span_id when the    generate_span_id is called for the FIRST time. Any subsequent calls to the generate_span_id()    will then fall back into producing random ones. As for the trace_id, the class is designed    to produce the provided trace id (and not anything random)    """    def __init__(self, span_id=None, trace_id=None):        super().__init__()        self.span_id = span_id        self.trace_id = trace_id    def generate_span_id(self) -> int:        if self.span_id is not None:            id = self.span_id            self.span_id = None            return id        else:            new_id = random.getrandbits(64)            return new_id    def generate_trace_id(self) -> int:        if self.trace_id is not None:            id = self.trace_id            return id        else:            new_id = random.getrandbits(128)            return new_id
 |