example_datasets.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """
  18. Example DAG for demonstrating the behavior of the Datasets feature in Airflow, including conditional and
  19. dataset expression-based scheduling.
  20. Notes on usage:
  21. Turn on all the DAGs.
  22. dataset_produces_1 is scheduled to run daily. Once it completes, it triggers several DAGs due to its dataset
  23. being updated. dataset_consumes_1 is triggered immediately, as it depends solely on the dataset produced by
  24. dataset_produces_1. consume_1_or_2_with_dataset_expressions will also be triggered, as its condition of
  25. either dataset_produces_1 or dataset_produces_2 being updated is satisfied with dataset_produces_1.
  26. dataset_consumes_1_and_2 will not be triggered after dataset_produces_1 runs because it requires the dataset
  27. from dataset_produces_2, which has no schedule and must be manually triggered.
  28. After manually triggering dataset_produces_2, several DAGs will be affected. dataset_consumes_1_and_2 should
  29. run because both its dataset dependencies are now met. consume_1_and_2_with_dataset_expressions will be
  30. triggered, as it requires both dataset_produces_1 and dataset_produces_2 datasets to be updated.
  31. consume_1_or_2_with_dataset_expressions will be triggered again, since it's conditionally set to run when
  32. either dataset is updated.
  33. consume_1_or_both_2_and_3_with_dataset_expressions demonstrates complex dataset dependency logic.
  34. This DAG triggers if dataset_produces_1 is updated or if both dataset_produces_2 and dag3_dataset
  35. are updated. This example highlights the capability to combine updates from multiple datasets with logical
  36. expressions for advanced scheduling.
  37. conditional_dataset_and_time_based_timetable illustrates the integration of time-based scheduling with
  38. dataset dependencies. This DAG is configured to execute either when both dataset_produces_1 and
  39. dataset_produces_2 datasets have been updated or according to a specific cron schedule, showcasing
  40. Airflow's versatility in handling mixed triggers for dataset and time-based scheduling.
  41. The DAGs dataset_consumes_1_never_scheduled and dataset_consumes_unknown_never_scheduled will not run
  42. automatically as they depend on datasets that do not get updated or are not produced by any scheduled tasks.
  43. """
  44. from __future__ import annotations
  45. import pendulum
  46. from airflow.datasets import Dataset
  47. from airflow.models.dag import DAG
  48. from airflow.operators.bash import BashOperator
  49. from airflow.timetables.datasets import DatasetOrTimeSchedule
  50. from airflow.timetables.trigger import CronTriggerTimetable
  51. # [START dataset_def]
  52. dag1_dataset = Dataset("s3://dag1/output_1.txt", extra={"hi": "bye"})
  53. # [END dataset_def]
  54. dag2_dataset = Dataset("s3://dag2/output_1.txt", extra={"hi": "bye"})
  55. dag3_dataset = Dataset("s3://dag3/output_3.txt", extra={"hi": "bye"})
  56. with DAG(
  57. dag_id="dataset_produces_1",
  58. catchup=False,
  59. start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  60. schedule="@daily",
  61. tags=["produces", "dataset-scheduled"],
  62. ) as dag1:
  63. # [START task_outlet]
  64. BashOperator(outlets=[dag1_dataset], task_id="producing_task_1", bash_command="sleep 5")
  65. # [END task_outlet]
  66. with DAG(
  67. dag_id="dataset_produces_2",
  68. catchup=False,
  69. start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  70. schedule=None,
  71. tags=["produces", "dataset-scheduled"],
  72. ) as dag2:
  73. BashOperator(outlets=[dag2_dataset], task_id="producing_task_2", bash_command="sleep 5")
  74. # [START dag_dep]
  75. with DAG(
  76. dag_id="dataset_consumes_1",
  77. catchup=False,
  78. start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  79. schedule=[dag1_dataset],
  80. tags=["consumes", "dataset-scheduled"],
  81. ) as dag3:
  82. # [END dag_dep]
  83. BashOperator(
  84. outlets=[Dataset("s3://consuming_1_task/dataset_other.txt")],
  85. task_id="consuming_1",
  86. bash_command="sleep 5",
  87. )
  88. with DAG(
  89. dag_id="dataset_consumes_1_and_2",
  90. catchup=False,
  91. start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  92. schedule=[dag1_dataset, dag2_dataset],
  93. tags=["consumes", "dataset-scheduled"],
  94. ) as dag4:
  95. BashOperator(
  96. outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
  97. task_id="consuming_2",
  98. bash_command="sleep 5",
  99. )
  100. with DAG(
  101. dag_id="dataset_consumes_1_never_scheduled",
  102. catchup=False,
  103. start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  104. schedule=[
  105. dag1_dataset,
  106. Dataset("s3://unrelated/this-dataset-doesnt-get-triggered"),
  107. ],
  108. tags=["consumes", "dataset-scheduled"],
  109. ) as dag5:
  110. BashOperator(
  111. outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
  112. task_id="consuming_3",
  113. bash_command="sleep 5",
  114. )
  115. with DAG(
  116. dag_id="dataset_consumes_unknown_never_scheduled",
  117. catchup=False,
  118. start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  119. schedule=[
  120. Dataset("s3://unrelated/dataset3.txt"),
  121. Dataset("s3://unrelated/dataset_other_unknown.txt"),
  122. ],
  123. tags=["dataset-scheduled"],
  124. ) as dag6:
  125. BashOperator(
  126. task_id="unrelated_task",
  127. outlets=[Dataset("s3://unrelated_task/dataset_other_unknown.txt")],
  128. bash_command="sleep 5",
  129. )
  130. with DAG(
  131. dag_id="consume_1_and_2_with_dataset_expressions",
  132. start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  133. schedule=(dag1_dataset & dag2_dataset),
  134. ) as dag5:
  135. BashOperator(
  136. outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
  137. task_id="consume_1_and_2_with_dataset_expressions",
  138. bash_command="sleep 5",
  139. )
  140. with DAG(
  141. dag_id="consume_1_or_2_with_dataset_expressions",
  142. start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  143. schedule=(dag1_dataset | dag2_dataset),
  144. ) as dag6:
  145. BashOperator(
  146. outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
  147. task_id="consume_1_or_2_with_dataset_expressions",
  148. bash_command="sleep 5",
  149. )
  150. with DAG(
  151. dag_id="consume_1_or_both_2_and_3_with_dataset_expressions",
  152. start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  153. schedule=(dag1_dataset | (dag2_dataset & dag3_dataset)),
  154. ) as dag7:
  155. BashOperator(
  156. outlets=[Dataset("s3://consuming_2_task/dataset_other_unknown.txt")],
  157. task_id="consume_1_or_both_2_and_3_with_dataset_expressions",
  158. bash_command="sleep 5",
  159. )
  160. with DAG(
  161. dag_id="conditional_dataset_and_time_based_timetable",
  162. catchup=False,
  163. start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
  164. schedule=DatasetOrTimeSchedule(
  165. timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
  166. ),
  167. tags=["dataset-time-based-timetable"],
  168. ) as dag8:
  169. BashOperator(
  170. outlets=[Dataset("s3://dataset_time_based/dataset_other_unknown.txt")],
  171. task_id="conditional_dataset_and_time_based_timetable",
  172. bash_command="sleep 5",
  173. )