dag.py 178 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import asyncio
  20. import copy
  21. import functools
  22. import itertools
  23. import logging
  24. import os
  25. import pathlib
  26. import pickle
  27. import sys
  28. import time
  29. import traceback
  30. import warnings
  31. import weakref
  32. from collections import abc, defaultdict, deque
  33. from contextlib import ExitStack
  34. from datetime import datetime, timedelta
  35. from inspect import signature
  36. from typing import (
  37. TYPE_CHECKING,
  38. Any,
  39. Callable,
  40. Collection,
  41. Container,
  42. Generator,
  43. Iterable,
  44. Iterator,
  45. List,
  46. Pattern,
  47. Sequence,
  48. Union,
  49. cast,
  50. overload,
  51. )
  52. from urllib.parse import urlsplit
  53. import jinja2
  54. import pendulum
  55. import re2
  56. import sqlalchemy_jsonfield
  57. from dateutil.relativedelta import relativedelta
  58. from packaging import version as packaging_version
  59. from sqlalchemy import (
  60. Boolean,
  61. Column,
  62. ForeignKey,
  63. Index,
  64. Integer,
  65. String,
  66. Text,
  67. and_,
  68. case,
  69. func,
  70. not_,
  71. or_,
  72. select,
  73. update,
  74. )
  75. from sqlalchemy.ext.associationproxy import association_proxy
  76. from sqlalchemy.ext.hybrid import hybrid_property
  77. from sqlalchemy.orm import backref, joinedload, load_only, relationship
  78. from sqlalchemy.sql import Select, expression
  79. import airflow.templates
  80. from airflow import settings, utils
  81. from airflow.api_internal.internal_api_call import internal_api_call
  82. from airflow.configuration import conf as airflow_conf, secrets_backend_list
  83. from airflow.datasets import BaseDataset, Dataset, DatasetAlias, DatasetAll
  84. from airflow.datasets.manager import dataset_manager
  85. from airflow.exceptions import (
  86. AirflowDagInconsistent,
  87. AirflowException,
  88. DuplicateTaskIdFound,
  89. FailStopDagInvalidTriggerRule,
  90. ParamValidationError,
  91. RemovedInAirflow3Warning,
  92. TaskDeferred,
  93. TaskNotFound,
  94. UnknownExecutorException,
  95. )
  96. from airflow.executors.executor_loader import ExecutorLoader
  97. from airflow.jobs.job import run_job
  98. from airflow.models.abstractoperator import AbstractOperator, TaskStateChangeCallback
  99. from airflow.models.base import Base, StringID
  100. from airflow.models.baseoperator import BaseOperator
  101. from airflow.models.dagcode import DagCode
  102. from airflow.models.dagpickle import DagPickle
  103. from airflow.models.dagrun import RUN_ID_REGEX, DagRun
  104. from airflow.models.dataset import (
  105. DatasetAliasModel,
  106. DatasetDagRunQueue,
  107. DatasetModel,
  108. )
  109. from airflow.models.param import DagParam, ParamsDict
  110. from airflow.models.taskinstance import (
  111. Context,
  112. TaskInstance,
  113. TaskInstanceKey,
  114. clear_task_instances,
  115. )
  116. from airflow.models.tasklog import LogTemplate
  117. from airflow.providers.fab import __version__ as FAB_VERSION
  118. from airflow.secrets.local_filesystem import LocalFilesystemBackend
  119. from airflow.security import permissions
  120. from airflow.settings import json
  121. from airflow.stats import Stats
  122. from airflow.timetables.base import DagRunInfo, DataInterval, TimeRestriction, Timetable
  123. from airflow.timetables.interval import CronDataIntervalTimetable, DeltaDataIntervalTimetable
  124. from airflow.timetables.simple import (
  125. ContinuousTimetable,
  126. DatasetTriggeredTimetable,
  127. NullTimetable,
  128. OnceTimetable,
  129. )
  130. from airflow.timetables.trigger import CronTriggerTimetable
  131. from airflow.utils import timezone
  132. from airflow.utils.dag_cycle_tester import check_cycle
  133. from airflow.utils.dates import cron_presets, date_range as utils_date_range
  134. from airflow.utils.decorators import fixup_decorator_warning_stack
  135. from airflow.utils.helpers import at_most_one, exactly_one, validate_instance_args, validate_key
  136. from airflow.utils.log.logging_mixin import LoggingMixin
  137. from airflow.utils.session import NEW_SESSION, provide_session
  138. from airflow.utils.sqlalchemy import (
  139. Interval,
  140. UtcDateTime,
  141. lock_rows,
  142. tuple_in_condition,
  143. with_row_locks,
  144. )
  145. from airflow.utils.state import DagRunState, State, TaskInstanceState
  146. from airflow.utils.trigger_rule import TriggerRule
  147. from airflow.utils.types import NOTSET, ArgNotSet, DagRunType, EdgeInfoType
  148. if TYPE_CHECKING:
  149. from types import ModuleType
  150. from pendulum.tz.timezone import FixedTimezone, Timezone
  151. from sqlalchemy.orm.query import Query
  152. from sqlalchemy.orm.session import Session
  153. from airflow.decorators import TaskDecoratorCollection
  154. from airflow.models.dagbag import DagBag
  155. from airflow.models.operator import Operator
  156. from airflow.models.slamiss import SlaMiss
  157. from airflow.serialization.pydantic.dag import DagModelPydantic
  158. from airflow.serialization.pydantic.dag_run import DagRunPydantic
  159. from airflow.typing_compat import Literal
  160. from airflow.utils.task_group import TaskGroup
  161. log = logging.getLogger(__name__)
  162. DEFAULT_VIEW_PRESETS = ["grid", "graph", "duration", "gantt", "landing_times"]
  163. ORIENTATION_PRESETS = ["LR", "TB", "RL", "BT"]
  164. TAG_MAX_LEN = 100
  165. DagStateChangeCallback = Callable[[Context], None]
  166. ScheduleInterval = Union[None, str, timedelta, relativedelta]
  167. # FIXME: Ideally this should be Union[Literal[NOTSET], ScheduleInterval],
  168. # but Mypy cannot handle that right now. Track progress of PEP 661 for progress.
  169. # See also: https://discuss.python.org/t/9126/7
  170. ScheduleIntervalArg = Union[ArgNotSet, ScheduleInterval]
  171. ScheduleArg = Union[
  172. ArgNotSet, ScheduleInterval, Timetable, BaseDataset, Collection[Union["Dataset", "DatasetAlias"]]
  173. ]
  174. SLAMissCallback = Callable[["DAG", str, str, List["SlaMiss"], List[TaskInstance]], None]
  175. # Backward compatibility: If neither schedule_interval nor timetable is
  176. # *provided by the user*, default to a one-day interval.
  177. DEFAULT_SCHEDULE_INTERVAL = timedelta(days=1)
  178. class InconsistentDataInterval(AirflowException):
  179. """
  180. Exception raised when a model populates data interval fields incorrectly.
  181. The data interval fields should either both be None (for runs scheduled
  182. prior to AIP-39), or both be datetime (for runs scheduled after AIP-39 is
  183. implemented). This is raised if exactly one of the fields is None.
  184. """
  185. _template = (
  186. "Inconsistent {cls}: {start[0]}={start[1]!r}, {end[0]}={end[1]!r}, "
  187. "they must be either both None or both datetime"
  188. )
  189. def __init__(self, instance: Any, start_field_name: str, end_field_name: str) -> None:
  190. self._class_name = type(instance).__name__
  191. self._start_field = (start_field_name, getattr(instance, start_field_name))
  192. self._end_field = (end_field_name, getattr(instance, end_field_name))
  193. def __str__(self) -> str:
  194. return self._template.format(cls=self._class_name, start=self._start_field, end=self._end_field)
  195. def _get_model_data_interval(
  196. instance: Any,
  197. start_field_name: str,
  198. end_field_name: str,
  199. ) -> DataInterval | None:
  200. start = timezone.coerce_datetime(getattr(instance, start_field_name))
  201. end = timezone.coerce_datetime(getattr(instance, end_field_name))
  202. if start is None:
  203. if end is not None:
  204. raise InconsistentDataInterval(instance, start_field_name, end_field_name)
  205. return None
  206. elif end is None:
  207. raise InconsistentDataInterval(instance, start_field_name, end_field_name)
  208. return DataInterval(start, end)
  209. def create_timetable(interval: ScheduleIntervalArg, timezone: Timezone | FixedTimezone) -> Timetable:
  210. """Create a Timetable instance from a ``schedule_interval`` argument."""
  211. if interval is NOTSET:
  212. return DeltaDataIntervalTimetable(DEFAULT_SCHEDULE_INTERVAL)
  213. if interval is None:
  214. return NullTimetable()
  215. if interval == "@once":
  216. return OnceTimetable()
  217. if interval == "@continuous":
  218. return ContinuousTimetable()
  219. if isinstance(interval, (timedelta, relativedelta)):
  220. return DeltaDataIntervalTimetable(interval)
  221. if isinstance(interval, str):
  222. if airflow_conf.getboolean("scheduler", "create_cron_data_intervals"):
  223. return CronDataIntervalTimetable(interval, timezone)
  224. else:
  225. return CronTriggerTimetable(interval, timezone=timezone)
  226. raise ValueError(f"{interval!r} is not a valid schedule_interval.")
  227. def get_last_dagrun(dag_id, session, include_externally_triggered=False):
  228. """
  229. Return the last dag run for a dag, None if there was none.
  230. Last dag run can be any type of run e.g. scheduled or backfilled.
  231. Overridden DagRuns are ignored.
  232. """
  233. DR = DagRun
  234. query = select(DR).where(DR.dag_id == dag_id)
  235. if not include_externally_triggered:
  236. query = query.where(DR.external_trigger == expression.false())
  237. query = query.order_by(DR.execution_date.desc())
  238. return session.scalar(query.limit(1))
  239. def get_dataset_triggered_next_run_info(
  240. dag_ids: list[str], *, session: Session
  241. ) -> dict[str, dict[str, int | str]]:
  242. """
  243. Get next run info for a list of dag_ids.
  244. Given a list of dag_ids, get string representing how close any that are dataset triggered are
  245. their next run, e.g. "1 of 2 datasets updated".
  246. """
  247. from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue as DDRQ, DatasetModel
  248. return {
  249. x.dag_id: {
  250. "uri": x.uri,
  251. "ready": x.ready,
  252. "total": x.total,
  253. }
  254. for x in session.execute(
  255. select(
  256. DagScheduleDatasetReference.dag_id,
  257. # This is a dirty hack to workaround group by requiring an aggregate,
  258. # since grouping by dataset is not what we want to do here...but it works
  259. case((func.count() == 1, func.max(DatasetModel.uri)), else_="").label("uri"),
  260. func.count().label("total"),
  261. func.sum(case((DDRQ.target_dag_id.is_not(None), 1), else_=0)).label("ready"),
  262. )
  263. .join(
  264. DDRQ,
  265. and_(
  266. DDRQ.dataset_id == DagScheduleDatasetReference.dataset_id,
  267. DDRQ.target_dag_id == DagScheduleDatasetReference.dag_id,
  268. ),
  269. isouter=True,
  270. )
  271. .join(DatasetModel, DatasetModel.id == DagScheduleDatasetReference.dataset_id)
  272. .group_by(DagScheduleDatasetReference.dag_id)
  273. .where(DagScheduleDatasetReference.dag_id.in_(dag_ids))
  274. ).all()
  275. }
  276. def _triggerer_is_healthy():
  277. from airflow.jobs.triggerer_job_runner import TriggererJobRunner
  278. job = TriggererJobRunner.most_recent_job()
  279. return job and job.is_alive()
  280. @provide_session
  281. def _create_orm_dagrun(
  282. dag,
  283. dag_id,
  284. run_id,
  285. logical_date,
  286. start_date,
  287. external_trigger,
  288. conf,
  289. state,
  290. run_type,
  291. dag_hash,
  292. creating_job_id,
  293. data_interval,
  294. session,
  295. ):
  296. run = DagRun(
  297. dag_id=dag_id,
  298. run_id=run_id,
  299. execution_date=logical_date,
  300. start_date=start_date,
  301. external_trigger=external_trigger,
  302. conf=conf,
  303. state=state,
  304. run_type=run_type,
  305. dag_hash=dag_hash,
  306. creating_job_id=creating_job_id,
  307. data_interval=data_interval,
  308. )
  309. # Load defaults into the following two fields to ensure result can be serialized detached
  310. run.log_template_id = int(session.scalar(select(func.max(LogTemplate.__table__.c.id))))
  311. run.consumed_dataset_events = []
  312. session.add(run)
  313. session.flush()
  314. run.dag = dag
  315. # create the associated task instances
  316. # state is None at the moment of creation
  317. run.verify_integrity(session=session)
  318. return run
  319. # TODO: The following mapping is used to validate that the arguments passed to the DAG are of the correct
  320. # type. This is a temporary solution until we find a more sophisticated method for argument validation.
  321. # One potential method is to use `get_type_hints` from the typing module. However, this is not fully
  322. # compatible with future annotations for Python versions below 3.10. Once we require a minimum Python
  323. # version that supports `get_type_hints` effectively or find a better approach, we can replace this
  324. # manual type-checking method.
  325. DAG_ARGS_EXPECTED_TYPES = {
  326. "dag_id": str,
  327. "description": str,
  328. "max_active_tasks": int,
  329. "max_active_runs": int,
  330. "max_consecutive_failed_dag_runs": int,
  331. "dagrun_timeout": timedelta,
  332. "default_view": str,
  333. "orientation": str,
  334. "catchup": bool,
  335. "doc_md": str,
  336. "is_paused_upon_creation": bool,
  337. "render_template_as_native_obj": bool,
  338. "tags": list,
  339. "auto_register": bool,
  340. "fail_stop": bool,
  341. "dag_display_name": str,
  342. }
  343. @functools.total_ordering
  344. class DAG(LoggingMixin):
  345. """
  346. A dag (directed acyclic graph) is a collection of tasks with directional dependencies.
  347. A dag also has a schedule, a start date and an end date (optional). For each schedule,
  348. (say daily or hourly), the DAG needs to run each individual tasks as their dependencies
  349. are met. Certain tasks have the property of depending on their own past, meaning that
  350. they can't run until their previous schedule (and upstream tasks) are completed.
  351. DAGs essentially act as namespaces for tasks. A task_id can only be
  352. added once to a DAG.
  353. Note that if you plan to use time zones all the dates provided should be pendulum
  354. dates. See :ref:`timezone_aware_dags`.
  355. .. versionadded:: 2.4
  356. The *schedule* argument to specify either time-based scheduling logic
  357. (timetable), or dataset-driven triggers.
  358. .. deprecated:: 2.4
  359. The arguments *schedule_interval* and *timetable*. Their functionalities
  360. are merged into the new *schedule* argument.
  361. :param dag_id: The id of the DAG; must consist exclusively of alphanumeric
  362. characters, dashes, dots and underscores (all ASCII)
  363. :param description: The description for the DAG to e.g. be shown on the webserver
  364. :param schedule: Defines the rules according to which DAG runs are scheduled. Can
  365. accept cron string, timedelta object, Timetable, or list of Dataset objects.
  366. If this is not provided, the DAG will be set to the default
  367. schedule ``timedelta(days=1)``. See also :doc:`/howto/timetable`.
  368. :param start_date: The timestamp from which the scheduler will
  369. attempt to backfill
  370. :param end_date: A date beyond which your DAG won't run, leave to None
  371. for open-ended scheduling
  372. :param template_searchpath: This list of folders (non-relative)
  373. defines where jinja will look for your templates. Order matters.
  374. Note that jinja/airflow includes the path of your DAG file by
  375. default
  376. :param template_undefined: Template undefined type.
  377. :param user_defined_macros: a dictionary of macros that will be exposed
  378. in your jinja templates. For example, passing ``dict(foo='bar')``
  379. to this argument allows you to ``{{ foo }}`` in all jinja
  380. templates related to this DAG. Note that you can pass any
  381. type of object here.
  382. :param user_defined_filters: a dictionary of filters that will be exposed
  383. in your jinja templates. For example, passing
  384. ``dict(hello=lambda name: 'Hello %s' % name)`` to this argument allows
  385. you to ``{{ 'world' | hello }}`` in all jinja templates related to
  386. this DAG.
  387. :param default_args: A dictionary of default parameters to be used
  388. as constructor keyword parameters when initialising operators.
  389. Note that operators have the same hook, and precede those defined
  390. here, meaning that if your dict contains `'depends_on_past': True`
  391. here and `'depends_on_past': False` in the operator's call
  392. `default_args`, the actual value will be `False`.
  393. :param params: a dictionary of DAG level parameters that are made
  394. accessible in templates, namespaced under `params`. These
  395. params can be overridden at the task level.
  396. :param max_active_tasks: the number of task instances allowed to run
  397. concurrently
  398. :param max_active_runs: maximum number of active DAG runs, beyond this
  399. number of DAG runs in a running state, the scheduler won't create
  400. new active DAG runs
  401. :param max_consecutive_failed_dag_runs: (experimental) maximum number of consecutive failed DAG runs,
  402. beyond this the scheduler will disable the DAG
  403. :param dagrun_timeout: specify how long a DagRun should be up before
  404. timing out / failing, so that new DagRuns can be created.
  405. :param sla_miss_callback: specify a function or list of functions to call when reporting SLA
  406. timeouts. See :ref:`sla_miss_callback<concepts:sla_miss_callback>` for
  407. more information about the function signature and parameters that are
  408. passed to the callback.
  409. :param default_view: Specify DAG default view (grid, graph, duration,
  410. gantt, landing_times), default grid
  411. :param orientation: Specify DAG orientation in graph view (LR, TB, RL, BT), default LR
  412. :param catchup: Perform scheduler catchup (or only run latest)? Defaults to True
  413. :param on_failure_callback: A function or list of functions to be called when a DagRun of this dag fails.
  414. A context dictionary is passed as a single parameter to this function.
  415. :param on_success_callback: Much like the ``on_failure_callback`` except
  416. that it is executed when the dag succeeds.
  417. :param access_control: Specify optional DAG-level actions, e.g.,
  418. "{'role1': {'can_read'}, 'role2': {'can_read', 'can_edit', 'can_delete'}}"
  419. or it can specify the resource name if there is a DAGs Run resource, e.g.,
  420. "{'role1': {'DAG Runs': {'can_create'}}, 'role2': {'DAGs': {'can_read', 'can_edit', 'can_delete'}}"
  421. :param is_paused_upon_creation: Specifies if the dag is paused when created for the first time.
  422. If the dag exists already, this flag will be ignored. If this optional parameter
  423. is not specified, the global config setting will be used.
  424. :param jinja_environment_kwargs: additional configuration options to be passed to Jinja
  425. ``Environment`` for template rendering
  426. **Example**: to avoid Jinja from removing a trailing newline from template strings ::
  427. DAG(
  428. dag_id="my-dag",
  429. jinja_environment_kwargs={
  430. "keep_trailing_newline": True,
  431. # some other jinja2 Environment options here
  432. },
  433. )
  434. **See**: `Jinja Environment documentation
  435. <https://jinja.palletsprojects.com/en/2.11.x/api/#jinja2.Environment>`_
  436. :param render_template_as_native_obj: If True, uses a Jinja ``NativeEnvironment``
  437. to render templates as native Python types. If False, a Jinja
  438. ``Environment`` is used to render templates as string values.
  439. :param tags: List of tags to help filtering DAGs in the UI.
  440. :param owner_links: Dict of owners and their links, that will be clickable on the DAGs view UI.
  441. Can be used as an HTTP link (for example the link to your Slack channel), or a mailto link.
  442. e.g: {"dag_owner": "https://airflow.apache.org/"}
  443. :param auto_register: Automatically register this DAG when it is used in a ``with`` block
  444. :param fail_stop: Fails currently running tasks when task in DAG fails.
  445. **Warning**: A fail stop dag can only have tasks with the default trigger rule ("all_success").
  446. An exception will be thrown if any task in a fail stop dag has a non default trigger rule.
  447. :param dag_display_name: The display name of the DAG which appears on the UI.
  448. """
  449. _comps = {
  450. "dag_id",
  451. "task_ids",
  452. "parent_dag",
  453. "start_date",
  454. "end_date",
  455. "schedule_interval",
  456. "fileloc",
  457. "template_searchpath",
  458. "last_loaded",
  459. }
  460. __serialized_fields: frozenset[str] | None = None
  461. fileloc: str
  462. """
  463. File path that needs to be imported to load this DAG or subdag.
  464. This may not be an actual file on disk in the case when this DAG is loaded
  465. from a ZIP file or other DAG distribution format.
  466. """
  467. parent_dag: DAG | None = None # Gets set when DAGs are loaded
  468. # NOTE: When updating arguments here, please also keep arguments in @dag()
  469. # below in sync. (Search for 'def dag(' in this file.)
  470. def __init__(
  471. self,
  472. dag_id: str,
  473. description: str | None = None,
  474. schedule: ScheduleArg = NOTSET,
  475. schedule_interval: ScheduleIntervalArg = NOTSET,
  476. timetable: Timetable | None = None,
  477. start_date: datetime | None = None,
  478. end_date: datetime | None = None,
  479. full_filepath: str | None = None,
  480. template_searchpath: str | Iterable[str] | None = None,
  481. template_undefined: type[jinja2.StrictUndefined] = jinja2.StrictUndefined,
  482. user_defined_macros: dict | None = None,
  483. user_defined_filters: dict | None = None,
  484. default_args: dict | None = None,
  485. concurrency: int | None = None,
  486. max_active_tasks: int = airflow_conf.getint("core", "max_active_tasks_per_dag"),
  487. max_active_runs: int = airflow_conf.getint("core", "max_active_runs_per_dag"),
  488. max_consecutive_failed_dag_runs: int = airflow_conf.getint(
  489. "core", "max_consecutive_failed_dag_runs_per_dag"
  490. ),
  491. dagrun_timeout: timedelta | None = None,
  492. sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = None,
  493. default_view: str = airflow_conf.get_mandatory_value("webserver", "dag_default_view").lower(),
  494. orientation: str = airflow_conf.get_mandatory_value("webserver", "dag_orientation"),
  495. catchup: bool = airflow_conf.getboolean("scheduler", "catchup_by_default"),
  496. on_success_callback: None | DagStateChangeCallback | list[DagStateChangeCallback] = None,
  497. on_failure_callback: None | DagStateChangeCallback | list[DagStateChangeCallback] = None,
  498. doc_md: str | None = None,
  499. params: abc.MutableMapping | None = None,
  500. access_control: dict[str, dict[str, Collection[str]]] | dict[str, Collection[str]] | None = None,
  501. is_paused_upon_creation: bool | None = None,
  502. jinja_environment_kwargs: dict | None = None,
  503. render_template_as_native_obj: bool = False,
  504. tags: list[str] | None = None,
  505. owner_links: dict[str, str] | None = None,
  506. auto_register: bool = True,
  507. fail_stop: bool = False,
  508. dag_display_name: str | None = None,
  509. ):
  510. from airflow.utils.task_group import TaskGroup
  511. if tags and any(len(tag) > TAG_MAX_LEN for tag in tags):
  512. raise AirflowException(f"tag cannot be longer than {TAG_MAX_LEN} characters")
  513. self.owner_links = owner_links or {}
  514. self.user_defined_macros = user_defined_macros
  515. self.user_defined_filters = user_defined_filters
  516. if default_args and not isinstance(default_args, dict):
  517. raise TypeError("default_args must be a dict")
  518. self.default_args = copy.deepcopy(default_args or {})
  519. params = params or {}
  520. # merging potentially conflicting default_args['params'] into params
  521. if "params" in self.default_args:
  522. params.update(self.default_args["params"])
  523. del self.default_args["params"]
  524. # check self.params and convert them into ParamsDict
  525. self.params = ParamsDict(params)
  526. if full_filepath:
  527. warnings.warn(
  528. "Passing full_filepath to DAG() is deprecated and has no effect",
  529. RemovedInAirflow3Warning,
  530. stacklevel=2,
  531. )
  532. validate_key(dag_id)
  533. self._dag_id = dag_id
  534. self._dag_display_property_value = dag_display_name
  535. if concurrency:
  536. # TODO: Remove in Airflow 3.0
  537. warnings.warn(
  538. "The 'concurrency' parameter is deprecated. Please use 'max_active_tasks'.",
  539. RemovedInAirflow3Warning,
  540. stacklevel=2,
  541. )
  542. max_active_tasks = concurrency
  543. self._max_active_tasks = max_active_tasks
  544. self._pickle_id: int | None = None
  545. self._description = description
  546. # set file location to caller source path
  547. back = sys._getframe().f_back
  548. self.fileloc = back.f_code.co_filename if back else ""
  549. self.task_dict: dict[str, Operator] = {}
  550. # set timezone from start_date
  551. tz = None
  552. if start_date and start_date.tzinfo:
  553. tzinfo = None if start_date.tzinfo else settings.TIMEZONE
  554. tz = pendulum.instance(start_date, tz=tzinfo).timezone
  555. elif date := self.default_args.get("start_date"):
  556. if not isinstance(date, datetime):
  557. date = timezone.parse(date)
  558. self.default_args["start_date"] = date
  559. start_date = date
  560. tzinfo = None if date.tzinfo else settings.TIMEZONE
  561. tz = pendulum.instance(date, tz=tzinfo).timezone
  562. self.timezone: Timezone | FixedTimezone = tz or settings.TIMEZONE
  563. # Apply the timezone we settled on to end_date if it wasn't supplied
  564. if isinstance(_end_date := self.default_args.get("end_date"), str):
  565. self.default_args["end_date"] = timezone.parse(_end_date, timezone=self.timezone)
  566. self.start_date = timezone.convert_to_utc(start_date)
  567. self.end_date = timezone.convert_to_utc(end_date)
  568. # also convert tasks
  569. if "start_date" in self.default_args:
  570. self.default_args["start_date"] = timezone.convert_to_utc(self.default_args["start_date"])
  571. if "end_date" in self.default_args:
  572. self.default_args["end_date"] = timezone.convert_to_utc(self.default_args["end_date"])
  573. # sort out DAG's scheduling behavior
  574. scheduling_args = [schedule_interval, timetable, schedule]
  575. has_scheduling_args = any(a is not NOTSET and bool(a) for a in scheduling_args)
  576. has_empty_start_date = not ("start_date" in self.default_args or self.start_date)
  577. if has_scheduling_args and has_empty_start_date:
  578. raise ValueError("DAG is missing the start_date parameter")
  579. if not at_most_one(*scheduling_args):
  580. raise ValueError("At most one allowed for args 'schedule_interval', 'timetable', and 'schedule'.")
  581. if schedule_interval is not NOTSET:
  582. warnings.warn(
  583. "Param `schedule_interval` is deprecated and will be removed in a future release. "
  584. "Please use `schedule` instead. ",
  585. RemovedInAirflow3Warning,
  586. stacklevel=2,
  587. )
  588. if timetable is not None:
  589. warnings.warn(
  590. "Param `timetable` is deprecated and will be removed in a future release. "
  591. "Please use `schedule` instead. ",
  592. RemovedInAirflow3Warning,
  593. stacklevel=2,
  594. )
  595. if timetable is not None:
  596. schedule = timetable
  597. elif schedule_interval is not NOTSET:
  598. schedule = schedule_interval
  599. # Kept for compatibility. Do not use in new code.
  600. self.schedule_interval: ScheduleInterval
  601. if isinstance(schedule, Timetable):
  602. self.timetable = schedule
  603. self.schedule_interval = schedule.summary
  604. elif isinstance(schedule, BaseDataset):
  605. self.timetable = DatasetTriggeredTimetable(schedule)
  606. self.schedule_interval = self.timetable.summary
  607. elif isinstance(schedule, Collection) and not isinstance(schedule, str):
  608. if not all(isinstance(x, (Dataset, DatasetAlias)) for x in schedule):
  609. raise ValueError("All elements in 'schedule' should be datasets or dataset aliases")
  610. self.timetable = DatasetTriggeredTimetable(DatasetAll(*schedule))
  611. self.schedule_interval = self.timetable.summary
  612. elif isinstance(schedule, ArgNotSet):
  613. warnings.warn(
  614. "Creating a DAG with an implicit schedule is deprecated, and will stop working "
  615. "in a future release. Set `schedule=datetime.timedelta(days=1)` explicitly.",
  616. RemovedInAirflow3Warning,
  617. stacklevel=2,
  618. )
  619. self.timetable = create_timetable(schedule, self.timezone)
  620. self.schedule_interval = DEFAULT_SCHEDULE_INTERVAL
  621. else:
  622. self.timetable = create_timetable(schedule, self.timezone)
  623. self.schedule_interval = schedule
  624. if isinstance(template_searchpath, str):
  625. template_searchpath = [template_searchpath]
  626. self.template_searchpath = template_searchpath
  627. self.template_undefined = template_undefined
  628. self.last_loaded: datetime = timezone.utcnow()
  629. self.safe_dag_id = dag_id.replace(".", "__dot__")
  630. self.max_active_runs = max_active_runs
  631. self.max_consecutive_failed_dag_runs = max_consecutive_failed_dag_runs
  632. if self.max_consecutive_failed_dag_runs == 0:
  633. self.max_consecutive_failed_dag_runs = airflow_conf.getint(
  634. "core", "max_consecutive_failed_dag_runs_per_dag"
  635. )
  636. if self.max_consecutive_failed_dag_runs < 0:
  637. raise AirflowException(
  638. f"Invalid max_consecutive_failed_dag_runs: {self.max_consecutive_failed_dag_runs}."
  639. f"Requires max_consecutive_failed_dag_runs >= 0"
  640. )
  641. if self.timetable.active_runs_limit is not None:
  642. if self.timetable.active_runs_limit < self.max_active_runs:
  643. raise AirflowException(
  644. f"Invalid max_active_runs: {type(self.timetable)} "
  645. f"requires max_active_runs <= {self.timetable.active_runs_limit}"
  646. )
  647. self.dagrun_timeout = dagrun_timeout
  648. self.sla_miss_callback = sla_miss_callback
  649. if default_view in DEFAULT_VIEW_PRESETS:
  650. self._default_view: str = default_view
  651. elif default_view == "tree":
  652. warnings.warn(
  653. "`default_view` of 'tree' has been renamed to 'grid' -- please update your DAG",
  654. RemovedInAirflow3Warning,
  655. stacklevel=2,
  656. )
  657. self._default_view = "grid"
  658. else:
  659. raise AirflowException(
  660. f"Invalid values of dag.default_view: only support "
  661. f"{DEFAULT_VIEW_PRESETS}, but get {default_view}"
  662. )
  663. if orientation in ORIENTATION_PRESETS:
  664. self.orientation = orientation
  665. else:
  666. raise AirflowException(
  667. f"Invalid values of dag.orientation: only support "
  668. f"{ORIENTATION_PRESETS}, but get {orientation}"
  669. )
  670. self.catchup: bool = catchup
  671. self.partial: bool = False
  672. self.on_success_callback = on_success_callback
  673. self.on_failure_callback = on_failure_callback
  674. # Keeps track of any extra edge metadata (sparse; will not contain all
  675. # edges, so do not iterate over it for that). Outer key is upstream
  676. # task ID, inner key is downstream task ID.
  677. self.edge_info: dict[str, dict[str, EdgeInfoType]] = {}
  678. # To keep it in parity with Serialized DAGs
  679. # and identify if DAG has on_*_callback without actually storing them in Serialized JSON
  680. self.has_on_success_callback: bool = self.on_success_callback is not None
  681. self.has_on_failure_callback: bool = self.on_failure_callback is not None
  682. self._access_control = DAG._upgrade_outdated_dag_access_control(access_control)
  683. self.is_paused_upon_creation = is_paused_upon_creation
  684. self.auto_register = auto_register
  685. self.fail_stop: bool = fail_stop
  686. self.jinja_environment_kwargs = jinja_environment_kwargs
  687. self.render_template_as_native_obj = render_template_as_native_obj
  688. self.doc_md = self.get_doc_md(doc_md)
  689. self.tags = tags or []
  690. self._task_group = TaskGroup.create_root(self)
  691. self.validate_schedule_and_params()
  692. wrong_links = dict(self.iter_invalid_owner_links())
  693. if wrong_links:
  694. raise AirflowException(
  695. "Wrong link format was used for the owner. Use a valid link \n"
  696. f"Bad formatted links are: {wrong_links}"
  697. )
  698. # this will only be set at serialization time
  699. # it's only use is for determining the relative
  700. # fileloc based only on the serialize dag
  701. self._processor_dags_folder = None
  702. validate_instance_args(self, DAG_ARGS_EXPECTED_TYPES)
  703. def get_doc_md(self, doc_md: str | None) -> str | None:
  704. if doc_md is None:
  705. return doc_md
  706. if doc_md.endswith(".md"):
  707. try:
  708. return open(doc_md).read()
  709. except FileNotFoundError:
  710. return doc_md
  711. return doc_md
  712. def _check_schedule_interval_matches_timetable(self) -> bool:
  713. """
  714. Check ``schedule_interval`` and ``timetable`` match.
  715. This is done as a part of the DAG validation done before it's bagged, to
  716. guard against the DAG's ``timetable`` (or ``schedule_interval``) from
  717. being changed after it's created, e.g.
  718. .. code-block:: python
  719. dag1 = DAG("d1", timetable=MyTimetable())
  720. dag1.schedule_interval = "@once"
  721. dag2 = DAG("d2", schedule="@once")
  722. dag2.timetable = MyTimetable()
  723. Validation is done by creating a timetable and check its summary matches
  724. ``schedule_interval``. The logic is not bullet-proof, especially if a
  725. custom timetable does not provide a useful ``summary``. But this is the
  726. best we can do.
  727. """
  728. if self.schedule_interval == self.timetable.summary:
  729. return True
  730. try:
  731. timetable = create_timetable(self.schedule_interval, self.timezone)
  732. except ValueError:
  733. return False
  734. return timetable.summary == self.timetable.summary
  735. def validate(self):
  736. """
  737. Validate the DAG has a coherent setup.
  738. This is called by the DAG bag before bagging the DAG.
  739. """
  740. if not self._check_schedule_interval_matches_timetable():
  741. raise AirflowDagInconsistent(
  742. f"inconsistent schedule: timetable {self.timetable.summary!r} "
  743. f"does not match schedule_interval {self.schedule_interval!r}",
  744. )
  745. self.validate_executor_field()
  746. self.validate_schedule_and_params()
  747. self.timetable.validate()
  748. self.validate_setup_teardown()
  749. def validate_executor_field(self):
  750. for task in self.tasks:
  751. if task.executor:
  752. try:
  753. ExecutorLoader.lookup_executor_name_by_str(task.executor)
  754. except UnknownExecutorException:
  755. raise UnknownExecutorException(
  756. f"The specified executor {task.executor} for task {task.task_id} is not "
  757. "configured. Review the core.executors Airflow configuration to add it or "
  758. "update the executor configuration for this task."
  759. )
  760. def validate_setup_teardown(self):
  761. """
  762. Validate that setup and teardown tasks are configured properly.
  763. :meta private:
  764. """
  765. for task in self.tasks:
  766. if task.is_setup:
  767. for down_task in task.downstream_list:
  768. if not down_task.is_teardown and down_task.trigger_rule != TriggerRule.ALL_SUCCESS:
  769. # todo: we can relax this to allow out-of-scope tasks to have other trigger rules
  770. # this is required to ensure consistent behavior of dag
  771. # when clearing an indirect setup
  772. raise ValueError("Setup tasks must be followed with trigger rule ALL_SUCCESS.")
  773. FailStopDagInvalidTriggerRule.check(dag=self, trigger_rule=task.trigger_rule)
  774. def __repr__(self):
  775. return f"<DAG: {self.dag_id}>"
  776. def __eq__(self, other):
  777. if type(self) is type(other):
  778. # Use getattr() instead of __dict__ as __dict__ doesn't return
  779. # correct values for properties.
  780. return all(getattr(self, c, None) == getattr(other, c, None) for c in self._comps)
  781. return False
  782. def __ne__(self, other):
  783. return not self == other
  784. def __lt__(self, other):
  785. return self.dag_id < other.dag_id
  786. def __hash__(self):
  787. hash_components = [type(self)]
  788. for c in self._comps:
  789. # task_ids returns a list and lists can't be hashed
  790. if c == "task_ids":
  791. val = tuple(self.task_dict)
  792. else:
  793. val = getattr(self, c, None)
  794. try:
  795. hash(val)
  796. hash_components.append(val)
  797. except TypeError:
  798. hash_components.append(repr(val))
  799. return hash(tuple(hash_components))
  800. # Context Manager -----------------------------------------------
  801. def __enter__(self):
  802. DagContext.push_context_managed_dag(self)
  803. return self
  804. def __exit__(self, _type, _value, _tb):
  805. DagContext.pop_context_managed_dag()
  806. # /Context Manager ----------------------------------------------
  807. @staticmethod
  808. def _upgrade_outdated_dag_access_control(access_control=None):
  809. """
  810. Look for outdated dag level actions in DAG access_controls and replace them with updated actions.
  811. For example, in DAG access_control {'role1': {'can_dag_read'}} 'can_dag_read'
  812. will be replaced with 'can_read', in {'role2': {'can_dag_read', 'can_dag_edit'}}
  813. 'can_dag_edit' will be replaced with 'can_edit', etc.
  814. """
  815. if access_control is None:
  816. return None
  817. new_dag_perm_mapping = {
  818. permissions.DEPRECATED_ACTION_CAN_DAG_READ: permissions.ACTION_CAN_READ,
  819. permissions.DEPRECATED_ACTION_CAN_DAG_EDIT: permissions.ACTION_CAN_EDIT,
  820. }
  821. def update_old_perm(permission: str):
  822. new_perm = new_dag_perm_mapping.get(permission, permission)
  823. if new_perm != permission:
  824. warnings.warn(
  825. f"The '{permission}' permission is deprecated. Please use '{new_perm}'.",
  826. RemovedInAirflow3Warning,
  827. stacklevel=3,
  828. )
  829. return new_perm
  830. updated_access_control = {}
  831. for role, perms in access_control.items():
  832. if packaging_version.parse(FAB_VERSION) >= packaging_version.parse("1.3.0"):
  833. updated_access_control[role] = updated_access_control.get(role, {})
  834. if isinstance(perms, (set, list)):
  835. # Support for old-style access_control where only the actions are specified
  836. updated_access_control[role][permissions.RESOURCE_DAG] = set(perms)
  837. else:
  838. updated_access_control[role] = perms
  839. if permissions.RESOURCE_DAG in updated_access_control[role]:
  840. updated_access_control[role][permissions.RESOURCE_DAG] = {
  841. update_old_perm(perm)
  842. for perm in updated_access_control[role][permissions.RESOURCE_DAG]
  843. }
  844. elif isinstance(perms, dict):
  845. # Not allow new access control format with old FAB versions
  846. raise AirflowException(
  847. "Please upgrade the FAB provider to a version >= 1.3.0 to allow "
  848. "use the Dag Level Access Control new format."
  849. )
  850. else:
  851. updated_access_control[role] = {update_old_perm(perm) for perm in perms}
  852. return updated_access_control
  853. def date_range(
  854. self,
  855. start_date: pendulum.DateTime,
  856. num: int | None = None,
  857. end_date: datetime | None = None,
  858. ) -> list[datetime]:
  859. message = "`DAG.date_range()` is deprecated."
  860. if num is not None:
  861. warnings.warn(message, category=RemovedInAirflow3Warning, stacklevel=2)
  862. with warnings.catch_warnings():
  863. warnings.simplefilter("ignore", RemovedInAirflow3Warning)
  864. return utils_date_range(
  865. start_date=start_date, num=num, delta=self.normalized_schedule_interval
  866. )
  867. message += " Please use `DAG.iter_dagrun_infos_between(..., align=False)` instead."
  868. warnings.warn(message, category=RemovedInAirflow3Warning, stacklevel=2)
  869. if end_date is None:
  870. coerced_end_date = timezone.utcnow()
  871. else:
  872. coerced_end_date = end_date
  873. it = self.iter_dagrun_infos_between(start_date, pendulum.instance(coerced_end_date), align=False)
  874. return [info.logical_date for info in it]
  875. def is_fixed_time_schedule(self):
  876. """
  877. Figures out if the schedule has a fixed time (e.g. 3 AM every day).
  878. Detection is done by "peeking" the next two cron trigger time; if the
  879. two times have the same minute and hour value, the schedule is fixed,
  880. and we *don't* need to perform the DST fix.
  881. This assumes DST happens on whole minute changes (e.g. 12:59 -> 12:00).
  882. Do not try to understand what this actually means. It is old logic that
  883. should not be used anywhere.
  884. """
  885. warnings.warn(
  886. "`DAG.is_fixed_time_schedule()` is deprecated.",
  887. category=RemovedInAirflow3Warning,
  888. stacklevel=2,
  889. )
  890. from airflow.timetables._cron import CronMixin
  891. if not isinstance(self.timetable, CronMixin):
  892. return True
  893. from croniter import croniter
  894. cron = croniter(self.timetable._expression)
  895. next_a = cron.get_next(datetime)
  896. next_b = cron.get_next(datetime)
  897. return next_b.minute == next_a.minute and next_b.hour == next_a.hour
  898. def following_schedule(self, dttm):
  899. """
  900. Calculate the following schedule for this dag in UTC.
  901. :param dttm: utc datetime
  902. :return: utc datetime
  903. """
  904. warnings.warn(
  905. "`DAG.following_schedule()` is deprecated. Use `DAG.next_dagrun_info(restricted=False)` instead.",
  906. category=RemovedInAirflow3Warning,
  907. stacklevel=2,
  908. )
  909. data_interval = self.infer_automated_data_interval(timezone.coerce_datetime(dttm))
  910. next_info = self.next_dagrun_info(data_interval, restricted=False)
  911. if next_info is None:
  912. return None
  913. return next_info.data_interval.start
  914. def previous_schedule(self, dttm):
  915. from airflow.timetables.interval import _DataIntervalTimetable
  916. warnings.warn(
  917. "`DAG.previous_schedule()` is deprecated.",
  918. category=RemovedInAirflow3Warning,
  919. stacklevel=2,
  920. )
  921. if not isinstance(self.timetable, _DataIntervalTimetable):
  922. return None
  923. return self.timetable._get_prev(timezone.coerce_datetime(dttm))
  924. def get_next_data_interval(self, dag_model: DagModel) -> DataInterval | None:
  925. """
  926. Get the data interval of the next scheduled run.
  927. For compatibility, this method infers the data interval from the DAG's
  928. schedule if the run does not have an explicit one set, which is possible
  929. for runs created prior to AIP-39.
  930. This function is private to Airflow core and should not be depended on as a
  931. part of the Python API.
  932. :meta private:
  933. """
  934. if self.dag_id != dag_model.dag_id:
  935. raise ValueError(f"Arguments refer to different DAGs: {self.dag_id} != {dag_model.dag_id}")
  936. if dag_model.next_dagrun is None: # Next run not scheduled.
  937. return None
  938. data_interval = dag_model.next_dagrun_data_interval
  939. if data_interval is not None:
  940. return data_interval
  941. # Compatibility: A run was scheduled without an explicit data interval.
  942. # This means the run was scheduled before AIP-39 implementation. Try to
  943. # infer from the logical date.
  944. return self.infer_automated_data_interval(dag_model.next_dagrun)
  945. def get_run_data_interval(self, run: DagRun | DagRunPydantic) -> DataInterval:
  946. """
  947. Get the data interval of this run.
  948. For compatibility, this method infers the data interval from the DAG's
  949. schedule if the run does not have an explicit one set, which is possible for
  950. runs created prior to AIP-39.
  951. This function is private to Airflow core and should not be depended on as a
  952. part of the Python API.
  953. :meta private:
  954. """
  955. if run.dag_id is not None and run.dag_id != self.dag_id:
  956. raise ValueError(f"Arguments refer to different DAGs: {self.dag_id} != {run.dag_id}")
  957. data_interval = _get_model_data_interval(run, "data_interval_start", "data_interval_end")
  958. if data_interval is not None:
  959. return data_interval
  960. # Compatibility: runs created before AIP-39 implementation don't have an
  961. # explicit data interval. Try to infer from the logical date.
  962. return self.infer_automated_data_interval(run.execution_date)
  963. def infer_automated_data_interval(self, logical_date: datetime) -> DataInterval:
  964. """
  965. Infer a data interval for a run against this DAG.
  966. This method is used to bridge runs created prior to AIP-39
  967. implementation, which do not have an explicit data interval. Therefore,
  968. this method only considers ``schedule_interval`` values valid prior to
  969. Airflow 2.2.
  970. DO NOT call this method if there is a known data interval.
  971. :meta private:
  972. """
  973. timetable_type = type(self.timetable)
  974. if issubclass(timetable_type, (NullTimetable, OnceTimetable, DatasetTriggeredTimetable)):
  975. return DataInterval.exact(timezone.coerce_datetime(logical_date))
  976. start = timezone.coerce_datetime(logical_date)
  977. if issubclass(timetable_type, CronDataIntervalTimetable):
  978. end = cast(CronDataIntervalTimetable, self.timetable)._get_next(start)
  979. elif issubclass(timetable_type, DeltaDataIntervalTimetable):
  980. end = cast(DeltaDataIntervalTimetable, self.timetable)._get_next(start)
  981. # Contributors: When the exception below is raised, you might want to
  982. # add an 'elif' block here to handle custom timetables. Stop! The bug
  983. # you're looking for is instead at when the DAG run (represented by
  984. # logical_date) was created. See GH-31969 for an example:
  985. # * Wrong fix: GH-32074 (modifies this function).
  986. # * Correct fix: GH-32118 (modifies the DAG run creation code).
  987. else:
  988. raise ValueError(f"Not a valid timetable: {self.timetable!r}")
  989. return DataInterval(start, end)
  990. def next_dagrun_info(
  991. self,
  992. last_automated_dagrun: None | datetime | DataInterval,
  993. *,
  994. restricted: bool = True,
  995. ) -> DagRunInfo | None:
  996. """
  997. Get information about the next DagRun of this dag after ``date_last_automated_dagrun``.
  998. This calculates what time interval the next DagRun should operate on
  999. (its execution date) and when it can be scheduled, according to the
  1000. dag's timetable, start_date, end_date, etc. This doesn't check max
  1001. active run or any other "max_active_tasks" type limits, but only
  1002. performs calculations based on the various date and interval fields of
  1003. this dag and its tasks.
  1004. :param last_automated_dagrun: The ``max(execution_date)`` of
  1005. existing "automated" DagRuns for this dag (scheduled or backfill,
  1006. but not manual).
  1007. :param restricted: If set to *False* (default is *True*), ignore
  1008. ``start_date``, ``end_date``, and ``catchup`` specified on the DAG
  1009. or tasks.
  1010. :return: DagRunInfo of the next dagrun, or None if a dagrun is not
  1011. going to be scheduled.
  1012. """
  1013. # Never schedule a subdag. It will be scheduled by its parent dag.
  1014. if self.is_subdag:
  1015. return None
  1016. data_interval = None
  1017. if isinstance(last_automated_dagrun, datetime):
  1018. warnings.warn(
  1019. "Passing a datetime to DAG.next_dagrun_info is deprecated. Use a DataInterval instead.",
  1020. RemovedInAirflow3Warning,
  1021. stacklevel=2,
  1022. )
  1023. data_interval = self.infer_automated_data_interval(
  1024. timezone.coerce_datetime(last_automated_dagrun)
  1025. )
  1026. else:
  1027. data_interval = last_automated_dagrun
  1028. if restricted:
  1029. restriction = self._time_restriction
  1030. else:
  1031. restriction = TimeRestriction(earliest=None, latest=None, catchup=True)
  1032. try:
  1033. info = self.timetable.next_dagrun_info(
  1034. last_automated_data_interval=data_interval,
  1035. restriction=restriction,
  1036. )
  1037. except Exception:
  1038. self.log.exception(
  1039. "Failed to fetch run info after data interval %s for DAG %r",
  1040. data_interval,
  1041. self.dag_id,
  1042. )
  1043. info = None
  1044. return info
  1045. def next_dagrun_after_date(self, date_last_automated_dagrun: pendulum.DateTime | None):
  1046. warnings.warn(
  1047. "`DAG.next_dagrun_after_date()` is deprecated. Please use `DAG.next_dagrun_info()` instead.",
  1048. category=RemovedInAirflow3Warning,
  1049. stacklevel=2,
  1050. )
  1051. if date_last_automated_dagrun is None:
  1052. data_interval = None
  1053. else:
  1054. data_interval = self.infer_automated_data_interval(date_last_automated_dagrun)
  1055. info = self.next_dagrun_info(data_interval)
  1056. if info is None:
  1057. return None
  1058. return info.run_after
  1059. @functools.cached_property
  1060. def _time_restriction(self) -> TimeRestriction:
  1061. start_dates = [t.start_date for t in self.tasks if t.start_date]
  1062. if self.start_date is not None:
  1063. start_dates.append(self.start_date)
  1064. earliest = None
  1065. if start_dates:
  1066. earliest = timezone.coerce_datetime(min(start_dates))
  1067. latest = self.end_date
  1068. end_dates = [t.end_date for t in self.tasks if t.end_date]
  1069. if len(end_dates) == len(self.tasks): # not exists null end_date
  1070. if self.end_date is not None:
  1071. end_dates.append(self.end_date)
  1072. if end_dates:
  1073. latest = timezone.coerce_datetime(max(end_dates))
  1074. return TimeRestriction(earliest, latest, self.catchup)
  1075. def iter_dagrun_infos_between(
  1076. self,
  1077. earliest: pendulum.DateTime | None,
  1078. latest: pendulum.DateTime,
  1079. *,
  1080. align: bool = True,
  1081. ) -> Iterable[DagRunInfo]:
  1082. """
  1083. Yield DagRunInfo using this DAG's timetable between given interval.
  1084. DagRunInfo instances yielded if their ``logical_date`` is not earlier
  1085. than ``earliest``, nor later than ``latest``. The instances are ordered
  1086. by their ``logical_date`` from earliest to latest.
  1087. If ``align`` is ``False``, the first run will happen immediately on
  1088. ``earliest``, even if it does not fall on the logical timetable schedule.
  1089. The default is ``True``, but subdags will ignore this value and always
  1090. behave as if this is set to ``False`` for backward compatibility.
  1091. Example: A DAG is scheduled to run every midnight (``0 0 * * *``). If
  1092. ``earliest`` is ``2021-06-03 23:00:00``, the first DagRunInfo would be
  1093. ``2021-06-03 23:00:00`` if ``align=False``, and ``2021-06-04 00:00:00``
  1094. if ``align=True``.
  1095. """
  1096. if earliest is None:
  1097. earliest = self._time_restriction.earliest
  1098. if earliest is None:
  1099. raise ValueError("earliest was None and we had no value in time_restriction to fallback on")
  1100. earliest = timezone.coerce_datetime(earliest)
  1101. latest = timezone.coerce_datetime(latest)
  1102. restriction = TimeRestriction(earliest, latest, catchup=True)
  1103. # HACK: Sub-DAGs are currently scheduled differently. For example, say
  1104. # the schedule is @daily and start is 2021-06-03 22:16:00, a top-level
  1105. # DAG should be first scheduled to run on midnight 2021-06-04, but a
  1106. # sub-DAG should be first scheduled to run RIGHT NOW. We can change
  1107. # this, but since sub-DAGs are going away in 3.0 anyway, let's keep
  1108. # compatibility for now and remove this entirely later.
  1109. if self.is_subdag:
  1110. align = False
  1111. try:
  1112. info = self.timetable.next_dagrun_info(
  1113. last_automated_data_interval=None,
  1114. restriction=restriction,
  1115. )
  1116. except Exception:
  1117. self.log.exception(
  1118. "Failed to fetch run info after data interval %s for DAG %r",
  1119. None,
  1120. self.dag_id,
  1121. )
  1122. info = None
  1123. if info is None:
  1124. # No runs to be scheduled between the user-supplied timeframe. But
  1125. # if align=False, "invent" a data interval for the timeframe itself.
  1126. if not align:
  1127. yield DagRunInfo.interval(earliest, latest)
  1128. return
  1129. # If align=False and earliest does not fall on the timetable's logical
  1130. # schedule, "invent" a data interval for it.
  1131. if not align and info.logical_date != earliest:
  1132. yield DagRunInfo.interval(earliest, info.data_interval.start)
  1133. # Generate naturally according to schedule.
  1134. while info is not None:
  1135. yield info
  1136. try:
  1137. info = self.timetable.next_dagrun_info(
  1138. last_automated_data_interval=info.data_interval,
  1139. restriction=restriction,
  1140. )
  1141. except Exception:
  1142. self.log.exception(
  1143. "Failed to fetch run info after data interval %s for DAG %r",
  1144. info.data_interval if info else "<NONE>",
  1145. self.dag_id,
  1146. )
  1147. break
  1148. def get_run_dates(self, start_date, end_date=None) -> list:
  1149. """
  1150. Return a list of dates between the interval received as parameter using this dag's schedule interval.
  1151. Returned dates can be used for execution dates.
  1152. :param start_date: The start date of the interval.
  1153. :param end_date: The end date of the interval. Defaults to ``timezone.utcnow()``.
  1154. :return: A list of dates within the interval following the dag's schedule.
  1155. """
  1156. warnings.warn(
  1157. "`DAG.get_run_dates()` is deprecated. Please use `DAG.iter_dagrun_infos_between()` instead.",
  1158. category=RemovedInAirflow3Warning,
  1159. stacklevel=2,
  1160. )
  1161. earliest = timezone.coerce_datetime(start_date)
  1162. if end_date is None:
  1163. latest = pendulum.now(timezone.utc)
  1164. else:
  1165. latest = timezone.coerce_datetime(end_date)
  1166. return [info.logical_date for info in self.iter_dagrun_infos_between(earliest, latest)]
  1167. def normalize_schedule(self, dttm):
  1168. warnings.warn(
  1169. "`DAG.normalize_schedule()` is deprecated.",
  1170. category=RemovedInAirflow3Warning,
  1171. stacklevel=2,
  1172. )
  1173. with warnings.catch_warnings():
  1174. warnings.simplefilter("ignore", RemovedInAirflow3Warning)
  1175. following = self.following_schedule(dttm)
  1176. if not following: # in case of @once
  1177. return dttm
  1178. with warnings.catch_warnings():
  1179. warnings.simplefilter("ignore", RemovedInAirflow3Warning)
  1180. previous_of_following = self.previous_schedule(following)
  1181. if previous_of_following != dttm:
  1182. return following
  1183. return dttm
  1184. @provide_session
  1185. def get_last_dagrun(self, session=NEW_SESSION, include_externally_triggered=False):
  1186. return get_last_dagrun(
  1187. self.dag_id, session=session, include_externally_triggered=include_externally_triggered
  1188. )
  1189. @provide_session
  1190. def has_dag_runs(self, session=NEW_SESSION, include_externally_triggered=True) -> bool:
  1191. return (
  1192. get_last_dagrun(
  1193. self.dag_id, session=session, include_externally_triggered=include_externally_triggered
  1194. )
  1195. is not None
  1196. )
  1197. @property
  1198. def dag_id(self) -> str:
  1199. return self._dag_id
  1200. @dag_id.setter
  1201. def dag_id(self, value: str) -> None:
  1202. self._dag_id = value
  1203. @property
  1204. def is_subdag(self) -> bool:
  1205. return self.parent_dag is not None
  1206. @property
  1207. def full_filepath(self) -> str:
  1208. """
  1209. Full file path to the DAG.
  1210. :meta private:
  1211. """
  1212. warnings.warn(
  1213. "DAG.full_filepath is deprecated in favour of fileloc",
  1214. RemovedInAirflow3Warning,
  1215. stacklevel=2,
  1216. )
  1217. return self.fileloc
  1218. @full_filepath.setter
  1219. def full_filepath(self, value) -> None:
  1220. warnings.warn(
  1221. "DAG.full_filepath is deprecated in favour of fileloc",
  1222. RemovedInAirflow3Warning,
  1223. stacklevel=2,
  1224. )
  1225. self.fileloc = value
  1226. @property
  1227. def concurrency(self) -> int:
  1228. # TODO: Remove in Airflow 3.0
  1229. warnings.warn(
  1230. "The 'DAG.concurrency' attribute is deprecated. Please use 'DAG.max_active_tasks'.",
  1231. RemovedInAirflow3Warning,
  1232. stacklevel=2,
  1233. )
  1234. return self._max_active_tasks
  1235. @concurrency.setter
  1236. def concurrency(self, value: int):
  1237. self._max_active_tasks = value
  1238. @property
  1239. def max_active_tasks(self) -> int:
  1240. return self._max_active_tasks
  1241. @max_active_tasks.setter
  1242. def max_active_tasks(self, value: int):
  1243. self._max_active_tasks = value
  1244. @property
  1245. def access_control(self):
  1246. return self._access_control
  1247. @access_control.setter
  1248. def access_control(self, value):
  1249. self._access_control = DAG._upgrade_outdated_dag_access_control(value)
  1250. @property
  1251. def dag_display_name(self) -> str:
  1252. return self._dag_display_property_value or self._dag_id
  1253. @property
  1254. def description(self) -> str | None:
  1255. return self._description
  1256. @property
  1257. def default_view(self) -> str:
  1258. return self._default_view
  1259. @property
  1260. def pickle_id(self) -> int | None:
  1261. return self._pickle_id
  1262. @pickle_id.setter
  1263. def pickle_id(self, value: int) -> None:
  1264. self._pickle_id = value
  1265. def param(self, name: str, default: Any = NOTSET) -> DagParam:
  1266. """
  1267. Return a DagParam object for current dag.
  1268. :param name: dag parameter name.
  1269. :param default: fallback value for dag parameter.
  1270. :return: DagParam instance for specified name and current dag.
  1271. """
  1272. return DagParam(current_dag=self, name=name, default=default)
  1273. @property
  1274. def tasks(self) -> list[Operator]:
  1275. return list(self.task_dict.values())
  1276. @tasks.setter
  1277. def tasks(self, val):
  1278. raise AttributeError("DAG.tasks can not be modified. Use dag.add_task() instead.")
  1279. @property
  1280. def task_ids(self) -> list[str]:
  1281. return list(self.task_dict)
  1282. @property
  1283. def teardowns(self) -> list[Operator]:
  1284. return [task for task in self.tasks if getattr(task, "is_teardown", None)]
  1285. @property
  1286. def tasks_upstream_of_teardowns(self) -> list[Operator]:
  1287. upstream_tasks = [t.upstream_list for t in self.teardowns]
  1288. return [val for sublist in upstream_tasks for val in sublist if not getattr(val, "is_teardown", None)]
  1289. @property
  1290. def task_group(self) -> TaskGroup:
  1291. return self._task_group
  1292. @property
  1293. def filepath(self) -> str:
  1294. """
  1295. Relative file path to the DAG.
  1296. :meta private:
  1297. """
  1298. warnings.warn(
  1299. "filepath is deprecated, use relative_fileloc instead",
  1300. RemovedInAirflow3Warning,
  1301. stacklevel=2,
  1302. )
  1303. return str(self.relative_fileloc)
  1304. @property
  1305. def relative_fileloc(self) -> pathlib.Path:
  1306. """File location of the importable dag 'file' relative to the configured DAGs folder."""
  1307. path = pathlib.Path(self.fileloc)
  1308. try:
  1309. rel_path = path.relative_to(self._processor_dags_folder or settings.DAGS_FOLDER)
  1310. if rel_path == pathlib.Path("."):
  1311. return path
  1312. else:
  1313. return rel_path
  1314. except ValueError:
  1315. # Not relative to DAGS_FOLDER.
  1316. return path
  1317. @property
  1318. def folder(self) -> str:
  1319. """Folder location of where the DAG object is instantiated."""
  1320. return os.path.dirname(self.fileloc)
  1321. @property
  1322. def owner(self) -> str:
  1323. """
  1324. Return list of all owners found in DAG tasks.
  1325. :return: Comma separated list of owners in DAG tasks
  1326. """
  1327. return ", ".join({t.owner for t in self.tasks})
  1328. @property
  1329. def allow_future_exec_dates(self) -> bool:
  1330. return settings.ALLOW_FUTURE_EXEC_DATES and not self.timetable.can_be_scheduled
  1331. @provide_session
  1332. def get_concurrency_reached(self, session=NEW_SESSION) -> bool:
  1333. """Return a boolean indicating whether the max_active_tasks limit for this DAG has been reached."""
  1334. TI = TaskInstance
  1335. total_tasks = session.scalar(
  1336. select(func.count(TI.task_id)).where(
  1337. TI.dag_id == self.dag_id,
  1338. TI.state == TaskInstanceState.RUNNING,
  1339. )
  1340. )
  1341. return total_tasks >= self.max_active_tasks
  1342. @property
  1343. def concurrency_reached(self):
  1344. """Use `airflow.models.DAG.get_concurrency_reached`, this attribute is deprecated."""
  1345. warnings.warn(
  1346. "This attribute is deprecated. Please use `airflow.models.DAG.get_concurrency_reached` method.",
  1347. RemovedInAirflow3Warning,
  1348. stacklevel=2,
  1349. )
  1350. return self.get_concurrency_reached()
  1351. @provide_session
  1352. def get_is_active(self, session=NEW_SESSION) -> None:
  1353. """Return a boolean indicating whether this DAG is active."""
  1354. return session.scalar(select(DagModel.is_active).where(DagModel.dag_id == self.dag_id))
  1355. @provide_session
  1356. def get_is_paused(self, session=NEW_SESSION) -> None:
  1357. """Return a boolean indicating whether this DAG is paused."""
  1358. return session.scalar(select(DagModel.is_paused).where(DagModel.dag_id == self.dag_id))
  1359. @property
  1360. def is_paused(self):
  1361. """Use `airflow.models.DAG.get_is_paused`, this attribute is deprecated."""
  1362. warnings.warn(
  1363. "This attribute is deprecated. Please use `airflow.models.DAG.get_is_paused` method.",
  1364. RemovedInAirflow3Warning,
  1365. stacklevel=2,
  1366. )
  1367. return self.get_is_paused()
  1368. @property
  1369. def normalized_schedule_interval(self) -> ScheduleInterval:
  1370. warnings.warn(
  1371. "DAG.normalized_schedule_interval() is deprecated.",
  1372. category=RemovedInAirflow3Warning,
  1373. stacklevel=2,
  1374. )
  1375. if isinstance(self.schedule_interval, str) and self.schedule_interval in cron_presets:
  1376. _schedule_interval: ScheduleInterval = cron_presets.get(self.schedule_interval)
  1377. elif self.schedule_interval == "@once":
  1378. _schedule_interval = None
  1379. else:
  1380. _schedule_interval = self.schedule_interval
  1381. return _schedule_interval
  1382. @staticmethod
  1383. @internal_api_call
  1384. @provide_session
  1385. def fetch_callback(
  1386. dag: DAG,
  1387. dag_run_id: str,
  1388. success: bool = True,
  1389. reason: str | None = None,
  1390. *,
  1391. session: Session = NEW_SESSION,
  1392. ) -> tuple[list[TaskStateChangeCallback], Context] | None:
  1393. """
  1394. Fetch the appropriate callbacks depending on the value of success.
  1395. This method gets the context of a single TaskInstance part of this DagRun and returns it along
  1396. the list of callbacks.
  1397. :param dag: DAG object
  1398. :param dag_run_id: The DAG run ID
  1399. :param success: Flag to specify if failure or success callback should be called
  1400. :param reason: Completion reason
  1401. :param session: Database session
  1402. """
  1403. callbacks = dag.on_success_callback if success else dag.on_failure_callback
  1404. if callbacks:
  1405. dagrun = DAG.fetch_dagrun(dag_id=dag.dag_id, run_id=dag_run_id, session=session)
  1406. callbacks = callbacks if isinstance(callbacks, list) else [callbacks]
  1407. tis = dagrun.get_task_instances(session=session)
  1408. # tis from a dagrun may not be a part of dag.partial_subset,
  1409. # since dag.partial_subset is a subset of the dag.
  1410. # This ensures that we will only use the accessible TI
  1411. # context for the callback.
  1412. if dag.partial:
  1413. tis = [ti for ti in tis if not ti.state == State.NONE]
  1414. # filter out removed tasks
  1415. tis = [ti for ti in tis if ti.state != TaskInstanceState.REMOVED]
  1416. ti = tis[-1] # get first TaskInstance of DagRun
  1417. ti.task = dag.get_task(ti.task_id)
  1418. context = ti.get_template_context(session=session)
  1419. context["reason"] = reason
  1420. return callbacks, context
  1421. return None
  1422. @provide_session
  1423. def handle_callback(self, dagrun: DagRun, success=True, reason=None, session=NEW_SESSION):
  1424. """
  1425. Triggers on_failure_callback or on_success_callback as appropriate.
  1426. This method gets the context of a single TaskInstance part of this DagRun
  1427. and passes that to the callable along with a 'reason', primarily to
  1428. differentiate DagRun failures.
  1429. .. note: The logs end up in
  1430. ``$AIRFLOW_HOME/logs/scheduler/latest/PROJECT/DAG_FILE.py.log``
  1431. :param dagrun: DagRun object
  1432. :param success: Flag to specify if failure or success callback should be called
  1433. :param reason: Completion reason
  1434. :param session: Database session
  1435. """
  1436. callbacks, context = DAG.fetch_callback(
  1437. dag=self, dag_run_id=dagrun.run_id, success=success, reason=reason, session=session
  1438. ) or (None, None)
  1439. DAG.execute_callback(callbacks, context, self.dag_id)
  1440. @classmethod
  1441. def execute_callback(cls, callbacks: list[Callable] | None, context: Context | None, dag_id: str):
  1442. """
  1443. Triggers the callbacks with the given context.
  1444. :param callbacks: List of callbacks to call
  1445. :param context: Context to pass to all callbacks
  1446. :param dag_id: The dag_id of the DAG to find.
  1447. """
  1448. if callbacks and context:
  1449. for callback in callbacks:
  1450. cls.logger().info("Executing dag callback function: %s", callback)
  1451. try:
  1452. callback(context)
  1453. except Exception:
  1454. cls.logger().exception("failed to invoke dag state update callback")
  1455. Stats.incr("dag.callback_exceptions", tags={"dag_id": dag_id})
  1456. def get_active_runs(self):
  1457. """
  1458. Return a list of dag run execution dates currently running.
  1459. :return: List of execution dates
  1460. """
  1461. runs = DagRun.find(dag_id=self.dag_id, state=DagRunState.RUNNING)
  1462. active_dates = []
  1463. for run in runs:
  1464. active_dates.append(run.execution_date)
  1465. return active_dates
  1466. @provide_session
  1467. def get_num_active_runs(self, external_trigger=None, only_running=True, session=NEW_SESSION):
  1468. """
  1469. Return the number of active "running" dag runs.
  1470. :param external_trigger: True for externally triggered active dag runs
  1471. :param session:
  1472. :return: number greater than 0 for active dag runs
  1473. """
  1474. query = select(func.count()).where(DagRun.dag_id == self.dag_id)
  1475. if only_running:
  1476. query = query.where(DagRun.state == DagRunState.RUNNING)
  1477. else:
  1478. query = query.where(DagRun.state.in_({DagRunState.RUNNING, DagRunState.QUEUED}))
  1479. if external_trigger is not None:
  1480. query = query.where(
  1481. DagRun.external_trigger == (expression.true() if external_trigger else expression.false())
  1482. )
  1483. return session.scalar(query)
  1484. @staticmethod
  1485. @internal_api_call
  1486. @provide_session
  1487. def fetch_dagrun(
  1488. dag_id: str,
  1489. execution_date: datetime | None = None,
  1490. run_id: str | None = None,
  1491. session: Session = NEW_SESSION,
  1492. ) -> DagRun | DagRunPydantic:
  1493. """
  1494. Return the dag run for a given execution date or run_id if it exists, otherwise none.
  1495. :param dag_id: The dag_id of the DAG to find.
  1496. :param execution_date: The execution date of the DagRun to find.
  1497. :param run_id: The run_id of the DagRun to find.
  1498. :param session:
  1499. :return: The DagRun if found, otherwise None.
  1500. """
  1501. if not (execution_date or run_id):
  1502. raise TypeError("You must provide either the execution_date or the run_id")
  1503. query = select(DagRun)
  1504. if execution_date:
  1505. query = query.where(DagRun.dag_id == dag_id, DagRun.execution_date == execution_date)
  1506. if run_id:
  1507. query = query.where(DagRun.dag_id == dag_id, DagRun.run_id == run_id)
  1508. return session.scalar(query)
  1509. @provide_session
  1510. def get_dagrun(
  1511. self,
  1512. execution_date: datetime | None = None,
  1513. run_id: str | None = None,
  1514. session: Session = NEW_SESSION,
  1515. ) -> DagRun | DagRunPydantic:
  1516. return DAG.fetch_dagrun(
  1517. dag_id=self.dag_id, execution_date=execution_date, run_id=run_id, session=session
  1518. )
  1519. @provide_session
  1520. def get_dagruns_between(self, start_date, end_date, session=NEW_SESSION):
  1521. """
  1522. Return the list of dag runs between start_date (inclusive) and end_date (inclusive).
  1523. :param start_date: The starting execution date of the DagRun to find.
  1524. :param end_date: The ending execution date of the DagRun to find.
  1525. :param session:
  1526. :return: The list of DagRuns found.
  1527. """
  1528. dagruns = session.scalars(
  1529. select(DagRun).where(
  1530. DagRun.dag_id == self.dag_id,
  1531. DagRun.execution_date >= start_date,
  1532. DagRun.execution_date <= end_date,
  1533. )
  1534. ).all()
  1535. return dagruns
  1536. @provide_session
  1537. def get_latest_execution_date(self, session: Session = NEW_SESSION) -> pendulum.DateTime | None:
  1538. """Return the latest date for which at least one dag run exists."""
  1539. return session.scalar(select(func.max(DagRun.execution_date)).where(DagRun.dag_id == self.dag_id))
  1540. @property
  1541. def latest_execution_date(self):
  1542. """Use `airflow.models.DAG.get_latest_execution_date`, this attribute is deprecated."""
  1543. warnings.warn(
  1544. "This attribute is deprecated. Please use `airflow.models.DAG.get_latest_execution_date`.",
  1545. RemovedInAirflow3Warning,
  1546. stacklevel=2,
  1547. )
  1548. return self.get_latest_execution_date()
  1549. @property
  1550. def subdags(self):
  1551. """Return a list of the subdag objects associated to this DAG."""
  1552. # Check SubDag for class but don't check class directly
  1553. from airflow.operators.subdag import SubDagOperator
  1554. subdag_lst = []
  1555. for task in self.tasks:
  1556. if (
  1557. isinstance(task, SubDagOperator)
  1558. or
  1559. # TODO remove in Airflow 2.0
  1560. type(task).__name__ == "SubDagOperator"
  1561. or task.task_type == "SubDagOperator"
  1562. ):
  1563. subdag_lst.append(task.subdag)
  1564. subdag_lst += task.subdag.subdags
  1565. return subdag_lst
  1566. def resolve_template_files(self):
  1567. for t in self.tasks:
  1568. t.resolve_template_files()
  1569. def get_template_env(self, *, force_sandboxed: bool = False) -> jinja2.Environment:
  1570. """Build a Jinja2 environment."""
  1571. # Collect directories to search for template files
  1572. searchpath = [self.folder]
  1573. if self.template_searchpath:
  1574. searchpath += self.template_searchpath
  1575. # Default values (for backward compatibility)
  1576. jinja_env_options = {
  1577. "loader": jinja2.FileSystemLoader(searchpath),
  1578. "undefined": self.template_undefined,
  1579. "extensions": ["jinja2.ext.do"],
  1580. "cache_size": 0,
  1581. }
  1582. if self.jinja_environment_kwargs:
  1583. jinja_env_options.update(self.jinja_environment_kwargs)
  1584. env: jinja2.Environment
  1585. if self.render_template_as_native_obj and not force_sandboxed:
  1586. env = airflow.templates.NativeEnvironment(**jinja_env_options)
  1587. else:
  1588. env = airflow.templates.SandboxedEnvironment(**jinja_env_options)
  1589. # Add any user defined items. Safe to edit globals as long as no templates are rendered yet.
  1590. # http://jinja.pocoo.org/docs/2.10/api/#jinja2.Environment.globals
  1591. if self.user_defined_macros:
  1592. env.globals.update(self.user_defined_macros)
  1593. if self.user_defined_filters:
  1594. env.filters.update(self.user_defined_filters)
  1595. return env
  1596. def set_dependency(self, upstream_task_id, downstream_task_id):
  1597. """Set dependency between two tasks that already have been added to the DAG using add_task()."""
  1598. self.get_task(upstream_task_id).set_downstream(self.get_task(downstream_task_id))
  1599. @provide_session
  1600. def get_task_instances_before(
  1601. self,
  1602. base_date: datetime,
  1603. num: int,
  1604. *,
  1605. session: Session = NEW_SESSION,
  1606. ) -> list[TaskInstance]:
  1607. """
  1608. Get ``num`` task instances before (including) ``base_date``.
  1609. The returned list may contain exactly ``num`` task instances
  1610. corresponding to any DagRunType. It can have less if there are
  1611. less than ``num`` scheduled DAG runs before ``base_date``.
  1612. """
  1613. execution_dates: list[Any] = session.execute(
  1614. select(DagRun.execution_date)
  1615. .where(
  1616. DagRun.dag_id == self.dag_id,
  1617. DagRun.execution_date <= base_date,
  1618. )
  1619. .order_by(DagRun.execution_date.desc())
  1620. .limit(num)
  1621. ).all()
  1622. if not execution_dates:
  1623. return self.get_task_instances(start_date=base_date, end_date=base_date, session=session)
  1624. min_date: datetime | None = execution_dates[-1]._mapping.get(
  1625. "execution_date"
  1626. ) # getting the last value from the list
  1627. return self.get_task_instances(start_date=min_date, end_date=base_date, session=session)
  1628. @provide_session
  1629. def get_task_instances(
  1630. self,
  1631. start_date: datetime | None = None,
  1632. end_date: datetime | None = None,
  1633. state: list[TaskInstanceState] | None = None,
  1634. session: Session = NEW_SESSION,
  1635. ) -> list[TaskInstance]:
  1636. if not start_date:
  1637. start_date = (timezone.utcnow() - timedelta(30)).replace(
  1638. hour=0, minute=0, second=0, microsecond=0
  1639. )
  1640. query = self._get_task_instances(
  1641. task_ids=None,
  1642. start_date=start_date,
  1643. end_date=end_date,
  1644. run_id=None,
  1645. state=state or (),
  1646. include_subdags=False,
  1647. include_parentdag=False,
  1648. include_dependent_dags=False,
  1649. exclude_task_ids=(),
  1650. session=session,
  1651. )
  1652. return session.scalars(cast(Select, query).order_by(DagRun.execution_date)).all()
  1653. @overload
  1654. def _get_task_instances(
  1655. self,
  1656. *,
  1657. task_ids: Collection[str | tuple[str, int]] | None,
  1658. start_date: datetime | None,
  1659. end_date: datetime | None,
  1660. run_id: str | None,
  1661. state: TaskInstanceState | Sequence[TaskInstanceState],
  1662. include_subdags: bool,
  1663. include_parentdag: bool,
  1664. include_dependent_dags: bool,
  1665. exclude_task_ids: Collection[str | tuple[str, int]] | None,
  1666. session: Session,
  1667. dag_bag: DagBag | None = ...,
  1668. ) -> Iterable[TaskInstance]: ... # pragma: no cover
  1669. @overload
  1670. def _get_task_instances(
  1671. self,
  1672. *,
  1673. task_ids: Collection[str | tuple[str, int]] | None,
  1674. as_pk_tuple: Literal[True],
  1675. start_date: datetime | None,
  1676. end_date: datetime | None,
  1677. run_id: str | None,
  1678. state: TaskInstanceState | Sequence[TaskInstanceState],
  1679. include_subdags: bool,
  1680. include_parentdag: bool,
  1681. include_dependent_dags: bool,
  1682. exclude_task_ids: Collection[str | tuple[str, int]] | None,
  1683. session: Session,
  1684. dag_bag: DagBag | None = ...,
  1685. recursion_depth: int = ...,
  1686. max_recursion_depth: int = ...,
  1687. visited_external_tis: set[TaskInstanceKey] = ...,
  1688. ) -> set[TaskInstanceKey]: ... # pragma: no cover
  1689. def _get_task_instances(
  1690. self,
  1691. *,
  1692. task_ids: Collection[str | tuple[str, int]] | None,
  1693. as_pk_tuple: Literal[True, None] = None,
  1694. start_date: datetime | None,
  1695. end_date: datetime | None,
  1696. run_id: str | None,
  1697. state: TaskInstanceState | Sequence[TaskInstanceState],
  1698. include_subdags: bool,
  1699. include_parentdag: bool,
  1700. include_dependent_dags: bool,
  1701. exclude_task_ids: Collection[str | tuple[str, int]] | None,
  1702. session: Session,
  1703. dag_bag: DagBag | None = None,
  1704. recursion_depth: int = 0,
  1705. max_recursion_depth: int | None = None,
  1706. visited_external_tis: set[TaskInstanceKey] | None = None,
  1707. ) -> Iterable[TaskInstance] | set[TaskInstanceKey]:
  1708. TI = TaskInstance
  1709. # If we are looking at subdags/dependent dags we want to avoid UNION calls
  1710. # in SQL (it doesn't play nice with fields that have no equality operator,
  1711. # like JSON types), we instead build our result set separately.
  1712. #
  1713. # This will be empty if we are only looking at one dag, in which case
  1714. # we can return the filtered TI query object directly.
  1715. result: set[TaskInstanceKey] = set()
  1716. # Do we want full objects, or just the primary columns?
  1717. if as_pk_tuple:
  1718. tis = select(TI.dag_id, TI.task_id, TI.run_id, TI.map_index)
  1719. else:
  1720. tis = select(TaskInstance)
  1721. tis = tis.join(TaskInstance.dag_run)
  1722. if include_subdags:
  1723. # Crafting the right filter for dag_id and task_ids combo
  1724. conditions = []
  1725. for dag in [*self.subdags, self]:
  1726. conditions.append(
  1727. (TaskInstance.dag_id == dag.dag_id) & TaskInstance.task_id.in_(dag.task_ids)
  1728. )
  1729. tis = tis.where(or_(*conditions))
  1730. elif self.partial:
  1731. tis = tis.where(TaskInstance.dag_id == self.dag_id, TaskInstance.task_id.in_(self.task_ids))
  1732. else:
  1733. tis = tis.where(TaskInstance.dag_id == self.dag_id)
  1734. if run_id:
  1735. tis = tis.where(TaskInstance.run_id == run_id)
  1736. if start_date:
  1737. tis = tis.where(DagRun.execution_date >= start_date)
  1738. if task_ids is not None:
  1739. tis = tis.where(TaskInstance.ti_selector_condition(task_ids))
  1740. # This allows allow_trigger_in_future config to take affect, rather than mandating exec_date <= UTC
  1741. if end_date or not self.allow_future_exec_dates:
  1742. end_date = end_date or timezone.utcnow()
  1743. tis = tis.where(DagRun.execution_date <= end_date)
  1744. if state:
  1745. if isinstance(state, (str, TaskInstanceState)):
  1746. tis = tis.where(TaskInstance.state == state)
  1747. elif len(state) == 1:
  1748. tis = tis.where(TaskInstance.state == state[0])
  1749. else:
  1750. # this is required to deal with NULL values
  1751. if None in state:
  1752. if all(x is None for x in state):
  1753. tis = tis.where(TaskInstance.state.is_(None))
  1754. else:
  1755. not_none_state = [s for s in state if s]
  1756. tis = tis.where(
  1757. or_(TaskInstance.state.in_(not_none_state), TaskInstance.state.is_(None))
  1758. )
  1759. else:
  1760. tis = tis.where(TaskInstance.state.in_(state))
  1761. # Next, get any of them from our parent DAG (if there is one)
  1762. if include_parentdag and self.parent_dag is not None:
  1763. if visited_external_tis is None:
  1764. visited_external_tis = set()
  1765. p_dag = self.parent_dag.partial_subset(
  1766. task_ids_or_regex=r"^{}$".format(self.dag_id.split(".")[1]),
  1767. include_upstream=False,
  1768. include_downstream=True,
  1769. )
  1770. result.update(
  1771. p_dag._get_task_instances(
  1772. task_ids=task_ids,
  1773. start_date=start_date,
  1774. end_date=end_date,
  1775. run_id=None,
  1776. state=state,
  1777. include_subdags=include_subdags,
  1778. include_parentdag=False,
  1779. include_dependent_dags=include_dependent_dags,
  1780. as_pk_tuple=True,
  1781. exclude_task_ids=exclude_task_ids,
  1782. session=session,
  1783. dag_bag=dag_bag,
  1784. recursion_depth=recursion_depth,
  1785. max_recursion_depth=max_recursion_depth,
  1786. visited_external_tis=visited_external_tis,
  1787. )
  1788. )
  1789. if include_dependent_dags:
  1790. # Recursively find external tasks indicated by ExternalTaskMarker
  1791. from airflow.sensors.external_task import ExternalTaskMarker
  1792. query = tis
  1793. if as_pk_tuple:
  1794. all_tis = session.execute(query).all()
  1795. condition = TI.filter_for_tis(TaskInstanceKey(*cols) for cols in all_tis)
  1796. if condition is not None:
  1797. query = select(TI).where(condition)
  1798. if visited_external_tis is None:
  1799. visited_external_tis = set()
  1800. external_tasks = session.scalars(query.where(TI.operator == ExternalTaskMarker.__name__))
  1801. for ti in external_tasks:
  1802. ti_key = ti.key.primary
  1803. if ti_key in visited_external_tis:
  1804. continue
  1805. visited_external_tis.add(ti_key)
  1806. task: ExternalTaskMarker = cast(ExternalTaskMarker, copy.copy(self.get_task(ti.task_id)))
  1807. ti.task = task
  1808. if max_recursion_depth is None:
  1809. # Maximum recursion depth allowed is the recursion_depth of the first
  1810. # ExternalTaskMarker in the tasks to be visited.
  1811. max_recursion_depth = task.recursion_depth
  1812. if recursion_depth + 1 > max_recursion_depth:
  1813. # Prevent cycles or accidents.
  1814. raise AirflowException(
  1815. f"Maximum recursion depth {max_recursion_depth} reached for "
  1816. f"{ExternalTaskMarker.__name__} {ti.task_id}. "
  1817. f"Attempted to clear too many tasks or there may be a cyclic dependency."
  1818. )
  1819. ti.render_templates()
  1820. external_tis = session.scalars(
  1821. select(TI)
  1822. .join(TI.dag_run)
  1823. .where(
  1824. TI.dag_id == task.external_dag_id,
  1825. TI.task_id == task.external_task_id,
  1826. DagRun.execution_date == pendulum.parse(task.execution_date),
  1827. )
  1828. )
  1829. for tii in external_tis:
  1830. if not dag_bag:
  1831. from airflow.models.dagbag import DagBag
  1832. dag_bag = DagBag(read_dags_from_db=True)
  1833. external_dag = dag_bag.get_dag(tii.dag_id, session=session)
  1834. if not external_dag:
  1835. raise AirflowException(f"Could not find dag {tii.dag_id}")
  1836. downstream = external_dag.partial_subset(
  1837. task_ids_or_regex=[tii.task_id],
  1838. include_upstream=False,
  1839. include_downstream=True,
  1840. )
  1841. result.update(
  1842. downstream._get_task_instances(
  1843. task_ids=None,
  1844. run_id=tii.run_id,
  1845. start_date=None,
  1846. end_date=None,
  1847. state=state,
  1848. include_subdags=include_subdags,
  1849. include_dependent_dags=include_dependent_dags,
  1850. include_parentdag=False,
  1851. as_pk_tuple=True,
  1852. exclude_task_ids=exclude_task_ids,
  1853. dag_bag=dag_bag,
  1854. session=session,
  1855. recursion_depth=recursion_depth + 1,
  1856. max_recursion_depth=max_recursion_depth,
  1857. visited_external_tis=visited_external_tis,
  1858. )
  1859. )
  1860. if result or as_pk_tuple:
  1861. # Only execute the `ti` query if we have also collected some other results (i.e. subdags etc.)
  1862. if as_pk_tuple:
  1863. tis_query = session.execute(tis).all()
  1864. result.update(TaskInstanceKey(**cols._mapping) for cols in tis_query)
  1865. else:
  1866. result.update(ti.key for ti in session.scalars(tis))
  1867. if exclude_task_ids is not None:
  1868. result = {
  1869. task
  1870. for task in result
  1871. if task.task_id not in exclude_task_ids
  1872. and (task.task_id, task.map_index) not in exclude_task_ids
  1873. }
  1874. if as_pk_tuple:
  1875. return result
  1876. if result:
  1877. # We've been asked for objects, lets combine it all back in to a result set
  1878. ti_filters = TI.filter_for_tis(result)
  1879. if ti_filters is not None:
  1880. tis = select(TI).where(ti_filters)
  1881. elif exclude_task_ids is None:
  1882. pass # Disable filter if not set.
  1883. elif isinstance(next(iter(exclude_task_ids), None), str):
  1884. tis = tis.where(TI.task_id.notin_(exclude_task_ids))
  1885. else:
  1886. tis = tis.where(not_(tuple_in_condition((TI.task_id, TI.map_index), exclude_task_ids)))
  1887. return tis
  1888. @provide_session
  1889. def set_task_instance_state(
  1890. self,
  1891. *,
  1892. task_id: str,
  1893. map_indexes: Collection[int] | None = None,
  1894. execution_date: datetime | None = None,
  1895. run_id: str | None = None,
  1896. state: TaskInstanceState,
  1897. upstream: bool = False,
  1898. downstream: bool = False,
  1899. future: bool = False,
  1900. past: bool = False,
  1901. commit: bool = True,
  1902. session=NEW_SESSION,
  1903. ) -> list[TaskInstance]:
  1904. """
  1905. Set the state of a TaskInstance and clear downstream tasks in failed or upstream_failed state.
  1906. :param task_id: Task ID of the TaskInstance
  1907. :param map_indexes: Only set TaskInstance if its map_index matches.
  1908. If None (default), all mapped TaskInstances of the task are set.
  1909. :param execution_date: Execution date of the TaskInstance
  1910. :param run_id: The run_id of the TaskInstance
  1911. :param state: State to set the TaskInstance to
  1912. :param upstream: Include all upstream tasks of the given task_id
  1913. :param downstream: Include all downstream tasks of the given task_id
  1914. :param future: Include all future TaskInstances of the given task_id
  1915. :param commit: Commit changes
  1916. :param past: Include all past TaskInstances of the given task_id
  1917. """
  1918. from airflow.api.common.mark_tasks import set_state
  1919. if not exactly_one(execution_date, run_id):
  1920. raise ValueError("Exactly one of execution_date or run_id must be provided")
  1921. task = self.get_task(task_id)
  1922. task.dag = self
  1923. tasks_to_set_state: list[Operator | tuple[Operator, int]]
  1924. if map_indexes is None:
  1925. tasks_to_set_state = [task]
  1926. else:
  1927. tasks_to_set_state = [(task, map_index) for map_index in map_indexes]
  1928. altered = set_state(
  1929. tasks=tasks_to_set_state,
  1930. execution_date=execution_date,
  1931. run_id=run_id,
  1932. upstream=upstream,
  1933. downstream=downstream,
  1934. future=future,
  1935. past=past,
  1936. state=state,
  1937. commit=commit,
  1938. session=session,
  1939. )
  1940. if not commit:
  1941. return altered
  1942. # Clear downstream tasks that are in failed/upstream_failed state to resume them.
  1943. # Flush the session so that the tasks marked success are reflected in the db.
  1944. session.flush()
  1945. subdag = self.partial_subset(
  1946. task_ids_or_regex={task_id},
  1947. include_downstream=True,
  1948. include_upstream=False,
  1949. )
  1950. if execution_date is None:
  1951. dag_run = session.scalars(
  1952. select(DagRun).where(DagRun.run_id == run_id, DagRun.dag_id == self.dag_id)
  1953. ).one() # Raises an error if not found
  1954. resolve_execution_date = dag_run.execution_date
  1955. else:
  1956. resolve_execution_date = execution_date
  1957. end_date = resolve_execution_date if not future else None
  1958. start_date = resolve_execution_date if not past else None
  1959. subdag.clear(
  1960. start_date=start_date,
  1961. end_date=end_date,
  1962. include_subdags=True,
  1963. include_parentdag=True,
  1964. only_failed=True,
  1965. session=session,
  1966. # Exclude the task itself from being cleared
  1967. exclude_task_ids=frozenset({task_id}),
  1968. )
  1969. return altered
  1970. @provide_session
  1971. def set_task_group_state(
  1972. self,
  1973. *,
  1974. group_id: str,
  1975. execution_date: datetime | None = None,
  1976. run_id: str | None = None,
  1977. state: TaskInstanceState,
  1978. upstream: bool = False,
  1979. downstream: bool = False,
  1980. future: bool = False,
  1981. past: bool = False,
  1982. commit: bool = True,
  1983. session: Session = NEW_SESSION,
  1984. ) -> list[TaskInstance]:
  1985. """
  1986. Set TaskGroup to the given state and clear downstream tasks in failed or upstream_failed state.
  1987. :param group_id: The group_id of the TaskGroup
  1988. :param execution_date: Execution date of the TaskInstance
  1989. :param run_id: The run_id of the TaskInstance
  1990. :param state: State to set the TaskInstance to
  1991. :param upstream: Include all upstream tasks of the given task_id
  1992. :param downstream: Include all downstream tasks of the given task_id
  1993. :param future: Include all future TaskInstances of the given task_id
  1994. :param commit: Commit changes
  1995. :param past: Include all past TaskInstances of the given task_id
  1996. :param session: new session
  1997. """
  1998. from airflow.api.common.mark_tasks import set_state
  1999. if not exactly_one(execution_date, run_id):
  2000. raise ValueError("Exactly one of execution_date or run_id must be provided")
  2001. tasks_to_set_state: list[BaseOperator | tuple[BaseOperator, int]] = []
  2002. task_ids: list[str] = []
  2003. if execution_date is None:
  2004. dag_run = session.scalars(
  2005. select(DagRun).where(DagRun.run_id == run_id, DagRun.dag_id == self.dag_id)
  2006. ).one() # Raises an error if not found
  2007. resolve_execution_date = dag_run.execution_date
  2008. else:
  2009. resolve_execution_date = execution_date
  2010. end_date = resolve_execution_date if not future else None
  2011. start_date = resolve_execution_date if not past else None
  2012. task_group_dict = self.task_group.get_task_group_dict()
  2013. task_group = task_group_dict.get(group_id)
  2014. if task_group is None:
  2015. raise ValueError("TaskGroup {group_id} could not be found")
  2016. tasks_to_set_state = [task for task in task_group.iter_tasks() if isinstance(task, BaseOperator)]
  2017. task_ids = [task.task_id for task in task_group.iter_tasks()]
  2018. dag_runs_query = select(DagRun.id).where(DagRun.dag_id == self.dag_id)
  2019. if start_date is None and end_date is None:
  2020. dag_runs_query = dag_runs_query.where(DagRun.execution_date == start_date)
  2021. else:
  2022. if start_date is not None:
  2023. dag_runs_query = dag_runs_query.where(DagRun.execution_date >= start_date)
  2024. if end_date is not None:
  2025. dag_runs_query = dag_runs_query.where(DagRun.execution_date <= end_date)
  2026. with lock_rows(dag_runs_query, session):
  2027. altered = set_state(
  2028. tasks=tasks_to_set_state,
  2029. execution_date=execution_date,
  2030. run_id=run_id,
  2031. upstream=upstream,
  2032. downstream=downstream,
  2033. future=future,
  2034. past=past,
  2035. state=state,
  2036. commit=commit,
  2037. session=session,
  2038. )
  2039. if not commit:
  2040. return altered
  2041. # Clear downstream tasks that are in failed/upstream_failed state to resume them.
  2042. # Flush the session so that the tasks marked success are reflected in the db.
  2043. session.flush()
  2044. task_subset = self.partial_subset(
  2045. task_ids_or_regex=task_ids,
  2046. include_downstream=True,
  2047. include_upstream=False,
  2048. )
  2049. task_subset.clear(
  2050. start_date=start_date,
  2051. end_date=end_date,
  2052. include_subdags=True,
  2053. include_parentdag=True,
  2054. only_failed=True,
  2055. session=session,
  2056. # Exclude the task from the current group from being cleared
  2057. exclude_task_ids=frozenset(task_ids),
  2058. )
  2059. return altered
  2060. @property
  2061. def roots(self) -> list[Operator]:
  2062. """Return nodes with no parents. These are first to execute and are called roots or root nodes."""
  2063. return [task for task in self.tasks if not task.upstream_list]
  2064. @property
  2065. def leaves(self) -> list[Operator]:
  2066. """Return nodes with no children. These are last to execute and are called leaves or leaf nodes."""
  2067. return [task for task in self.tasks if not task.downstream_list]
  2068. def topological_sort(self, include_subdag_tasks: bool = False):
  2069. """
  2070. Sorts tasks in topographical order, such that a task comes after any of its upstream dependencies.
  2071. Deprecated in place of ``task_group.topological_sort``
  2072. """
  2073. from airflow.utils.task_group import TaskGroup
  2074. def nested_topo(group):
  2075. for node in group.topological_sort(_include_subdag_tasks=include_subdag_tasks):
  2076. if isinstance(node, TaskGroup):
  2077. yield from nested_topo(node)
  2078. else:
  2079. yield node
  2080. return tuple(nested_topo(self.task_group))
  2081. @provide_session
  2082. def set_dag_runs_state(
  2083. self,
  2084. state: DagRunState = DagRunState.RUNNING,
  2085. session: Session = NEW_SESSION,
  2086. start_date: datetime | None = None,
  2087. end_date: datetime | None = None,
  2088. dag_ids: list[str] | None = None,
  2089. ) -> None:
  2090. warnings.warn(
  2091. "This method is deprecated and will be removed in a future version.",
  2092. RemovedInAirflow3Warning,
  2093. stacklevel=3,
  2094. )
  2095. dag_ids = dag_ids or [self.dag_id]
  2096. query = update(DagRun).where(DagRun.dag_id.in_(dag_ids))
  2097. if start_date:
  2098. query = query.where(DagRun.execution_date >= start_date)
  2099. if end_date:
  2100. query = query.where(DagRun.execution_date <= end_date)
  2101. session.execute(query.values(state=state).execution_options(synchronize_session="fetch"))
  2102. @provide_session
  2103. def clear(
  2104. self,
  2105. task_ids: Collection[str | tuple[str, int]] | None = None,
  2106. start_date: datetime | None = None,
  2107. end_date: datetime | None = None,
  2108. only_failed: bool = False,
  2109. only_running: bool = False,
  2110. confirm_prompt: bool = False,
  2111. include_subdags: bool = True,
  2112. include_parentdag: bool = True,
  2113. dag_run_state: DagRunState = DagRunState.QUEUED,
  2114. dry_run: bool = False,
  2115. session: Session = NEW_SESSION,
  2116. get_tis: bool = False,
  2117. recursion_depth: int = 0,
  2118. max_recursion_depth: int | None = None,
  2119. dag_bag: DagBag | None = None,
  2120. exclude_task_ids: frozenset[str] | frozenset[tuple[str, int]] | None = frozenset(),
  2121. ) -> int | Iterable[TaskInstance]:
  2122. """
  2123. Clear a set of task instances associated with the current dag for a specified date range.
  2124. :param task_ids: List of task ids or (``task_id``, ``map_index``) tuples to clear
  2125. :param start_date: The minimum execution_date to clear
  2126. :param end_date: The maximum execution_date to clear
  2127. :param only_failed: Only clear failed tasks
  2128. :param only_running: Only clear running tasks.
  2129. :param confirm_prompt: Ask for confirmation
  2130. :param include_subdags: Clear tasks in subdags and clear external tasks
  2131. indicated by ExternalTaskMarker
  2132. :param include_parentdag: Clear tasks in the parent dag of the subdag.
  2133. :param dag_run_state: state to set DagRun to. If set to False, dagrun state will not
  2134. be changed.
  2135. :param dry_run: Find the tasks to clear but don't clear them.
  2136. :param session: The sqlalchemy session to use
  2137. :param dag_bag: The DagBag used to find the dags subdags (Optional)
  2138. :param exclude_task_ids: A set of ``task_id`` or (``task_id``, ``map_index``)
  2139. tuples that should not be cleared
  2140. """
  2141. if get_tis:
  2142. warnings.warn(
  2143. "Passing `get_tis` to dag.clear() is deprecated. Use `dry_run` parameter instead.",
  2144. RemovedInAirflow3Warning,
  2145. stacklevel=2,
  2146. )
  2147. dry_run = True
  2148. if recursion_depth:
  2149. warnings.warn(
  2150. "Passing `recursion_depth` to dag.clear() is deprecated.",
  2151. RemovedInAirflow3Warning,
  2152. stacklevel=2,
  2153. )
  2154. if max_recursion_depth:
  2155. warnings.warn(
  2156. "Passing `max_recursion_depth` to dag.clear() is deprecated.",
  2157. RemovedInAirflow3Warning,
  2158. stacklevel=2,
  2159. )
  2160. state: list[TaskInstanceState] = []
  2161. if only_failed:
  2162. state += [TaskInstanceState.FAILED, TaskInstanceState.UPSTREAM_FAILED]
  2163. if only_running:
  2164. # Yes, having `+=` doesn't make sense, but this was the existing behaviour
  2165. state += [TaskInstanceState.RUNNING]
  2166. tis = self._get_task_instances(
  2167. task_ids=task_ids,
  2168. start_date=start_date,
  2169. end_date=end_date,
  2170. run_id=None,
  2171. state=state,
  2172. include_subdags=include_subdags,
  2173. include_parentdag=include_parentdag,
  2174. include_dependent_dags=include_subdags, # compat, yes this is not a typo
  2175. session=session,
  2176. dag_bag=dag_bag,
  2177. exclude_task_ids=exclude_task_ids,
  2178. )
  2179. if dry_run:
  2180. return session.scalars(tis).all()
  2181. tis = session.scalars(tis).all()
  2182. count = len(list(tis))
  2183. do_it = True
  2184. if count == 0:
  2185. return 0
  2186. if confirm_prompt:
  2187. ti_list = "\n".join(str(t) for t in tis)
  2188. question = f"You are about to delete these {count} tasks:\n{ti_list}\n\nAre you sure? [y/n]"
  2189. do_it = utils.helpers.ask_yesno(question)
  2190. if do_it:
  2191. clear_task_instances(
  2192. list(tis),
  2193. session,
  2194. dag=self,
  2195. dag_run_state=dag_run_state,
  2196. )
  2197. else:
  2198. count = 0
  2199. print("Cancelled, nothing was cleared.")
  2200. session.flush()
  2201. return count
  2202. @classmethod
  2203. def clear_dags(
  2204. cls,
  2205. dags,
  2206. start_date=None,
  2207. end_date=None,
  2208. only_failed=False,
  2209. only_running=False,
  2210. confirm_prompt=False,
  2211. include_subdags=True,
  2212. include_parentdag=False,
  2213. dag_run_state=DagRunState.QUEUED,
  2214. dry_run=False,
  2215. ):
  2216. all_tis = []
  2217. for dag in dags:
  2218. tis = dag.clear(
  2219. start_date=start_date,
  2220. end_date=end_date,
  2221. only_failed=only_failed,
  2222. only_running=only_running,
  2223. confirm_prompt=False,
  2224. include_subdags=include_subdags,
  2225. include_parentdag=include_parentdag,
  2226. dag_run_state=dag_run_state,
  2227. dry_run=True,
  2228. )
  2229. all_tis.extend(tis)
  2230. if dry_run:
  2231. return all_tis
  2232. count = len(all_tis)
  2233. do_it = True
  2234. if count == 0:
  2235. print("Nothing to clear.")
  2236. return 0
  2237. if confirm_prompt:
  2238. ti_list = "\n".join(str(t) for t in all_tis)
  2239. question = f"You are about to delete these {count} tasks:\n{ti_list}\n\nAre you sure? [y/n]"
  2240. do_it = utils.helpers.ask_yesno(question)
  2241. if do_it:
  2242. for dag in dags:
  2243. dag.clear(
  2244. start_date=start_date,
  2245. end_date=end_date,
  2246. only_failed=only_failed,
  2247. only_running=only_running,
  2248. confirm_prompt=False,
  2249. include_subdags=include_subdags,
  2250. dag_run_state=dag_run_state,
  2251. dry_run=False,
  2252. )
  2253. else:
  2254. count = 0
  2255. print("Cancelled, nothing was cleared.")
  2256. return count
  2257. def __deepcopy__(self, memo):
  2258. # Switcharoo to go around deepcopying objects coming through the
  2259. # backdoor
  2260. cls = self.__class__
  2261. result = cls.__new__(cls)
  2262. memo[id(self)] = result
  2263. for k, v in self.__dict__.items():
  2264. if k not in ("user_defined_macros", "user_defined_filters", "_log"):
  2265. setattr(result, k, copy.deepcopy(v, memo))
  2266. result.user_defined_macros = self.user_defined_macros
  2267. result.user_defined_filters = self.user_defined_filters
  2268. if hasattr(self, "_log"):
  2269. result._log = self._log
  2270. return result
  2271. def sub_dag(self, *args, **kwargs):
  2272. """Use `airflow.models.DAG.partial_subset`, this method is deprecated."""
  2273. warnings.warn(
  2274. "This method is deprecated and will be removed in a future version. Please use partial_subset",
  2275. RemovedInAirflow3Warning,
  2276. stacklevel=2,
  2277. )
  2278. return self.partial_subset(*args, **kwargs)
  2279. def partial_subset(
  2280. self,
  2281. task_ids_or_regex: str | Pattern | Iterable[str],
  2282. include_downstream=False,
  2283. include_upstream=True,
  2284. include_direct_upstream=False,
  2285. ):
  2286. """
  2287. Return a subset of the current dag based on regex matching one or more tasks.
  2288. Returns a subset of the current dag as a deep copy of the current dag
  2289. based on a regex that should match one or many tasks, and includes
  2290. upstream and downstream neighbours based on the flag passed.
  2291. :param task_ids_or_regex: Either a list of task_ids, or a regex to
  2292. match against task ids (as a string, or compiled regex pattern).
  2293. :param include_downstream: Include all downstream tasks of matched
  2294. tasks, in addition to matched tasks.
  2295. :param include_upstream: Include all upstream tasks of matched tasks,
  2296. in addition to matched tasks.
  2297. :param include_direct_upstream: Include all tasks directly upstream of matched
  2298. and downstream (if include_downstream = True) tasks
  2299. """
  2300. from airflow.models.baseoperator import BaseOperator
  2301. from airflow.models.mappedoperator import MappedOperator
  2302. # deep-copying self.task_dict and self._task_group takes a long time, and we don't want all
  2303. # the tasks anyway, so we copy the tasks manually later
  2304. memo = {id(self.task_dict): None, id(self._task_group): None}
  2305. dag = copy.deepcopy(self, memo) # type: ignore
  2306. if isinstance(task_ids_or_regex, (str, Pattern)):
  2307. matched_tasks = [t for t in self.tasks if re2.findall(task_ids_or_regex, t.task_id)]
  2308. else:
  2309. matched_tasks = [t for t in self.tasks if t.task_id in task_ids_or_regex]
  2310. also_include_ids: set[str] = set()
  2311. for t in matched_tasks:
  2312. if include_downstream:
  2313. for rel in t.get_flat_relatives(upstream=False):
  2314. also_include_ids.add(rel.task_id)
  2315. if rel not in matched_tasks: # if it's in there, we're already processing it
  2316. # need to include setups and teardowns for tasks that are in multiple
  2317. # non-collinear setup/teardown paths
  2318. if not rel.is_setup and not rel.is_teardown:
  2319. also_include_ids.update(
  2320. x.task_id for x in rel.get_upstreams_only_setups_and_teardowns()
  2321. )
  2322. if include_upstream:
  2323. also_include_ids.update(x.task_id for x in t.get_upstreams_follow_setups())
  2324. else:
  2325. if not t.is_setup and not t.is_teardown:
  2326. also_include_ids.update(x.task_id for x in t.get_upstreams_only_setups_and_teardowns())
  2327. if t.is_setup and not include_downstream:
  2328. also_include_ids.update(x.task_id for x in t.downstream_list if x.is_teardown)
  2329. also_include: list[Operator] = [self.task_dict[x] for x in also_include_ids]
  2330. direct_upstreams: list[Operator] = []
  2331. if include_direct_upstream:
  2332. for t in itertools.chain(matched_tasks, also_include):
  2333. upstream = (u for u in t.upstream_list if isinstance(u, (BaseOperator, MappedOperator)))
  2334. direct_upstreams.extend(upstream)
  2335. # Compiling the unique list of tasks that made the cut
  2336. # Make sure to not recursively deepcopy the dag or task_group while copying the task.
  2337. # task_group is reset later
  2338. def _deepcopy_task(t) -> Operator:
  2339. memo.setdefault(id(t.task_group), None)
  2340. return copy.deepcopy(t, memo)
  2341. dag.task_dict = {
  2342. t.task_id: _deepcopy_task(t)
  2343. for t in itertools.chain(matched_tasks, also_include, direct_upstreams)
  2344. }
  2345. def filter_task_group(group, parent_group):
  2346. """Exclude tasks not included in the subdag from the given TaskGroup."""
  2347. # We want to deepcopy _most but not all_ attributes of the task group, so we create a shallow copy
  2348. # and then manually deep copy the instances. (memo argument to deepcopy only works for instances
  2349. # of classes, not "native" properties of an instance)
  2350. copied = copy.copy(group)
  2351. memo[id(group.children)] = {}
  2352. if parent_group:
  2353. memo[id(group.parent_group)] = parent_group
  2354. for attr, value in copied.__dict__.items():
  2355. if id(value) in memo:
  2356. value = memo[id(value)]
  2357. else:
  2358. value = copy.deepcopy(value, memo)
  2359. copied.__dict__[attr] = value
  2360. proxy = weakref.proxy(copied)
  2361. for child in group.children.values():
  2362. if isinstance(child, AbstractOperator):
  2363. if child.task_id in dag.task_dict:
  2364. task = copied.children[child.task_id] = dag.task_dict[child.task_id]
  2365. task.task_group = proxy
  2366. else:
  2367. copied.used_group_ids.discard(child.task_id)
  2368. else:
  2369. filtered_child = filter_task_group(child, proxy)
  2370. # Only include this child TaskGroup if it is non-empty.
  2371. if filtered_child.children:
  2372. copied.children[child.group_id] = filtered_child
  2373. return copied
  2374. dag._task_group = filter_task_group(self.task_group, None)
  2375. # Removing upstream/downstream references to tasks and TaskGroups that did not make
  2376. # the cut.
  2377. subdag_task_groups = dag.task_group.get_task_group_dict()
  2378. for group in subdag_task_groups.values():
  2379. group.upstream_group_ids.intersection_update(subdag_task_groups)
  2380. group.downstream_group_ids.intersection_update(subdag_task_groups)
  2381. group.upstream_task_ids.intersection_update(dag.task_dict)
  2382. group.downstream_task_ids.intersection_update(dag.task_dict)
  2383. for t in dag.tasks:
  2384. # Removing upstream/downstream references to tasks that did not
  2385. # make the cut
  2386. t.upstream_task_ids.intersection_update(dag.task_dict)
  2387. t.downstream_task_ids.intersection_update(dag.task_dict)
  2388. if len(dag.tasks) < len(self.tasks):
  2389. dag.partial = True
  2390. return dag
  2391. def has_task(self, task_id: str):
  2392. return task_id in self.task_dict
  2393. def has_task_group(self, task_group_id: str) -> bool:
  2394. return task_group_id in self.task_group_dict
  2395. @functools.cached_property
  2396. def task_group_dict(self):
  2397. return {k: v for k, v in self._task_group.get_task_group_dict().items() if k is not None}
  2398. def get_task(self, task_id: str, include_subdags: bool = False) -> Operator:
  2399. if task_id in self.task_dict:
  2400. return self.task_dict[task_id]
  2401. if include_subdags:
  2402. for dag in self.subdags:
  2403. if task_id in dag.task_dict:
  2404. return dag.task_dict[task_id]
  2405. raise TaskNotFound(f"Task {task_id} not found")
  2406. def pickle_info(self):
  2407. d = {}
  2408. d["is_picklable"] = True
  2409. try:
  2410. dttm = timezone.utcnow()
  2411. pickled = pickle.dumps(self)
  2412. d["pickle_len"] = len(pickled)
  2413. d["pickling_duration"] = str(timezone.utcnow() - dttm)
  2414. except Exception as e:
  2415. self.log.debug(e)
  2416. d["is_picklable"] = False
  2417. d["stacktrace"] = traceback.format_exc()
  2418. return d
  2419. @provide_session
  2420. def pickle(self, session=NEW_SESSION) -> DagPickle:
  2421. dag = session.scalar(select(DagModel).where(DagModel.dag_id == self.dag_id).limit(1))
  2422. dp = None
  2423. if dag and dag.pickle_id:
  2424. dp = session.scalar(select(DagPickle).where(DagPickle.id == dag.pickle_id).limit(1))
  2425. if not dp or dp.pickle != self:
  2426. dp = DagPickle(dag=self)
  2427. session.add(dp)
  2428. self.last_pickled = timezone.utcnow()
  2429. session.commit()
  2430. self.pickle_id = dp.id
  2431. return dp
  2432. def tree_view(self) -> None:
  2433. """Print an ASCII tree representation of the DAG."""
  2434. warnings.warn(
  2435. "`tree_view` is deprecated and will be removed in Airflow 3.0.",
  2436. category=RemovedInAirflow3Warning,
  2437. stacklevel=2,
  2438. )
  2439. for tmp in self._generate_tree_view():
  2440. print(tmp)
  2441. def _generate_tree_view(self) -> Generator[str, None, None]:
  2442. def get_downstream(task, level=0) -> Generator[str, None, None]:
  2443. yield (" " * level * 4) + str(task)
  2444. level += 1
  2445. for tmp_task in sorted(task.downstream_list, key=lambda x: x.task_id):
  2446. yield from get_downstream(tmp_task, level)
  2447. for t in sorted(self.roots, key=lambda x: x.task_id):
  2448. yield from get_downstream(t)
  2449. def get_tree_view(self) -> str:
  2450. """Return an ASCII tree representation of the DAG."""
  2451. warnings.warn(
  2452. "`get_tree_view` is deprecated and will be removed in Airflow 3.0.",
  2453. category=RemovedInAirflow3Warning,
  2454. stacklevel=2,
  2455. )
  2456. rst = ""
  2457. for tmp in self._generate_tree_view():
  2458. rst += tmp + "\n"
  2459. return rst
  2460. @property
  2461. def task(self) -> TaskDecoratorCollection:
  2462. from airflow.decorators import task
  2463. return cast("TaskDecoratorCollection", functools.partial(task, dag=self))
  2464. def add_task(self, task: Operator) -> None:
  2465. """
  2466. Add a task to the DAG.
  2467. :param task: the task you want to add
  2468. """
  2469. FailStopDagInvalidTriggerRule.check(dag=self, trigger_rule=task.trigger_rule)
  2470. from airflow.utils.task_group import TaskGroupContext
  2471. # if the task has no start date, assign it the same as the DAG
  2472. if not task.start_date:
  2473. task.start_date = self.start_date
  2474. # otherwise, the task will start on the later of its own start date and
  2475. # the DAG's start date
  2476. elif self.start_date:
  2477. task.start_date = max(task.start_date, self.start_date)
  2478. # if the task has no end date, assign it the same as the dag
  2479. if not task.end_date:
  2480. task.end_date = self.end_date
  2481. # otherwise, the task will end on the earlier of its own end date and
  2482. # the DAG's end date
  2483. elif task.end_date and self.end_date:
  2484. task.end_date = min(task.end_date, self.end_date)
  2485. task_id = task.task_id
  2486. if not task.task_group:
  2487. task_group = TaskGroupContext.get_current_task_group(self)
  2488. if task_group:
  2489. task_id = task_group.child_id(task_id)
  2490. task_group.add(task)
  2491. if (
  2492. task_id in self.task_dict and self.task_dict[task_id] is not task
  2493. ) or task_id in self._task_group.used_group_ids:
  2494. raise DuplicateTaskIdFound(f"Task id '{task_id}' has already been added to the DAG")
  2495. else:
  2496. self.task_dict[task_id] = task
  2497. task.dag = self
  2498. # Add task_id to used_group_ids to prevent group_id and task_id collisions.
  2499. self._task_group.used_group_ids.add(task_id)
  2500. self.task_count = len(self.task_dict)
  2501. def add_tasks(self, tasks: Iterable[Operator]) -> None:
  2502. """
  2503. Add a list of tasks to the DAG.
  2504. :param tasks: a lit of tasks you want to add
  2505. """
  2506. for task in tasks:
  2507. self.add_task(task)
  2508. def _remove_task(self, task_id: str) -> None:
  2509. # This is "private" as removing could leave a hole in dependencies if done incorrectly, and this
  2510. # doesn't guard against that
  2511. task = self.task_dict.pop(task_id)
  2512. tg = getattr(task, "task_group", None)
  2513. if tg:
  2514. tg._remove(task)
  2515. self.task_count = len(self.task_dict)
  2516. def run(
  2517. self,
  2518. start_date=None,
  2519. end_date=None,
  2520. mark_success=False,
  2521. local=False,
  2522. donot_pickle=airflow_conf.getboolean("core", "donot_pickle"),
  2523. ignore_task_deps=False,
  2524. ignore_first_depends_on_past=True,
  2525. pool=None,
  2526. delay_on_limit_secs=1.0,
  2527. verbose=False,
  2528. conf=None,
  2529. rerun_failed_tasks=False,
  2530. run_backwards=False,
  2531. run_at_least_once=False,
  2532. continue_on_failures=False,
  2533. disable_retry=False,
  2534. ):
  2535. """
  2536. Run the DAG.
  2537. :param start_date: the start date of the range to run
  2538. :param end_date: the end date of the range to run
  2539. :param mark_success: True to mark jobs as succeeded without running them
  2540. :param local: True to run the tasks using the LocalExecutor
  2541. :param executor: The executor instance to run the tasks
  2542. :param donot_pickle: True to avoid pickling DAG object and send to workers
  2543. :param ignore_task_deps: True to skip upstream tasks
  2544. :param ignore_first_depends_on_past: True to ignore depends_on_past
  2545. dependencies for the first set of tasks only
  2546. :param pool: Resource pool to use
  2547. :param delay_on_limit_secs: Time in seconds to wait before next attempt to run
  2548. dag run when max_active_runs limit has been reached
  2549. :param verbose: Make logging output more verbose
  2550. :param conf: user defined dictionary passed from CLI
  2551. :param rerun_failed_tasks:
  2552. :param run_backwards:
  2553. :param run_at_least_once: If true, always run the DAG at least once even
  2554. if no logical run exists within the time range.
  2555. """
  2556. from airflow.executors.executor_loader import ExecutorLoader
  2557. from airflow.jobs.backfill_job_runner import BackfillJobRunner
  2558. if local:
  2559. from airflow.executors.local_executor import LocalExecutor
  2560. ExecutorLoader.set_default_executor(LocalExecutor())
  2561. from airflow.jobs.job import Job
  2562. job = Job()
  2563. job_runner = BackfillJobRunner(
  2564. job=job,
  2565. dag=self,
  2566. start_date=start_date,
  2567. end_date=end_date,
  2568. mark_success=mark_success,
  2569. donot_pickle=donot_pickle,
  2570. ignore_task_deps=ignore_task_deps,
  2571. ignore_first_depends_on_past=ignore_first_depends_on_past,
  2572. pool=pool,
  2573. delay_on_limit_secs=delay_on_limit_secs,
  2574. verbose=verbose,
  2575. conf=conf,
  2576. rerun_failed_tasks=rerun_failed_tasks,
  2577. run_backwards=run_backwards,
  2578. run_at_least_once=run_at_least_once,
  2579. continue_on_failures=continue_on_failures,
  2580. disable_retry=disable_retry,
  2581. )
  2582. run_job(job=job, execute_callable=job_runner._execute)
  2583. def cli(self):
  2584. """Exposes a CLI specific to this DAG."""
  2585. check_cycle(self)
  2586. from airflow.cli import cli_parser
  2587. parser = cli_parser.get_parser(dag_parser=True)
  2588. args = parser.parse_args()
  2589. args.func(args, self)
  2590. @provide_session
  2591. def test(
  2592. self,
  2593. execution_date: datetime | None = None,
  2594. run_conf: dict[str, Any] | None = None,
  2595. conn_file_path: str | None = None,
  2596. variable_file_path: str | None = None,
  2597. use_executor: bool = False,
  2598. mark_success_pattern: Pattern | str | None = None,
  2599. session: Session = NEW_SESSION,
  2600. ) -> DagRun:
  2601. """
  2602. Execute one single DagRun for a given DAG and execution date.
  2603. :param execution_date: execution date for the DAG run
  2604. :param run_conf: configuration to pass to newly created dagrun
  2605. :param conn_file_path: file path to a connection file in either yaml or json
  2606. :param variable_file_path: file path to a variable file in either yaml or json
  2607. :param use_executor: if set, uses an executor to test the DAG
  2608. :param mark_success_pattern: regex of task_ids to mark as success instead of running
  2609. :param session: database connection (optional)
  2610. """
  2611. def add_logger_if_needed(ti: TaskInstance):
  2612. """
  2613. Add a formatted logger to the task instance.
  2614. This allows all logs to surface to the command line, instead of into
  2615. a task file. Since this is a local test run, it is much better for
  2616. the user to see logs in the command line, rather than needing to
  2617. search for a log file.
  2618. :param ti: The task instance that will receive a logger.
  2619. """
  2620. format = logging.Formatter("[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s")
  2621. handler = logging.StreamHandler(sys.stdout)
  2622. handler.level = logging.INFO
  2623. handler.setFormatter(format)
  2624. # only add log handler once
  2625. if not any(isinstance(h, logging.StreamHandler) for h in ti.log.handlers):
  2626. self.log.debug("Adding Streamhandler to taskinstance %s", ti.task_id)
  2627. ti.log.addHandler(handler)
  2628. exit_stack = ExitStack()
  2629. if conn_file_path or variable_file_path:
  2630. local_secrets = LocalFilesystemBackend(
  2631. variables_file_path=variable_file_path, connections_file_path=conn_file_path
  2632. )
  2633. secrets_backend_list.insert(0, local_secrets)
  2634. exit_stack.callback(lambda: secrets_backend_list.pop(0))
  2635. with exit_stack:
  2636. execution_date = execution_date or timezone.utcnow()
  2637. self.validate()
  2638. self.log.debug("Clearing existing task instances for execution date %s", execution_date)
  2639. self.clear(
  2640. start_date=execution_date,
  2641. end_date=execution_date,
  2642. dag_run_state=False, # type: ignore
  2643. session=session,
  2644. )
  2645. self.log.debug("Getting dagrun for dag %s", self.dag_id)
  2646. logical_date = timezone.coerce_datetime(execution_date)
  2647. data_interval = self.timetable.infer_manual_data_interval(run_after=logical_date)
  2648. dr: DagRun = _get_or_create_dagrun(
  2649. dag=self,
  2650. start_date=execution_date,
  2651. execution_date=execution_date,
  2652. run_id=DagRun.generate_run_id(DagRunType.MANUAL, execution_date),
  2653. session=session,
  2654. conf=run_conf,
  2655. data_interval=data_interval,
  2656. )
  2657. tasks = self.task_dict
  2658. self.log.debug("starting dagrun")
  2659. # Instead of starting a scheduler, we run the minimal loop possible to check
  2660. # for task readiness and dependency management. This is notably faster
  2661. # than creating a BackfillJob and allows us to surface logs to the user
  2662. # ``Dag.test()`` works in two different modes depending on ``use_executor``:
  2663. # - if ``use_executor`` is False, runs the task locally with no executor using ``_run_task``
  2664. # - if ``use_executor`` is True, sends the task instances to the executor with
  2665. # ``BaseExecutor.queue_task_instance``
  2666. if use_executor:
  2667. executor = ExecutorLoader.get_default_executor()
  2668. executor.start()
  2669. while dr.state == DagRunState.RUNNING:
  2670. session.expire_all()
  2671. schedulable_tis, _ = dr.update_state(session=session)
  2672. for s in schedulable_tis:
  2673. if s.state != TaskInstanceState.UP_FOR_RESCHEDULE:
  2674. s.try_number += 1
  2675. s.state = TaskInstanceState.SCHEDULED
  2676. session.commit()
  2677. # triggerer may mark tasks scheduled so we read from DB
  2678. all_tis = set(dr.get_task_instances(session=session))
  2679. scheduled_tis = {x for x in all_tis if x.state == TaskInstanceState.SCHEDULED}
  2680. ids_unrunnable = {x for x in all_tis if x.state not in State.finished} - scheduled_tis
  2681. if not scheduled_tis and ids_unrunnable:
  2682. self.log.warning("No tasks to run. unrunnable tasks: %s", ids_unrunnable)
  2683. time.sleep(1)
  2684. triggerer_running = _triggerer_is_healthy()
  2685. for ti in scheduled_tis:
  2686. ti.task = tasks[ti.task_id]
  2687. mark_success = (
  2688. re2.compile(mark_success_pattern).fullmatch(ti.task_id) is not None
  2689. if mark_success_pattern is not None
  2690. else False
  2691. )
  2692. if use_executor:
  2693. if executor.has_task(ti):
  2694. continue
  2695. # Send the task to the executor
  2696. executor.queue_task_instance(ti, ignore_ti_state=True)
  2697. else:
  2698. # Run the task locally
  2699. try:
  2700. add_logger_if_needed(ti)
  2701. _run_task(
  2702. ti=ti,
  2703. inline_trigger=not triggerer_running,
  2704. session=session,
  2705. mark_success=mark_success,
  2706. )
  2707. except Exception:
  2708. self.log.exception("Task failed; ti=%s", ti)
  2709. if use_executor:
  2710. executor.heartbeat()
  2711. if use_executor:
  2712. executor.end()
  2713. return dr
  2714. @provide_session
  2715. def create_dagrun(
  2716. self,
  2717. state: DagRunState,
  2718. execution_date: datetime | None = None,
  2719. run_id: str | None = None,
  2720. start_date: datetime | None = None,
  2721. external_trigger: bool | None = False,
  2722. conf: dict | None = None,
  2723. run_type: DagRunType | None = None,
  2724. session: Session = NEW_SESSION,
  2725. dag_hash: str | None = None,
  2726. creating_job_id: int | None = None,
  2727. data_interval: tuple[datetime, datetime] | None = None,
  2728. ):
  2729. """
  2730. Create a dag run from this dag including the tasks associated with this dag.
  2731. Returns the dag run.
  2732. :param run_id: defines the run id for this dag run
  2733. :param run_type: type of DagRun
  2734. :param execution_date: the execution date of this dag run
  2735. :param state: the state of the dag run
  2736. :param start_date: the date this dag run should be evaluated
  2737. :param external_trigger: whether this dag run is externally triggered
  2738. :param conf: Dict containing configuration/parameters to pass to the DAG
  2739. :param creating_job_id: id of the job creating this DagRun
  2740. :param session: database session
  2741. :param dag_hash: Hash of Serialized DAG
  2742. :param data_interval: Data interval of the DagRun
  2743. """
  2744. logical_date = timezone.coerce_datetime(execution_date)
  2745. if data_interval and not isinstance(data_interval, DataInterval):
  2746. data_interval = DataInterval(*map(timezone.coerce_datetime, data_interval))
  2747. if data_interval is None and logical_date is not None:
  2748. warnings.warn(
  2749. "Calling `DAG.create_dagrun()` without an explicit data interval is deprecated",
  2750. RemovedInAirflow3Warning,
  2751. stacklevel=3,
  2752. )
  2753. if run_type == DagRunType.MANUAL:
  2754. data_interval = self.timetable.infer_manual_data_interval(run_after=logical_date)
  2755. else:
  2756. data_interval = self.infer_automated_data_interval(logical_date)
  2757. if run_type is None or isinstance(run_type, DagRunType):
  2758. pass
  2759. elif isinstance(run_type, str): # Compatibility: run_type used to be a str.
  2760. run_type = DagRunType(run_type)
  2761. else:
  2762. raise ValueError(f"`run_type` should be a DagRunType, not {type(run_type)}")
  2763. if run_id: # Infer run_type from run_id if needed.
  2764. if not isinstance(run_id, str):
  2765. raise ValueError(f"`run_id` should be a str, not {type(run_id)}")
  2766. inferred_run_type = DagRunType.from_run_id(run_id)
  2767. if run_type is None:
  2768. # No explicit type given, use the inferred type.
  2769. run_type = inferred_run_type
  2770. elif run_type == DagRunType.MANUAL and inferred_run_type != DagRunType.MANUAL:
  2771. # Prevent a manual run from using an ID that looks like a scheduled run.
  2772. raise ValueError(
  2773. f"A {run_type.value} DAG run cannot use ID {run_id!r} since it "
  2774. f"is reserved for {inferred_run_type.value} runs"
  2775. )
  2776. elif run_type and logical_date is not None: # Generate run_id from run_type and execution_date.
  2777. run_id = self.timetable.generate_run_id(
  2778. run_type=run_type, logical_date=logical_date, data_interval=data_interval
  2779. )
  2780. else:
  2781. raise AirflowException(
  2782. "Creating DagRun needs either `run_id` or both `run_type` and `execution_date`"
  2783. )
  2784. regex = airflow_conf.get("scheduler", "allowed_run_id_pattern")
  2785. if run_id and not re2.match(RUN_ID_REGEX, run_id):
  2786. if not regex.strip() or not re2.match(regex.strip(), run_id):
  2787. raise AirflowException(
  2788. f"The provided run ID '{run_id}' is invalid. It does not match either "
  2789. f"the configured pattern: '{regex}' or the built-in pattern: '{RUN_ID_REGEX}'"
  2790. )
  2791. # create a copy of params before validating
  2792. copied_params = copy.deepcopy(self.params)
  2793. copied_params.update(conf or {})
  2794. copied_params.validate()
  2795. run = _create_orm_dagrun(
  2796. dag=self,
  2797. dag_id=self.dag_id,
  2798. run_id=run_id,
  2799. logical_date=logical_date,
  2800. start_date=start_date,
  2801. external_trigger=external_trigger,
  2802. conf=conf,
  2803. state=state,
  2804. run_type=run_type,
  2805. dag_hash=dag_hash,
  2806. creating_job_id=creating_job_id,
  2807. data_interval=data_interval,
  2808. session=session,
  2809. )
  2810. return run
  2811. @classmethod
  2812. @provide_session
  2813. def bulk_sync_to_db(
  2814. cls,
  2815. dags: Collection[DAG],
  2816. session=NEW_SESSION,
  2817. ):
  2818. """Use `airflow.models.DAG.bulk_write_to_db`, this method is deprecated."""
  2819. warnings.warn(
  2820. "This method is deprecated and will be removed in a future version. Please use bulk_write_to_db",
  2821. RemovedInAirflow3Warning,
  2822. stacklevel=2,
  2823. )
  2824. return cls.bulk_write_to_db(dags=dags, session=session)
  2825. @classmethod
  2826. @provide_session
  2827. def bulk_write_to_db(
  2828. cls,
  2829. dags: Collection[DAG],
  2830. processor_subdir: str | None = None,
  2831. session=NEW_SESSION,
  2832. ):
  2833. """
  2834. Ensure the DagModel rows for the given dags are up-to-date in the dag table in the DB.
  2835. Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator.
  2836. :param dags: the DAG objects to save to the DB
  2837. :return: None
  2838. """
  2839. if not dags:
  2840. return
  2841. log.info("Sync %s DAGs", len(dags))
  2842. dag_by_ids = {dag.dag_id: dag for dag in dags}
  2843. dag_ids = set(dag_by_ids)
  2844. query = (
  2845. select(DagModel)
  2846. .options(joinedload(DagModel.tags, innerjoin=False))
  2847. .where(DagModel.dag_id.in_(dag_ids))
  2848. .options(joinedload(DagModel.schedule_dataset_references))
  2849. .options(joinedload(DagModel.schedule_dataset_alias_references))
  2850. .options(joinedload(DagModel.task_outlet_dataset_references))
  2851. )
  2852. query = with_row_locks(query, of=DagModel, session=session)
  2853. orm_dags: list[DagModel] = session.scalars(query).unique().all()
  2854. existing_dags: dict[str, DagModel] = {x.dag_id: x for x in orm_dags}
  2855. missing_dag_ids = dag_ids.difference(existing_dags.keys())
  2856. for missing_dag_id in missing_dag_ids:
  2857. orm_dag = DagModel(dag_id=missing_dag_id)
  2858. dag = dag_by_ids[missing_dag_id]
  2859. if dag.is_paused_upon_creation is not None:
  2860. orm_dag.is_paused = dag.is_paused_upon_creation
  2861. orm_dag.tags = []
  2862. log.info("Creating ORM DAG for %s", dag.dag_id)
  2863. session.add(orm_dag)
  2864. orm_dags.append(orm_dag)
  2865. latest_runs: dict[str, DagRun] = {}
  2866. num_active_runs: dict[str, int] = {}
  2867. # Skip these queries entirely if no DAGs can be scheduled to save time.
  2868. if any(dag.timetable.can_be_scheduled for dag in dags):
  2869. # Get the latest automated dag run for each existing dag as a single query (avoid n+1 query)
  2870. query = cls._get_latest_runs_stmt(dags=list(existing_dags.keys()))
  2871. latest_runs = {run.dag_id: run for run in session.scalars(query)}
  2872. # Get number of active dagruns for all dags we are processing as a single query.
  2873. num_active_runs = DagRun.active_runs_of_dags(dag_ids=existing_dags, session=session)
  2874. filelocs = []
  2875. for orm_dag in sorted(orm_dags, key=lambda d: d.dag_id):
  2876. dag = dag_by_ids[orm_dag.dag_id]
  2877. filelocs.append(dag.fileloc)
  2878. if dag.is_subdag:
  2879. orm_dag.is_subdag = True
  2880. orm_dag.fileloc = dag.parent_dag.fileloc # type: ignore
  2881. orm_dag.root_dag_id = dag.parent_dag.dag_id # type: ignore
  2882. orm_dag.owners = dag.parent_dag.owner # type: ignore
  2883. else:
  2884. orm_dag.is_subdag = False
  2885. orm_dag.fileloc = dag.fileloc
  2886. orm_dag.owners = dag.owner
  2887. orm_dag.is_active = True
  2888. orm_dag.has_import_errors = False
  2889. orm_dag.last_parsed_time = timezone.utcnow()
  2890. orm_dag.default_view = dag.default_view
  2891. orm_dag._dag_display_property_value = dag._dag_display_property_value
  2892. orm_dag.description = dag.description
  2893. orm_dag.max_active_tasks = dag.max_active_tasks
  2894. orm_dag.max_active_runs = dag.max_active_runs
  2895. orm_dag.max_consecutive_failed_dag_runs = dag.max_consecutive_failed_dag_runs
  2896. orm_dag.has_task_concurrency_limits = any(
  2897. t.max_active_tis_per_dag is not None or t.max_active_tis_per_dagrun is not None
  2898. for t in dag.tasks
  2899. )
  2900. orm_dag.schedule_interval = dag.schedule_interval
  2901. orm_dag.timetable_description = dag.timetable.description
  2902. orm_dag.dataset_expression = dag.timetable.dataset_condition.as_expression()
  2903. orm_dag.processor_subdir = processor_subdir
  2904. last_automated_run: DagRun | None = latest_runs.get(dag.dag_id)
  2905. if last_automated_run is None:
  2906. last_automated_data_interval = None
  2907. else:
  2908. last_automated_data_interval = dag.get_run_data_interval(last_automated_run)
  2909. if num_active_runs.get(dag.dag_id, 0) >= orm_dag.max_active_runs:
  2910. orm_dag.next_dagrun_create_after = None
  2911. else:
  2912. orm_dag.calculate_dagrun_date_fields(dag, last_automated_data_interval)
  2913. dag_tags = set(dag.tags or {})
  2914. orm_dag_tags = list(orm_dag.tags or [])
  2915. for orm_tag in orm_dag_tags:
  2916. if orm_tag.name not in dag_tags:
  2917. session.delete(orm_tag)
  2918. orm_dag.tags.remove(orm_tag)
  2919. orm_tag_names = {t.name for t in orm_dag_tags}
  2920. for dag_tag in dag_tags:
  2921. if dag_tag not in orm_tag_names:
  2922. dag_tag_orm = DagTag(name=dag_tag, dag_id=dag.dag_id)
  2923. orm_dag.tags.append(dag_tag_orm)
  2924. session.add(dag_tag_orm)
  2925. orm_dag_links = orm_dag.dag_owner_links or []
  2926. for orm_dag_link in orm_dag_links:
  2927. if orm_dag_link not in dag.owner_links:
  2928. session.delete(orm_dag_link)
  2929. for owner_name, owner_link in dag.owner_links.items():
  2930. dag_owner_orm = DagOwnerAttributes(dag_id=dag.dag_id, owner=owner_name, link=owner_link)
  2931. session.add(dag_owner_orm)
  2932. DagCode.bulk_sync_to_db(filelocs, session=session)
  2933. from airflow.datasets import Dataset
  2934. from airflow.models.dataset import (
  2935. DagScheduleDatasetAliasReference,
  2936. DagScheduleDatasetReference,
  2937. DatasetModel,
  2938. TaskOutletDatasetReference,
  2939. )
  2940. dag_references: dict[str, set[tuple[Literal["dataset", "dataset-alias"], str]]] = defaultdict(set)
  2941. outlet_references = defaultdict(set)
  2942. # We can't use a set here as we want to preserve order
  2943. outlet_dataset_models: dict[DatasetModel, None] = {}
  2944. input_dataset_models: dict[DatasetModel, None] = {}
  2945. outlet_dataset_alias_models: set[DatasetAliasModel] = set()
  2946. input_dataset_alias_models: set[DatasetAliasModel] = set()
  2947. # here we go through dags and tasks to check for dataset references
  2948. # if there are now None and previously there were some, we delete them
  2949. # if there are now *any*, we add them to the above data structures, and
  2950. # later we'll persist them to the database.
  2951. for dag in dags:
  2952. curr_orm_dag = existing_dags.get(dag.dag_id)
  2953. if not (dataset_condition := dag.timetable.dataset_condition):
  2954. if curr_orm_dag:
  2955. if curr_orm_dag.schedule_dataset_references:
  2956. curr_orm_dag.schedule_dataset_references = []
  2957. if curr_orm_dag.schedule_dataset_alias_references:
  2958. curr_orm_dag.schedule_dataset_alias_references = []
  2959. else:
  2960. for _, dataset in dataset_condition.iter_datasets():
  2961. dag_references[dag.dag_id].add(("dataset", dataset.uri))
  2962. input_dataset_models[DatasetModel.from_public(dataset)] = None
  2963. for dataset_alias in dataset_condition.iter_dataset_aliases():
  2964. dag_references[dag.dag_id].add(("dataset-alias", dataset_alias.name))
  2965. input_dataset_alias_models.add(DatasetAliasModel.from_public(dataset_alias))
  2966. curr_outlet_references = curr_orm_dag and curr_orm_dag.task_outlet_dataset_references
  2967. for task in dag.tasks:
  2968. dataset_outlets: list[Dataset] = []
  2969. dataset_alias_outlets: set[DatasetAlias] = set()
  2970. for outlet in task.outlets:
  2971. if isinstance(outlet, Dataset):
  2972. dataset_outlets.append(outlet)
  2973. elif isinstance(outlet, DatasetAlias):
  2974. dataset_alias_outlets.add(outlet)
  2975. if not dataset_outlets:
  2976. if curr_outlet_references:
  2977. this_task_outlet_refs = [
  2978. x
  2979. for x in curr_outlet_references
  2980. if x.dag_id == dag.dag_id and x.task_id == task.task_id
  2981. ]
  2982. for ref in this_task_outlet_refs:
  2983. curr_outlet_references.remove(ref)
  2984. for d in dataset_outlets:
  2985. outlet_dataset_models[DatasetModel.from_public(d)] = None
  2986. outlet_references[(task.dag_id, task.task_id)].add(d.uri)
  2987. for d_a in dataset_alias_outlets:
  2988. outlet_dataset_alias_models.add(DatasetAliasModel.from_public(d_a))
  2989. all_dataset_models = outlet_dataset_models
  2990. all_dataset_models.update(input_dataset_models)
  2991. # store datasets
  2992. stored_dataset_models: dict[str, DatasetModel] = {}
  2993. new_dataset_models: list[DatasetModel] = []
  2994. for dataset in all_dataset_models:
  2995. stored_dataset_model = session.scalar(
  2996. select(DatasetModel).where(DatasetModel.uri == dataset.uri).limit(1)
  2997. )
  2998. if stored_dataset_model:
  2999. # Some datasets may have been previously unreferenced, and therefore orphaned by the
  3000. # scheduler. But if we're here, then we have found that dataset again in our DAGs, which
  3001. # means that it is no longer an orphan, so set is_orphaned to False.
  3002. stored_dataset_model.is_orphaned = expression.false()
  3003. stored_dataset_models[stored_dataset_model.uri] = stored_dataset_model
  3004. else:
  3005. new_dataset_models.append(dataset)
  3006. dataset_manager.create_datasets(dataset_models=new_dataset_models, session=session)
  3007. stored_dataset_models.update(
  3008. {dataset_model.uri: dataset_model for dataset_model in new_dataset_models}
  3009. )
  3010. del new_dataset_models
  3011. del all_dataset_models
  3012. # store dataset aliases
  3013. all_datasets_alias_models = input_dataset_alias_models | outlet_dataset_alias_models
  3014. stored_dataset_alias_models: dict[str, DatasetAliasModel] = {}
  3015. new_dataset_alias_models: set[DatasetAliasModel] = set()
  3016. if all_datasets_alias_models:
  3017. all_dataset_alias_names = {
  3018. dataset_alias_model.name for dataset_alias_model in all_datasets_alias_models
  3019. }
  3020. stored_dataset_alias_models = {
  3021. dsa_m.name: dsa_m
  3022. for dsa_m in session.scalars(
  3023. select(DatasetAliasModel).where(DatasetAliasModel.name.in_(all_dataset_alias_names))
  3024. ).fetchall()
  3025. }
  3026. if stored_dataset_alias_models:
  3027. new_dataset_alias_models = {
  3028. dataset_alias_model
  3029. for dataset_alias_model in all_datasets_alias_models
  3030. if dataset_alias_model.name not in stored_dataset_alias_models.keys()
  3031. }
  3032. else:
  3033. new_dataset_alias_models = all_datasets_alias_models
  3034. session.add_all(new_dataset_alias_models)
  3035. session.flush()
  3036. stored_dataset_alias_models.update(
  3037. {
  3038. dataset_alias_model.name: dataset_alias_model
  3039. for dataset_alias_model in new_dataset_alias_models
  3040. }
  3041. )
  3042. del new_dataset_alias_models
  3043. del all_datasets_alias_models
  3044. # reconcile dag-schedule-on-dataset and dag-schedule-on-dataset-alias references
  3045. for dag_id, base_dataset_list in dag_references.items():
  3046. dag_refs_needed = {
  3047. DagScheduleDatasetReference(
  3048. dataset_id=stored_dataset_models[base_dataset_identifier].id, dag_id=dag_id
  3049. )
  3050. if base_dataset_type == "dataset"
  3051. else DagScheduleDatasetAliasReference(
  3052. alias_id=stored_dataset_alias_models[base_dataset_identifier].id, dag_id=dag_id
  3053. )
  3054. for base_dataset_type, base_dataset_identifier in base_dataset_list
  3055. }
  3056. # if isinstance(base_dataset, Dataset)
  3057. dag_refs_stored = (
  3058. set(existing_dags.get(dag_id).schedule_dataset_references) # type: ignore
  3059. | set(existing_dags.get(dag_id).schedule_dataset_alias_references) # type: ignore
  3060. if existing_dags.get(dag_id)
  3061. else set()
  3062. )
  3063. dag_refs_to_add = dag_refs_needed - dag_refs_stored
  3064. session.bulk_save_objects(dag_refs_to_add)
  3065. for obj in dag_refs_stored - dag_refs_needed:
  3066. session.delete(obj)
  3067. existing_task_outlet_refs_dict = defaultdict(set)
  3068. for dag_id, orm_dag in existing_dags.items():
  3069. for todr in orm_dag.task_outlet_dataset_references:
  3070. existing_task_outlet_refs_dict[(dag_id, todr.task_id)].add(todr)
  3071. # reconcile task-outlet-dataset references
  3072. for (dag_id, task_id), uri_list in outlet_references.items():
  3073. task_refs_needed = {
  3074. TaskOutletDatasetReference(
  3075. dataset_id=stored_dataset_models[uri].id, dag_id=dag_id, task_id=task_id
  3076. )
  3077. for uri in uri_list
  3078. }
  3079. task_refs_stored = existing_task_outlet_refs_dict[(dag_id, task_id)]
  3080. task_refs_to_add = {x for x in task_refs_needed if x not in task_refs_stored}
  3081. session.bulk_save_objects(task_refs_to_add)
  3082. for obj in task_refs_stored - task_refs_needed:
  3083. session.delete(obj)
  3084. # Issue SQL/finish "Unit of Work", but let @provide_session commit (or if passed a session, let caller
  3085. # decide when to commit
  3086. session.flush()
  3087. for dag in dags:
  3088. cls.bulk_write_to_db(dag.subdags, processor_subdir=processor_subdir, session=session)
  3089. @classmethod
  3090. def _get_latest_runs_stmt(cls, dags: list[str]) -> Select:
  3091. """
  3092. Build a select statement for retrieve the last automated run for each dag.
  3093. :param dags: dags to query
  3094. """
  3095. if len(dags) == 1:
  3096. # Index optimized fast path to avoid more complicated & slower groupby queryplan
  3097. existing_dag_id = dags[0]
  3098. last_automated_runs_subq = (
  3099. select(func.max(DagRun.execution_date).label("max_execution_date"))
  3100. .where(
  3101. DagRun.dag_id == existing_dag_id,
  3102. DagRun.run_type.in_((DagRunType.BACKFILL_JOB, DagRunType.SCHEDULED)),
  3103. )
  3104. .scalar_subquery()
  3105. )
  3106. query = select(DagRun).where(
  3107. DagRun.dag_id == existing_dag_id, DagRun.execution_date == last_automated_runs_subq
  3108. )
  3109. else:
  3110. last_automated_runs_subq = (
  3111. select(DagRun.dag_id, func.max(DagRun.execution_date).label("max_execution_date"))
  3112. .where(
  3113. DagRun.dag_id.in_(dags),
  3114. DagRun.run_type.in_((DagRunType.BACKFILL_JOB, DagRunType.SCHEDULED)),
  3115. )
  3116. .group_by(DagRun.dag_id)
  3117. .subquery()
  3118. )
  3119. query = select(DagRun).where(
  3120. DagRun.dag_id == last_automated_runs_subq.c.dag_id,
  3121. DagRun.execution_date == last_automated_runs_subq.c.max_execution_date,
  3122. )
  3123. return query.options(
  3124. load_only(
  3125. DagRun.dag_id,
  3126. DagRun.execution_date,
  3127. DagRun.data_interval_start,
  3128. DagRun.data_interval_end,
  3129. )
  3130. )
  3131. @provide_session
  3132. def sync_to_db(self, processor_subdir: str | None = None, session=NEW_SESSION):
  3133. """
  3134. Save attributes about this DAG to the DB.
  3135. Note that this method can be called for both DAGs and SubDAGs. A SubDag is actually a SubDagOperator.
  3136. :return: None
  3137. """
  3138. self.bulk_write_to_db([self], processor_subdir=processor_subdir, session=session)
  3139. def get_default_view(self):
  3140. """Allow backward compatible jinja2 templates."""
  3141. if self.default_view is None:
  3142. return airflow_conf.get("webserver", "dag_default_view").lower()
  3143. else:
  3144. return self.default_view
  3145. @staticmethod
  3146. @provide_session
  3147. def deactivate_unknown_dags(active_dag_ids, session=NEW_SESSION):
  3148. """
  3149. Given a list of known DAGs, deactivate any other DAGs that are marked as active in the ORM.
  3150. :param active_dag_ids: list of DAG IDs that are active
  3151. :return: None
  3152. """
  3153. if not active_dag_ids:
  3154. return
  3155. for dag in session.scalars(select(DagModel).where(~DagModel.dag_id.in_(active_dag_ids))).all():
  3156. dag.is_active = False
  3157. session.merge(dag)
  3158. session.commit()
  3159. @staticmethod
  3160. @provide_session
  3161. def deactivate_stale_dags(expiration_date, session=NEW_SESSION):
  3162. """
  3163. Deactivate any DAGs that were last touched by the scheduler before the expiration date.
  3164. These DAGs were likely deleted.
  3165. :param expiration_date: set inactive DAGs that were touched before this time
  3166. :return: None
  3167. """
  3168. for dag in session.scalars(
  3169. select(DagModel).where(DagModel.last_parsed_time < expiration_date, DagModel.is_active)
  3170. ):
  3171. log.info(
  3172. "Deactivating DAG ID %s since it was last touched by the scheduler at %s",
  3173. dag.dag_id,
  3174. dag.last_parsed_time.isoformat(),
  3175. )
  3176. dag.is_active = False
  3177. session.merge(dag)
  3178. session.commit()
  3179. @staticmethod
  3180. @provide_session
  3181. def get_num_task_instances(dag_id, run_id=None, task_ids=None, states=None, session=NEW_SESSION) -> int:
  3182. """
  3183. Return the number of task instances in the given DAG.
  3184. :param session: ORM session
  3185. :param dag_id: ID of the DAG to get the task concurrency of
  3186. :param run_id: ID of the DAG run to get the task concurrency of
  3187. :param task_ids: A list of valid task IDs for the given DAG
  3188. :param states: A list of states to filter by if supplied
  3189. :return: The number of running tasks
  3190. """
  3191. qry = select(func.count(TaskInstance.task_id)).where(
  3192. TaskInstance.dag_id == dag_id,
  3193. )
  3194. if run_id:
  3195. qry = qry.where(
  3196. TaskInstance.run_id == run_id,
  3197. )
  3198. if task_ids:
  3199. qry = qry.where(
  3200. TaskInstance.task_id.in_(task_ids),
  3201. )
  3202. if states:
  3203. if None in states:
  3204. if all(x is None for x in states):
  3205. qry = qry.where(TaskInstance.state.is_(None))
  3206. else:
  3207. not_none_states = [state for state in states if state]
  3208. qry = qry.where(
  3209. or_(TaskInstance.state.in_(not_none_states), TaskInstance.state.is_(None))
  3210. )
  3211. else:
  3212. qry = qry.where(TaskInstance.state.in_(states))
  3213. return session.scalar(qry)
  3214. @classmethod
  3215. def get_serialized_fields(cls):
  3216. """Stringified DAGs and operators contain exactly these fields."""
  3217. if not cls.__serialized_fields:
  3218. exclusion_list = {
  3219. "parent_dag",
  3220. "schedule_dataset_references",
  3221. "schedule_dataset_alias_references",
  3222. "task_outlet_dataset_references",
  3223. "_old_context_manager_dags",
  3224. "safe_dag_id",
  3225. "last_loaded",
  3226. "user_defined_filters",
  3227. "user_defined_macros",
  3228. "partial",
  3229. "params",
  3230. "_pickle_id",
  3231. "_log",
  3232. "task_dict",
  3233. "template_searchpath",
  3234. "sla_miss_callback",
  3235. "on_success_callback",
  3236. "on_failure_callback",
  3237. "template_undefined",
  3238. "jinja_environment_kwargs",
  3239. # has_on_*_callback are only stored if the value is True, as the default is False
  3240. "has_on_success_callback",
  3241. "has_on_failure_callback",
  3242. "auto_register",
  3243. "fail_stop",
  3244. }
  3245. cls.__serialized_fields = frozenset(vars(DAG(dag_id="test", schedule=None))) - exclusion_list
  3246. return cls.__serialized_fields
  3247. def get_edge_info(self, upstream_task_id: str, downstream_task_id: str) -> EdgeInfoType:
  3248. """Return edge information for the given pair of tasks or an empty edge if there is no information."""
  3249. # Note - older serialized DAGs may not have edge_info being a dict at all
  3250. empty = cast(EdgeInfoType, {})
  3251. if self.edge_info:
  3252. return self.edge_info.get(upstream_task_id, {}).get(downstream_task_id, empty)
  3253. else:
  3254. return empty
  3255. def set_edge_info(self, upstream_task_id: str, downstream_task_id: str, info: EdgeInfoType):
  3256. """
  3257. Set the given edge information on the DAG.
  3258. Note that this will overwrite, rather than merge with, existing info.
  3259. """
  3260. self.edge_info.setdefault(upstream_task_id, {})[downstream_task_id] = info
  3261. def validate_schedule_and_params(self):
  3262. """
  3263. Validate Param values when the DAG has schedule defined.
  3264. Raise exception if there are any Params which can not be resolved by their schema definition.
  3265. """
  3266. if not self.timetable.can_be_scheduled:
  3267. return
  3268. try:
  3269. self.params.validate()
  3270. except ParamValidationError as pverr:
  3271. raise AirflowException(
  3272. "DAG is not allowed to define a Schedule, "
  3273. "if there are any required params without default values or default values are not valid."
  3274. ) from pverr
  3275. def iter_invalid_owner_links(self) -> Iterator[tuple[str, str]]:
  3276. """
  3277. Parse a given link, and verifies if it's a valid URL, or a 'mailto' link.
  3278. Returns an iterator of invalid (owner, link) pairs.
  3279. """
  3280. for owner, link in self.owner_links.items():
  3281. result = urlsplit(link)
  3282. if result.scheme == "mailto":
  3283. # netloc is not existing for 'mailto' link, so we are checking that the path is parsed
  3284. if not result.path:
  3285. yield result.path, link
  3286. elif not result.scheme or not result.netloc:
  3287. yield owner, link
  3288. class DagTag(Base):
  3289. """A tag name per dag, to allow quick filtering in the DAG view."""
  3290. __tablename__ = "dag_tag"
  3291. name = Column(String(TAG_MAX_LEN), primary_key=True)
  3292. dag_id = Column(
  3293. StringID(),
  3294. ForeignKey("dag.dag_id", name="dag_tag_dag_id_fkey", ondelete="CASCADE"),
  3295. primary_key=True,
  3296. )
  3297. __table_args__ = (Index("idx_dag_tag_dag_id", dag_id),)
  3298. def __repr__(self):
  3299. return self.name
  3300. class DagOwnerAttributes(Base):
  3301. """
  3302. Table defining different owner attributes.
  3303. For example, a link for an owner that will be passed as a hyperlink to the "DAGs" view.
  3304. """
  3305. __tablename__ = "dag_owner_attributes"
  3306. dag_id = Column(
  3307. StringID(),
  3308. ForeignKey("dag.dag_id", name="dag.dag_id", ondelete="CASCADE"),
  3309. nullable=False,
  3310. primary_key=True,
  3311. )
  3312. owner = Column(String(500), primary_key=True, nullable=False)
  3313. link = Column(String(500), nullable=False)
  3314. def __repr__(self):
  3315. return f"<DagOwnerAttributes: dag_id={self.dag_id}, owner={self.owner}, link={self.link}>"
  3316. @classmethod
  3317. def get_all(cls, session) -> dict[str, dict[str, str]]:
  3318. dag_links: dict = defaultdict(dict)
  3319. for obj in session.scalars(select(cls)):
  3320. dag_links[obj.dag_id].update({obj.owner: obj.link})
  3321. return dag_links
  3322. class DagModel(Base):
  3323. """Table containing DAG properties."""
  3324. __tablename__ = "dag"
  3325. """
  3326. These items are stored in the database for state related information
  3327. """
  3328. dag_id = Column(StringID(), primary_key=True)
  3329. root_dag_id = Column(StringID())
  3330. # A DAG can be paused from the UI / DB
  3331. # Set this default value of is_paused based on a configuration value!
  3332. is_paused_at_creation = airflow_conf.getboolean("core", "dags_are_paused_at_creation")
  3333. is_paused = Column(Boolean, default=is_paused_at_creation)
  3334. # Whether the DAG is a subdag
  3335. is_subdag = Column(Boolean, default=False)
  3336. # Whether that DAG was seen on the last DagBag load
  3337. is_active = Column(Boolean, default=False)
  3338. # Last time the scheduler started
  3339. last_parsed_time = Column(UtcDateTime)
  3340. # Last time this DAG was pickled
  3341. last_pickled = Column(UtcDateTime)
  3342. # Time when the DAG last received a refresh signal
  3343. # (e.g. the DAG's "refresh" button was clicked in the web UI)
  3344. last_expired = Column(UtcDateTime)
  3345. # Whether (one of) the scheduler is scheduling this DAG at the moment
  3346. scheduler_lock = Column(Boolean)
  3347. # Foreign key to the latest pickle_id
  3348. pickle_id = Column(Integer)
  3349. # The location of the file containing the DAG object
  3350. # Note: Do not depend on fileloc pointing to a file; in the case of a
  3351. # packaged DAG, it will point to the subpath of the DAG within the
  3352. # associated zip.
  3353. fileloc = Column(String(2000))
  3354. # The base directory used by Dag Processor that parsed this dag.
  3355. processor_subdir = Column(String(2000), nullable=True)
  3356. # String representing the owners
  3357. owners = Column(String(2000))
  3358. # Display name of the dag
  3359. _dag_display_property_value = Column("dag_display_name", String(2000), nullable=True)
  3360. # Description of the dag
  3361. description = Column(Text)
  3362. # Default view of the DAG inside the webserver
  3363. default_view = Column(String(25))
  3364. # Schedule interval
  3365. schedule_interval = Column(Interval)
  3366. # Timetable/Schedule Interval description
  3367. timetable_description = Column(String(1000), nullable=True)
  3368. # Dataset expression based on dataset triggers
  3369. dataset_expression = Column(sqlalchemy_jsonfield.JSONField(json=json), nullable=True)
  3370. # Tags for view filter
  3371. tags = relationship("DagTag", cascade="all, delete, delete-orphan", backref=backref("dag"))
  3372. # Dag owner links for DAGs view
  3373. dag_owner_links = relationship(
  3374. "DagOwnerAttributes", cascade="all, delete, delete-orphan", backref=backref("dag")
  3375. )
  3376. max_active_tasks = Column(Integer, nullable=False)
  3377. max_active_runs = Column(Integer, nullable=True)
  3378. max_consecutive_failed_dag_runs = Column(Integer, nullable=False)
  3379. has_task_concurrency_limits = Column(Boolean, nullable=False)
  3380. has_import_errors = Column(Boolean(), default=False, server_default="0")
  3381. # The logical date of the next dag run.
  3382. next_dagrun = Column(UtcDateTime)
  3383. # Must be either both NULL or both datetime.
  3384. next_dagrun_data_interval_start = Column(UtcDateTime)
  3385. next_dagrun_data_interval_end = Column(UtcDateTime)
  3386. # Earliest time at which this ``next_dagrun`` can be created.
  3387. next_dagrun_create_after = Column(UtcDateTime)
  3388. __table_args__ = (
  3389. Index("idx_root_dag_id", root_dag_id, unique=False),
  3390. Index("idx_next_dagrun_create_after", next_dagrun_create_after, unique=False),
  3391. )
  3392. parent_dag = relationship(
  3393. "DagModel", remote_side=[dag_id], primaryjoin=root_dag_id == dag_id, foreign_keys=[root_dag_id]
  3394. )
  3395. schedule_dataset_references = relationship(
  3396. "DagScheduleDatasetReference",
  3397. back_populates="dag",
  3398. cascade="all, delete, delete-orphan",
  3399. )
  3400. schedule_dataset_alias_references = relationship(
  3401. "DagScheduleDatasetAliasReference",
  3402. back_populates="dag",
  3403. cascade="all, delete, delete-orphan",
  3404. )
  3405. schedule_datasets = association_proxy("schedule_dataset_references", "dataset")
  3406. task_outlet_dataset_references = relationship(
  3407. "TaskOutletDatasetReference",
  3408. cascade="all, delete, delete-orphan",
  3409. )
  3410. NUM_DAGS_PER_DAGRUN_QUERY = airflow_conf.getint(
  3411. "scheduler", "max_dagruns_to_create_per_loop", fallback=10
  3412. )
  3413. def __init__(self, concurrency=None, **kwargs):
  3414. super().__init__(**kwargs)
  3415. if self.max_active_tasks is None:
  3416. if concurrency:
  3417. warnings.warn(
  3418. "The 'DagModel.concurrency' parameter is deprecated. Please use 'max_active_tasks'.",
  3419. RemovedInAirflow3Warning,
  3420. stacklevel=2,
  3421. )
  3422. self.max_active_tasks = concurrency
  3423. else:
  3424. self.max_active_tasks = airflow_conf.getint("core", "max_active_tasks_per_dag")
  3425. if self.max_active_runs is None:
  3426. self.max_active_runs = airflow_conf.getint("core", "max_active_runs_per_dag")
  3427. if self.max_consecutive_failed_dag_runs is None:
  3428. self.max_consecutive_failed_dag_runs = airflow_conf.getint(
  3429. "core", "max_consecutive_failed_dag_runs_per_dag"
  3430. )
  3431. if self.has_task_concurrency_limits is None:
  3432. # Be safe -- this will be updated later once the DAG is parsed
  3433. self.has_task_concurrency_limits = True
  3434. def __repr__(self):
  3435. return f"<DAG: {self.dag_id}>"
  3436. @property
  3437. def next_dagrun_data_interval(self) -> DataInterval | None:
  3438. return _get_model_data_interval(
  3439. self,
  3440. "next_dagrun_data_interval_start",
  3441. "next_dagrun_data_interval_end",
  3442. )
  3443. @next_dagrun_data_interval.setter
  3444. def next_dagrun_data_interval(self, value: tuple[datetime, datetime] | None) -> None:
  3445. if value is None:
  3446. self.next_dagrun_data_interval_start = self.next_dagrun_data_interval_end = None
  3447. else:
  3448. self.next_dagrun_data_interval_start, self.next_dagrun_data_interval_end = value
  3449. @property
  3450. def timezone(self):
  3451. return settings.TIMEZONE
  3452. @staticmethod
  3453. @provide_session
  3454. def get_dagmodel(dag_id: str, session: Session = NEW_SESSION) -> DagModel | None:
  3455. return session.get(
  3456. DagModel,
  3457. dag_id,
  3458. options=[joinedload(DagModel.parent_dag)],
  3459. )
  3460. @classmethod
  3461. @internal_api_call
  3462. @provide_session
  3463. def get_current(cls, dag_id: str, session=NEW_SESSION) -> DagModel | DagModelPydantic:
  3464. return session.scalar(select(cls).where(cls.dag_id == dag_id))
  3465. @provide_session
  3466. def get_last_dagrun(self, session=NEW_SESSION, include_externally_triggered=False):
  3467. return get_last_dagrun(
  3468. self.dag_id, session=session, include_externally_triggered=include_externally_triggered
  3469. )
  3470. def get_is_paused(self, *, session: Session | None = None) -> bool:
  3471. """Provide interface compatibility to 'DAG'."""
  3472. return self.is_paused
  3473. def get_is_active(self, *, session: Session | None = None) -> bool:
  3474. """Provide interface compatibility to 'DAG'."""
  3475. return self.is_active
  3476. @staticmethod
  3477. @internal_api_call
  3478. @provide_session
  3479. def get_paused_dag_ids(dag_ids: list[str], session: Session = NEW_SESSION) -> set[str]:
  3480. """
  3481. Given a list of dag_ids, get a set of Paused Dag Ids.
  3482. :param dag_ids: List of Dag ids
  3483. :param session: ORM Session
  3484. :return: Paused Dag_ids
  3485. """
  3486. paused_dag_ids = session.execute(
  3487. select(DagModel.dag_id)
  3488. .where(DagModel.is_paused == expression.true())
  3489. .where(DagModel.dag_id.in_(dag_ids))
  3490. )
  3491. paused_dag_ids = {paused_dag_id for (paused_dag_id,) in paused_dag_ids}
  3492. return paused_dag_ids
  3493. def get_default_view(self) -> str:
  3494. """Get the Default DAG View, returns the default config value if DagModel does not have a value."""
  3495. # This is for backwards-compatibility with old dags that don't have None as default_view
  3496. return self.default_view or airflow_conf.get_mandatory_value("webserver", "dag_default_view").lower()
  3497. @property
  3498. def safe_dag_id(self):
  3499. return self.dag_id.replace(".", "__dot__")
  3500. @property
  3501. def relative_fileloc(self) -> pathlib.Path | None:
  3502. """File location of the importable dag 'file' relative to the configured DAGs folder."""
  3503. if self.fileloc is None:
  3504. return None
  3505. path = pathlib.Path(self.fileloc)
  3506. try:
  3507. return path.relative_to(settings.DAGS_FOLDER)
  3508. except ValueError:
  3509. # Not relative to DAGS_FOLDER.
  3510. return path
  3511. @provide_session
  3512. def set_is_paused(self, is_paused: bool, including_subdags: bool = True, session=NEW_SESSION) -> None:
  3513. """
  3514. Pause/Un-pause a DAG.
  3515. :param is_paused: Is the DAG paused
  3516. :param including_subdags: whether to include the DAG's subdags
  3517. :param session: session
  3518. """
  3519. filter_query = [
  3520. DagModel.dag_id == self.dag_id,
  3521. ]
  3522. if including_subdags:
  3523. filter_query.append(DagModel.root_dag_id == self.dag_id)
  3524. session.execute(
  3525. update(DagModel)
  3526. .where(or_(*filter_query))
  3527. .values(is_paused=is_paused)
  3528. .execution_options(synchronize_session="fetch")
  3529. )
  3530. session.commit()
  3531. @hybrid_property
  3532. def dag_display_name(self) -> str:
  3533. return self._dag_display_property_value or self.dag_id
  3534. @classmethod
  3535. @internal_api_call
  3536. @provide_session
  3537. def deactivate_deleted_dags(
  3538. cls,
  3539. alive_dag_filelocs: Container[str],
  3540. processor_subdir: str,
  3541. session: Session = NEW_SESSION,
  3542. ) -> None:
  3543. """
  3544. Set ``is_active=False`` on the DAGs for which the DAG files have been removed.
  3545. :param alive_dag_filelocs: file paths of alive DAGs
  3546. :param processor_subdir: dag processor subdir
  3547. :param session: ORM Session
  3548. """
  3549. log.debug("Deactivating DAGs (for which DAG files are deleted) from %s table ", cls.__tablename__)
  3550. dag_models = session.scalars(
  3551. select(cls).where(
  3552. cls.fileloc.is_not(None),
  3553. or_(
  3554. cls.processor_subdir.is_(None),
  3555. cls.processor_subdir == processor_subdir,
  3556. ),
  3557. )
  3558. )
  3559. for dag_model in dag_models:
  3560. if dag_model.fileloc not in alive_dag_filelocs:
  3561. dag_model.is_active = False
  3562. @classmethod
  3563. def dags_needing_dagruns(cls, session: Session) -> tuple[Query, dict[str, tuple[datetime, datetime]]]:
  3564. """
  3565. Return (and lock) a list of Dag objects that are due to create a new DagRun.
  3566. This will return a resultset of rows that is row-level-locked with a "SELECT ... FOR UPDATE" query,
  3567. you should ensure that any scheduling decisions are made in a single transaction -- as soon as the
  3568. transaction is committed it will be unlocked.
  3569. """
  3570. from airflow.models.serialized_dag import SerializedDagModel
  3571. def dag_ready(dag_id: str, cond: BaseDataset, statuses: dict) -> bool | None:
  3572. # if dag was serialized before 2.9 and we *just* upgraded,
  3573. # we may be dealing with old version. In that case,
  3574. # just wait for the dag to be reserialized.
  3575. try:
  3576. return cond.evaluate(statuses)
  3577. except AttributeError:
  3578. log.warning("dag '%s' has old serialization; skipping DAG run creation.", dag_id)
  3579. return None
  3580. # this loads all the DDRQ records.... may need to limit num dags
  3581. all_records = session.scalars(select(DatasetDagRunQueue)).all()
  3582. by_dag = defaultdict(list)
  3583. for r in all_records:
  3584. by_dag[r.target_dag_id].append(r)
  3585. del all_records
  3586. dag_statuses = {}
  3587. for dag_id, records in by_dag.items():
  3588. dag_statuses[dag_id] = {x.dataset.uri: True for x in records}
  3589. ser_dags = session.scalars(
  3590. select(SerializedDagModel).where(SerializedDagModel.dag_id.in_(dag_statuses.keys()))
  3591. ).all()
  3592. for ser_dag in ser_dags:
  3593. dag_id = ser_dag.dag_id
  3594. statuses = dag_statuses[dag_id]
  3595. if not dag_ready(dag_id, cond=ser_dag.dag.timetable.dataset_condition, statuses=statuses):
  3596. del by_dag[dag_id]
  3597. del dag_statuses[dag_id]
  3598. del dag_statuses
  3599. dataset_triggered_dag_info = {}
  3600. for dag_id, records in by_dag.items():
  3601. times = sorted(x.created_at for x in records)
  3602. dataset_triggered_dag_info[dag_id] = (times[0], times[-1])
  3603. del by_dag
  3604. dataset_triggered_dag_ids = set(dataset_triggered_dag_info.keys())
  3605. if dataset_triggered_dag_ids:
  3606. exclusion_list = set(
  3607. session.scalars(
  3608. select(DagModel.dag_id)
  3609. .join(DagRun.dag_model)
  3610. .where(DagRun.state.in_((DagRunState.QUEUED, DagRunState.RUNNING)))
  3611. .where(DagModel.dag_id.in_(dataset_triggered_dag_ids))
  3612. .group_by(DagModel.dag_id)
  3613. .having(func.count() >= func.max(DagModel.max_active_runs))
  3614. )
  3615. )
  3616. if exclusion_list:
  3617. dataset_triggered_dag_ids -= exclusion_list
  3618. dataset_triggered_dag_info = {
  3619. k: v for k, v in dataset_triggered_dag_info.items() if k not in exclusion_list
  3620. }
  3621. # We limit so that _one_ scheduler doesn't try to do all the creation of dag runs
  3622. query = (
  3623. select(cls)
  3624. .where(
  3625. cls.is_paused == expression.false(),
  3626. cls.is_active == expression.true(),
  3627. cls.has_import_errors == expression.false(),
  3628. or_(
  3629. cls.next_dagrun_create_after <= func.now(),
  3630. cls.dag_id.in_(dataset_triggered_dag_ids),
  3631. ),
  3632. )
  3633. .order_by(cls.next_dagrun_create_after)
  3634. .limit(cls.NUM_DAGS_PER_DAGRUN_QUERY)
  3635. )
  3636. return (
  3637. session.scalars(with_row_locks(query, of=cls, session=session, skip_locked=True)),
  3638. dataset_triggered_dag_info,
  3639. )
  3640. def calculate_dagrun_date_fields(
  3641. self,
  3642. dag: DAG,
  3643. last_automated_dag_run: None | datetime | DataInterval,
  3644. ) -> None:
  3645. """
  3646. Calculate ``next_dagrun`` and `next_dagrun_create_after``.
  3647. :param dag: The DAG object
  3648. :param last_automated_dag_run: DataInterval (or datetime) of most recent run of this dag, or none
  3649. if not yet scheduled.
  3650. """
  3651. last_automated_data_interval: DataInterval | None
  3652. if isinstance(last_automated_dag_run, datetime):
  3653. warnings.warn(
  3654. "Passing a datetime to `DagModel.calculate_dagrun_date_fields` is deprecated. "
  3655. "Provide a data interval instead.",
  3656. RemovedInAirflow3Warning,
  3657. stacklevel=2,
  3658. )
  3659. last_automated_data_interval = dag.infer_automated_data_interval(last_automated_dag_run)
  3660. else:
  3661. last_automated_data_interval = last_automated_dag_run
  3662. next_dagrun_info = dag.next_dagrun_info(last_automated_data_interval)
  3663. if next_dagrun_info is None:
  3664. self.next_dagrun_data_interval = self.next_dagrun = self.next_dagrun_create_after = None
  3665. else:
  3666. self.next_dagrun_data_interval = next_dagrun_info.data_interval
  3667. self.next_dagrun = next_dagrun_info.logical_date
  3668. self.next_dagrun_create_after = next_dagrun_info.run_after
  3669. log.info(
  3670. "Setting next_dagrun for %s to %s, run_after=%s",
  3671. dag.dag_id,
  3672. self.next_dagrun,
  3673. self.next_dagrun_create_after,
  3674. )
  3675. @provide_session
  3676. def get_dataset_triggered_next_run_info(self, *, session=NEW_SESSION) -> dict[str, int | str] | None:
  3677. if self.schedule_interval != "Dataset":
  3678. return None
  3679. return get_dataset_triggered_next_run_info([self.dag_id], session=session)[self.dag_id]
  3680. # NOTE: Please keep the list of arguments in sync with DAG.__init__.
  3681. # Only exception: dag_id here should have a default value, but not in DAG.
  3682. def dag(
  3683. dag_id: str = "",
  3684. description: str | None = None,
  3685. schedule: ScheduleArg = NOTSET,
  3686. schedule_interval: ScheduleIntervalArg = NOTSET,
  3687. timetable: Timetable | None = None,
  3688. start_date: datetime | None = None,
  3689. end_date: datetime | None = None,
  3690. full_filepath: str | None = None,
  3691. template_searchpath: str | Iterable[str] | None = None,
  3692. template_undefined: type[jinja2.StrictUndefined] = jinja2.StrictUndefined,
  3693. user_defined_macros: dict | None = None,
  3694. user_defined_filters: dict | None = None,
  3695. default_args: dict | None = None,
  3696. concurrency: int | None = None,
  3697. max_active_tasks: int = airflow_conf.getint("core", "max_active_tasks_per_dag"),
  3698. max_active_runs: int = airflow_conf.getint("core", "max_active_runs_per_dag"),
  3699. max_consecutive_failed_dag_runs: int = airflow_conf.getint(
  3700. "core", "max_consecutive_failed_dag_runs_per_dag"
  3701. ),
  3702. dagrun_timeout: timedelta | None = None,
  3703. sla_miss_callback: None | SLAMissCallback | list[SLAMissCallback] = None,
  3704. default_view: str = airflow_conf.get_mandatory_value("webserver", "dag_default_view").lower(),
  3705. orientation: str = airflow_conf.get_mandatory_value("webserver", "dag_orientation"),
  3706. catchup: bool = airflow_conf.getboolean("scheduler", "catchup_by_default"),
  3707. on_success_callback: None | DagStateChangeCallback | list[DagStateChangeCallback] = None,
  3708. on_failure_callback: None | DagStateChangeCallback | list[DagStateChangeCallback] = None,
  3709. doc_md: str | None = None,
  3710. params: abc.MutableMapping | None = None,
  3711. access_control: dict[str, dict[str, Collection[str]]] | dict[str, Collection[str]] | None = None,
  3712. is_paused_upon_creation: bool | None = None,
  3713. jinja_environment_kwargs: dict | None = None,
  3714. render_template_as_native_obj: bool = False,
  3715. tags: list[str] | None = None,
  3716. owner_links: dict[str, str] | None = None,
  3717. auto_register: bool = True,
  3718. fail_stop: bool = False,
  3719. dag_display_name: str | None = None,
  3720. ) -> Callable[[Callable], Callable[..., DAG]]:
  3721. """
  3722. Python dag decorator which wraps a function into an Airflow DAG.
  3723. Accepts kwargs for operator kwarg. Can be used to parameterize DAGs.
  3724. :param dag_args: Arguments for DAG object
  3725. :param dag_kwargs: Kwargs for DAG object.
  3726. """
  3727. def wrapper(f: Callable) -> Callable[..., DAG]:
  3728. @functools.wraps(f)
  3729. def factory(*args, **kwargs):
  3730. # Generate signature for decorated function and bind the arguments when called
  3731. # we do this to extract parameters, so we can annotate them on the DAG object.
  3732. # In addition, this fails if we are missing any args/kwargs with TypeError as expected.
  3733. f_sig = signature(f).bind(*args, **kwargs)
  3734. # Apply defaults to capture default values if set.
  3735. f_sig.apply_defaults()
  3736. # Initialize DAG with bound arguments
  3737. with DAG(
  3738. dag_id or f.__name__,
  3739. description=description,
  3740. schedule_interval=schedule_interval,
  3741. timetable=timetable,
  3742. start_date=start_date,
  3743. end_date=end_date,
  3744. full_filepath=full_filepath,
  3745. template_searchpath=template_searchpath,
  3746. template_undefined=template_undefined,
  3747. user_defined_macros=user_defined_macros,
  3748. user_defined_filters=user_defined_filters,
  3749. default_args=default_args,
  3750. concurrency=concurrency,
  3751. max_active_tasks=max_active_tasks,
  3752. max_active_runs=max_active_runs,
  3753. max_consecutive_failed_dag_runs=max_consecutive_failed_dag_runs,
  3754. dagrun_timeout=dagrun_timeout,
  3755. sla_miss_callback=sla_miss_callback,
  3756. default_view=default_view,
  3757. orientation=orientation,
  3758. catchup=catchup,
  3759. on_success_callback=on_success_callback,
  3760. on_failure_callback=on_failure_callback,
  3761. doc_md=doc_md,
  3762. params=params,
  3763. access_control=access_control,
  3764. is_paused_upon_creation=is_paused_upon_creation,
  3765. jinja_environment_kwargs=jinja_environment_kwargs,
  3766. render_template_as_native_obj=render_template_as_native_obj,
  3767. tags=tags,
  3768. schedule=schedule,
  3769. owner_links=owner_links,
  3770. auto_register=auto_register,
  3771. fail_stop=fail_stop,
  3772. dag_display_name=dag_display_name,
  3773. ) as dag_obj:
  3774. # Set DAG documentation from function documentation if it exists and doc_md is not set.
  3775. if f.__doc__ and not dag_obj.doc_md:
  3776. dag_obj.doc_md = f.__doc__
  3777. # Generate DAGParam for each function arg/kwarg and replace it for calling the function.
  3778. # All args/kwargs for function will be DAGParam object and replaced on execution time.
  3779. f_kwargs = {}
  3780. for name, value in f_sig.arguments.items():
  3781. f_kwargs[name] = dag_obj.param(name, value)
  3782. # set file location to caller source path
  3783. back = sys._getframe().f_back
  3784. dag_obj.fileloc = back.f_code.co_filename if back else ""
  3785. # Invoke function to create operators in the DAG scope.
  3786. f(**f_kwargs)
  3787. # Return dag object such that it's accessible in Globals.
  3788. return dag_obj
  3789. # Ensure that warnings from inside DAG() are emitted from the caller, not here
  3790. fixup_decorator_warning_stack(factory)
  3791. return factory
  3792. return wrapper
  3793. STATICA_HACK = True
  3794. globals()["kcah_acitats"[::-1].upper()] = False
  3795. if STATICA_HACK: # pragma: no cover
  3796. from airflow.models.serialized_dag import SerializedDagModel
  3797. DagModel.serialized_dag = relationship(SerializedDagModel)
  3798. """:sphinx-autoapi-skip:"""
  3799. class DagContext:
  3800. """
  3801. DAG context is used to keep the current DAG when DAG is used as ContextManager.
  3802. You can use DAG as context:
  3803. .. code-block:: python
  3804. with DAG(
  3805. dag_id="example_dag",
  3806. default_args=default_args,
  3807. schedule="0 0 * * *",
  3808. dagrun_timeout=timedelta(minutes=60),
  3809. ) as dag:
  3810. ...
  3811. If you do this the context stores the DAG and whenever new task is created, it will use
  3812. such stored DAG as the parent DAG.
  3813. """
  3814. _context_managed_dags: deque[DAG] = deque()
  3815. autoregistered_dags: set[tuple[DAG, ModuleType]] = set()
  3816. current_autoregister_module_name: str | None = None
  3817. @classmethod
  3818. def push_context_managed_dag(cls, dag: DAG):
  3819. cls._context_managed_dags.appendleft(dag)
  3820. @classmethod
  3821. def pop_context_managed_dag(cls) -> DAG | None:
  3822. dag = cls._context_managed_dags.popleft()
  3823. # In a few cases around serialization we explicitly push None in to the stack
  3824. if cls.current_autoregister_module_name is not None and dag and dag.auto_register:
  3825. mod = sys.modules[cls.current_autoregister_module_name]
  3826. cls.autoregistered_dags.add((dag, mod))
  3827. return dag
  3828. @classmethod
  3829. def get_current_dag(cls) -> DAG | None:
  3830. try:
  3831. return cls._context_managed_dags[0]
  3832. except IndexError:
  3833. return None
  3834. def _run_inline_trigger(trigger):
  3835. async def _run_inline_trigger_main():
  3836. async for event in trigger.run():
  3837. return event
  3838. return asyncio.run(_run_inline_trigger_main())
  3839. def _run_task(
  3840. *, ti: TaskInstance, inline_trigger: bool = False, mark_success: bool = False, session: Session
  3841. ):
  3842. """
  3843. Run a single task instance, and push result to Xcom for downstream tasks.
  3844. Bypasses a lot of extra steps used in `task.run` to keep our local running as fast as
  3845. possible. This function is only meant for the `dag.test` function as a helper function.
  3846. Args:
  3847. ti: TaskInstance to run
  3848. """
  3849. log.info("[DAG TEST] starting task_id=%s map_index=%s", ti.task_id, ti.map_index)
  3850. while True:
  3851. try:
  3852. log.info("[DAG TEST] running task %s", ti)
  3853. ti._run_raw_task(session=session, raise_on_defer=inline_trigger, mark_success=mark_success)
  3854. break
  3855. except TaskDeferred as e:
  3856. log.info("[DAG TEST] running trigger in line")
  3857. event = _run_inline_trigger(e.trigger)
  3858. ti.next_method = e.method_name
  3859. ti.next_kwargs = {"event": event.payload} if event else e.kwargs
  3860. log.info("[DAG TEST] Trigger completed")
  3861. session.merge(ti)
  3862. session.commit()
  3863. log.info("[DAG TEST] end task task_id=%s map_index=%s", ti.task_id, ti.map_index)
  3864. def _get_or_create_dagrun(
  3865. dag: DAG,
  3866. conf: dict[Any, Any] | None,
  3867. start_date: datetime,
  3868. execution_date: datetime,
  3869. run_id: str,
  3870. session: Session,
  3871. data_interval: tuple[datetime, datetime] | None = None,
  3872. ) -> DagRun:
  3873. """
  3874. Create a DAG run, replacing an existing instance if needed to prevent collisions.
  3875. This function is only meant to be used by :meth:`DAG.test` as a helper function.
  3876. :param dag: DAG to be used to find run.
  3877. :param conf: Configuration to pass to newly created run.
  3878. :param start_date: Start date of new run.
  3879. :param execution_date: Logical date for finding an existing run.
  3880. :param run_id: Run ID for the new DAG run.
  3881. :return: The newly created DAG run.
  3882. """
  3883. log.info("dagrun id: %s", dag.dag_id)
  3884. dr: DagRun = session.scalar(
  3885. select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.execution_date == execution_date)
  3886. )
  3887. if dr:
  3888. session.delete(dr)
  3889. session.commit()
  3890. dr = dag.create_dagrun(
  3891. state=DagRunState.RUNNING,
  3892. execution_date=execution_date,
  3893. run_id=run_id,
  3894. start_date=start_date or execution_date,
  3895. session=session,
  3896. conf=conf,
  3897. data_interval=data_interval,
  3898. )
  3899. log.info("created dagrun %s", dr)
  3900. return dr