dagwarning.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. from enum import Enum
  20. from typing import TYPE_CHECKING
  21. from sqlalchemy import Column, ForeignKeyConstraint, Index, String, Text, delete, false, select
  22. from airflow.api_internal.internal_api_call import internal_api_call
  23. from airflow.models.base import Base, StringID
  24. from airflow.utils import timezone
  25. from airflow.utils.retries import retry_db_transaction
  26. from airflow.utils.session import NEW_SESSION, provide_session
  27. from airflow.utils.sqlalchemy import UtcDateTime
  28. if TYPE_CHECKING:
  29. from sqlalchemy.orm import Session
  30. class DagWarning(Base):
  31. """
  32. A table to store DAG warnings.
  33. DAG warnings are problems that don't rise to the level of failing the DAG parse
  34. but which users should nonetheless be warned about. These warnings are recorded
  35. when parsing DAG and displayed on the Webserver in a flash message.
  36. """
  37. dag_id = Column(StringID(), primary_key=True)
  38. warning_type = Column(String(50), primary_key=True)
  39. message = Column(Text, nullable=False)
  40. timestamp = Column(UtcDateTime, nullable=False, default=timezone.utcnow)
  41. __tablename__ = "dag_warning"
  42. __table_args__ = (
  43. ForeignKeyConstraint(
  44. ("dag_id",),
  45. ["dag.dag_id"],
  46. name="dcw_dag_id_fkey",
  47. ondelete="CASCADE",
  48. ),
  49. Index("idx_dag_warning_dag_id", dag_id),
  50. )
  51. def __init__(self, dag_id: str, error_type: str, message: str, **kwargs):
  52. super().__init__(**kwargs)
  53. self.dag_id = dag_id
  54. self.warning_type = DagWarningType(error_type).value # make sure valid type
  55. self.message = message
  56. def __eq__(self, other) -> bool:
  57. return self.dag_id == other.dag_id and self.warning_type == other.warning_type
  58. def __hash__(self) -> int:
  59. return hash((self.dag_id, self.warning_type))
  60. @classmethod
  61. @internal_api_call
  62. @provide_session
  63. def purge_inactive_dag_warnings(cls, session: Session = NEW_SESSION) -> None:
  64. """
  65. Deactivate DagWarning records for inactive dags.
  66. :return: None
  67. """
  68. cls._purge_inactive_dag_warnings_with_retry(session)
  69. @classmethod
  70. @retry_db_transaction
  71. def _purge_inactive_dag_warnings_with_retry(cls, session: Session) -> None:
  72. from airflow.models.dag import DagModel
  73. if session.get_bind().dialect.name == "sqlite":
  74. dag_ids_stmt = select(DagModel.dag_id).where(DagModel.is_active == false())
  75. query = delete(cls).where(cls.dag_id.in_(dag_ids_stmt.scalar_subquery()))
  76. else:
  77. query = delete(cls).where(cls.dag_id == DagModel.dag_id, DagModel.is_active == false())
  78. session.execute(query.execution_options(synchronize_session=False))
  79. session.commit()
  80. class DagWarningType(str, Enum):
  81. """
  82. Enum for DAG warning types.
  83. This is the set of allowable values for the ``warning_type`` field
  84. in the DagWarning model.
  85. """
  86. NONEXISTENT_POOL = "non-existent pool"