123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- from __future__ import annotations
- from datetime import datetime
- from typing import TYPE_CHECKING, Iterable, List, Optional
- from airflow.models.dagrun import DagRun
- from airflow.serialization.pydantic.dag import PydanticDag
- from airflow.serialization.pydantic.dataset import DatasetEventPydantic
- from airflow.utils.pydantic import BaseModel as BaseModelPydantic, ConfigDict, is_pydantic_2_installed
- if TYPE_CHECKING:
- from sqlalchemy.orm import Session
- from airflow.jobs.scheduler_job_runner import TI
- from airflow.serialization.pydantic.taskinstance import TaskInstancePydantic
- from airflow.utils.state import TaskInstanceState
- class DagRunPydantic(BaseModelPydantic):
- """Serializable representation of the DagRun ORM SqlAlchemyModel used by internal API."""
- id: int
- dag_id: str
- queued_at: Optional[datetime]
- execution_date: datetime
- start_date: Optional[datetime]
- end_date: Optional[datetime]
- state: str
- run_id: str
- creating_job_id: Optional[int]
- external_trigger: bool
- run_type: str
- conf: dict
- data_interval_start: Optional[datetime]
- data_interval_end: Optional[datetime]
- last_scheduling_decision: Optional[datetime]
- dag_hash: Optional[str]
- updated_at: Optional[datetime]
- dag: Optional[PydanticDag]
- consumed_dataset_events: List[DatasetEventPydantic] # noqa: UP006
- log_template_id: Optional[int]
- model_config = ConfigDict(from_attributes=True, arbitrary_types_allowed=True)
- @property
- def logical_date(self) -> datetime:
- return self.execution_date
- def get_task_instances(
- self,
- state: Iterable[TaskInstanceState | None] | None = None,
- session=None,
- ) -> list[TI]:
- """
- Return the task instances for this dag run.
- Redirect to DagRun.fetch_task_instances method.
- Keep this method because it is widely used across the code.
- """
- task_ids = DagRun._get_partial_task_ids(self.dag)
- return DagRun.fetch_task_instances(
- dag_id=self.dag_id,
- run_id=self.run_id,
- task_ids=task_ids,
- state=state,
- session=session,
- )
- def get_task_instance(
- self,
- task_id: str,
- session: Session,
- *,
- map_index: int = -1,
- ) -> TI | TaskInstancePydantic | None:
- """
- Return the task instance specified by task_id for this dag run.
- :param task_id: the task id
- :param session: Sqlalchemy ORM Session
- """
- from airflow.models.dagrun import DagRun
- return DagRun.fetch_task_instance(
- dag_id=self.dag_id,
- dag_run_id=self.run_id,
- task_id=task_id,
- session=session,
- map_index=map_index,
- )
- def get_log_template(self, session: Session):
- from airflow.models.dagrun import DagRun
- return DagRun._get_log_template(log_template_id=self.log_template_id)
- if is_pydantic_2_installed():
- DagRunPydantic.model_rebuild()
|