| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- #
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- from __future__ import annotations
- import logging
- from typing import TYPE_CHECKING
- from airflow.traces import NO_TRACE_ID
- from airflow.utils.hashlib_wrapper import md5
- if TYPE_CHECKING:
- from airflow.models import DagRun, TaskInstance
- from airflow.models.taskinstancekey import TaskInstanceKey
- TRACE_ID = 0
- SPAN_ID = 16
- log = logging.getLogger(__name__)
- def _gen_id(seeds: list[str], as_int: bool = False, type: int = TRACE_ID) -> str | int:
- seed_str = "_".join(seeds).encode("utf-8")
- hash_hex = md5(seed_str).hexdigest()[type:]
- return int(hash_hex, 16) if as_int else hash_hex
- def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
- if dag_run.start_date is None:
- return NO_TRACE_ID
- """Generate trace id from DagRun."""
- return _gen_id(
- [dag_run.dag_id, str(dag_run.run_id), str(dag_run.start_date.timestamp())],
- as_int,
- )
- def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> str | int:
- """Generate span id from TI key."""
- return _gen_id(
- [ti_key.dag_id, str(ti_key.run_id), ti_key.task_id, str(ti_key.try_number)],
- as_int,
- SPAN_ID,
- )
- def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int:
- """Generate dag's root span id using dag_run."""
- if dag_run.start_date is None:
- return NO_TRACE_ID
- return _gen_id(
- [dag_run.dag_id, str(dag_run.run_id), str(dag_run.start_date.timestamp())],
- as_int,
- SPAN_ID,
- )
- def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int:
- """Generate span id from the task instance."""
- dag_run = ti.dag_run
- return _gen_id(
- [dag_run.dag_id, dag_run.run_id, ti.task_id, str(ti.try_number)],
- as_int,
- SPAN_ID,
- )
- def parse_traceparent(traceparent_str: str | None = None) -> dict:
- """Parse traceparent string: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01."""
- if traceparent_str is None:
- return {}
- tokens = traceparent_str.split("-")
- if len(tokens) != 4:
- raise ValueError("The traceparent string does not have the correct format.")
- return {"version": tokens[0], "trace_id": tokens[1], "parent_id": tokens[2], "flags": tokens[3]}
- def parse_tracestate(tracestate_str: str | None = None) -> dict:
- """Parse tracestate string: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE."""
- if tracestate_str is None or len(tracestate_str) == 0:
- return {}
- tokens = tracestate_str.split(",")
- result = {}
- for pair in tokens:
- if "=" in pair:
- key, value = pair.split("=")
- result[key.strip()] = value.strip()
- return result
- def is_valid_trace_id(trace_id: str) -> bool:
- """Check whether trace id is valid."""
- return trace_id is not None and len(trace_id) == 34 and int(trace_id, 16) != 0
- def is_valid_span_id(span_id: str) -> bool:
- """Check whether span id is valid."""
- return span_id is not None and len(span_id) == 18 and int(span_id, 16) != 0
|