# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """Taskfail tracks the failed run durations of each task instance.""" from __future__ import annotations from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, text from sqlalchemy.orm import relationship from airflow.models.base import StringID, TaskInstanceDependencies from airflow.utils.sqlalchemy import UtcDateTime class TaskFail(TaskInstanceDependencies): """TaskFail tracks the failed run durations of each task instance.""" __tablename__ = "task_fail" id = Column(Integer, primary_key=True) task_id = Column(StringID(), nullable=False) dag_id = Column(StringID(), nullable=False) run_id = Column(StringID(), nullable=False) map_index = Column(Integer, nullable=False, server_default=text("-1")) start_date = Column(UtcDateTime) end_date = Column(UtcDateTime) duration = Column(Integer) __table_args__ = ( Index("idx_task_fail_task_instance", dag_id, task_id, run_id, map_index), ForeignKeyConstraint( [dag_id, task_id, run_id, map_index], [ "task_instance.dag_id", "task_instance.task_id", "task_instance.run_id", "task_instance.map_index", ], name="task_fail_ti_fkey", ondelete="CASCADE", ), ) # We don't need a DB level FK here, as we already have that to TI (which has one to DR) but by defining # the relationship we can more easily find the execution date for these rows dag_run = relationship( "DagRun", primaryjoin="""and_( TaskFail.dag_id == foreign(DagRun.dag_id), TaskFail.run_id == foreign(DagRun.run_id), )""", viewonly=True, ) def __init__(self, ti): self.dag_id = ti.dag_id self.task_id = ti.task_id self.run_id = ti.run_id self.map_index = ti.map_index self.start_date = ti.start_date self.end_date = ti.end_date if self.end_date and self.start_date: self.duration = int((self.end_date - self.start_date).total_seconds()) else: self.duration = None def __repr__(self): prefix = f"<{self.__class__.__name__}: {self.dag_id}.{self.task_id} {self.run_id}" if self.map_index != -1: prefix += f" map_index={self.map_index}" return prefix + ">"