taskfail.py 3.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. """Taskfail tracks the failed run durations of each task instance."""
  19. from __future__ import annotations
  20. from sqlalchemy import Column, ForeignKeyConstraint, Index, Integer, text
  21. from sqlalchemy.orm import relationship
  22. from airflow.models.base import StringID, TaskInstanceDependencies
  23. from airflow.utils.sqlalchemy import UtcDateTime
  24. class TaskFail(TaskInstanceDependencies):
  25. """TaskFail tracks the failed run durations of each task instance."""
  26. __tablename__ = "task_fail"
  27. id = Column(Integer, primary_key=True)
  28. task_id = Column(StringID(), nullable=False)
  29. dag_id = Column(StringID(), nullable=False)
  30. run_id = Column(StringID(), nullable=False)
  31. map_index = Column(Integer, nullable=False, server_default=text("-1"))
  32. start_date = Column(UtcDateTime)
  33. end_date = Column(UtcDateTime)
  34. duration = Column(Integer)
  35. __table_args__ = (
  36. Index("idx_task_fail_task_instance", dag_id, task_id, run_id, map_index),
  37. ForeignKeyConstraint(
  38. [dag_id, task_id, run_id, map_index],
  39. [
  40. "task_instance.dag_id",
  41. "task_instance.task_id",
  42. "task_instance.run_id",
  43. "task_instance.map_index",
  44. ],
  45. name="task_fail_ti_fkey",
  46. ondelete="CASCADE",
  47. ),
  48. )
  49. # We don't need a DB level FK here, as we already have that to TI (which has one to DR) but by defining
  50. # the relationship we can more easily find the execution date for these rows
  51. dag_run = relationship(
  52. "DagRun",
  53. primaryjoin="""and_(
  54. TaskFail.dag_id == foreign(DagRun.dag_id),
  55. TaskFail.run_id == foreign(DagRun.run_id),
  56. )""",
  57. viewonly=True,
  58. )
  59. def __init__(self, ti):
  60. self.dag_id = ti.dag_id
  61. self.task_id = ti.task_id
  62. self.run_id = ti.run_id
  63. self.map_index = ti.map_index
  64. self.start_date = ti.start_date
  65. self.end_date = ti.end_date
  66. if self.end_date and self.start_date:
  67. self.duration = int((self.end_date - self.start_date).total_seconds())
  68. else:
  69. self.duration = None
  70. def __repr__(self):
  71. prefix = f"<{self.__class__.__name__}: {self.dag_id}.{self.task_id} {self.run_id}"
  72. if self.map_index != -1:
  73. prefix += f" map_index={self.map_index}"
  74. return prefix + ">"