utils.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import logging
  20. from typing import TYPE_CHECKING
  21. from airflow.traces import NO_TRACE_ID
  22. from airflow.utils.hashlib_wrapper import md5
  23. if TYPE_CHECKING:
  24. from airflow.models import DagRun, TaskInstance
  25. from airflow.models.taskinstancekey import TaskInstanceKey
  26. TRACE_ID = 0
  27. SPAN_ID = 16
  28. log = logging.getLogger(__name__)
  29. def _gen_id(seeds: list[str], as_int: bool = False, type: int = TRACE_ID) -> str | int:
  30. seed_str = "_".join(seeds).encode("utf-8")
  31. hash_hex = md5(seed_str).hexdigest()[type:]
  32. return int(hash_hex, 16) if as_int else hash_hex
  33. def gen_trace_id(dag_run: DagRun, as_int: bool = False) -> str | int:
  34. if dag_run.start_date is None:
  35. return NO_TRACE_ID
  36. """Generate trace id from DagRun."""
  37. return _gen_id(
  38. [dag_run.dag_id, str(dag_run.run_id), str(dag_run.start_date.timestamp())],
  39. as_int,
  40. )
  41. def gen_span_id_from_ti_key(ti_key: TaskInstanceKey, as_int: bool = False) -> str | int:
  42. """Generate span id from TI key."""
  43. return _gen_id(
  44. [ti_key.dag_id, str(ti_key.run_id), ti_key.task_id, str(ti_key.try_number)],
  45. as_int,
  46. SPAN_ID,
  47. )
  48. def gen_dag_span_id(dag_run: DagRun, as_int: bool = False) -> str | int:
  49. """Generate dag's root span id using dag_run."""
  50. if dag_run.start_date is None:
  51. return NO_TRACE_ID
  52. return _gen_id(
  53. [dag_run.dag_id, str(dag_run.run_id), str(dag_run.start_date.timestamp())],
  54. as_int,
  55. SPAN_ID,
  56. )
  57. def gen_span_id(ti: TaskInstance, as_int: bool = False) -> str | int:
  58. """Generate span id from the task instance."""
  59. dag_run = ti.dag_run
  60. return _gen_id(
  61. [dag_run.dag_id, dag_run.run_id, ti.task_id, str(ti.try_number)],
  62. as_int,
  63. SPAN_ID,
  64. )
  65. def parse_traceparent(traceparent_str: str | None = None) -> dict:
  66. """Parse traceparent string: 00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01."""
  67. if traceparent_str is None:
  68. return {}
  69. tokens = traceparent_str.split("-")
  70. if len(tokens) != 4:
  71. raise ValueError("The traceparent string does not have the correct format.")
  72. return {"version": tokens[0], "trace_id": tokens[1], "parent_id": tokens[2], "flags": tokens[3]}
  73. def parse_tracestate(tracestate_str: str | None = None) -> dict:
  74. """Parse tracestate string: rojo=00f067aa0ba902b7,congo=t61rcWkgMzE."""
  75. if tracestate_str is None or len(tracestate_str) == 0:
  76. return {}
  77. tokens = tracestate_str.split(",")
  78. result = {}
  79. for pair in tokens:
  80. if "=" in pair:
  81. key, value = pair.split("=")
  82. result[key.strip()] = value.strip()
  83. return result
  84. def is_valid_trace_id(trace_id: str) -> bool:
  85. """Check whether trace id is valid."""
  86. return trace_id is not None and len(trace_id) == 34 and int(trace_id, 16) != 0
  87. def is_valid_span_id(span_id: str) -> bool:
  88. """Check whether span id is valid."""
  89. return span_id is not None and len(span_id) == 18 and int(span_id, 16) != 0