trigger_dag.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. """Triggering DAG runs APIs."""
  19. from __future__ import annotations
  20. import json
  21. from typing import TYPE_CHECKING
  22. from airflow.api_internal.internal_api_call import internal_api_call
  23. from airflow.exceptions import DagNotFound, DagRunAlreadyExists
  24. from airflow.models import DagBag, DagModel, DagRun
  25. from airflow.utils import timezone
  26. from airflow.utils.session import NEW_SESSION, provide_session
  27. from airflow.utils.state import DagRunState
  28. from airflow.utils.types import DagRunType
  29. if TYPE_CHECKING:
  30. from datetime import datetime
  31. from sqlalchemy.orm.session import Session
  32. def _trigger_dag(
  33. dag_id: str,
  34. dag_bag: DagBag,
  35. run_id: str | None = None,
  36. conf: dict | str | None = None,
  37. execution_date: datetime | None = None,
  38. replace_microseconds: bool = True,
  39. ) -> list[DagRun | None]:
  40. """
  41. Triggers DAG run.
  42. :param dag_id: DAG ID
  43. :param dag_bag: DAG Bag model
  44. :param run_id: ID of the dag_run
  45. :param conf: configuration
  46. :param execution_date: date of execution
  47. :param replace_microseconds: whether microseconds should be zeroed
  48. :return: list of triggered dags
  49. """
  50. dag = dag_bag.get_dag(dag_id) # prefetch dag if it is stored serialized
  51. if dag is None or dag_id not in dag_bag.dags:
  52. raise DagNotFound(f"Dag id {dag_id} not found")
  53. execution_date = execution_date or timezone.utcnow()
  54. if not timezone.is_localized(execution_date):
  55. raise ValueError("The execution_date should be localized")
  56. if replace_microseconds:
  57. execution_date = execution_date.replace(microsecond=0)
  58. if dag.default_args and "start_date" in dag.default_args:
  59. min_dag_start_date = dag.default_args["start_date"]
  60. if min_dag_start_date and execution_date < min_dag_start_date:
  61. raise ValueError(
  62. f"The execution_date [{execution_date.isoformat()}] should be >= start_date "
  63. f"[{min_dag_start_date.isoformat()}] from DAG's default_args"
  64. )
  65. logical_date = timezone.coerce_datetime(execution_date)
  66. data_interval = dag.timetable.infer_manual_data_interval(run_after=logical_date)
  67. run_id = run_id or dag.timetable.generate_run_id(
  68. run_type=DagRunType.MANUAL, logical_date=logical_date, data_interval=data_interval
  69. )
  70. dag_run = DagRun.find_duplicate(dag_id=dag_id, execution_date=execution_date, run_id=run_id)
  71. if dag_run:
  72. raise DagRunAlreadyExists(dag_run=dag_run, execution_date=execution_date, run_id=run_id)
  73. run_conf = None
  74. if conf:
  75. run_conf = conf if isinstance(conf, dict) else json.loads(conf)
  76. dag_runs = []
  77. dags_to_run = [dag, *dag.subdags]
  78. for _dag in dags_to_run:
  79. dag_run = _dag.create_dagrun(
  80. run_id=run_id,
  81. execution_date=execution_date,
  82. state=DagRunState.QUEUED,
  83. conf=run_conf,
  84. external_trigger=True,
  85. dag_hash=dag_bag.dags_hash.get(dag_id),
  86. data_interval=data_interval,
  87. )
  88. dag_runs.append(dag_run)
  89. return dag_runs
  90. @internal_api_call
  91. @provide_session
  92. def trigger_dag(
  93. dag_id: str,
  94. run_id: str | None = None,
  95. conf: dict | str | None = None,
  96. execution_date: datetime | None = None,
  97. replace_microseconds: bool = True,
  98. session: Session = NEW_SESSION,
  99. ) -> DagRun | None:
  100. """
  101. Triggers execution of DAG specified by dag_id.
  102. :param dag_id: DAG ID
  103. :param run_id: ID of the dag_run
  104. :param conf: configuration
  105. :param execution_date: date of execution
  106. :param replace_microseconds: whether microseconds should be zeroed
  107. :param session: Unused. Only added in compatibility with database isolation mode
  108. :return: first dag run triggered - even if more than one Dag Runs were triggered or None
  109. """
  110. dag_model = DagModel.get_current(dag_id)
  111. if dag_model is None:
  112. raise DagNotFound(f"Dag id {dag_id} not found in DagModel")
  113. dagbag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
  114. triggers = _trigger_dag(
  115. dag_id=dag_id,
  116. dag_bag=dagbag,
  117. run_id=run_id,
  118. conf=conf,
  119. execution_date=execution_date,
  120. replace_microseconds=replace_microseconds,
  121. )
  122. return triggers[0] if triggers else None