dag.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import pathlib
  19. from datetime import datetime, timedelta
  20. from typing import Any, List, Optional
  21. from dateutil import relativedelta
  22. from typing_extensions import Annotated
  23. from airflow import DAG, settings
  24. from airflow.configuration import conf as airflow_conf
  25. from airflow.utils.pydantic import (
  26. BaseModel as BaseModelPydantic,
  27. ConfigDict,
  28. PlainSerializer,
  29. PlainValidator,
  30. ValidationInfo,
  31. )
  32. from airflow.utils.sqlalchemy import Interval
  33. def serialize_interval(value: Interval) -> Interval:
  34. interval = Interval()
  35. return interval.process_bind_param(value, None)
  36. def validate_interval(value: Interval | Any, _info: ValidationInfo) -> Any:
  37. if (
  38. isinstance(value, Interval)
  39. or isinstance(value, timedelta)
  40. or isinstance(value, relativedelta.relativedelta)
  41. ):
  42. return value
  43. interval = Interval()
  44. try:
  45. return interval.process_result_value(value, None)
  46. except ValueError as e:
  47. # Interval may be provided in string format (cron),
  48. # so it must be returned as valid value.
  49. if isinstance(value, str):
  50. return value
  51. raise e
  52. PydanticInterval = Annotated[
  53. Interval,
  54. PlainValidator(validate_interval),
  55. PlainSerializer(serialize_interval, return_type=Interval),
  56. ]
  57. def serialize_operator(x: DAG) -> dict:
  58. from airflow.serialization.serialized_objects import SerializedDAG
  59. return SerializedDAG.serialize_dag(x)
  60. def validate_operator(x: DAG | dict[str, Any], _info: ValidationInfo) -> Any:
  61. from airflow.serialization.serialized_objects import SerializedDAG
  62. if isinstance(x, DAG):
  63. return x
  64. return SerializedDAG.deserialize_dag(x)
  65. PydanticDag = Annotated[
  66. DAG,
  67. PlainValidator(validate_operator),
  68. PlainSerializer(serialize_operator, return_type=dict),
  69. ]
  70. class DagOwnerAttributesPydantic(BaseModelPydantic):
  71. """Serializable representation of the DagOwnerAttributes ORM SqlAlchemyModel used by internal API."""
  72. owner: str
  73. link: str
  74. model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)
  75. class DagTagPydantic(BaseModelPydantic):
  76. """Serializable representation of the DagTag ORM SqlAlchemyModel used by internal API."""
  77. name: str
  78. dag_id: str
  79. model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)
  80. class DagModelPydantic(BaseModelPydantic):
  81. """Serializable representation of the DagModel ORM SqlAlchemyModel used by internal API."""
  82. dag_id: str
  83. root_dag_id: Optional[str]
  84. is_paused_at_creation: bool = airflow_conf.getboolean("core", "dags_are_paused_at_creation")
  85. is_paused: bool = is_paused_at_creation
  86. is_subdag: Optional[bool] = False
  87. is_active: Optional[bool] = False
  88. last_parsed_time: Optional[datetime]
  89. last_pickled: Optional[datetime]
  90. last_expired: Optional[datetime]
  91. scheduler_lock: Optional[bool]
  92. pickle_id: Optional[int]
  93. fileloc: str
  94. processor_subdir: Optional[str]
  95. owners: Optional[str]
  96. description: Optional[str]
  97. default_view: Optional[str]
  98. schedule_interval: Optional[PydanticInterval]
  99. timetable_description: Optional[str]
  100. tags: List[DagTagPydantic] # noqa: UP006
  101. dag_owner_links: List[DagOwnerAttributesPydantic] # noqa: UP006
  102. parent_dag: Optional[PydanticDag]
  103. max_active_tasks: int
  104. max_active_runs: Optional[int]
  105. max_consecutive_failed_dag_runs: Optional[int]
  106. has_task_concurrency_limits: bool
  107. has_import_errors: Optional[bool] = False
  108. _processor_dags_folder: Optional[str] = None
  109. model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)
  110. @property
  111. def relative_fileloc(self) -> pathlib.Path:
  112. """File location of the importable dag 'file' relative to the configured DAGs folder."""
  113. path = pathlib.Path(self.fileloc)
  114. try:
  115. rel_path = path.relative_to(self._processor_dags_folder or settings.DAGS_FOLDER)
  116. if rel_path == pathlib.Path("."):
  117. return path
  118. else:
  119. return rel_path
  120. except ValueError:
  121. # Not relative to DAGS_FOLDER.
  122. return path