dag_run.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. from datetime import datetime
  19. from typing import TYPE_CHECKING, Iterable, List, Optional
  20. from airflow.models.dagrun import DagRun
  21. from airflow.serialization.pydantic.dag import PydanticDag
  22. from airflow.serialization.pydantic.dataset import DatasetEventPydantic
  23. from airflow.utils.pydantic import BaseModel as BaseModelPydantic, ConfigDict, is_pydantic_2_installed
  24. if TYPE_CHECKING:
  25. from sqlalchemy.orm import Session
  26. from airflow.jobs.scheduler_job_runner import TI
  27. from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
  28. from airflow.utils.state import TaskInstanceState
  29. class DagRunPydantic(BaseModelPydantic):
  30. """Serializable representation of the DagRun ORM SqlAlchemyModel used by internal API."""
  31. id: int
  32. dag_id: str
  33. queued_at: Optional[datetime]
  34. execution_date: datetime
  35. start_date: Optional[datetime]
  36. end_date: Optional[datetime]
  37. state: str
  38. run_id: str
  39. creating_job_id: Optional[int]
  40. external_trigger: bool
  41. run_type: str
  42. conf: dict
  43. data_interval_start: Optional[datetime]
  44. data_interval_end: Optional[datetime]
  45. last_scheduling_decision: Optional[datetime]
  46. dag_hash: Optional[str]
  47. updated_at: Optional[datetime]
  48. dag: Optional[PydanticDag]
  49. consumed_dataset_events: List[DatasetEventPydantic] # noqa: UP006
  50. log_template_id: Optional[int]
  51. model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)
  52. @property
  53. def logical_date(self) -> datetime:
  54. return self.execution_date
  55. def get_task_instances(
  56. self,
  57. state: Iterable[TaskInstanceState | None] | None = None,
  58. session=None,
  59. ) -> list[TI]:
  60. """
  61. Return the task instances for this dag run.
  62. Redirect to DagRun.fetch_task_instances method.
  63. Keep this method because it is widely used across the code.
  64. """
  65. task_ids = DagRun._get_partial_task_ids(self.dag)
  66. return DagRun.fetch_task_instances(
  67. dag_id=self.dag_id,
  68. run_id=self.run_id,
  69. task_ids=task_ids,
  70. state=state,
  71. session=session,
  72. )
  73. def get_task_instance(
  74. self,
  75. task_id: str,
  76. session: Session,
  77. *,
  78. map_index: int = -1,
  79. ) -> TI | TaskInstancePydantic | None:
  80. """
  81. Return the task instance specified by task_id for this dag run.
  82. :param task_id: the task id
  83. :param session: Sqlalchemy ORM Session
  84. """
  85. from airflow.models.dagrun import DagRun
  86. return DagRun.fetch_task_instance(
  87. dag_id=self.dag_id,
  88. dag_run_id=self.run_id,
  89. task_id=task_id,
  90. session=session,
  91. map_index=map_index,
  92. )
  93. def get_log_template(self, session: Session):
  94. from airflow.models.dagrun import DagRun
  95. return DagRun._get_log_template(log_template_id=self.log_template_id)
  96. if is_pydantic_2_installed():
  97. DagRunPydantic.model_rebuild()