delete_dag.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. """Delete DAGs APIs."""
  19. from __future__ import annotations
  20. import logging
  21. from typing import TYPE_CHECKING
  22. from sqlalchemy import and_, delete, or_, select
  23. from airflow import models
  24. from airflow.exceptions import AirflowException, DagNotFound
  25. from airflow.models import DagModel, TaskFail
  26. from airflow.models.errors import ParseImportError
  27. from airflow.models.serialized_dag import SerializedDagModel
  28. from airflow.utils.db import get_sqla_model_classes
  29. from airflow.utils.session import NEW_SESSION, provide_session
  30. from airflow.utils.state import TaskInstanceState
  31. if TYPE_CHECKING:
  32. from sqlalchemy.orm import Session
  33. log = logging.getLogger(__name__)
  34. @provide_session
  35. def delete_dag(dag_id: str, keep_records_in_log: bool = True, session: Session = NEW_SESSION) -> int:
  36. """
  37. Delete a DAG by a dag_id.
  38. :param dag_id: the dag_id of the DAG to delete
  39. :param keep_records_in_log: whether keep records of the given dag_id
  40. in the Log table in the backend database (for reasons like auditing).
  41. The default value is True.
  42. :param session: session used
  43. :return count of deleted dags
  44. """
  45. log.info("Deleting DAG: %s", dag_id)
  46. running_tis = session.scalar(
  47. select(models.TaskInstance.state)
  48. .where(models.TaskInstance.dag_id == dag_id)
  49. .where(models.TaskInstance.state == TaskInstanceState.RUNNING)
  50. .limit(1)
  51. )
  52. if running_tis:
  53. raise AirflowException("TaskInstances still running")
  54. dag = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id).limit(1))
  55. if dag is None:
  56. raise DagNotFound(f"Dag id {dag_id} not found")
  57. # deleting a DAG should also delete all of its subdags
  58. dags_to_delete_query = session.execute(
  59. select(DagModel.dag_id).where(
  60. or_(
  61. DagModel.dag_id == dag_id,
  62. and_(DagModel.dag_id.like(f"{dag_id}.%"), DagModel.is_subdag),
  63. )
  64. )
  65. )
  66. dags_to_delete = [dag_id for (dag_id,) in dags_to_delete_query]
  67. # Scheduler removes DAGs without files from serialized_dag table every dag_dir_list_interval.
  68. # There may be a lag, so explicitly removes serialized DAG here.
  69. if SerializedDagModel.has_dag(dag_id=dag_id, session=session):
  70. SerializedDagModel.remove_dag(dag_id=dag_id, session=session)
  71. count = 0
  72. for model in get_sqla_model_classes():
  73. if hasattr(model, "dag_id") and (not keep_records_in_log or model.__name__ != "Log"):
  74. count += session.execute(
  75. delete(model)
  76. .where(model.dag_id.in_(dags_to_delete))
  77. .execution_options(synchronize_session="fetch")
  78. ).rowcount
  79. if dag.is_subdag:
  80. parent_dag_id, task_id = dag_id.rsplit(".", 1)
  81. for model in TaskFail, models.TaskInstance:
  82. count += session.execute(
  83. delete(model).where(model.dag_id == parent_dag_id, model.task_id == task_id)
  84. ).rowcount
  85. # Delete entries in Import Errors table for a deleted DAG
  86. # This handles the case when the dag_id is changed in the file
  87. session.execute(
  88. delete(ParseImportError)
  89. .where(ParseImportError.filename == dag.fileloc)
  90. .execution_options(synchronize_session="fetch")
  91. )
  92. return count