views.py 221 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743474447454746474747484749475047514752475347544755475647574758475947604761476247634764476547664767476847694770477147724773477447754776477747784779478047814782478347844785478647874788478947904791479247934794479547964797479847994800480148024803480448054806480748084809481048114812481348144815481648174818481948204821482248234824482548264827482848294830483148324833483448354836483748384839484048414842484348444845484648474848484948504851485248534854485548564857485848594860486148624863486448654866486748684869487048714872487348744875487648774878487948804881488248834884488548864887488848894890489148924893489448954896489748984899490049014902490349044905490649074908490949104911491249134914491549164917491849194920492149224923492449254926492749284929493049314932493349344935493649374938493949404941494249434944494549464947494849494950495149524953495449554956495749584959496049614962496349644965496649674968496949704971497249734974497549764977497849794980498149824983498449854986498749884989499049914992499349944995499649974998499950005001500250035004500550065007500850095010501150125013501450155016501750185019502050215022502350245025502650275028502950305031503250335034503550365037503850395040504150425043504450455046504750485049505050515052505350545055505650575058505950605061506250635064506550665067506850695070507150725073507450755076507750785079508050815082508350845085508650875088508950905091509250935094509550965097509850995100510151025103510451055106510751085109511051115112511351145115511651175118511951205121512251235124512551265127512851295130513151325133513451355136513751385139514051415142514351445145514651475148514951505151515251535154515551565157515851595160516151625163516451655166516751685169517051715172517351745175517651775178517951805181518251835184518551865187518851895190519151925193519451955196519751985199520052015202520352045205520652075208520952105211521252135214521552165217521852195220522152225223522452255226522752285229523052315232523352345235523652375238523952405241524252435244524552465247524852495250525152525253525452555256525752585259526052615262526352645265526652675268526952705271527252735274527552765277527852795280528152825283528452855286528752885289529052915292529352945295529652975298529953005301530253035304530553065307530853095310531153125313531453155316531753185319532053215322532353245325532653275328532953305331533253335334533553365337533853395340534153425343534453455346534753485349535053515352535353545355535653575358535953605361536253635364536553665367536853695370537153725373537453755376537753785379538053815382538353845385538653875388538953905391539253935394539553965397539853995400540154025403540454055406540754085409541054115412541354145415541654175418541954205421542254235424542554265427542854295430543154325433543454355436543754385439544054415442544354445445544654475448544954505451545254535454545554565457545854595460546154625463546454655466546754685469547054715472547354745475547654775478547954805481548254835484548554865487548854895490549154925493549454955496549754985499550055015502550355045505550655075508550955105511551255135514551555165517551855195520552155225523552455255526552755285529553055315532553355345535553655375538553955405541554255435544554555465547554855495550555155525553555455555556555755585559556055615562556355645565556655675568556955705571557255735574557555765577557855795580558155825583558455855586558755885589559055915592559355945595559655975598559956005601560256035604560556065607560856095610561156125613561456155616561756185619562056215622562356245625562656275628562956305631563256335634563556365637563856395640564156425643564456455646564756485649565056515652565356545655565656575658565956605661566256635664566556665667566856695670567156725673567456755676567756785679568056815682568356845685568656875688568956905691569256935694569556965697569856995700570157025703570457055706570757085709571057115712571357145715571657175718571957205721572257235724572557265727572857295730573157325733573457355736573757385739574057415742574357445745574657475748574957505751575257535754575557565757575857595760576157625763576457655766576757685769577057715772577357745775577657775778577957805781578257835784578557865787578857895790579157925793579457955796579757985799580058015802580358045805580658075808580958105811
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import collections.abc
  20. import contextlib
  21. import copy
  22. import datetime
  23. import itertools
  24. import json
  25. import logging
  26. import math
  27. import operator
  28. import os
  29. import sys
  30. import traceback
  31. import warnings
  32. from bisect import insort_left
  33. from collections import defaultdict
  34. from functools import cached_property
  35. from json import JSONDecodeError
  36. from pathlib import Path
  37. from typing import TYPE_CHECKING, Any, Collection, Iterator, Mapping, MutableMapping, Sequence
  38. from urllib.parse import unquote, urlencode, urljoin, urlparse, urlsplit
  39. import configupdater
  40. import flask.json
  41. import lazy_object_proxy
  42. import re2
  43. import sqlalchemy as sqla
  44. from croniter import croniter
  45. from flask import (
  46. Response,
  47. abort,
  48. before_render_template,
  49. current_app,
  50. flash,
  51. g,
  52. has_request_context,
  53. make_response,
  54. redirect,
  55. render_template,
  56. request,
  57. send_from_directory,
  58. session as flask_session,
  59. url_for,
  60. )
  61. from flask_appbuilder import BaseView, ModelView, expose
  62. from flask_appbuilder._compat import as_unicode
  63. from flask_appbuilder.actions import action
  64. from flask_appbuilder.const import FLAMSG_ERR_SEC_ACCESS_DENIED
  65. from flask_appbuilder.models.sqla.filters import BaseFilter
  66. from flask_appbuilder.urltools import get_order_args, get_page_args, get_page_size_args
  67. from flask_appbuilder.widgets import FormWidget
  68. from flask_babel import lazy_gettext
  69. from itsdangerous import URLSafeSerializer
  70. from jinja2.utils import htmlsafe_json_dumps, pformat # type: ignore
  71. from markupsafe import Markup, escape
  72. from pendulum.datetime import DateTime
  73. from pendulum.parsing.exceptions import ParserError
  74. from sqlalchemy import and_, case, desc, func, inspect, or_, select, union_all
  75. from sqlalchemy.exc import IntegrityError
  76. from sqlalchemy.orm import joinedload
  77. from wtforms import BooleanField, validators
  78. import airflow
  79. from airflow import models, plugins_manager, settings
  80. from airflow.api.common.airflow_health import get_airflow_health
  81. from airflow.api.common.mark_tasks import (
  82. set_dag_run_state_to_failed,
  83. set_dag_run_state_to_queued,
  84. set_dag_run_state_to_success,
  85. set_state,
  86. )
  87. from airflow.auth.managers.models.resource_details import AccessView, DagAccessEntity, DagDetails
  88. from airflow.compat.functools import cache
  89. from airflow.configuration import AIRFLOW_CONFIG, conf
  90. from airflow.datasets import Dataset, DatasetAlias
  91. from airflow.exceptions import (
  92. AirflowConfigException,
  93. AirflowException,
  94. AirflowNotFoundException,
  95. ParamValidationError,
  96. RemovedInAirflow3Warning,
  97. )
  98. from airflow.executors.executor_loader import ExecutorLoader
  99. from airflow.hooks.base import BaseHook
  100. from airflow.jobs.job import Job
  101. from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
  102. from airflow.jobs.triggerer_job_runner import TriggererJobRunner
  103. from airflow.models import Connection, DagModel, DagTag, Log, SlaMiss, Trigger, XCom
  104. from airflow.models.dag import get_dataset_triggered_next_run_info
  105. from airflow.models.dagrun import RUN_ID_REGEX, DagRun, DagRunType
  106. from airflow.models.dataset import DagScheduleDatasetReference, DatasetDagRunQueue, DatasetEvent, DatasetModel
  107. from airflow.models.errors import ParseImportError
  108. from airflow.models.serialized_dag import SerializedDagModel
  109. from airflow.models.taskinstance import TaskInstance, TaskInstanceNote
  110. from airflow.plugins_manager import PLUGINS_ATTRIBUTES_TO_DUMP
  111. from airflow.providers_manager import ProvidersManager
  112. from airflow.security import permissions
  113. from airflow.ti_deps.dep_context import DepContext
  114. from airflow.ti_deps.dependencies_deps import SCHEDULER_QUEUED_DEPS
  115. from airflow.timetables._cron import CronMixin
  116. from airflow.timetables.base import DataInterval, TimeRestriction
  117. from airflow.timetables.simple import ContinuousTimetable
  118. from airflow.utils import json as utils_json, timezone, yaml
  119. from airflow.utils.airflow_flask_app import get_airflow_app
  120. from airflow.utils.dag_edges import dag_edges
  121. from airflow.utils.db import get_query_count
  122. from airflow.utils.docs import get_doc_url_for_provider, get_docs_url
  123. from airflow.utils.helpers import exactly_one
  124. from airflow.utils.log import secrets_masker
  125. from airflow.utils.log.log_reader import TaskLogReader
  126. from airflow.utils.net import get_hostname
  127. from airflow.utils.session import NEW_SESSION, create_session, provide_session
  128. from airflow.utils.state import DagRunState, State, TaskInstanceState
  129. from airflow.utils.strings import to_boolean
  130. from airflow.utils.task_group import TaskGroup, task_group_to_dict
  131. from airflow.utils.timezone import td_format, utcnow
  132. from airflow.utils.types import NOTSET
  133. from airflow.version import version
  134. from airflow.www import auth, utils as wwwutils
  135. from airflow.www.decorators import action_logging, gzipped
  136. from airflow.www.extensions.init_auth_manager import get_auth_manager, is_auth_manager_initialized
  137. from airflow.www.forms import (
  138. DagRunEditForm,
  139. DateTimeForm,
  140. TaskInstanceEditForm,
  141. create_connection_form_class,
  142. )
  143. from airflow.www.widgets import AirflowModelListWidget, AirflowVariableShowWidget
  144. if TYPE_CHECKING:
  145. from sqlalchemy.orm import Session
  146. from airflow.auth.managers.models.batch_apis import IsAuthorizedDagRequest
  147. from airflow.models.dag import DAG
  148. from airflow.models.operator import Operator
  149. PAGE_SIZE = conf.getint("webserver", "page_size")
  150. FILTER_TAGS_COOKIE = "tags_filter"
  151. FILTER_STATUS_COOKIE = "dag_status_filter"
  152. FILTER_LASTRUN_COOKIE = "last_run_filter"
  153. LINECHART_X_AXIS_TICKFORMAT = (
  154. "function (d, i) { let xLabel;"
  155. "if (i === undefined) {xLabel = d3.time.format('%H:%M, %d %b %Y')(new Date(parseInt(d)));"
  156. "} else {xLabel = d3.time.format('%H:%M, %d %b')(new Date(parseInt(d)));} return xLabel;}"
  157. )
  158. SENSITIVE_FIELD_PLACEHOLDER = "RATHER_LONG_SENSITIVE_FIELD_PLACEHOLDER"
  159. logger = logging.getLogger(__name__)
  160. def sanitize_args(args: dict[str, Any]) -> dict[str, Any]:
  161. """
  162. Remove all parameters starting with `_`.
  163. :param args: arguments of request
  164. :return: copy of the dictionary passed as input with args starting with `_` removed.
  165. """
  166. return {key: value for key, value in args.items() if not key.startswith("_")}
  167. # Following the release of https://github.com/python/cpython/issues/102153 in Python 3.8.17 and 3.9.17 on
  168. # June 6, 2023, we are adding extra sanitization of the urls passed to get_safe_url method to make it works
  169. # the same way regardless if the user uses latest Python patchlevel versions or not. This also follows
  170. # a recommended solution by the Python core team.
  171. #
  172. # From: https://github.com/python/cpython/commit/d28bafa2d3e424b6fdcfd7ae7cde8e71d7177369
  173. #
  174. # We recommend that users of these APIs where the values may be used anywhere
  175. # with security implications code defensively. Do some verification within your
  176. # code before trusting a returned component part. Does that ``scheme`` make
  177. # sense? Is that a sensible ``path``? Is there anything strange about that
  178. # ``hostname``? etc.
  179. #
  180. # C0 control and space to be stripped per WHATWG spec.
  181. # == "".join([chr(i) for i in range(0, 0x20 + 1)])
  182. _WHATWG_C0_CONTROL_OR_SPACE = (
  183. "\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c"
  184. "\r\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f "
  185. )
  186. def get_safe_url(url):
  187. """Given a user-supplied URL, ensure it points to our web server."""
  188. if not url:
  189. return url_for("Airflow.index")
  190. # If the url contains semicolon, redirect it to homepage to avoid
  191. # potential XSS. (Similar to https://github.com/python/cpython/pull/24297/files (bpo-42967))
  192. if ";" in unquote(url):
  193. return url_for("Airflow.index")
  194. url = url.lstrip(_WHATWG_C0_CONTROL_OR_SPACE)
  195. host_url = urlsplit(request.host_url)
  196. redirect_url = urlsplit(urljoin(request.host_url, url))
  197. if not (redirect_url.scheme in ("http", "https") and host_url.netloc == redirect_url.netloc):
  198. return url_for("Airflow.index")
  199. # This will ensure we only redirect to the right scheme/netloc
  200. return redirect_url.geturl()
  201. def get_date_time_num_runs_dag_runs_form_data(www_request, session, dag):
  202. """Get Execution Data, Base Date & Number of runs from a Request."""
  203. date_time = www_request.args.get("execution_date")
  204. run_id = www_request.args.get("run_id")
  205. # First check run id, then check execution date, if not fall back on the latest dagrun
  206. if run_id:
  207. dagrun = dag.get_dagrun(run_id=run_id, session=session)
  208. date_time = dagrun.execution_date
  209. elif date_time:
  210. date_time = _safe_parse_datetime(date_time)
  211. else:
  212. date_time = dag.get_latest_execution_date(session=session) or timezone.utcnow()
  213. base_date = www_request.args.get("base_date")
  214. if base_date:
  215. base_date = _safe_parse_datetime(base_date)
  216. else:
  217. # The DateTimeField widget truncates milliseconds and would loose
  218. # the first dag run. Round to next second.
  219. base_date = (date_time + datetime.timedelta(seconds=1)).replace(microsecond=0)
  220. default_dag_run = conf.getint("webserver", "default_dag_run_display_number")
  221. num_runs = www_request.args.get("num_runs", default=default_dag_run, type=int)
  222. # When base_date has been rounded up because of the DateTimeField widget, we want
  223. # to use the execution_date as the starting point for our query just to ensure a
  224. # link targeting a specific dag run actually loads that dag run. If there are
  225. # more than num_runs dag runs in the "rounded period" then those dagruns would get
  226. # loaded and the actual requested run would be excluded by the limit(). Once
  227. # the user has changed base date to be anything else we want to use that instead.
  228. query_date = base_date
  229. if date_time < base_date <= date_time + datetime.timedelta(seconds=1):
  230. query_date = date_time
  231. drs = session.scalars(
  232. select(DagRun)
  233. .where(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= query_date)
  234. .order_by(desc(DagRun.execution_date))
  235. .limit(num_runs)
  236. ).all()
  237. dr_choices = []
  238. dr_state = None
  239. for dr in drs:
  240. dr_choices.append((dr.execution_date.isoformat(), dr.run_id))
  241. if date_time == dr.execution_date:
  242. dr_state = dr.state
  243. # Happens if base_date was changed and the selected dag run is not in result
  244. if not dr_state and drs:
  245. dr = drs[0]
  246. date_time = dr.execution_date
  247. dr_state = dr.state
  248. return {
  249. "dttm": date_time,
  250. "base_date": base_date,
  251. "num_runs": num_runs,
  252. "execution_date": date_time.isoformat(),
  253. "dr_choices": dr_choices,
  254. "dr_state": dr_state,
  255. }
  256. def _safe_parse_datetime(v, *, allow_empty=False, strict=True) -> datetime.datetime | None:
  257. """
  258. Parse datetime and return error message for invalid dates.
  259. :param v: the string value to be parsed
  260. :param allow_empty: Set True to return none if empty str or None
  261. :param strict: if False, it will fall back on the dateutil parser if unable to parse with pendulum
  262. """
  263. if allow_empty is True and not v:
  264. return None
  265. try:
  266. return timezone.parse(v, strict=strict)
  267. except (TypeError, ParserError):
  268. abort(400, f"Invalid datetime: {v!r}")
  269. def node_dict(node_id, label, node_class):
  270. return {
  271. "id": node_id,
  272. "value": {"label": label, "rx": 5, "ry": 5, "class": node_class},
  273. }
  274. def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], session: Session) -> dict[str, Any]:
  275. """
  276. Create a nested dict representation of the DAG's TaskGroup and its children.
  277. Used to construct the Graph and Grid views.
  278. """
  279. query = session.execute(
  280. select(
  281. TaskInstance.task_id,
  282. TaskInstance.run_id,
  283. TaskInstance.state,
  284. case(
  285. (TaskInstance.map_index == -1, TaskInstance.try_number),
  286. else_=None,
  287. ).label("try_number"),
  288. func.min(TaskInstanceNote.content).label("note"),
  289. func.count(func.coalesce(TaskInstance.state, sqla.literal("no_status"))).label("state_count"),
  290. func.min(TaskInstance.queued_dttm).label("queued_dttm"),
  291. func.min(TaskInstance.start_date).label("start_date"),
  292. func.max(TaskInstance.end_date).label("end_date"),
  293. )
  294. .join(TaskInstance.task_instance_note, isouter=True)
  295. .where(
  296. TaskInstance.dag_id == dag.dag_id,
  297. TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
  298. )
  299. .group_by(
  300. TaskInstance.task_id,
  301. TaskInstance.run_id,
  302. TaskInstance.state,
  303. case(
  304. (TaskInstance.map_index == -1, TaskInstance.try_number),
  305. else_=None,
  306. ),
  307. )
  308. .order_by(TaskInstance.task_id, TaskInstance.run_id)
  309. )
  310. grouped_tis: dict[str, list[TaskInstance]] = collections.defaultdict(
  311. list,
  312. ((task_id, list(tis)) for task_id, tis in itertools.groupby(query, key=lambda ti: ti.task_id)),
  313. )
  314. @cache
  315. def get_task_group_children_getter() -> operator.methodcaller:
  316. sort_order = conf.get("webserver", "grid_view_sorting_order", fallback="topological")
  317. if sort_order == "topological":
  318. return operator.methodcaller("topological_sort")
  319. if sort_order == "hierarchical_alphabetical":
  320. return operator.methodcaller("hierarchical_alphabetical_sort")
  321. raise AirflowConfigException(f"Unsupported grid_view_sorting_order: {sort_order}")
  322. def task_group_to_grid(item: Operator | TaskGroup) -> dict[str, Any]:
  323. if not isinstance(item, TaskGroup):
  324. def _mapped_summary(ti_summaries: list[TaskInstance]) -> Iterator[dict[str, Any]]:
  325. run_id = ""
  326. record: dict[str, Any] = {}
  327. def set_overall_state(record):
  328. for state in wwwutils.priority:
  329. if state in record["mapped_states"]:
  330. record["state"] = state
  331. break
  332. # When turning the dict into JSON we can't have None as a key,
  333. # so use the string that the UI does.
  334. with contextlib.suppress(KeyError):
  335. record["mapped_states"]["no_status"] = record["mapped_states"].pop(None)
  336. for ti_summary in ti_summaries:
  337. if run_id != ti_summary.run_id:
  338. run_id = ti_summary.run_id
  339. if record:
  340. set_overall_state(record)
  341. yield record
  342. record = {
  343. "task_id": ti_summary.task_id,
  344. "run_id": run_id,
  345. "queued_dttm": ti_summary.queued_dttm,
  346. "start_date": ti_summary.start_date,
  347. "end_date": ti_summary.end_date,
  348. "mapped_states": {ti_summary.state: ti_summary.state_count},
  349. "state": None, # We change this before yielding
  350. }
  351. continue
  352. record["queued_dttm"] = min(
  353. filter(None, [record["queued_dttm"], ti_summary.queued_dttm]), default=None
  354. )
  355. record["start_date"] = min(
  356. filter(None, [record["start_date"], ti_summary.start_date]), default=None
  357. )
  358. # Sometimes the start date of a group might be before the queued date of the group
  359. if (
  360. record["queued_dttm"]
  361. and record["start_date"]
  362. and record["queued_dttm"] > record["start_date"]
  363. ):
  364. record["queued_dttm"] = None
  365. record["end_date"] = max(
  366. filter(None, [record["end_date"], ti_summary.end_date]), default=None
  367. )
  368. record["mapped_states"][ti_summary.state] = ti_summary.state_count
  369. if record:
  370. set_overall_state(record)
  371. yield record
  372. if item_is_mapped := item.get_needs_expansion():
  373. instances = list(_mapped_summary(grouped_tis[item.task_id]))
  374. else:
  375. instances = [
  376. {
  377. "task_id": task_instance.task_id,
  378. "run_id": task_instance.run_id,
  379. "state": task_instance.state,
  380. "queued_dttm": task_instance.queued_dttm,
  381. "start_date": task_instance.start_date,
  382. "end_date": task_instance.end_date,
  383. "try_number": task_instance.try_number,
  384. "note": task_instance.note,
  385. }
  386. for task_instance in grouped_tis[item.task_id]
  387. ]
  388. setup_teardown_type = {}
  389. if item.is_setup is True:
  390. setup_teardown_type["setupTeardownType"] = "setup"
  391. elif item.is_teardown is True:
  392. setup_teardown_type["setupTeardownType"] = "teardown"
  393. return {
  394. "id": item.task_id,
  395. "instances": instances,
  396. "label": item.label,
  397. "extra_links": item.extra_links,
  398. "is_mapped": item_is_mapped,
  399. "has_outlet_datasets": any(
  400. isinstance(i, (Dataset, DatasetAlias)) for i in (item.outlets or [])
  401. ),
  402. "operator": item.operator_name,
  403. "trigger_rule": item.trigger_rule,
  404. **setup_teardown_type,
  405. }
  406. # Task Group
  407. task_group = item
  408. children = [task_group_to_grid(child) for child in get_task_group_children_getter()(item)]
  409. def get_summary(dag_run: DagRun):
  410. child_instances = [
  411. item
  412. for sublist in (child["instances"] for child in children if "instances" in child)
  413. for item in sublist
  414. if item["run_id"] == dag_run.run_id
  415. if item
  416. ]
  417. children_queued_dttms = (item["queued_dttm"] for item in child_instances)
  418. children_start_dates = (item["start_date"] for item in child_instances)
  419. children_end_dates = (item["end_date"] for item in child_instances)
  420. children_states = {item["state"] for item in child_instances}
  421. group_state = next((state for state in wwwutils.priority if state in children_states), None)
  422. group_queued_dttm = min(filter(None, children_queued_dttms), default=None)
  423. group_start_date = min(filter(None, children_start_dates), default=None)
  424. group_end_date = max(filter(None, children_end_dates), default=None)
  425. # Sometimes the start date of a group might be before the queued date of the group
  426. if group_queued_dttm and group_start_date and group_queued_dttm > group_start_date:
  427. group_queued_dttm = None
  428. return {
  429. "task_id": task_group.group_id,
  430. "run_id": dag_run.run_id,
  431. "state": group_state,
  432. "queued_dttm": group_queued_dttm,
  433. "start_date": group_start_date,
  434. "end_date": group_end_date,
  435. }
  436. def get_mapped_group_summaries():
  437. mapped_ti_query = session.execute(
  438. select(TaskInstance.task_id, TaskInstance.state, TaskInstance.run_id, TaskInstance.map_index)
  439. .where(
  440. TaskInstance.dag_id == dag.dag_id,
  441. TaskInstance.task_id.in_(child["id"] for child in children),
  442. TaskInstance.run_id.in_(r.run_id for r in dag_runs),
  443. )
  444. .order_by(TaskInstance.task_id, TaskInstance.run_id)
  445. )
  446. # Group tis by run_id, and then map_index.
  447. mapped_tis: Mapping[str, Mapping[int, list[TaskInstance]]] = defaultdict(
  448. lambda: defaultdict(list)
  449. )
  450. for ti in mapped_ti_query:
  451. mapped_tis[ti.run_id][ti.map_index].append(ti)
  452. def get_mapped_group_summary(run_id: str, mapped_instances: Mapping[int, list[TaskInstance]]):
  453. child_instances = [
  454. item
  455. for sublist in (child["instances"] for child in children if "instances" in child)
  456. for item in sublist
  457. if item and item["run_id"] == run_id
  458. ]
  459. children_queued_dttms = (item["queued_dttm"] for item in child_instances)
  460. children_start_dates = (item["start_date"] for item in child_instances)
  461. children_end_dates = (item["end_date"] for item in child_instances)
  462. children_states = {item["state"] for item in child_instances}
  463. # TODO: This assumes TI map index has a one-to-one mapping to
  464. # its parent mapped task group, which will not be true when we
  465. # allow nested mapping in the future.
  466. mapped_states: MutableMapping[str, int] = defaultdict(int)
  467. for mi_values in mapped_instances.values():
  468. child_states = {mi.state for mi in mi_values}
  469. state = next(s for s in wwwutils.priority if s in child_states)
  470. value = state.value if state is not None else "no_status"
  471. mapped_states[value] += 1
  472. group_state = next((state for state in wwwutils.priority if state in children_states), None)
  473. group_queued_dttm = min(filter(None, children_queued_dttms), default=None)
  474. group_start_date = min(filter(None, children_start_dates), default=None)
  475. group_end_date = max(filter(None, children_end_dates), default=None)
  476. return {
  477. "task_id": task_group.group_id,
  478. "run_id": run_id,
  479. "state": group_state,
  480. "queued_dttm": group_queued_dttm,
  481. "start_date": group_start_date,
  482. "end_date": group_end_date,
  483. "mapped_states": mapped_states,
  484. }
  485. return [get_mapped_group_summary(run_id, tis) for run_id, tis in mapped_tis.items()]
  486. # We don't need to calculate summaries for the root
  487. if task_group.group_id is None:
  488. return {
  489. "id": task_group.group_id,
  490. "label": task_group.label,
  491. "children": children,
  492. "instances": [],
  493. }
  494. if next(task_group.iter_mapped_task_groups(), None) is not None:
  495. return {
  496. "id": task_group.group_id,
  497. "label": task_group.label,
  498. "children": children,
  499. "tooltip": task_group.tooltip,
  500. "instances": get_mapped_group_summaries(),
  501. "is_mapped": True,
  502. }
  503. group_summaries = [get_summary(dr) for dr in dag_runs]
  504. return {
  505. "id": task_group.group_id,
  506. "label": task_group.label,
  507. "children": children,
  508. "tooltip": task_group.tooltip,
  509. "instances": group_summaries,
  510. }
  511. return task_group_to_grid(dag.task_group)
  512. def get_key_paths(input_dict):
  513. """Return a list of dot-separated dictionary paths."""
  514. for key, value in input_dict.items():
  515. if isinstance(value, dict):
  516. for sub_key in get_key_paths(value):
  517. yield f"{key}.{sub_key}"
  518. else:
  519. yield key
  520. def get_value_from_path(key_path, content):
  521. """Return the value from a dictionary based on dot-separated path of keys."""
  522. elem = content
  523. for x in key_path.strip(".").split("."):
  524. try:
  525. x = int(x)
  526. elem = elem[x]
  527. except ValueError:
  528. elem = elem.get(x)
  529. return elem
  530. def get_task_stats_from_query(qry):
  531. """
  532. Return a dict of the task quantity, grouped by dag id and task status.
  533. :param qry: The data in the format (<dag id>, <task state>, <is dag running>, <task count>),
  534. ordered by <dag id> and <is dag running>
  535. """
  536. data = {}
  537. last_dag_id = None
  538. has_running_dags = False
  539. for dag_id, state, is_dag_running, count in qry:
  540. if last_dag_id != dag_id:
  541. last_dag_id = dag_id
  542. has_running_dags = False
  543. elif not is_dag_running and has_running_dags:
  544. continue
  545. if is_dag_running:
  546. has_running_dags = True
  547. if dag_id not in data:
  548. data[dag_id] = {}
  549. data[dag_id][state] = count
  550. return data
  551. def redirect_or_json(origin, msg, status="", status_code=200):
  552. """
  553. Return json which allows us to more elegantly handle side effects in-page.
  554. This is useful because some endpoints are called by javascript.
  555. """
  556. if request.headers.get("Accept") == "application/json":
  557. if status == "error" and status_code == 200:
  558. status_code = 500
  559. return Response(response=msg, status=status_code, mimetype="application/json")
  560. else:
  561. if status:
  562. flash(msg, status)
  563. else:
  564. flash(msg)
  565. return redirect(origin)
  566. ######################################################################################
  567. # Error handlers
  568. ######################################################################################
  569. def not_found(error):
  570. """Show Not Found on screen for any error in the Webserver."""
  571. return (
  572. render_template(
  573. "airflow/error.html",
  574. hostname=get_hostname() if conf.getboolean("webserver", "EXPOSE_HOSTNAME") else "",
  575. status_code=404,
  576. error_message="Page cannot be found.",
  577. ),
  578. 404,
  579. )
  580. def method_not_allowed(error):
  581. """Show Method Not Allowed on screen for any error in the Webserver."""
  582. return (
  583. render_template(
  584. "airflow/error.html",
  585. hostname=get_hostname() if conf.getboolean("webserver", "EXPOSE_HOSTNAME") else "",
  586. status_code=405,
  587. error_message="Received an invalid request.",
  588. ),
  589. 405,
  590. )
  591. def show_traceback(error):
  592. """Show Traceback for a given error."""
  593. if not is_auth_manager_initialized():
  594. # this is the case where internal API component is used and auth manager is not used/initialized
  595. return ("Error calling the API", 500)
  596. is_logged_in = get_auth_manager().is_logged_in()
  597. return (
  598. render_template(
  599. "airflow/traceback.html",
  600. python_version=sys.version.split(" ")[0] if is_logged_in else "redacted",
  601. airflow_version=version if is_logged_in else "redacted",
  602. hostname=(
  603. get_hostname()
  604. if conf.getboolean("webserver", "EXPOSE_HOSTNAME") and is_logged_in
  605. else "redacted"
  606. ),
  607. info=(
  608. traceback.format_exc()
  609. if conf.getboolean("webserver", "EXPOSE_STACKTRACE") and is_logged_in
  610. else "Error! Please contact server admin."
  611. ),
  612. ),
  613. 500,
  614. )
  615. ######################################################################################
  616. # BaseViews
  617. ######################################################################################
  618. class AirflowBaseView(BaseView):
  619. """Base View to set Airflow related properties."""
  620. from airflow import macros
  621. route_base = ""
  622. extra_args = {
  623. # Make our macros available to our UI templates too.
  624. "macros": macros,
  625. "get_docs_url": get_docs_url,
  626. }
  627. if not conf.getboolean("core", "unit_test_mode"):
  628. executor, _ = ExecutorLoader.import_default_executor_cls()
  629. extra_args["sqlite_warning"] = settings.engine and (settings.engine.dialect.name == "sqlite")
  630. if not executor.is_production:
  631. extra_args["production_executor_warning"] = executor.__name__
  632. extra_args["otel_metrics_on"] = conf.getboolean("metrics", "otel_on")
  633. extra_args["otel_traces_on"] = conf.getboolean("traces", "otel_on")
  634. line_chart_attr = {
  635. "legend.maxKeyLength": 200,
  636. }
  637. def render_template(self, *args, **kwargs):
  638. # Add triggerer_job only if we need it
  639. if TriggererJobRunner.is_needed():
  640. kwargs["triggerer_job"] = lazy_object_proxy.Proxy(TriggererJobRunner.most_recent_job)
  641. if "dag" in kwargs:
  642. kwargs["can_edit_dag"] = get_auth_manager().is_authorized_dag(
  643. method="PUT", details=DagDetails(id=kwargs["dag"].dag_id)
  644. )
  645. url_serializer = URLSafeSerializer(current_app.config["SECRET_KEY"])
  646. kwargs["dag_file_token"] = url_serializer.dumps(kwargs["dag"].fileloc)
  647. return super().render_template(
  648. *args,
  649. # Cache this at most once per request, not for the lifetime of the view instance
  650. scheduler_job=lazy_object_proxy.Proxy(SchedulerJobRunner.most_recent_job),
  651. **kwargs,
  652. )
  653. class Airflow(AirflowBaseView):
  654. """Main Airflow application."""
  655. @expose("/health")
  656. def health(self):
  657. """
  658. Check the health status of the Airflow instance.
  659. Includes metadatabase, scheduler and triggerer.
  660. """
  661. airflow_health_status = get_airflow_health()
  662. return flask.json.jsonify(airflow_health_status)
  663. @expose("/home")
  664. @auth.has_access_view()
  665. def index(self):
  666. """Home view."""
  667. from airflow.models.dag import DagOwnerAttributes
  668. hide_paused_dags_by_default = conf.getboolean("webserver", "hide_paused_dags_by_default")
  669. default_dag_run = conf.getint("webserver", "default_dag_run_display_number")
  670. num_runs = request.args.get("num_runs", default=default_dag_run, type=int)
  671. current_page = request.args.get("page", default=0, type=int)
  672. arg_search_query = request.args.get("search")
  673. arg_tags_filter = request.args.getlist("tags")
  674. arg_status_filter = request.args.get("status")
  675. arg_lastrun_filter = request.args.get("lastrun")
  676. arg_sorting_key = request.args.get("sorting_key", "dag_id")
  677. arg_sorting_direction = request.args.get("sorting_direction", default="asc")
  678. if request.args.get("reset_tags") is not None:
  679. flask_session[FILTER_TAGS_COOKIE] = None
  680. # Remove the reset_tags=reset from the URL
  681. return redirect(url_for("Airflow.index"))
  682. if arg_lastrun_filter == "reset_filter":
  683. flask_session[FILTER_LASTRUN_COOKIE] = None
  684. return redirect(url_for("Airflow.index"))
  685. filter_tags_cookie_val = flask_session.get(FILTER_TAGS_COOKIE)
  686. filter_lastrun_cookie_val = flask_session.get(FILTER_LASTRUN_COOKIE)
  687. # update filter args in url from session values if needed
  688. if (not arg_tags_filter and filter_tags_cookie_val) or (
  689. not arg_lastrun_filter and filter_lastrun_cookie_val
  690. ):
  691. tags = arg_tags_filter or (filter_tags_cookie_val and filter_tags_cookie_val.split(","))
  692. lastrun = arg_lastrun_filter or filter_lastrun_cookie_val
  693. return redirect(url_for("Airflow.index", tags=tags, lastrun=lastrun))
  694. if arg_tags_filter:
  695. flask_session[FILTER_TAGS_COOKIE] = ",".join(arg_tags_filter)
  696. if arg_lastrun_filter:
  697. arg_lastrun_filter = arg_lastrun_filter.strip().lower()
  698. flask_session[FILTER_LASTRUN_COOKIE] = arg_lastrun_filter
  699. if arg_status_filter is None:
  700. filter_status_cookie_val = flask_session.get(FILTER_STATUS_COOKIE)
  701. if filter_status_cookie_val:
  702. arg_status_filter = filter_status_cookie_val
  703. else:
  704. arg_status_filter = "active" if hide_paused_dags_by_default else "all"
  705. flask_session[FILTER_STATUS_COOKIE] = arg_status_filter
  706. else:
  707. status = arg_status_filter.strip().lower()
  708. flask_session[FILTER_STATUS_COOKIE] = status
  709. arg_status_filter = status
  710. dags_per_page = PAGE_SIZE
  711. start = current_page * dags_per_page
  712. end = start + dags_per_page
  713. # Get all the dag id the user could access
  714. filter_dag_ids = get_auth_manager().get_permitted_dag_ids(user=g.user)
  715. with create_session() as session:
  716. # read orm_dags from the db
  717. dags_query = select(DagModel).where(~DagModel.is_subdag, DagModel.is_active)
  718. if arg_search_query:
  719. escaped_arg_search_query = arg_search_query.replace("_", r"\_")
  720. dags_query = dags_query.where(
  721. DagModel.dag_id.ilike("%" + escaped_arg_search_query + "%", escape="\\")
  722. | DagModel._dag_display_property_value.ilike(
  723. "%" + escaped_arg_search_query + "%", escape="\\"
  724. )
  725. | DagModel.owners.ilike("%" + escaped_arg_search_query + "%", escape="\\")
  726. )
  727. if arg_tags_filter:
  728. dags_query = dags_query.where(DagModel.tags.any(DagTag.name.in_(arg_tags_filter)))
  729. dags_query = dags_query.where(DagModel.dag_id.in_(filter_dag_ids))
  730. filtered_dag_count = get_query_count(dags_query, session=session)
  731. if filtered_dag_count == 0 and len(arg_tags_filter):
  732. flash(
  733. "No matching DAG tags found.",
  734. "warning",
  735. )
  736. flask_session[FILTER_TAGS_COOKIE] = None
  737. return redirect(url_for("Airflow.index"))
  738. # find DAGs which have a RUNNING DagRun
  739. running_dags = dags_query.join(DagRun, DagModel.dag_id == DagRun.dag_id).where(
  740. (DagRun.state == DagRunState.RUNNING) | (DagRun.state == DagRunState.QUEUED)
  741. )
  742. lastrun_running_is_paused = session.execute(
  743. running_dags.with_only_columns(DagModel.dag_id, DagModel.is_paused).distinct(DagModel.dag_id)
  744. ).all()
  745. lastrun_running_count_active = len(
  746. list(filter(lambda x: not x.is_paused, lastrun_running_is_paused))
  747. )
  748. lastrun_running_count_paused = len(list(filter(lambda x: x.is_paused, lastrun_running_is_paused)))
  749. # find DAGs for which the latest DagRun is FAILED
  750. subq_all = (
  751. select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
  752. .group_by(DagRun.dag_id)
  753. .subquery()
  754. )
  755. subq_failed = (
  756. select(DagRun.dag_id, func.max(DagRun.start_date).label("start_date"))
  757. .where(DagRun.state == DagRunState.FAILED)
  758. .group_by(DagRun.dag_id)
  759. .subquery()
  760. )
  761. subq_join = (
  762. select(subq_all.c.dag_id, subq_all.c.start_date)
  763. .join(
  764. subq_failed,
  765. and_(
  766. subq_all.c.dag_id == subq_failed.c.dag_id,
  767. subq_all.c.start_date == subq_failed.c.start_date,
  768. ),
  769. )
  770. .subquery()
  771. )
  772. failed_dags = dags_query.join(subq_join, DagModel.dag_id == subq_join.c.dag_id)
  773. lastrun_failed_is_paused_count = dict(
  774. session.execute(
  775. failed_dags.with_only_columns(DagModel.is_paused, func.count()).group_by(
  776. DagModel.is_paused
  777. )
  778. ).all()
  779. )
  780. lastrun_failed_count_active = lastrun_failed_is_paused_count.get(False, 0)
  781. lastrun_failed_count_paused = lastrun_failed_is_paused_count.get(True, 0)
  782. if arg_lastrun_filter == "running":
  783. dags_query = running_dags
  784. elif arg_lastrun_filter == "failed":
  785. dags_query = failed_dags
  786. all_dags = dags_query
  787. active_dags = dags_query.where(~DagModel.is_paused)
  788. paused_dags = dags_query.where(DagModel.is_paused)
  789. status_is_paused = session.execute(
  790. all_dags.with_only_columns(DagModel.dag_id, DagModel.is_paused).distinct(DagModel.dag_id)
  791. ).all()
  792. status_count_active = len(list(filter(lambda x: not x.is_paused, status_is_paused)))
  793. status_count_paused = len(list(filter(lambda x: x.is_paused, status_is_paused)))
  794. all_dags_count = status_count_active + status_count_paused
  795. if arg_status_filter == "active":
  796. current_dags = active_dags
  797. num_of_all_dags = status_count_active
  798. lastrun_count_running = lastrun_running_count_active
  799. lastrun_count_failed = lastrun_failed_count_active
  800. elif arg_status_filter == "paused":
  801. current_dags = paused_dags
  802. num_of_all_dags = status_count_paused
  803. lastrun_count_running = lastrun_running_count_paused
  804. lastrun_count_failed = lastrun_failed_count_paused
  805. else:
  806. current_dags = all_dags
  807. num_of_all_dags = all_dags_count
  808. lastrun_count_running = lastrun_running_count_active + lastrun_running_count_paused
  809. lastrun_count_failed = lastrun_failed_count_active + lastrun_failed_count_paused
  810. if arg_sorting_key == "dag_id":
  811. if arg_sorting_direction == "desc":
  812. current_dags = current_dags.order_by(
  813. func.coalesce(DagModel.dag_display_name, DagModel.dag_id).desc()
  814. )
  815. else:
  816. current_dags = current_dags.order_by(
  817. func.coalesce(DagModel.dag_display_name, DagModel.dag_id)
  818. )
  819. elif arg_sorting_key == "last_dagrun":
  820. dag_run_subquery = (
  821. select(
  822. DagRun.dag_id,
  823. sqla.func.max(DagRun.execution_date).label("max_execution_date"),
  824. )
  825. .group_by(DagRun.dag_id)
  826. .subquery()
  827. )
  828. current_dags = current_dags.outerjoin(
  829. dag_run_subquery, and_(dag_run_subquery.c.dag_id == DagModel.dag_id)
  830. )
  831. null_case = case((dag_run_subquery.c.max_execution_date.is_(None), 1), else_=0)
  832. if arg_sorting_direction == "desc":
  833. current_dags = current_dags.order_by(
  834. null_case, dag_run_subquery.c.max_execution_date.desc()
  835. )
  836. else:
  837. current_dags = current_dags.order_by(null_case, dag_run_subquery.c.max_execution_date)
  838. else:
  839. sort_column = DagModel.__table__.c.get(arg_sorting_key)
  840. if sort_column is not None:
  841. null_case = case((sort_column.is_(None), 1), else_=0)
  842. if arg_sorting_direction == "desc":
  843. current_dags = current_dags.order_by(null_case, sort_column.desc())
  844. else:
  845. current_dags = current_dags.order_by(null_case, sort_column)
  846. dags = (
  847. session.scalars(
  848. current_dags.options(joinedload(DagModel.tags)).offset(start).limit(dags_per_page)
  849. )
  850. .unique()
  851. .all()
  852. )
  853. dataset_triggered_dag_ids = {dag.dag_id for dag in dags if dag.schedule_interval == "Dataset"}
  854. if dataset_triggered_dag_ids:
  855. dataset_triggered_next_run_info = get_dataset_triggered_next_run_info(
  856. dataset_triggered_dag_ids, session=session
  857. )
  858. else:
  859. dataset_triggered_next_run_info = {}
  860. file_tokens = {}
  861. for dag in dags:
  862. dag.can_edit = get_auth_manager().is_authorized_dag(
  863. method="PUT", details=DagDetails(id=dag.dag_id), user=g.user
  864. )
  865. can_create_dag_run = get_auth_manager().is_authorized_dag(
  866. method="POST",
  867. access_entity=DagAccessEntity.RUN,
  868. details=DagDetails(id=dag.dag_id),
  869. user=g.user,
  870. )
  871. dag.can_trigger = dag.can_edit and can_create_dag_run
  872. dag.can_delete = get_auth_manager().is_authorized_dag(
  873. method="DELETE", details=DagDetails(id=dag.dag_id), user=g.user
  874. )
  875. url_serializer = URLSafeSerializer(current_app.config["SECRET_KEY"])
  876. file_tokens[dag.dag_id] = url_serializer.dumps(dag.fileloc)
  877. dagtags = session.execute(select(func.distinct(DagTag.name)).order_by(DagTag.name)).all()
  878. tags = [
  879. {"name": name, "selected": bool(arg_tags_filter and name in arg_tags_filter)}
  880. for (name,) in dagtags
  881. ]
  882. owner_links_dict = DagOwnerAttributes.get_all(session)
  883. if get_auth_manager().is_authorized_view(access_view=AccessView.IMPORT_ERRORS):
  884. import_errors = select(ParseImportError).order_by(ParseImportError.id)
  885. can_read_all_dags = get_auth_manager().is_authorized_dag(method="GET")
  886. if not can_read_all_dags:
  887. # if the user doesn't have access to all DAGs, only display errors from visible DAGs
  888. import_errors = import_errors.where(
  889. ParseImportError.filename.in_(
  890. select(DagModel.fileloc).distinct().where(DagModel.dag_id.in_(filter_dag_ids))
  891. )
  892. )
  893. import_errors = session.scalars(import_errors)
  894. for import_error in import_errors:
  895. stacktrace = import_error.stacktrace
  896. if not can_read_all_dags:
  897. # Check if user has read access to all the DAGs defined in the file
  898. file_dag_ids = (
  899. session.query(DagModel.dag_id)
  900. .filter(DagModel.fileloc == import_error.filename)
  901. .all()
  902. )
  903. requests: Sequence[IsAuthorizedDagRequest] = [
  904. {
  905. "method": "GET",
  906. "details": DagDetails(id=dag_id[0]),
  907. }
  908. for dag_id in file_dag_ids
  909. ]
  910. if not get_auth_manager().batch_is_authorized_dag(requests):
  911. stacktrace = "REDACTED - you do not have read permission on all DAGs in the file"
  912. flash(
  913. f"Broken DAG: [{import_error.filename}]\r{stacktrace}",
  914. "dag_import_error",
  915. )
  916. from airflow.plugins_manager import import_errors as plugin_import_errors
  917. for filename, stacktrace in plugin_import_errors.items():
  918. flash(
  919. f"Broken plugin: [{filename}] {stacktrace}",
  920. "error",
  921. )
  922. num_of_pages = math.ceil(num_of_all_dags / dags_per_page)
  923. state_color_mapping = State.state_color.copy()
  924. state_color_mapping["null"] = state_color_mapping.pop(None)
  925. page_title = conf.get(section="webserver", key="instance_name", fallback="DAGs")
  926. page_title_has_markup = conf.getboolean(
  927. section="webserver", key="instance_name_has_markup", fallback=False
  928. )
  929. dashboard_alerts = [
  930. fm for fm in settings.DASHBOARD_UIALERTS if fm.should_show(get_airflow_app().appbuilder)
  931. ]
  932. def _iter_parsed_moved_data_table_names():
  933. for table_name in inspect(session.get_bind()).get_table_names():
  934. segments = table_name.split("__", 3)
  935. if len(segments) >= 3:
  936. if segments[0] == settings.AIRFLOW_MOVED_TABLE_PREFIX:
  937. # Second segment is a version marker that we don't need to show.
  938. yield segments[-1], table_name
  939. if get_auth_manager().is_authorized_configuration(method="GET", user=g.user) and conf.getboolean(
  940. "webserver", "warn_deployment_exposure"
  941. ):
  942. robots_file_access_count = (
  943. select(Log)
  944. .where(Log.event == "robots")
  945. .where(Log.dttm > (utcnow() - datetime.timedelta(days=7)))
  946. )
  947. robots_file_access_count = get_query_count(robots_file_access_count, session=session)
  948. if robots_file_access_count > 0:
  949. flash(
  950. Markup(
  951. "Recent requests have been made to /robots.txt. "
  952. "This indicates that this deployment may be accessible to the public internet. "
  953. "This warning can be disabled by setting webserver.warn_deployment_exposure=False in "
  954. "airflow.cfg. Read more about web deployment security <a href="
  955. f'"{get_docs_url("security/webserver.html")}">'
  956. "here</a>"
  957. ),
  958. "warning",
  959. )
  960. return self.render_template(
  961. "airflow/dags.html",
  962. dags=dags,
  963. show_trigger_form_if_no_params=conf.getboolean("webserver", "show_trigger_form_if_no_params"),
  964. dashboard_alerts=dashboard_alerts,
  965. migration_moved_data_alerts=sorted(set(_iter_parsed_moved_data_table_names())),
  966. current_page=current_page,
  967. search_query=arg_search_query or "",
  968. page_title=Markup(page_title) if page_title_has_markup else page_title,
  969. page_size=dags_per_page,
  970. num_of_pages=num_of_pages,
  971. num_dag_from=min(start + 1, num_of_all_dags),
  972. num_dag_to=min(end, num_of_all_dags),
  973. num_of_all_dags=num_of_all_dags,
  974. paging=wwwutils.generate_pages(
  975. current_page,
  976. num_of_pages,
  977. search=escape(arg_search_query) if arg_search_query else None,
  978. status=arg_status_filter or None,
  979. tags=arg_tags_filter or None,
  980. sorting_key=arg_sorting_key or None,
  981. sorting_direction=arg_sorting_direction or None,
  982. ),
  983. num_runs=num_runs,
  984. tags=tags,
  985. owner_links=owner_links_dict,
  986. state_color=state_color_mapping,
  987. status_filter=arg_status_filter,
  988. status_count_all=all_dags_count,
  989. status_count_active=status_count_active,
  990. status_count_paused=status_count_paused,
  991. lastrun_filter=arg_lastrun_filter,
  992. lastrun_count_running=lastrun_count_running,
  993. lastrun_count_failed=lastrun_count_failed,
  994. tags_filter=arg_tags_filter,
  995. sorting_key=arg_sorting_key,
  996. sorting_direction=arg_sorting_direction,
  997. auto_refresh_interval=conf.getint("webserver", "auto_refresh_interval"),
  998. dataset_triggered_next_run_info=dataset_triggered_next_run_info,
  999. file_tokens=file_tokens,
  1000. )
  1001. @expose("/datasets")
  1002. @auth.has_access_dataset("GET")
  1003. def datasets(self):
  1004. """Datasets view."""
  1005. state_color_mapping = State.state_color.copy()
  1006. state_color_mapping["null"] = state_color_mapping.pop(None)
  1007. return self.render_template(
  1008. "airflow/datasets.html",
  1009. auto_refresh_interval=conf.getint("webserver", "auto_refresh_interval"),
  1010. state_color_mapping=state_color_mapping,
  1011. )
  1012. @expose("/cluster_activity")
  1013. @auth.has_access_view(AccessView.CLUSTER_ACTIVITY)
  1014. def cluster_activity(self):
  1015. """Cluster Activity view."""
  1016. state_color_mapping = State.state_color.copy()
  1017. state_color_mapping["no_status"] = state_color_mapping.pop(None)
  1018. standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
  1019. return self.render_template(
  1020. "airflow/cluster_activity.html",
  1021. auto_refresh_interval=conf.getint("webserver", "auto_refresh_interval"),
  1022. state_color_mapping=state_color_mapping,
  1023. standalone_dag_processor=standalone_dag_processor,
  1024. )
  1025. @expose("/next_run_datasets_summary", methods=["POST"])
  1026. @provide_session
  1027. def next_run_datasets_summary(self, session: Session = NEW_SESSION):
  1028. """Next run info for dataset triggered DAGs."""
  1029. allowed_dag_ids = get_auth_manager().get_permitted_dag_ids(user=g.user)
  1030. if not allowed_dag_ids:
  1031. return flask.json.jsonify({})
  1032. # Filter by post parameters
  1033. selected_dag_ids = {unquote(dag_id) for dag_id in request.form.getlist("dag_ids") if dag_id}
  1034. if selected_dag_ids:
  1035. filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids)
  1036. else:
  1037. filter_dag_ids = allowed_dag_ids
  1038. dataset_triggered_dag_ids = [
  1039. dag_id
  1040. for dag_id in (
  1041. session.scalars(
  1042. select(DagModel.dag_id)
  1043. .where(DagModel.dag_id.in_(filter_dag_ids))
  1044. .where(DagModel.schedule_interval == "Dataset")
  1045. )
  1046. )
  1047. ]
  1048. dataset_triggered_next_run_info = get_dataset_triggered_next_run_info(
  1049. dataset_triggered_dag_ids, session=session
  1050. )
  1051. return flask.json.jsonify(dataset_triggered_next_run_info)
  1052. @expose("/dag_stats", methods=["POST"])
  1053. @auth.has_access_dag("GET", DagAccessEntity.RUN)
  1054. @provide_session
  1055. def dag_stats(self, session: Session = NEW_SESSION):
  1056. """Dag statistics."""
  1057. allowed_dag_ids = get_auth_manager().get_permitted_dag_ids(user=g.user)
  1058. # Filter by post parameters
  1059. selected_dag_ids = {unquote(dag_id) for dag_id in request.form.getlist("dag_ids") if dag_id}
  1060. if selected_dag_ids:
  1061. filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids)
  1062. else:
  1063. filter_dag_ids = allowed_dag_ids
  1064. if not filter_dag_ids:
  1065. return flask.json.jsonify({})
  1066. dag_state_stats = session.execute(
  1067. select(DagRun.dag_id, DagRun.state, sqla.func.count(DagRun.state))
  1068. .group_by(DagRun.dag_id, DagRun.state)
  1069. .where(DagRun.dag_id.in_(filter_dag_ids))
  1070. )
  1071. dag_state_data = {(dag_id, state): count for dag_id, state, count in dag_state_stats}
  1072. payload = {
  1073. dag_id: [
  1074. {"state": state, "count": dag_state_data.get((dag_id, state), 0)}
  1075. for state in State.dag_states
  1076. ]
  1077. for dag_id in filter_dag_ids
  1078. }
  1079. return flask.json.jsonify(payload)
  1080. @expose("/task_stats", methods=["POST"])
  1081. @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
  1082. @provide_session
  1083. def task_stats(self, session: Session = NEW_SESSION):
  1084. """Task Statistics."""
  1085. allowed_dag_ids = get_auth_manager().get_permitted_dag_ids(user=g.user)
  1086. if not allowed_dag_ids:
  1087. return flask.json.jsonify({})
  1088. # Filter by post parameters
  1089. selected_dag_ids = {unquote(dag_id) for dag_id in request.form.getlist("dag_ids") if dag_id}
  1090. if selected_dag_ids:
  1091. filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids)
  1092. else:
  1093. filter_dag_ids = allowed_dag_ids
  1094. running_dag_run_query_result = (
  1095. select(DagRun.dag_id, DagRun.run_id)
  1096. .join(DagModel, DagModel.dag_id == DagRun.dag_id)
  1097. .where(DagRun.state == DagRunState.RUNNING, DagModel.is_active)
  1098. )
  1099. running_dag_run_query_result = running_dag_run_query_result.where(DagRun.dag_id.in_(filter_dag_ids))
  1100. running_dag_run_query_result = running_dag_run_query_result.subquery("running_dag_run")
  1101. # Select all task_instances from active dag_runs.
  1102. running_task_instance_query_result = select(
  1103. TaskInstance.dag_id.label("dag_id"),
  1104. TaskInstance.state.label("state"),
  1105. sqla.literal(True).label("is_dag_running"),
  1106. ).join(
  1107. running_dag_run_query_result,
  1108. and_(
  1109. running_dag_run_query_result.c.dag_id == TaskInstance.dag_id,
  1110. running_dag_run_query_result.c.run_id == TaskInstance.run_id,
  1111. ),
  1112. )
  1113. if conf.getboolean("webserver", "SHOW_RECENT_STATS_FOR_COMPLETED_RUNS", fallback=True):
  1114. last_dag_run = (
  1115. select(DagRun.dag_id, sqla.func.max(DagRun.execution_date).label("execution_date"))
  1116. .join(DagModel, DagModel.dag_id == DagRun.dag_id)
  1117. .where(DagRun.state != DagRunState.RUNNING, DagModel.is_active)
  1118. .group_by(DagRun.dag_id)
  1119. )
  1120. last_dag_run = last_dag_run.where(DagRun.dag_id.in_(filter_dag_ids))
  1121. last_dag_run = last_dag_run.subquery("last_dag_run")
  1122. # Select all task_instances from active dag_runs.
  1123. # If no dag_run is active, return task instances from most recent dag_run.
  1124. last_task_instance_query_result = (
  1125. select(
  1126. TaskInstance.dag_id.label("dag_id"),
  1127. TaskInstance.state.label("state"),
  1128. sqla.literal(False).label("is_dag_running"),
  1129. )
  1130. .join(TaskInstance.dag_run)
  1131. .join(
  1132. last_dag_run,
  1133. and_(
  1134. last_dag_run.c.dag_id == TaskInstance.dag_id,
  1135. last_dag_run.c.execution_date == DagRun.execution_date,
  1136. ),
  1137. )
  1138. )
  1139. final_task_instance_query_result = union_all(
  1140. last_task_instance_query_result, running_task_instance_query_result
  1141. ).alias("final_ti")
  1142. else:
  1143. final_task_instance_query_result = running_task_instance_query_result.subquery("final_ti")
  1144. qry = session.execute(
  1145. select(
  1146. final_task_instance_query_result.c.dag_id,
  1147. final_task_instance_query_result.c.state,
  1148. final_task_instance_query_result.c.is_dag_running,
  1149. sqla.func.count(),
  1150. )
  1151. .group_by(
  1152. final_task_instance_query_result.c.dag_id,
  1153. final_task_instance_query_result.c.state,
  1154. final_task_instance_query_result.c.is_dag_running,
  1155. )
  1156. .order_by(
  1157. final_task_instance_query_result.c.dag_id,
  1158. final_task_instance_query_result.c.is_dag_running.desc(),
  1159. )
  1160. )
  1161. data = get_task_stats_from_query(qry)
  1162. payload: dict[str, list[dict[str, Any]]] = defaultdict(list)
  1163. for dag_id, state in itertools.product(filter_dag_ids, State.task_states):
  1164. payload[dag_id].append({"state": state, "count": data.get(dag_id, {}).get(state, 0)})
  1165. return flask.json.jsonify(payload)
  1166. @expose("/last_dagruns", methods=["POST"])
  1167. @auth.has_access_dag("GET", DagAccessEntity.RUN)
  1168. @provide_session
  1169. def last_dagruns(self, session: Session = NEW_SESSION):
  1170. """Last DAG runs."""
  1171. allowed_dag_ids = get_auth_manager().get_permitted_dag_ids(user=g.user)
  1172. # Filter by post parameters
  1173. selected_dag_ids = {unquote(dag_id) for dag_id in request.form.getlist("dag_ids") if dag_id}
  1174. if selected_dag_ids:
  1175. filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids)
  1176. else:
  1177. filter_dag_ids = allowed_dag_ids
  1178. if not filter_dag_ids:
  1179. return flask.json.jsonify({})
  1180. last_runs_subquery = (
  1181. select(
  1182. DagRun.dag_id,
  1183. sqla.func.max(DagRun.execution_date).label("max_execution_date"),
  1184. )
  1185. .group_by(DagRun.dag_id)
  1186. .where(DagRun.dag_id.in_(filter_dag_ids)) # Only include accessible/selected DAGs.
  1187. .subquery("last_runs")
  1188. )
  1189. query = session.execute(
  1190. select(
  1191. DagRun.dag_id,
  1192. DagRun.start_date,
  1193. DagRun.end_date,
  1194. DagRun.state,
  1195. DagRun.execution_date,
  1196. DagRun.data_interval_start,
  1197. DagRun.data_interval_end,
  1198. ).join(
  1199. last_runs_subquery,
  1200. and_(
  1201. last_runs_subquery.c.dag_id == DagRun.dag_id,
  1202. last_runs_subquery.c.max_execution_date == DagRun.execution_date,
  1203. ),
  1204. )
  1205. )
  1206. resp = {
  1207. r.dag_id.replace(".", "__dot__"): {
  1208. "dag_id": r.dag_id,
  1209. "state": r.state,
  1210. "execution_date": wwwutils.datetime_to_string(r.execution_date),
  1211. "start_date": wwwutils.datetime_to_string(r.start_date),
  1212. "end_date": wwwutils.datetime_to_string(r.end_date),
  1213. "data_interval_start": wwwutils.datetime_to_string(r.data_interval_start),
  1214. "data_interval_end": wwwutils.datetime_to_string(r.data_interval_end),
  1215. }
  1216. for r in query
  1217. }
  1218. return flask.json.jsonify(resp)
  1219. @expose("/code")
  1220. def legacy_code(self):
  1221. """Redirect from url param."""
  1222. return redirect(url_for("Airflow.code", **sanitize_args(request.args)))
  1223. @expose("/dags/<string:dag_id>/code")
  1224. @auth.has_access_dag("GET", DagAccessEntity.CODE)
  1225. def code(self, dag_id):
  1226. """Dag Code."""
  1227. kwargs = {
  1228. **sanitize_args(request.args),
  1229. "dag_id": dag_id,
  1230. "tab": "code",
  1231. }
  1232. return redirect(url_for("Airflow.grid", **kwargs))
  1233. @expose("/dag_details")
  1234. def legacy_dag_details(self):
  1235. """Redirect from url param."""
  1236. return redirect(url_for("Airflow.dag_details", **sanitize_args(request.args)))
  1237. @expose("/dags/<string:dag_id>/details")
  1238. def dag_details(self, dag_id):
  1239. """Get Dag details."""
  1240. return redirect(url_for("Airflow.grid", dag_id=dag_id))
  1241. @expose("/rendered-templates")
  1242. @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
  1243. @provide_session
  1244. def rendered_templates(self, session):
  1245. """Get rendered Dag."""
  1246. dag_id = request.args.get("dag_id")
  1247. task_id = request.args.get("task_id")
  1248. map_index = request.args.get("map_index", -1, type=int)
  1249. execution_date = request.args.get("execution_date")
  1250. dttm = _safe_parse_datetime(execution_date)
  1251. form = DateTimeForm(data={"execution_date": dttm})
  1252. root = request.args.get("root", "")
  1253. logger.info("Retrieving rendered templates.")
  1254. dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
  1255. dag_run = dag.get_dagrun(execution_date=dttm)
  1256. raw_task = dag.get_task(task_id).prepare_for_execution()
  1257. no_dagrun = False
  1258. url_serializer = URLSafeSerializer(current_app.config["SECRET_KEY"])
  1259. title = "Rendered Template"
  1260. html_dict = {}
  1261. ti: TaskInstance
  1262. if dag_run is None:
  1263. # No DAG run matching given logical date. This usually means this
  1264. # DAG has never been run. Task instance rendering does not really
  1265. # make sense in this situation, but "works" prior to AIP-39. This
  1266. # "fakes" a temporary DagRun-TaskInstance association (not saved to
  1267. # database) for presentation only.
  1268. ti = TaskInstance(raw_task, map_index=map_index)
  1269. ti.dag_run = DagRun(dag_id=dag_id, execution_date=dttm)
  1270. no_dagrun = True
  1271. else:
  1272. ti = dag_run.get_task_instance(task_id=task_id, map_index=map_index, session=session)
  1273. if ti:
  1274. ti.refresh_from_task(raw_task)
  1275. else:
  1276. flash(f"there is no task instance with the provided map_index {map_index}", "error")
  1277. return self.render_template(
  1278. "airflow/ti_code.html",
  1279. show_trigger_form_if_no_params=conf.getboolean(
  1280. "webserver", "show_trigger_form_if_no_params"
  1281. ),
  1282. dag_run_id=dag_run.run_id if dag_run else "",
  1283. html_dict=html_dict,
  1284. dag=dag,
  1285. task_id=task_id,
  1286. execution_date=execution_date,
  1287. map_index=map_index,
  1288. form=form,
  1289. root=root,
  1290. title=title,
  1291. )
  1292. try:
  1293. ti.get_rendered_template_fields(session=session)
  1294. except AirflowException as e:
  1295. if not e.__cause__:
  1296. flash(f"Error rendering template: {e}", "error")
  1297. else:
  1298. msg = Markup("Error rendering template: {0}<br><br>OriginalError: {0.__cause__}").format(e)
  1299. flash(msg, "error")
  1300. except Exception as e:
  1301. flash(f"Error rendering template: {e}", "error")
  1302. # Ensure we are rendering the unmapped operator. Unmapping should be
  1303. # done automatically if template fields are rendered successfully; this
  1304. # only matters if get_rendered_template_fields() raised an exception.
  1305. # The following rendering won't show useful values in this case anyway,
  1306. # but we'll display some quasi-meaingful field names.
  1307. task = ti.task.unmap(None)
  1308. renderers = wwwutils.get_attr_renderer()
  1309. for template_field in task.template_fields:
  1310. content = getattr(task, template_field)
  1311. renderer = task.template_fields_renderers.get(template_field, template_field)
  1312. if renderer in renderers:
  1313. html_dict[template_field] = renderers[renderer](content) if not no_dagrun else ""
  1314. else:
  1315. html_dict[template_field] = Markup("<pre><code>{}</pre></code>").format(
  1316. pformat(content) if not no_dagrun else ""
  1317. )
  1318. if isinstance(content, dict):
  1319. if template_field == "op_kwargs":
  1320. for key, value in content.items():
  1321. renderer = task.template_fields_renderers.get(key, key)
  1322. if renderer in renderers:
  1323. html_dict[f"{template_field}.{key}"] = (
  1324. renderers[renderer](value) if not no_dagrun else ""
  1325. )
  1326. else:
  1327. html_dict[f"{template_field}.{key}"] = Markup(
  1328. "<pre><code>{}</pre></code>"
  1329. ).format(pformat(value) if not no_dagrun else "")
  1330. else:
  1331. for dict_keys in get_key_paths(content):
  1332. template_path = f"{template_field}.{dict_keys}"
  1333. renderer = task.template_fields_renderers.get(template_path, template_path)
  1334. if renderer in renderers:
  1335. content_value = get_value_from_path(dict_keys, content)
  1336. html_dict[template_path] = (
  1337. renderers[renderer](content_value) if not no_dagrun else ""
  1338. )
  1339. return self.render_template(
  1340. "airflow/ti_code.html",
  1341. show_trigger_form_if_no_params=conf.getboolean("webserver", "show_trigger_form_if_no_params"),
  1342. html_dict=html_dict,
  1343. dag_run_id=dag_run.run_id if dag_run else "",
  1344. dag=dag,
  1345. task_id=task_id,
  1346. task_display_name=task.task_display_name,
  1347. execution_date=execution_date,
  1348. map_index=map_index,
  1349. form=form,
  1350. root=root,
  1351. title=title,
  1352. dag_file_token=url_serializer.dumps(dag.fileloc),
  1353. )
  1354. @expose("/rendered-k8s")
  1355. @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
  1356. @provide_session
  1357. def rendered_k8s(self, *, session: Session = NEW_SESSION):
  1358. """Get rendered k8s yaml."""
  1359. if not settings.IS_K8S_OR_K8SCELERY_EXECUTOR:
  1360. abort(404)
  1361. # This part is only used for k8s executor so providers.cncf.kubernetes must be installed
  1362. # with the get_rendered_k8s_spec method
  1363. from airflow.providers.cncf.kubernetes.template_rendering import get_rendered_k8s_spec
  1364. dag_id = request.args.get("dag_id")
  1365. task_id = request.args.get("task_id")
  1366. if task_id is None:
  1367. logger.warning("Task id not passed in the request")
  1368. abort(400)
  1369. execution_date = request.args.get("execution_date")
  1370. dttm = _safe_parse_datetime(execution_date)
  1371. form = DateTimeForm(data={"execution_date": dttm})
  1372. root = request.args.get("root", "")
  1373. map_index = request.args.get("map_index", -1, type=int)
  1374. logger.info("Retrieving rendered k8s.")
  1375. dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
  1376. task = dag.get_task(task_id)
  1377. dag_run = dag.get_dagrun(execution_date=dttm, session=session)
  1378. ti = dag_run.get_task_instance(task_id=task.task_id, map_index=map_index, session=session)
  1379. if not ti:
  1380. raise AirflowException(f"Task instance {task.task_id} not found.")
  1381. pod_spec = None
  1382. if not isinstance(ti, TaskInstance):
  1383. raise ValueError("not a TaskInstance")
  1384. try:
  1385. pod_spec = get_rendered_k8s_spec(ti, session=session)
  1386. except AirflowException as e:
  1387. if not e.__cause__:
  1388. flash(f"Error rendering Kubernetes POD Spec: {e}", "error")
  1389. else:
  1390. tmp = Markup("Error rendering Kubernetes POD Spec: {0}<br><br>Original error: {0.__cause__}")
  1391. flash(tmp.format(e), "error")
  1392. except Exception as e:
  1393. flash(f"Error rendering Kubernetes Pod Spec: {e}", "error")
  1394. title = "Rendered K8s Pod Spec"
  1395. if pod_spec:
  1396. content = wwwutils.get_attr_renderer()["yaml"](yaml.dump(pod_spec))
  1397. else:
  1398. content = Markup("<pre><code>Error rendering Kubernetes POD Spec</pre></code>")
  1399. return self.render_template(
  1400. "airflow/ti_code.html",
  1401. show_trigger_form_if_no_params=conf.getboolean("webserver", "show_trigger_form_if_no_params"),
  1402. dag_run_id=dag_run.run_id if dag_run else "",
  1403. html_dict={"k8s": content},
  1404. dag=dag,
  1405. task_id=task_id,
  1406. task_display_name=task.task_display_name,
  1407. execution_date=execution_date,
  1408. map_index=map_index,
  1409. form=form,
  1410. root=root,
  1411. title=title,
  1412. )
  1413. @expose("/object/rendered-k8s")
  1414. @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
  1415. @provide_session
  1416. def rendered_k8s_data(self, *, session: Session = NEW_SESSION):
  1417. """Get rendered k8s yaml."""
  1418. if not settings.IS_K8S_OR_K8SCELERY_EXECUTOR:
  1419. return {"error": "Not a k8s or k8s_celery executor"}, 404
  1420. # This part is only used for k8s executor so providers.cncf.kubernetes must be installed
  1421. # with the get_rendered_k8s_spec method
  1422. from airflow.providers.cncf.kubernetes.template_rendering import get_rendered_k8s_spec
  1423. dag_id = request.args.get("dag_id")
  1424. task_id = request.args.get("task_id")
  1425. if task_id is None:
  1426. return {"error": "Task id not passed in the request"}, 404
  1427. run_id = request.args.get("run_id")
  1428. map_index = request.args.get("map_index", -1, type=int)
  1429. logger.info("Retrieving rendered k8s data.")
  1430. dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
  1431. task = dag.get_task(task_id)
  1432. dag_run = dag.get_dagrun(run_id=run_id, session=session)
  1433. ti = dag_run.get_task_instance(task_id=task.task_id, map_index=map_index, session=session)
  1434. if not ti:
  1435. return {"error": f"can't find task instance {task.task_id}"}, 404
  1436. pod_spec = None
  1437. if not isinstance(ti, TaskInstance):
  1438. return {"error": f"{task.task_id} is not a task instance"}, 500
  1439. try:
  1440. pod_spec = get_rendered_k8s_spec(ti, session=session)
  1441. except AirflowException as e:
  1442. if not e.__cause__:
  1443. return {"error": f"Error rendering Kubernetes POD Spec: {e}"}, 500
  1444. else:
  1445. tmp = Markup("Error rendering Kubernetes POD Spec: {0}<br><br>Original error: {0.__cause__}")
  1446. return {"error": tmp.format(e)}, 500
  1447. except Exception as e:
  1448. return {"error": f"Error rendering Kubernetes Pod Spec: {e}"}, 500
  1449. return pod_spec
  1450. @expose("/get_logs_with_metadata")
  1451. @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
  1452. @auth.has_access_dag("GET", DagAccessEntity.TASK_LOGS)
  1453. @provide_session
  1454. def get_logs_with_metadata(self, session: Session = NEW_SESSION):
  1455. """Retrieve logs including metadata."""
  1456. dag_id = request.args.get("dag_id")
  1457. task_id = request.args.get("task_id")
  1458. execution_date_str = request.args["execution_date"]
  1459. map_index = request.args.get("map_index", -1, type=int)
  1460. try_number = request.args.get("try_number", type=int)
  1461. metadata_str = request.args.get("metadata", "{}")
  1462. response_format = request.args.get("format", "json")
  1463. # Validate JSON metadata
  1464. try:
  1465. metadata: dict = json.loads(metadata_str) or {}
  1466. except json.decoder.JSONDecodeError:
  1467. return {"error": "Invalid JSON metadata"}, 400
  1468. # Convert string datetime into actual datetime
  1469. try:
  1470. execution_date = timezone.parse(execution_date_str, strict=True)
  1471. except ValueError:
  1472. error_message = (
  1473. f"Given execution date {execution_date_str!r} could not be identified as a date. "
  1474. "Example date format: 2015-11-16T14:34:15+00:00"
  1475. )
  1476. return {"error": error_message}, 400
  1477. task_log_reader = TaskLogReader()
  1478. if not task_log_reader.supports_read:
  1479. return {
  1480. "message": "Task log handler does not support read logs.",
  1481. "error": True,
  1482. "metadata": {"end_of_log": True},
  1483. }
  1484. ti = session.scalar(
  1485. select(models.TaskInstance)
  1486. .where(
  1487. TaskInstance.task_id == task_id,
  1488. TaskInstance.dag_id == dag_id,
  1489. TaskInstance.execution_date == execution_date,
  1490. TaskInstance.map_index == map_index,
  1491. )
  1492. .join(TaskInstance.dag_run)
  1493. .options(joinedload(TaskInstance.trigger).joinedload(Trigger.triggerer_job))
  1494. .limit(1)
  1495. )
  1496. if ti is None:
  1497. return {
  1498. "message": "*** Task instance did not exist in the DB\n",
  1499. "error": True,
  1500. "metadata": {"end_of_log": True},
  1501. }
  1502. try:
  1503. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  1504. if dag:
  1505. ti.task = dag.get_task(ti.task_id)
  1506. if response_format == "json":
  1507. logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
  1508. message = logs[0] if try_number is not None else logs
  1509. return {"message": message, "metadata": metadata}
  1510. metadata["download_logs"] = True
  1511. attachment_filename = task_log_reader.render_log_filename(ti, try_number, session=session)
  1512. log_stream = task_log_reader.read_log_stream(ti, try_number, metadata)
  1513. return Response(
  1514. response=log_stream,
  1515. mimetype="text/plain",
  1516. headers={"Content-Disposition": f"attachment; filename={attachment_filename}"},
  1517. )
  1518. except AttributeError as e:
  1519. error_messages = [f"Task log handler does not support read logs.\n{e}\n"]
  1520. metadata["end_of_log"] = True
  1521. return {"message": error_messages, "error": True, "metadata": metadata}
  1522. @expose("/log")
  1523. @auth.has_access_dag("GET", DagAccessEntity.TASK_LOGS)
  1524. @provide_session
  1525. def log(self, session: Session = NEW_SESSION):
  1526. """Retrieve log."""
  1527. dag_id = request.args["dag_id"]
  1528. task_id = request.args.get("task_id")
  1529. map_index = request.args.get("map_index", -1, type=int)
  1530. execution_date = request.args.get("execution_date")
  1531. if execution_date:
  1532. dttm = _safe_parse_datetime(execution_date)
  1533. else:
  1534. dttm = None
  1535. form = DateTimeForm(data={"execution_date": dttm})
  1536. dag_model = DagModel.get_dagmodel(dag_id)
  1537. ti: TaskInstance = session.scalar(
  1538. select(models.TaskInstance)
  1539. .filter_by(dag_id=dag_id, task_id=task_id, execution_date=dttm, map_index=map_index)
  1540. .limit(1)
  1541. )
  1542. num_logs = 0
  1543. if ti is not None:
  1544. num_logs = ti.try_number
  1545. logs = [""] * num_logs
  1546. root = request.args.get("root", "")
  1547. return self.render_template(
  1548. "airflow/ti_log.html",
  1549. show_trigger_form_if_no_params=conf.getboolean("webserver", "show_trigger_form_if_no_params"),
  1550. logs=logs,
  1551. dag=dag_model,
  1552. dag_run_id=ti.run_id if ti else "",
  1553. title="Log by attempts",
  1554. dag_id=dag_id,
  1555. task_id=task_id,
  1556. task_display_name=ti.task_display_name if ti else "",
  1557. execution_date=execution_date,
  1558. map_index=map_index,
  1559. form=form,
  1560. root=root,
  1561. wrapped=conf.getboolean("webserver", "default_wrap"),
  1562. )
  1563. @expose("/redirect_to_external_log")
  1564. @auth.has_access_dag("GET", DagAccessEntity.TASK_LOGS)
  1565. @provide_session
  1566. def redirect_to_external_log(self, session: Session = NEW_SESSION):
  1567. """Redirects to external log."""
  1568. dag_id = request.args.get("dag_id")
  1569. task_id = request.args.get("task_id")
  1570. execution_date = request.args.get("execution_date")
  1571. dttm = _safe_parse_datetime(execution_date)
  1572. map_index = request.args.get("map_index", -1, type=int)
  1573. try_number = request.args.get("try_number", 1)
  1574. ti = session.scalar(
  1575. select(models.TaskInstance)
  1576. .filter_by(dag_id=dag_id, task_id=task_id, execution_date=dttm, map_index=map_index)
  1577. .limit(1)
  1578. )
  1579. if not ti:
  1580. flash(f"Task [{dag_id}.{task_id}] does not exist", "error")
  1581. return redirect(url_for("Airflow.index"))
  1582. task_log_reader = TaskLogReader()
  1583. if not task_log_reader.supports_external_link:
  1584. flash("Task log handler does not support external links", "error")
  1585. return redirect(url_for("Airflow.index"))
  1586. handler = task_log_reader.log_handler
  1587. url = handler.get_external_log_url(ti, try_number)
  1588. return redirect(url)
  1589. @expose("/task")
  1590. @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
  1591. @provide_session
  1592. def task(self, session: Session = NEW_SESSION):
  1593. """Retrieve task."""
  1594. dag_id = request.args.get("dag_id")
  1595. task_id = request.args.get("task_id")
  1596. execution_date = request.args.get("execution_date")
  1597. dttm = _safe_parse_datetime(execution_date)
  1598. map_index = request.args.get("map_index", -1, type=int)
  1599. form = DateTimeForm(data={"execution_date": dttm})
  1600. root = request.args.get("root", "")
  1601. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  1602. if not dag or task_id not in dag.task_ids:
  1603. flash(f"Task [{dag_id}.{task_id}] doesn't seem to exist at the moment", "error")
  1604. return redirect(url_for("Airflow.index"))
  1605. task = copy.copy(dag.get_task(task_id))
  1606. task.resolve_template_files()
  1607. ti: TaskInstance | None = session.scalar(
  1608. select(TaskInstance)
  1609. .options(
  1610. # HACK: Eager-load relationships. This is needed because
  1611. # multiple properties mis-use provide_session() that destroys
  1612. # the session object ti is bounded to.
  1613. joinedload(TaskInstance.queued_by_job, innerjoin=False),
  1614. joinedload(TaskInstance.trigger, innerjoin=False),
  1615. )
  1616. .filter_by(execution_date=dttm, dag_id=dag_id, task_id=task_id, map_index=map_index)
  1617. )
  1618. if ti is None:
  1619. ti_attrs: list[tuple[str, Any]] | None = None
  1620. else:
  1621. ti.refresh_from_task(task)
  1622. ti_attrs_to_skip = [
  1623. "dag_id",
  1624. "key",
  1625. "mark_success_url",
  1626. "log",
  1627. "log_url",
  1628. "task",
  1629. "trigger",
  1630. "triggerer_job",
  1631. ]
  1632. # Some fields on TI are deprecated, but we don't want those warnings here.
  1633. with warnings.catch_warnings():
  1634. warnings.simplefilter("ignore", RemovedInAirflow3Warning)
  1635. all_ti_attrs = (
  1636. # fetching the value of _try_number to be shown under name try_number in UI
  1637. (name, getattr(ti, name))
  1638. for name in dir(ti)
  1639. if not name.startswith("_") and name not in ti_attrs_to_skip
  1640. )
  1641. ti_attrs = sorted((name, attr) for name, attr in all_ti_attrs if not callable(attr))
  1642. attr_renderers = wwwutils.get_attr_renderer()
  1643. attrs_to_skip: frozenset[str] = getattr(task, "HIDE_ATTRS_FROM_UI", frozenset())
  1644. def include_task_attrs(attr_name):
  1645. return not (
  1646. attr_name == "HIDE_ATTRS_FROM_UI"
  1647. or attr_name.startswith("_")
  1648. or attr_name in attr_renderers
  1649. or attr_name in attrs_to_skip
  1650. )
  1651. task_attrs = [
  1652. (attr_name, secrets_masker.redact(attr, attr_name))
  1653. for attr_name, attr in (
  1654. (attr_name, getattr(task, attr_name)) for attr_name in filter(include_task_attrs, dir(task))
  1655. )
  1656. if not callable(attr)
  1657. ]
  1658. # Color coding the special attributes that are code
  1659. special_attrs_rendered = {
  1660. attr_name: renderer(getattr(task, attr_name))
  1661. for attr_name, renderer in attr_renderers.items()
  1662. if hasattr(task, attr_name)
  1663. }
  1664. no_failed_deps_result = [
  1665. (
  1666. "Unknown",
  1667. "All dependencies are met but the task instance is not running. In most "
  1668. "cases this just means that the task will probably be scheduled soon "
  1669. "unless:<br>\n- The scheduler is down or under heavy load<br>\n{}\n"
  1670. "<br>\nIf this task instance does not start soon please contact your "
  1671. "Airflow administrator for assistance.".format(
  1672. "- This task instance already ran and had it's state changed manually "
  1673. "(e.g. cleared in the UI)<br>"
  1674. if ti and ti.state is None
  1675. else ""
  1676. ),
  1677. )
  1678. ]
  1679. # Use the scheduler's context to figure out which dependencies are not met
  1680. if ti is None:
  1681. failed_dep_reasons: list[tuple[str, str]] = []
  1682. else:
  1683. dep_context = DepContext(SCHEDULER_QUEUED_DEPS)
  1684. failed_dep_reasons = [
  1685. (dep.dep_name, dep.reason) for dep in ti.get_failed_dep_statuses(dep_context=dep_context)
  1686. ]
  1687. title = "Task Instance Details"
  1688. return self.render_template(
  1689. "airflow/task.html",
  1690. show_trigger_form_if_no_params=conf.getboolean("webserver", "show_trigger_form_if_no_params"),
  1691. task_attrs=task_attrs,
  1692. ti_attrs=ti_attrs,
  1693. dag_run_id=ti.run_id if ti else "",
  1694. failed_dep_reasons=failed_dep_reasons or no_failed_deps_result,
  1695. task_id=task_id,
  1696. execution_date=execution_date,
  1697. map_index=map_index,
  1698. special_attrs_rendered=special_attrs_rendered,
  1699. form=form,
  1700. root=root,
  1701. dag=dag,
  1702. title=title,
  1703. task_display_name=task.task_display_name,
  1704. )
  1705. @expose("/xcom")
  1706. @auth.has_access_dag("GET", DagAccessEntity.XCOM)
  1707. @provide_session
  1708. def xcom(self, session: Session = NEW_SESSION):
  1709. """Retrieve XCOM."""
  1710. dag_id = request.args["dag_id"]
  1711. task_id = request.args.get("task_id")
  1712. map_index = request.args.get("map_index", -1, type=int)
  1713. # Carrying execution_date through, even though it's irrelevant for
  1714. # this context
  1715. execution_date = request.args.get("execution_date")
  1716. dttm = _safe_parse_datetime(execution_date)
  1717. form = DateTimeForm(data={"execution_date": dttm})
  1718. root = request.args.get("root", "")
  1719. dag = DagModel.get_dagmodel(dag_id)
  1720. ti: TaskInstance = session.scalar(
  1721. select(TaskInstance).filter_by(dag_id=dag_id, task_id=task_id).limit(1)
  1722. )
  1723. if not ti:
  1724. flash(f"Task [{dag_id}.{task_id}] doesn't seem to exist at the moment", "error")
  1725. return redirect(url_for("Airflow.index"))
  1726. xcom_query = session.scalars(
  1727. select(XCom).where(
  1728. XCom.dag_id == dag_id,
  1729. XCom.task_id == task_id,
  1730. XCom.execution_date == dttm,
  1731. XCom.map_index == map_index,
  1732. )
  1733. )
  1734. attributes = [(xcom.key, xcom.value) for xcom in xcom_query if not xcom.key.startswith("_")]
  1735. title = "XCom"
  1736. return self.render_template(
  1737. "airflow/xcom.html",
  1738. show_trigger_form_if_no_params=conf.getboolean("webserver", "show_trigger_form_if_no_params"),
  1739. attributes=attributes,
  1740. task_id=task_id,
  1741. dag_run_id=ti.run_id if ti else "",
  1742. task_display_name=ti.task_display_name,
  1743. execution_date=execution_date,
  1744. map_index=map_index,
  1745. form=form,
  1746. root=root,
  1747. dag=dag,
  1748. title=title,
  1749. )
  1750. @expose("/delete", methods=["POST"])
  1751. @auth.has_access_dag("DELETE")
  1752. @action_logging
  1753. def delete(self):
  1754. """Delete DAG."""
  1755. from airflow.api.common import delete_dag
  1756. from airflow.exceptions import DagNotFound
  1757. dag_id = request.values.get("dag_id")
  1758. origin = get_safe_url(request.values.get("origin"))
  1759. redirect_url = get_safe_url(request.values.get("redirect_url"))
  1760. try:
  1761. delete_dag.delete_dag(dag_id)
  1762. except DagNotFound:
  1763. flash(f"DAG with id {dag_id} not found. Cannot delete", "error")
  1764. return redirect(redirect_url)
  1765. except AirflowException:
  1766. flash(
  1767. f"Cannot delete DAG with id {dag_id} because some task instances of the DAG "
  1768. "are still running. Please mark the task instances as "
  1769. "failed/succeeded before deleting the DAG",
  1770. "error",
  1771. )
  1772. return redirect(redirect_url)
  1773. flash(f"Deleting DAG with id {dag_id}. May take a couple minutes to fully disappear.")
  1774. # Upon success return to origin.
  1775. return redirect(origin)
  1776. @expose("/dags/<string:dag_id>/trigger", methods=["POST", "GET"])
  1777. @auth.has_access_dag("POST", DagAccessEntity.RUN)
  1778. @action_logging
  1779. @provide_session
  1780. def trigger(self, dag_id: str, session: Session = NEW_SESSION):
  1781. """Triggers DAG Run."""
  1782. run_id = request.values.get("run_id", "")
  1783. origin = get_safe_url(request.values.get("origin"))
  1784. unpause = request.values.get("unpause")
  1785. request_conf = request.values.get("conf")
  1786. request_execution_date = request.values.get("execution_date", default=timezone.utcnow().isoformat())
  1787. is_dag_run_conf_overrides_params = conf.getboolean("core", "dag_run_conf_overrides_params")
  1788. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  1789. dag_orm: DagModel = session.scalar(select(DagModel).where(DagModel.dag_id == dag_id).limit(1))
  1790. # Prepare form fields with param struct details to render a proper form with schema information
  1791. form_fields = {}
  1792. allow_raw_html_descriptions = conf.getboolean("webserver", "allow_raw_html_descriptions")
  1793. form_trust_problems = []
  1794. for k, v in dag.params.items():
  1795. form_fields[k] = v.dump()
  1796. form_field: dict = form_fields[k]
  1797. # If no schema is provided, auto-detect on default values
  1798. if "schema" not in form_field:
  1799. form_field["schema"] = {}
  1800. form_field_schema: dict = form_field["schema"]
  1801. if "type" not in form_field_schema:
  1802. form_field_value = form_field["value"]
  1803. if isinstance(form_field_value, bool):
  1804. form_field_schema["type"] = "boolean"
  1805. elif isinstance(form_field_value, int):
  1806. form_field_schema["type"] = ["integer", "null"]
  1807. elif isinstance(form_field_value, list):
  1808. form_field_schema["type"] = ["array", "null"]
  1809. elif isinstance(form_field_value, dict):
  1810. form_field_schema["type"] = ["object", "null"]
  1811. # Mark HTML fields as safe if allowed
  1812. if allow_raw_html_descriptions:
  1813. if "description_html" in form_field_schema:
  1814. form_field["description"] = Markup(form_field_schema["description_html"])
  1815. if "custom_html_form" in form_field_schema:
  1816. form_field_schema["custom_html_form"] = Markup(form_field_schema["custom_html_form"])
  1817. else:
  1818. if "description_html" in form_field_schema and "description_md" not in form_field_schema:
  1819. form_trust_problems.append(f"Field {k} uses HTML description")
  1820. form_field["description"] = form_field_schema.pop("description_html")
  1821. if "custom_html_form" in form_field_schema:
  1822. form_trust_problems.append(f"Field {k} uses custom HTML form definition")
  1823. form_field_schema.pop("custom_html_form")
  1824. if "description_md" in form_field_schema:
  1825. form_field["description"] = wwwutils.wrapped_markdown(form_field_schema["description_md"])
  1826. # Check for default values and pre-populate
  1827. if k in request.values:
  1828. if form_field_schema.get("type", None) in [
  1829. "boolean",
  1830. "array",
  1831. ["array", "null"],
  1832. "object",
  1833. ["object", "null"],
  1834. ]:
  1835. try:
  1836. form_field["value"] = json.loads(request.values.get(k, ""))
  1837. except JSONDecodeError:
  1838. flash(
  1839. f'Could not pre-populate field "{k}" due to parsing error of value "{request.values.get(k)}"'
  1840. )
  1841. else:
  1842. form_field["value"] = request.values.get(k)
  1843. if form_trust_problems:
  1844. flash(
  1845. Markup(
  1846. "At least one field in the trigger form uses a raw HTML form definition. This is not allowed for "
  1847. "security. Please switch to markdown description via <code>description_md</code>. "
  1848. "Raw HTML is deprecated and must be enabled via "
  1849. "<code>webserver.allow_raw_html_descriptions</code> configuration parameter. Using plain text "
  1850. "as fallback for these fields. "
  1851. f"<ul><li>{'</li><li>'.join(form_trust_problems)}</li></ul>"
  1852. ),
  1853. "warning",
  1854. )
  1855. if allow_raw_html_descriptions and any("description_html" in p.schema for p in dag.params.values()):
  1856. flash(
  1857. Markup(
  1858. "The form params use raw HTML in <code>description_html</code> which is deprecated. "
  1859. "Please migrate to <code>description_md</code>."
  1860. ),
  1861. "warning",
  1862. )
  1863. if allow_raw_html_descriptions and any("custom_html_form" in p.schema for p in dag.params.values()):
  1864. flash(
  1865. Markup(
  1866. "The form params use <code>custom_html_form</code> definition. "
  1867. "This is deprecated with Airflow 2.8.0 and will be removed in a future release."
  1868. ),
  1869. "warning",
  1870. )
  1871. ui_fields_defined = any("const" not in f["schema"] for f in form_fields.values())
  1872. show_trigger_form_if_no_params = conf.getboolean("webserver", "show_trigger_form_if_no_params")
  1873. if not dag_orm:
  1874. flash(f"Cannot find dag {dag_id}")
  1875. return redirect(origin)
  1876. if dag_orm.has_import_errors:
  1877. flash(f"Cannot create dagruns because the dag {dag_id} has import errors", "error")
  1878. return redirect(origin)
  1879. num_recent_confs = conf.getint("webserver", "num_recent_configurations_for_trigger")
  1880. recent_runs = session.execute(
  1881. select(DagRun.conf, func.max(DagRun.run_id).label("run_id"), func.max(DagRun.execution_date))
  1882. .where(
  1883. DagRun.dag_id == dag_id,
  1884. DagRun.run_type == DagRunType.MANUAL,
  1885. DagRun.conf.isnot(None),
  1886. )
  1887. .group_by(DagRun.conf)
  1888. .order_by(func.max(DagRun.execution_date).desc())
  1889. .limit(num_recent_confs)
  1890. )
  1891. recent_confs = {
  1892. run_id: json.dumps(run_conf, cls=utils_json.WebEncoder)
  1893. for run_id, run_conf in ((run.run_id, run.conf) for run in recent_runs)
  1894. if isinstance(run_conf, dict) and any(run_conf)
  1895. }
  1896. render_params = {
  1897. "dag": dag,
  1898. "dag_id": dag_id,
  1899. "run_id": run_id,
  1900. "origin": origin,
  1901. "doc_md": wwwutils.wrapped_markdown(getattr(dag, "doc_md", None)),
  1902. "recent_confs": recent_confs,
  1903. "is_dag_run_conf_overrides_params": is_dag_run_conf_overrides_params,
  1904. }
  1905. if request.method == "GET" or (
  1906. not request_conf and (ui_fields_defined or show_trigger_form_if_no_params)
  1907. ):
  1908. # Populate conf textarea with conf requests parameter, or dag.params
  1909. default_conf = ""
  1910. form = DateTimeForm(data={"execution_date": request_execution_date})
  1911. if request_conf:
  1912. default_conf = request_conf
  1913. else:
  1914. try:
  1915. default_conf = json.dumps(
  1916. {
  1917. str(k): v.resolve(
  1918. value=request.values.get(k, default=NOTSET), suppress_exception=True
  1919. )
  1920. for k, v in dag.params.items()
  1921. },
  1922. indent=4,
  1923. ensure_ascii=False,
  1924. cls=utils_json.WebEncoder,
  1925. )
  1926. except TypeError:
  1927. flash("Could not pre-populate conf field due to non-JSON-serializable data-types")
  1928. return self.render_template(
  1929. "airflow/trigger.html",
  1930. form_fields=form_fields,
  1931. **render_params,
  1932. conf=default_conf,
  1933. form=form,
  1934. )
  1935. try:
  1936. execution_date = timezone.parse(request_execution_date, strict=True)
  1937. except ParserError:
  1938. flash("Invalid execution date", "error")
  1939. form = DateTimeForm(data={"execution_date": timezone.utcnow().isoformat()})
  1940. return self.render_template(
  1941. "airflow/trigger.html",
  1942. form_fields=form_fields,
  1943. **render_params,
  1944. conf=request_conf or {},
  1945. form=form,
  1946. )
  1947. dr = DagRun.find_duplicate(dag_id=dag_id, run_id=run_id, execution_date=execution_date)
  1948. if dr:
  1949. if dr.run_id == run_id:
  1950. message = f"The run ID {run_id} already exists"
  1951. else:
  1952. message = f"The logical date {execution_date} already exists"
  1953. flash(message, "error")
  1954. return redirect(origin)
  1955. regex = conf.get("scheduler", "allowed_run_id_pattern")
  1956. if run_id and not re2.match(RUN_ID_REGEX, run_id):
  1957. if not regex.strip() or not re2.match(regex.strip(), run_id):
  1958. flash(
  1959. f"The provided run ID '{run_id}' is invalid. It does not match either "
  1960. f"the configured pattern: '{regex}' or the built-in pattern: '{RUN_ID_REGEX}'",
  1961. "error",
  1962. )
  1963. form = DateTimeForm(data={"execution_date": execution_date})
  1964. return self.render_template(
  1965. "airflow/trigger.html",
  1966. form_fields=form_fields,
  1967. **render_params,
  1968. conf=request_conf,
  1969. form=form,
  1970. )
  1971. run_conf = {}
  1972. if request_conf:
  1973. try:
  1974. run_conf = json.loads(request_conf)
  1975. if not isinstance(run_conf, dict):
  1976. flash("Invalid JSON configuration, must be a dict", "error")
  1977. form = DateTimeForm(data={"execution_date": execution_date})
  1978. return self.render_template(
  1979. "airflow/trigger.html",
  1980. form_fields=form_fields,
  1981. **render_params,
  1982. conf=request_conf,
  1983. form=form,
  1984. )
  1985. except json.decoder.JSONDecodeError:
  1986. flash("Invalid JSON configuration, not parseable", "error")
  1987. form = DateTimeForm(data={"execution_date": execution_date})
  1988. return self.render_template(
  1989. "airflow/trigger.html",
  1990. form_fields=form_fields,
  1991. **render_params,
  1992. conf=request_conf,
  1993. form=form,
  1994. )
  1995. if dag.get_is_paused():
  1996. if unpause or not ui_fields_defined:
  1997. flash(f"Unpaused DAG {dag_id}.")
  1998. dag_model = models.DagModel.get_dagmodel(dag_id)
  1999. if dag_model is not None:
  2000. dag_model.set_is_paused(is_paused=False)
  2001. else:
  2002. flash(
  2003. f"DAG {dag_id} is paused, unpause if you want to have the triggered run being executed.",
  2004. "warning",
  2005. )
  2006. try:
  2007. dag_run = dag.create_dagrun(
  2008. run_type=DagRunType.MANUAL,
  2009. execution_date=execution_date,
  2010. data_interval=dag.timetable.infer_manual_data_interval(run_after=execution_date),
  2011. state=DagRunState.QUEUED,
  2012. conf=run_conf,
  2013. external_trigger=True,
  2014. dag_hash=get_airflow_app().dag_bag.dags_hash.get(dag_id),
  2015. run_id=run_id,
  2016. )
  2017. except (ValueError, ParamValidationError) as ve:
  2018. flash(f"{ve}", "error")
  2019. form = DateTimeForm(data={"execution_date": execution_date})
  2020. # Take over "bad" submitted fields for new form display
  2021. for k in form_fields:
  2022. if k in run_conf:
  2023. form_fields[k]["value"] = run_conf[k]
  2024. return self.render_template(
  2025. "airflow/trigger.html",
  2026. form_fields=form_fields,
  2027. **render_params,
  2028. conf=request_conf,
  2029. form=form,
  2030. )
  2031. flash(f"Triggered {dag_id} with new Run ID {dag_run.run_id}, it should start any moment now.")
  2032. if "/grid?" in origin:
  2033. path, query = origin.split("?", 1)
  2034. params = {param.split("=")[0]: param.split("=")[1] for param in query.split("&")}
  2035. params["dag_run_id"] = dag_run.run_id
  2036. origin = f"{path}?{urlencode(params)}"
  2037. elif origin.endswith("/grid"):
  2038. origin += f"?{urlencode({'dag_run_id': dag_run.run_id})}"
  2039. return redirect(origin)
  2040. def _clear_dag_tis(
  2041. self,
  2042. dag: DAG,
  2043. start_date: datetime.datetime | None,
  2044. end_date: datetime.datetime | None,
  2045. *,
  2046. origin: str | None,
  2047. task_ids: Collection[str | tuple[str, int]] | None = None,
  2048. recursive: bool = False,
  2049. confirmed: bool = False,
  2050. only_failed: bool = False,
  2051. session: Session,
  2052. ):
  2053. if confirmed:
  2054. count = dag.clear(
  2055. start_date=start_date,
  2056. end_date=end_date,
  2057. task_ids=task_ids,
  2058. include_subdags=recursive,
  2059. include_parentdag=recursive,
  2060. only_failed=only_failed,
  2061. session=session,
  2062. )
  2063. msg = f"{count} task instances have been cleared"
  2064. return redirect_or_json(origin, msg)
  2065. try:
  2066. tis = dag.clear(
  2067. start_date=start_date,
  2068. end_date=end_date,
  2069. task_ids=task_ids,
  2070. include_subdags=recursive,
  2071. include_parentdag=recursive,
  2072. only_failed=only_failed,
  2073. dry_run=True,
  2074. session=session,
  2075. )
  2076. except AirflowException as ex:
  2077. return redirect_or_json(origin, msg=str(ex), status="error", status_code=500)
  2078. if not isinstance(tis, collections.abc.Iterable):
  2079. raise AssertionError(
  2080. f"Expected dag.clear() to return an iterable for dry runs, got {tis} instead."
  2081. )
  2082. details = [str(t) for t in tis]
  2083. if not details:
  2084. return redirect_or_json(origin, "No task instances to clear", status="error", status_code=404)
  2085. elif request.headers.get("Accept") == "application/json":
  2086. if confirmed:
  2087. return htmlsafe_json_dumps(details, separators=(",", ":"))
  2088. return htmlsafe_json_dumps(
  2089. [{"task_id": ti.task_id, "map_index": ti.map_index, "run_id": ti.run_id} for ti in tis],
  2090. separators=(",", ":"),
  2091. )
  2092. return self.render_template(
  2093. "airflow/confirm.html",
  2094. endpoint=None,
  2095. message="Task instances you are about to clear:",
  2096. details="\n".join(details),
  2097. )
  2098. @expose("/clear", methods=["POST"])
  2099. @auth.has_access_dag("PUT", DagAccessEntity.TASK_INSTANCE)
  2100. @action_logging
  2101. @provide_session
  2102. def clear(self, *, session: Session = NEW_SESSION):
  2103. """Clear DAG tasks."""
  2104. dag_id = request.form.get("dag_id")
  2105. task_id = request.form.get("task_id")
  2106. origin = get_safe_url(request.form.get("origin"))
  2107. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  2108. group_id = request.form.get("group_id")
  2109. if "map_index" not in request.form:
  2110. map_indexes: list[int] | None = None
  2111. else:
  2112. map_indexes = request.form.getlist("map_index", type=int)
  2113. execution_date_str = request.form.get("execution_date")
  2114. execution_date = _safe_parse_datetime(execution_date_str)
  2115. confirmed = request.form.get("confirmed") == "true"
  2116. upstream = request.form.get("upstream") == "true"
  2117. downstream = request.form.get("downstream") == "true"
  2118. future = request.form.get("future") == "true"
  2119. past = request.form.get("past") == "true"
  2120. recursive = request.form.get("recursive") == "true"
  2121. only_failed = request.form.get("only_failed") == "true"
  2122. task_ids: list[str | tuple[str, int]] = []
  2123. end_date = execution_date if not future else None
  2124. start_date = execution_date if not past else None
  2125. locked_dag_run_ids: list[int] = []
  2126. if group_id is not None:
  2127. task_group_dict = dag.task_group.get_task_group_dict()
  2128. task_group = task_group_dict.get(group_id)
  2129. if task_group is None:
  2130. return redirect_or_json(
  2131. origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
  2132. )
  2133. task_ids = task_ids_or_regex = [t.task_id for t in task_group.iter_tasks()]
  2134. # Lock the related dag runs to prevent from possible dead lock.
  2135. # https://github.com/apache/airflow/pull/26658
  2136. dag_runs_query = select(DagRun.id).where(DagRun.dag_id == dag_id).with_for_update()
  2137. if start_date is None and end_date is None:
  2138. dag_runs_query = dag_runs_query.where(DagRun.execution_date == start_date)
  2139. else:
  2140. if start_date is not None:
  2141. dag_runs_query = dag_runs_query.where(DagRun.execution_date >= start_date)
  2142. if end_date is not None:
  2143. dag_runs_query = dag_runs_query.where(DagRun.execution_date <= end_date)
  2144. locked_dag_run_ids = session.scalars(dag_runs_query).all()
  2145. elif task_id:
  2146. if map_indexes is None:
  2147. task_ids = [task_id]
  2148. else:
  2149. task_ids = [(task_id, map_index) for map_index in map_indexes]
  2150. task_ids_or_regex = [task_id]
  2151. dag = dag.partial_subset(
  2152. task_ids_or_regex=task_ids_or_regex,
  2153. include_downstream=downstream,
  2154. include_upstream=upstream,
  2155. )
  2156. if len(dag.task_dict) > 1:
  2157. # If we had upstream/downstream etc then also include those!
  2158. task_ids.extend(tid for tid in dag.task_dict if tid != task_id)
  2159. response = self._clear_dag_tis(
  2160. dag,
  2161. start_date,
  2162. end_date,
  2163. origin=origin,
  2164. task_ids=task_ids,
  2165. recursive=recursive,
  2166. confirmed=confirmed,
  2167. only_failed=only_failed,
  2168. session=session,
  2169. )
  2170. del locked_dag_run_ids
  2171. return response
  2172. @expose("/dagrun_clear", methods=["POST"])
  2173. @auth.has_access_dag("PUT", DagAccessEntity.TASK_INSTANCE)
  2174. @action_logging
  2175. @provide_session
  2176. def dagrun_clear(self, *, session: Session = NEW_SESSION):
  2177. """Clear the DagRun."""
  2178. dag_id = request.form.get("dag_id")
  2179. dag_run_id = request.form.get("dag_run_id")
  2180. confirmed = request.form.get("confirmed") == "true"
  2181. only_failed = request.form.get("only_failed") == "true"
  2182. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  2183. dr = dag.get_dagrun(run_id=dag_run_id)
  2184. start_date = dr.logical_date
  2185. end_date = dr.logical_date
  2186. return self._clear_dag_tis(
  2187. dag,
  2188. start_date,
  2189. end_date,
  2190. origin=None,
  2191. recursive=True,
  2192. confirmed=confirmed,
  2193. only_failed=only_failed,
  2194. session=session,
  2195. )
  2196. @expose("/blocked", methods=["POST"])
  2197. @auth.has_access_dag("GET", DagAccessEntity.RUN)
  2198. @provide_session
  2199. def blocked(self, session: Session = NEW_SESSION):
  2200. """Retrieve active_dag_runs and max_active_runs information for running Dags."""
  2201. allowed_dag_ids = get_auth_manager().get_permitted_dag_ids(user=g.user)
  2202. # Filter by post parameters
  2203. selected_dag_ids = {unquote(dag_id) for dag_id in request.form.getlist("dag_ids") if dag_id}
  2204. if selected_dag_ids:
  2205. filter_dag_ids = selected_dag_ids.intersection(allowed_dag_ids)
  2206. else:
  2207. filter_dag_ids = allowed_dag_ids
  2208. if not filter_dag_ids:
  2209. return flask.json.jsonify([])
  2210. dags = session.execute(
  2211. select(DagRun.dag_id, sqla.func.count(DagRun.id))
  2212. .where(DagRun.state == DagRunState.RUNNING)
  2213. .where(DagRun.dag_id.in_(filter_dag_ids))
  2214. .group_by(DagRun.dag_id)
  2215. )
  2216. payload = []
  2217. for dag_id, active_dag_runs in dags:
  2218. max_active_runs = 0
  2219. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  2220. if dag:
  2221. # TODO: Make max_active_runs a column so we can query for it directly
  2222. max_active_runs = dag.max_active_runs
  2223. payload.append(
  2224. {
  2225. "dag_id": dag_id,
  2226. "active_dag_run": active_dag_runs,
  2227. "max_active_runs": max_active_runs,
  2228. }
  2229. )
  2230. return flask.json.jsonify(payload)
  2231. def _mark_dagrun_state_as_failed(self, dag_id, dag_run_id, confirmed):
  2232. if not dag_run_id:
  2233. return {"status": "error", "message": "Invalid dag_run_id"}
  2234. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  2235. if not dag:
  2236. return {"status": "error", "message": f"Cannot find DAG: {dag_id}"}
  2237. new_dag_state = set_dag_run_state_to_failed(dag=dag, run_id=dag_run_id, commit=confirmed)
  2238. if confirmed:
  2239. return {"status": "success", "message": f"Marked failed on {len(new_dag_state)} task instances"}
  2240. else:
  2241. details = [str(t) for t in new_dag_state]
  2242. return htmlsafe_json_dumps(details, separators=(",", ":"))
  2243. def _mark_dagrun_state_as_success(self, dag_id, dag_run_id, confirmed):
  2244. if not dag_run_id:
  2245. return {"status": "error", "message": "Invalid dag_run_id"}
  2246. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  2247. if not dag:
  2248. return {"status": "error", "message": f"Cannot find DAG: {dag_id}"}
  2249. new_dag_state = set_dag_run_state_to_success(dag=dag, run_id=dag_run_id, commit=confirmed)
  2250. if confirmed:
  2251. return {"status": "success", "message": f"Marked success on {len(new_dag_state)} task instances"}
  2252. else:
  2253. details = [str(t) for t in new_dag_state]
  2254. return htmlsafe_json_dumps(details, separators=(",", ":"))
  2255. @provide_session
  2256. def _mark_dagrun_state_as_queued(
  2257. self,
  2258. dag_id: str,
  2259. dag_run_id: str,
  2260. confirmed: bool,
  2261. session: Session = NEW_SESSION,
  2262. ):
  2263. if not dag_run_id:
  2264. return {"status": "error", "message": "Invalid dag_run_id"}
  2265. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  2266. if not dag:
  2267. return {"status": "error", "message": f"Cannot find DAG: {dag_id}"}
  2268. set_dag_run_state_to_queued(dag=dag, run_id=dag_run_id, commit=confirmed)
  2269. if confirmed:
  2270. return {"status": "success", "message": "Marked the DagRun as queued."}
  2271. else:
  2272. # Identify tasks that will be queued up to run when confirmed
  2273. all_task_ids = [task.task_id for task in dag.tasks]
  2274. existing_tis = session.execute(
  2275. select(TaskInstance.task_id).where(
  2276. TaskInstance.dag_id == dag.dag_id,
  2277. TaskInstance.run_id == dag_run_id,
  2278. )
  2279. )
  2280. completed_tis_ids = [task_id for (task_id,) in existing_tis]
  2281. tasks_with_no_state = list(set(all_task_ids) - set(completed_tis_ids))
  2282. details = [str(t) for t in tasks_with_no_state]
  2283. return htmlsafe_json_dumps(details, separators=(",", ":"))
  2284. @expose("/dagrun_failed", methods=["POST"])
  2285. @auth.has_access_dag("PUT", DagAccessEntity.RUN)
  2286. @action_logging
  2287. def dagrun_failed(self):
  2288. """Mark DagRun failed."""
  2289. dag_id = request.form.get("dag_id")
  2290. dag_run_id = request.form.get("dag_run_id")
  2291. confirmed = request.form.get("confirmed") == "true"
  2292. return self._mark_dagrun_state_as_failed(dag_id, dag_run_id, confirmed)
  2293. @expose("/dagrun_success", methods=["POST"])
  2294. @auth.has_access_dag("PUT", DagAccessEntity.RUN)
  2295. @action_logging
  2296. def dagrun_success(self):
  2297. """Mark DagRun success."""
  2298. dag_id = request.form.get("dag_id")
  2299. dag_run_id = request.form.get("dag_run_id")
  2300. confirmed = request.form.get("confirmed") == "true"
  2301. return self._mark_dagrun_state_as_success(dag_id, dag_run_id, confirmed)
  2302. @expose("/dagrun_queued", methods=["POST"])
  2303. @auth.has_access_dag("PUT", DagAccessEntity.RUN)
  2304. @action_logging
  2305. def dagrun_queued(self):
  2306. """Queue DagRun so tasks that haven't run yet can be started."""
  2307. dag_id = request.form.get("dag_id")
  2308. dag_run_id = request.form.get("dag_run_id")
  2309. confirmed = request.form.get("confirmed") == "true"
  2310. return self._mark_dagrun_state_as_queued(dag_id, dag_run_id, confirmed)
  2311. @expose("/dagrun_details")
  2312. def dagrun_details(self):
  2313. """Redirect to the Grid DagRun page. This is avoids breaking links."""
  2314. dag_id = request.args.get("dag_id")
  2315. run_id = request.args.get("run_id")
  2316. return redirect(url_for("Airflow.grid", dag_id=dag_id, dag_run_id=run_id))
  2317. def _mark_task_instance_state(
  2318. self,
  2319. *,
  2320. dag_id: str,
  2321. run_id: str,
  2322. task_id: str,
  2323. map_indexes: list[int] | None,
  2324. origin: str,
  2325. upstream: bool,
  2326. downstream: bool,
  2327. future: bool,
  2328. past: bool,
  2329. state: TaskInstanceState,
  2330. ):
  2331. dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
  2332. if not run_id:
  2333. flash(f"Cannot mark tasks as {state}, seem that DAG {dag_id} has never run", "error")
  2334. return redirect(origin)
  2335. altered = dag.set_task_instance_state(
  2336. task_id=task_id,
  2337. map_indexes=map_indexes,
  2338. run_id=run_id,
  2339. state=state,
  2340. upstream=upstream,
  2341. downstream=downstream,
  2342. future=future,
  2343. past=past,
  2344. )
  2345. flash(f"Marked {state} on {len(altered)} task instances")
  2346. return redirect(origin)
  2347. def _mark_task_group_state(
  2348. self,
  2349. *,
  2350. dag_id: str,
  2351. run_id: str,
  2352. group_id: str,
  2353. origin: str,
  2354. upstream: bool,
  2355. downstream: bool,
  2356. future: bool,
  2357. past: bool,
  2358. state: TaskInstanceState,
  2359. ):
  2360. dag: DAG = get_airflow_app().dag_bag.get_dag(dag_id)
  2361. if not run_id:
  2362. flash(f"Cannot mark tasks as {state}, as DAG {dag_id} has never run", "error")
  2363. return redirect(origin)
  2364. altered = dag.set_task_group_state(
  2365. group_id=group_id,
  2366. run_id=run_id,
  2367. state=state,
  2368. upstream=upstream,
  2369. downstream=downstream,
  2370. future=future,
  2371. past=past,
  2372. )
  2373. flash(f"Marked {state} on {len(altered)} task instances")
  2374. return redirect(origin)
  2375. @expose("/confirm", methods=["GET"])
  2376. @auth.has_access_dag("PUT", DagAccessEntity.TASK_INSTANCE)
  2377. @action_logging
  2378. def confirm(self):
  2379. """Show confirmation page for marking tasks as success or failed."""
  2380. args = request.args
  2381. dag_id = args.get("dag_id")
  2382. task_id = args.get("task_id")
  2383. dag_run_id = args.get("dag_run_id")
  2384. state = args.get("state")
  2385. origin = get_safe_url(args.get("origin"))
  2386. group_id = args.get("group_id")
  2387. if "map_index" not in args:
  2388. map_indexes: list[int] | None = None
  2389. else:
  2390. map_indexes = args.getlist("map_index", type=int)
  2391. upstream = to_boolean(args.get("upstream"))
  2392. downstream = to_boolean(args.get("downstream"))
  2393. future = to_boolean(args.get("future"))
  2394. past = to_boolean(args.get("past"))
  2395. origin = origin or url_for("Airflow.index")
  2396. if not exactly_one(task_id, group_id):
  2397. raise ValueError("Exactly one of task_id or group_id must be provided")
  2398. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  2399. if not dag:
  2400. msg = f"DAG {dag_id} not found"
  2401. return redirect_or_json(origin, msg, status="error", status_code=404)
  2402. if state not in (
  2403. "success",
  2404. "failed",
  2405. ):
  2406. msg = f"Invalid state {state}, must be either 'success' or 'failed'"
  2407. return redirect_or_json(origin, msg, status="error", status_code=400)
  2408. latest_execution_date = dag.get_latest_execution_date()
  2409. if not latest_execution_date:
  2410. msg = f"Cannot mark tasks as {state}, seem that dag {dag_id} has never run"
  2411. return redirect_or_json(origin, msg, status="error", status_code=400)
  2412. tasks: list[Operator | tuple[Operator, int]] = []
  2413. if group_id:
  2414. task_group_dict = dag.task_group.get_task_group_dict()
  2415. task_group = task_group_dict.get(group_id)
  2416. if task_group is None:
  2417. return redirect_or_json(
  2418. origin, msg=f"TaskGroup {group_id} could not be found", status="error", status_code=404
  2419. )
  2420. tasks = list(task_group.iter_tasks())
  2421. elif task_id:
  2422. try:
  2423. task = dag.get_task(task_id)
  2424. except airflow.exceptions.TaskNotFound:
  2425. msg = f"Task {task_id} not found"
  2426. return redirect_or_json(origin, msg, status="error", status_code=404)
  2427. task.dag = dag
  2428. if map_indexes is None:
  2429. tasks = [task]
  2430. else:
  2431. tasks = [(task, map_index) for map_index in map_indexes]
  2432. to_be_altered = set_state(
  2433. tasks=tasks,
  2434. run_id=dag_run_id,
  2435. upstream=upstream,
  2436. downstream=downstream,
  2437. future=future,
  2438. past=past,
  2439. state=state,
  2440. commit=False,
  2441. )
  2442. if request.headers.get("Accept") == "application/json":
  2443. return htmlsafe_json_dumps(
  2444. [
  2445. {"task_id": ti.task_id, "map_index": ti.map_index, "run_id": ti.run_id}
  2446. for ti in to_be_altered
  2447. ],
  2448. separators=(",", ":"),
  2449. )
  2450. details = "\n".join(str(t) for t in to_be_altered)
  2451. response = self.render_template(
  2452. "airflow/confirm.html",
  2453. endpoint=url_for(f"Airflow.{state}"),
  2454. message=f"Task instances you are about to mark as {state}:",
  2455. details=details,
  2456. )
  2457. return response
  2458. @expose("/failed", methods=["POST"])
  2459. @auth.has_access_dag("PUT", DagAccessEntity.TASK_INSTANCE)
  2460. @action_logging
  2461. def failed(self):
  2462. """Mark task or task_group as failed."""
  2463. args = request.form
  2464. dag_id = args.get("dag_id")
  2465. task_id = args.get("task_id")
  2466. run_id = args.get("dag_run_id")
  2467. group_id = args.get("group_id")
  2468. if not exactly_one(task_id, group_id):
  2469. raise ValueError("Exactly one of task_id or group_id must be provided")
  2470. if "map_index" not in args:
  2471. map_indexes: list[int] | None = None
  2472. else:
  2473. map_indexes = args.getlist("map_index", type=int)
  2474. origin = get_safe_url(args.get("origin"))
  2475. upstream = to_boolean(args.get("upstream"))
  2476. downstream = to_boolean(args.get("downstream"))
  2477. future = to_boolean(args.get("future"))
  2478. past = to_boolean(args.get("past"))
  2479. if task_id:
  2480. return self._mark_task_instance_state(
  2481. dag_id=dag_id,
  2482. run_id=run_id,
  2483. task_id=task_id,
  2484. map_indexes=map_indexes,
  2485. origin=origin,
  2486. upstream=upstream,
  2487. downstream=downstream,
  2488. future=future,
  2489. past=past,
  2490. state=TaskInstanceState.FAILED,
  2491. )
  2492. elif group_id:
  2493. return self._mark_task_group_state(
  2494. dag_id=dag_id,
  2495. run_id=run_id,
  2496. group_id=group_id,
  2497. origin=origin,
  2498. upstream=upstream,
  2499. downstream=downstream,
  2500. future=future,
  2501. past=past,
  2502. state=TaskInstanceState.FAILED,
  2503. )
  2504. @expose("/success", methods=["POST"])
  2505. @auth.has_access_dag("PUT", DagAccessEntity.TASK_INSTANCE)
  2506. @action_logging
  2507. def success(self):
  2508. """Mark task or task_group as success."""
  2509. args = request.form
  2510. dag_id = args.get("dag_id")
  2511. task_id = args.get("task_id")
  2512. run_id = args.get("dag_run_id")
  2513. group_id = args.get("group_id")
  2514. if not exactly_one(task_id, group_id):
  2515. raise ValueError("Exactly one of task_id or group_id must be provided")
  2516. if "map_index" not in args:
  2517. map_indexes: list[int] | None = None
  2518. else:
  2519. map_indexes = args.getlist("map_index", type=int)
  2520. origin = get_safe_url(args.get("origin"))
  2521. upstream = to_boolean(args.get("upstream"))
  2522. downstream = to_boolean(args.get("downstream"))
  2523. future = to_boolean(args.get("future"))
  2524. past = to_boolean(args.get("past"))
  2525. if task_id:
  2526. return self._mark_task_instance_state(
  2527. dag_id=dag_id,
  2528. run_id=run_id,
  2529. task_id=task_id,
  2530. map_indexes=map_indexes,
  2531. origin=origin,
  2532. upstream=upstream,
  2533. downstream=downstream,
  2534. future=future,
  2535. past=past,
  2536. state=TaskInstanceState.SUCCESS,
  2537. )
  2538. elif group_id:
  2539. return self._mark_task_group_state(
  2540. dag_id=dag_id,
  2541. run_id=run_id,
  2542. group_id=group_id,
  2543. origin=origin,
  2544. upstream=upstream,
  2545. downstream=downstream,
  2546. future=future,
  2547. past=past,
  2548. state=TaskInstanceState.SUCCESS,
  2549. )
  2550. @expose("/dags/<string:dag_id>")
  2551. def dag(self, dag_id):
  2552. """Redirect to default DAG view."""
  2553. kwargs = {**sanitize_args(request.args), "dag_id": dag_id}
  2554. return redirect(url_for("Airflow.grid", **kwargs))
  2555. @expose("/tree")
  2556. def legacy_tree(self):
  2557. """Redirect to the replacement - grid view. Kept for backwards compatibility."""
  2558. return redirect(url_for("Airflow.grid", **sanitize_args(request.args)))
  2559. @expose("/dags/<string:dag_id>/grid")
  2560. @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
  2561. @gzipped
  2562. @provide_session
  2563. def grid(self, dag_id: str, session: Session = NEW_SESSION):
  2564. """Get Dag's grid view."""
  2565. color_log_error_keywords = conf.get("logging", "color_log_error_keywords", fallback="")
  2566. color_log_warning_keywords = conf.get("logging", "color_log_warning_keywords", fallback="")
  2567. dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session)
  2568. url_serializer = URLSafeSerializer(current_app.config["SECRET_KEY"])
  2569. dag_model = DagModel.get_dagmodel(dag_id, session=session)
  2570. if not dag:
  2571. flash(f'DAG "{dag_id}" seems to be missing from DagBag.', "error")
  2572. return redirect(url_for("Airflow.index"))
  2573. wwwutils.check_import_errors(dag.fileloc, session)
  2574. wwwutils.check_dag_warnings(dag.dag_id, session)
  2575. included_events_raw = conf.get("webserver", "audit_view_included_events", fallback="")
  2576. excluded_events_raw = conf.get("webserver", "audit_view_excluded_events", fallback="")
  2577. root = request.args.get("root")
  2578. if root:
  2579. dag = dag.partial_subset(task_ids_or_regex=root, include_downstream=False, include_upstream=True)
  2580. num_runs = request.args.get("num_runs", type=int)
  2581. if num_runs is None:
  2582. num_runs = conf.getint("webserver", "default_dag_run_display_number")
  2583. doc_md = wwwutils.wrapped_markdown(getattr(dag, "doc_md", None))
  2584. task_log_reader = TaskLogReader()
  2585. if task_log_reader.supports_external_link:
  2586. external_log_name = task_log_reader.log_handler.log_name
  2587. else:
  2588. external_log_name = None
  2589. default_dag_run_display_number = conf.getint("webserver", "default_dag_run_display_number")
  2590. num_runs_options = [5, 25, 50, 100, 365]
  2591. if default_dag_run_display_number not in num_runs_options:
  2592. insort_left(num_runs_options, default_dag_run_display_number)
  2593. can_edit_taskinstance = get_auth_manager().is_authorized_dag(
  2594. method="PUT",
  2595. access_entity=DagAccessEntity.TASK_INSTANCE,
  2596. )
  2597. return self.render_template(
  2598. "airflow/grid.html",
  2599. show_trigger_form_if_no_params=conf.getboolean("webserver", "show_trigger_form_if_no_params"),
  2600. root=root,
  2601. dag=dag,
  2602. doc_md=doc_md,
  2603. num_runs=num_runs,
  2604. can_edit_taskinstance=can_edit_taskinstance,
  2605. show_external_log_redirect=task_log_reader.supports_external_link,
  2606. external_log_name=external_log_name,
  2607. dag_model=dag_model,
  2608. auto_refresh_interval=conf.getint("webserver", "auto_refresh_interval"),
  2609. default_dag_run_display_number=default_dag_run_display_number,
  2610. default_wrap=conf.getboolean("webserver", "default_wrap"),
  2611. filters_drop_down_values=htmlsafe_json_dumps(
  2612. {
  2613. "taskStates": [state.value for state in TaskInstanceState],
  2614. "dagStates": [state.value for state in State.dag_states],
  2615. "runTypes": [run_type.value for run_type in DagRunType],
  2616. "numRuns": num_runs_options,
  2617. }
  2618. ),
  2619. included_events_raw=included_events_raw,
  2620. excluded_events_raw=excluded_events_raw,
  2621. color_log_error_keywords=color_log_error_keywords,
  2622. color_log_warning_keywords=color_log_warning_keywords,
  2623. dag_file_token=url_serializer.dumps(dag.fileloc),
  2624. )
  2625. @expose("/calendar")
  2626. def legacy_calendar(self):
  2627. """Redirect from url param."""
  2628. return redirect(url_for("Airflow.calendar", **sanitize_args(request.args)))
  2629. @expose("/dags/<string:dag_id>/calendar")
  2630. def calendar(self, dag_id: str):
  2631. """Redirect to the replacement - grid + calendar. Kept for backwards compatibility."""
  2632. kwargs = {**sanitize_args(request.args), "dag_id": dag_id, "tab": "calendar"}
  2633. return redirect(url_for("Airflow.grid", **kwargs))
  2634. @expose("/object/calendar_data")
  2635. @auth.has_access_dag("GET", DagAccessEntity.RUN)
  2636. @gzipped
  2637. @provide_session
  2638. def calendar_data(self, session: Session = NEW_SESSION):
  2639. """Get DAG runs as calendar."""
  2640. dag_id = request.args.get("dag_id")
  2641. dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session)
  2642. if not dag:
  2643. return {"error": f"can't find dag {dag_id}"}, 404
  2644. dag_states = session.execute(
  2645. select(
  2646. func.date(DagRun.execution_date).label("date"),
  2647. DagRun.state,
  2648. func.max(DagRun.data_interval_start).label("data_interval_start"),
  2649. func.max(DagRun.data_interval_end).label("data_interval_end"),
  2650. func.count("*").label("count"),
  2651. )
  2652. .where(DagRun.dag_id == dag.dag_id)
  2653. .group_by(func.date(DagRun.execution_date), DagRun.state)
  2654. .order_by(func.date(DagRun.execution_date).asc())
  2655. ).all()
  2656. data_dag_states = [
  2657. {
  2658. # DATE() in SQLite and MySQL behave differently:
  2659. # SQLite returns a string, MySQL returns a date.
  2660. "date": dr.date if isinstance(dr.date, str) else dr.date.isoformat(),
  2661. "state": dr.state,
  2662. "count": dr.count,
  2663. }
  2664. for dr in dag_states
  2665. ]
  2666. # Upper limit of how many planned runs we should iterate through
  2667. max_planned_runs = 2000
  2668. total_planned = 0
  2669. # Interpret the schedule and show planned dag runs in calendar
  2670. if (
  2671. dag_states
  2672. and dag_states[-1].data_interval_start
  2673. and dag_states[-1].data_interval_end
  2674. and not isinstance(dag.timetable, ContinuousTimetable)
  2675. ):
  2676. last_automated_data_interval = DataInterval(
  2677. timezone.coerce_datetime(dag_states[-1].data_interval_start),
  2678. timezone.coerce_datetime(dag_states[-1].data_interval_end),
  2679. )
  2680. year = last_automated_data_interval.end.year
  2681. restriction = TimeRestriction(dag.start_date, dag.end_date, False)
  2682. dates: dict[datetime.date, int] = collections.Counter()
  2683. if isinstance(dag.timetable, CronMixin):
  2684. # Optimized calendar generation for timetables based on a cron expression.
  2685. dates_iter: Iterator[datetime.datetime | None] = croniter(
  2686. dag.timetable._expression,
  2687. start_time=last_automated_data_interval.end,
  2688. ret_type=datetime.datetime,
  2689. )
  2690. for dt in dates_iter:
  2691. if dt is None:
  2692. break
  2693. if dt.year != year:
  2694. break
  2695. if dag.end_date and dt > dag.end_date:
  2696. break
  2697. dates[dt.date()] += 1
  2698. else:
  2699. prev_logical_date = DateTime.min
  2700. while True:
  2701. curr_info = dag.timetable.next_dagrun_info(
  2702. last_automated_data_interval=last_automated_data_interval,
  2703. restriction=restriction,
  2704. )
  2705. if curr_info is None:
  2706. break # Reached the end.
  2707. if curr_info.logical_date <= prev_logical_date:
  2708. break # We're not progressing. Maybe a malformed timetable? Give up.
  2709. if curr_info.logical_date.year != year:
  2710. break # Crossed the year boundary.
  2711. last_automated_data_interval = curr_info.data_interval
  2712. dates[curr_info.logical_date.date()] += 1
  2713. prev_logical_date = curr_info.logical_date
  2714. total_planned += 1
  2715. if total_planned > max_planned_runs:
  2716. break
  2717. data_dag_states.extend(
  2718. {"date": date.isoformat(), "state": "planned", "count": count}
  2719. for (date, count) in dates.items()
  2720. )
  2721. data = {
  2722. "dag_states": data_dag_states,
  2723. }
  2724. return (
  2725. htmlsafe_json_dumps(data, separators=(",", ":"), dumps=flask.json.dumps),
  2726. {"Content-Type": "application/json; charset=utf-8"},
  2727. )
  2728. @expose("/graph")
  2729. def legacy_graph(self):
  2730. """Redirect from url param."""
  2731. return redirect(url_for("Airflow.graph", **sanitize_args(request.args)))
  2732. @expose("/dags/<string:dag_id>/graph")
  2733. @gzipped
  2734. @provide_session
  2735. def graph(self, dag_id: str, session: Session = NEW_SESSION):
  2736. """Redirect to the replacement - grid + graph. Kept for backwards compatibility."""
  2737. dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session)
  2738. if not dag:
  2739. flash(f'DAG "{dag_id}" seems to be missing from DagBag.', "error")
  2740. return redirect(url_for("Airflow.index"))
  2741. dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag)
  2742. dttm = dt_nr_dr_data["dttm"]
  2743. dag_run = dag.get_dagrun(execution_date=dttm)
  2744. dag_run_id = dag_run.run_id if dag_run else None
  2745. kwargs = {
  2746. **sanitize_args(request.args),
  2747. "dag_id": dag_id,
  2748. "tab": "graph",
  2749. "dag_run_id": dag_run_id,
  2750. }
  2751. return redirect(url_for("Airflow.grid", **kwargs))
  2752. @expose("/duration")
  2753. def legacy_duration(self):
  2754. """Redirect from url param."""
  2755. return redirect(url_for("Airflow.duration", **sanitize_args(request.args)))
  2756. @expose("/dags/<string:dag_id>/duration")
  2757. def duration(self, dag_id: str):
  2758. """Redirect to Grid view."""
  2759. return redirect(url_for("Airflow.grid", dag_id=dag_id))
  2760. @expose("/tries")
  2761. def legacy_tries(self):
  2762. """Redirect from url param."""
  2763. return redirect(url_for("Airflow.tries", **sanitize_args(request.args)))
  2764. @expose("/dags/<string:dag_id>/tries")
  2765. def tries(self, dag_id: str):
  2766. """Redirect to grid view."""
  2767. kwargs = {
  2768. **sanitize_args(request.args),
  2769. "dag_id": dag_id,
  2770. }
  2771. return redirect(url_for("Airflow.grid", **kwargs))
  2772. @expose("/landing_times")
  2773. def legacy_landing_times(self):
  2774. """Redirect from url param."""
  2775. return redirect(url_for("Airflow.landing_times", **sanitize_args(request.args)))
  2776. @expose("/dags/<string:dag_id>/landing-times")
  2777. def landing_times(self, dag_id: str):
  2778. """Redirect to run duration page."""
  2779. kwargs = {
  2780. **sanitize_args(request.args),
  2781. "dag_id": dag_id,
  2782. "tab": "run_duration",
  2783. }
  2784. return redirect(url_for("Airflow.grid", **kwargs))
  2785. @expose("/paused", methods=["POST"])
  2786. @auth.has_access_dag("PUT")
  2787. @action_logging
  2788. def paused(self):
  2789. """Toggle paused."""
  2790. dag_id = request.args.get("dag_id")
  2791. is_paused = request.args.get("is_paused") == "false"
  2792. models.DagModel.get_dagmodel(dag_id).set_is_paused(is_paused=is_paused)
  2793. return "OK"
  2794. @expose("/gantt")
  2795. def legacy_gantt(self):
  2796. """Redirect from url param."""
  2797. return redirect(url_for("Airflow.gantt", **sanitize_args(request.args)))
  2798. @expose("/dags/<string:dag_id>/gantt")
  2799. @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
  2800. @provide_session
  2801. def gantt(self, dag_id: str, session: Session = NEW_SESSION):
  2802. """Redirect to the replacement - grid + gantt. Kept for backwards compatibility."""
  2803. dag = get_airflow_app().dag_bag.get_dag(dag_id, session=session)
  2804. dt_nr_dr_data = get_date_time_num_runs_dag_runs_form_data(request, session, dag)
  2805. dttm = dt_nr_dr_data["dttm"]
  2806. dag_run = dag.get_dagrun(execution_date=dttm)
  2807. dag_run_id = dag_run.run_id if dag_run else None
  2808. kwargs = {**sanitize_args(request.args), "dag_id": dag_id, "tab": "gantt", "dag_run_id": dag_run_id}
  2809. return redirect(url_for("Airflow.grid", **kwargs))
  2810. @expose("/extra_links")
  2811. @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
  2812. @provide_session
  2813. def extra_links(self, *, session: Session = NEW_SESSION):
  2814. """
  2815. Return external links for a given Operator.
  2816. It queries the operator that sent the request for the links it wishes
  2817. to provide for a given external link name.
  2818. API: GET
  2819. Args: dag_id: The id of the dag containing the task in question
  2820. task_id: The id of the task in question
  2821. execution_date: The date of execution of the task
  2822. link_name: The name of the link reference to find the actual URL for
  2823. Returns:
  2824. 200: {url: <url of link>, error: None} - returned when there was no problem
  2825. finding the URL
  2826. 404: {url: None, error: <error message>} - returned when the operator does
  2827. not return a URL
  2828. """
  2829. dag_id = request.args.get("dag_id")
  2830. task_id = request.args.get("task_id")
  2831. map_index = request.args.get("map_index", -1, type=int)
  2832. execution_date = request.args.get("execution_date")
  2833. dttm = _safe_parse_datetime(execution_date)
  2834. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  2835. if not dag or task_id not in dag.task_ids:
  2836. return {"url": None, "error": f"can't find dag {dag} or task_id {task_id}"}, 404
  2837. task = dag.get_task(task_id)
  2838. link_name = request.args.get("link_name")
  2839. if link_name is None:
  2840. return {"url": None, "error": "Link name not passed"}, 400
  2841. ti = session.scalar(
  2842. select(TaskInstance)
  2843. .filter_by(dag_id=dag_id, task_id=task_id, execution_date=dttm, map_index=map_index)
  2844. .options(joinedload(TaskInstance.dag_run))
  2845. .limit(1)
  2846. )
  2847. if not ti:
  2848. return {"url": None, "error": "Task Instances not found"}, 404
  2849. try:
  2850. url = task.get_extra_links(ti, link_name)
  2851. except ValueError as err:
  2852. return {"url": None, "error": str(err)}, 404
  2853. if url:
  2854. return {"error": None, "url": url}
  2855. else:
  2856. return {"url": None, "error": f"No URL found for {link_name}"}, 404
  2857. @expose("/object/graph_data")
  2858. @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
  2859. @gzipped
  2860. def graph_data(self):
  2861. """Get Graph Data."""
  2862. dag_id = request.args.get("dag_id")
  2863. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  2864. root = request.args.get("root")
  2865. if root:
  2866. filter_upstream = request.args.get("filter_upstream") == "true"
  2867. filter_downstream = request.args.get("filter_downstream") == "true"
  2868. dag = dag.partial_subset(
  2869. task_ids_or_regex=root, include_upstream=filter_upstream, include_downstream=filter_downstream
  2870. )
  2871. nodes = task_group_to_dict(dag.task_group)
  2872. edges = dag_edges(dag)
  2873. data = {
  2874. "arrange": dag.orientation,
  2875. "nodes": nodes,
  2876. "edges": edges,
  2877. }
  2878. return (
  2879. htmlsafe_json_dumps(data, separators=(",", ":"), dumps=flask.json.dumps),
  2880. {"Content-Type": "application/json; charset=utf-8"},
  2881. )
  2882. @expose("/object/task_instances")
  2883. @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
  2884. def task_instances(self):
  2885. """Show task instances."""
  2886. dag_id = request.args.get("dag_id")
  2887. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  2888. dttm = request.args.get("execution_date")
  2889. if dttm:
  2890. dttm = _safe_parse_datetime(dttm)
  2891. else:
  2892. return {"error": f"Invalid execution_date {dttm}"}, 400
  2893. with create_session() as session:
  2894. task_instances = {
  2895. ti.task_id: wwwutils.get_instance_with_map(ti, session)
  2896. for ti in dag.get_task_instances(dttm, dttm)
  2897. }
  2898. return flask.json.jsonify(task_instances)
  2899. @expose("/object/grid_data")
  2900. @auth.has_access_dag("GET", DagAccessEntity.TASK_INSTANCE)
  2901. def grid_data(self):
  2902. """Return grid data."""
  2903. dag_id = request.args.get("dag_id")
  2904. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  2905. if not dag:
  2906. return {"error": f"can't find dag {dag_id}"}, 404
  2907. root = request.args.get("root")
  2908. if root:
  2909. filter_upstream = request.args.get("filter_upstream") == "true"
  2910. filter_downstream = request.args.get("filter_downstream") == "true"
  2911. dag = dag.partial_subset(
  2912. task_ids_or_regex=root, include_upstream=filter_upstream, include_downstream=filter_downstream
  2913. )
  2914. num_runs = request.args.get("num_runs", type=int)
  2915. if num_runs is None:
  2916. num_runs = conf.getint("webserver", "default_dag_run_display_number")
  2917. try:
  2918. base_date = timezone.parse(request.args["base_date"], strict=True)
  2919. except (KeyError, ValueError):
  2920. base_date = dag.get_latest_execution_date() or timezone.utcnow()
  2921. with create_session() as session:
  2922. query = select(DagRun).where(DagRun.dag_id == dag.dag_id, DagRun.execution_date <= base_date)
  2923. run_types = request.args.getlist("run_type")
  2924. if run_types:
  2925. query = query.where(DagRun.run_type.in_(run_types))
  2926. run_states = request.args.getlist("run_state")
  2927. if run_states:
  2928. query = query.where(DagRun.state.in_(run_states))
  2929. # Retrieve, sort and encode the previous DAG Runs
  2930. dag_runs = wwwutils.sorted_dag_runs(
  2931. query, ordering=dag.timetable.run_ordering, limit=num_runs, session=session
  2932. )
  2933. encoded_runs = []
  2934. encoding_errors = []
  2935. for dr in dag_runs:
  2936. encoded_dr, error = wwwutils.encode_dag_run(dr, json_encoder=utils_json.WebEncoder)
  2937. if error:
  2938. encoding_errors.append(error)
  2939. else:
  2940. encoded_runs.append(encoded_dr)
  2941. data = {
  2942. "groups": dag_to_grid(dag, dag_runs, session),
  2943. "dag_runs": encoded_runs,
  2944. "ordering": dag.timetable.run_ordering,
  2945. "errors": encoding_errors,
  2946. }
  2947. # avoid spaces to reduce payload size
  2948. return (
  2949. htmlsafe_json_dumps(data, separators=(",", ":"), dumps=flask.json.dumps),
  2950. {"Content-Type": "application/json; charset=utf-8"},
  2951. )
  2952. @expose("/object/historical_metrics_data")
  2953. @auth.has_access_view(AccessView.CLUSTER_ACTIVITY)
  2954. def historical_metrics_data(self):
  2955. """Return cluster activity historical metrics."""
  2956. start_date = _safe_parse_datetime(request.args.get("start_date"))
  2957. end_date = _safe_parse_datetime(request.args.get("end_date"))
  2958. with create_session() as session:
  2959. # DagRuns
  2960. dag_run_types = session.execute(
  2961. select(DagRun.run_type, func.count(DagRun.run_id))
  2962. .where(
  2963. DagRun.start_date >= start_date,
  2964. func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
  2965. )
  2966. .group_by(DagRun.run_type)
  2967. ).all()
  2968. dag_run_states = session.execute(
  2969. select(DagRun.state, func.count(DagRun.run_id))
  2970. .where(
  2971. DagRun.start_date >= start_date,
  2972. func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
  2973. )
  2974. .group_by(DagRun.state)
  2975. ).all()
  2976. # TaskInstances
  2977. task_instance_states = session.execute(
  2978. select(TaskInstance.state, func.count(TaskInstance.run_id))
  2979. .join(TaskInstance.dag_run)
  2980. .where(
  2981. DagRun.start_date >= start_date,
  2982. func.coalesce(DagRun.end_date, timezone.utcnow()) <= end_date,
  2983. )
  2984. .group_by(TaskInstance.state)
  2985. ).all()
  2986. data = {
  2987. "dag_run_types": {
  2988. **{dag_run_type.value: 0 for dag_run_type in DagRunType},
  2989. **dict(dag_run_types),
  2990. },
  2991. "dag_run_states": {
  2992. **{dag_run_state.value: 0 for dag_run_state in DagRunState},
  2993. **dict(dag_run_states),
  2994. },
  2995. "task_instance_states": {
  2996. "no_status": 0,
  2997. **{ti_state.value: 0 for ti_state in TaskInstanceState},
  2998. **{ti_state or "no_status": sum_value for ti_state, sum_value in task_instance_states},
  2999. },
  3000. }
  3001. return (
  3002. htmlsafe_json_dumps(data, separators=(",", ":"), dumps=flask.json.dumps),
  3003. {"Content-Type": "application/json; charset=utf-8"},
  3004. )
  3005. @expose("/object/next_run_datasets/<string:dag_id>")
  3006. @auth.has_access_dag("GET", DagAccessEntity.RUN)
  3007. @auth.has_access_dataset("GET")
  3008. def next_run_datasets(self, dag_id):
  3009. """Return datasets necessary, and their status, for the next dag run."""
  3010. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  3011. if not dag:
  3012. return {"error": f"can't find dag {dag_id}"}, 404
  3013. with create_session() as session:
  3014. dag_model = DagModel.get_dagmodel(dag_id, session=session)
  3015. latest_run = dag_model.get_last_dagrun(session=session)
  3016. events = [
  3017. dict(info._mapping)
  3018. for info in session.execute(
  3019. select(
  3020. DatasetModel.id,
  3021. DatasetModel.uri,
  3022. func.max(DatasetEvent.timestamp).label("lastUpdate"),
  3023. )
  3024. .join(
  3025. DagScheduleDatasetReference, DagScheduleDatasetReference.dataset_id == DatasetModel.id
  3026. )
  3027. .join(
  3028. DatasetDagRunQueue,
  3029. and_(
  3030. DatasetDagRunQueue.dataset_id == DatasetModel.id,
  3031. DatasetDagRunQueue.target_dag_id == DagScheduleDatasetReference.dag_id,
  3032. ),
  3033. isouter=True,
  3034. )
  3035. .join(
  3036. DatasetEvent,
  3037. and_(
  3038. DatasetEvent.dataset_id == DatasetModel.id,
  3039. (
  3040. DatasetEvent.timestamp >= latest_run.execution_date
  3041. if latest_run and latest_run.execution_date
  3042. else True
  3043. ),
  3044. ),
  3045. isouter=True,
  3046. )
  3047. .where(DagScheduleDatasetReference.dag_id == dag_id, ~DatasetModel.is_orphaned)
  3048. .group_by(DatasetModel.id, DatasetModel.uri)
  3049. .order_by(DatasetModel.uri)
  3050. )
  3051. ]
  3052. data = {"dataset_expression": dag_model.dataset_expression, "events": events}
  3053. return (
  3054. htmlsafe_json_dumps(data, separators=(",", ":"), dumps=flask.json.dumps),
  3055. {"Content-Type": "application/json; charset=utf-8"},
  3056. )
  3057. @expose("/object/dataset_dependencies")
  3058. @auth.has_access_dag("GET", DagAccessEntity.DEPENDENCIES)
  3059. def dataset_dependencies(self):
  3060. """Return dataset dependencies graph."""
  3061. nodes_dict: dict[str, Any] = {}
  3062. edge_tuples: set[dict[str, str]] = set()
  3063. for dag, dependencies in SerializedDagModel.get_dag_dependencies().items():
  3064. dag_node_id = f"dag:{dag}"
  3065. if dag_node_id not in nodes_dict:
  3066. for dep in dependencies:
  3067. if dep.dependency_type in ("dag", "dataset", "dataset-alias"):
  3068. # add node
  3069. nodes_dict[dag_node_id] = node_dict(dag_node_id, dag, "dag")
  3070. if dep.node_id not in nodes_dict:
  3071. nodes_dict[dep.node_id] = node_dict(
  3072. dep.node_id, dep.dependency_id, dep.dependency_type
  3073. )
  3074. # add edge
  3075. # not start dep
  3076. if dep.source != dep.dependency_type:
  3077. source = dep.source if ":" in dep.source else f"dag:{dep.source}"
  3078. target = dep.node_id
  3079. edge_tuples.add((source, target))
  3080. # not end dep
  3081. if dep.target != dep.dependency_type:
  3082. source = dep.node_id
  3083. target = dep.target if ":" in dep.target else f"dag:{dep.target}"
  3084. edge_tuples.add((source, target))
  3085. nodes = list(nodes_dict.values())
  3086. edges = [{"source": source, "target": target} for source, target in edge_tuples]
  3087. data = {
  3088. "nodes": nodes,
  3089. "edges": edges,
  3090. }
  3091. return (
  3092. htmlsafe_json_dumps(data, separators=(",", ":"), dumps=flask.json.dumps),
  3093. {"Content-Type": "application/json; charset=utf-8"},
  3094. )
  3095. @expose("/object/datasets_summary")
  3096. @auth.has_access_dataset("GET")
  3097. def datasets_summary(self):
  3098. """
  3099. Get a summary of datasets.
  3100. Includes the datetime they were last updated and how many updates they've ever had.
  3101. """
  3102. allowed_attrs = ["uri", "last_dataset_update"]
  3103. # Grab query parameters
  3104. limit = int(request.args.get("limit", 25))
  3105. offset = int(request.args.get("offset", 0))
  3106. order_by = request.args.get("order_by", "uri")
  3107. uri_pattern = request.args.get("uri_pattern", "")
  3108. lstripped_orderby = order_by.lstrip("-")
  3109. updated_after = _safe_parse_datetime(request.args.get("updated_after"), allow_empty=True)
  3110. updated_before = _safe_parse_datetime(request.args.get("updated_before"), allow_empty=True)
  3111. # Check and clean up query parameters
  3112. limit = min(50, limit)
  3113. uri_pattern = uri_pattern[:4000]
  3114. if lstripped_orderby not in allowed_attrs:
  3115. return {
  3116. "detail": (
  3117. f"Ordering with '{lstripped_orderby}' is disallowed or the attribute does not "
  3118. "exist on the model"
  3119. )
  3120. }, 400
  3121. with create_session() as session:
  3122. if lstripped_orderby == "uri":
  3123. if order_by.startswith("-"):
  3124. order_by = (DatasetModel.uri.desc(),)
  3125. else:
  3126. order_by = (DatasetModel.uri.asc(),)
  3127. elif lstripped_orderby == "last_dataset_update":
  3128. if order_by.startswith("-"):
  3129. order_by = (
  3130. func.max(DatasetEvent.timestamp).desc(),
  3131. DatasetModel.uri.asc(),
  3132. )
  3133. if session.bind.dialect.name == "postgresql":
  3134. order_by = (order_by[0].nulls_last(), *order_by[1:])
  3135. else:
  3136. order_by = (
  3137. func.max(DatasetEvent.timestamp).asc(),
  3138. DatasetModel.uri.desc(),
  3139. )
  3140. if session.bind.dialect.name == "postgresql":
  3141. order_by = (order_by[0].nulls_first(), *order_by[1:])
  3142. count_query = select(func.count(DatasetModel.id))
  3143. has_event_filters = bool(updated_before or updated_after)
  3144. query = (
  3145. select(
  3146. DatasetModel.id,
  3147. DatasetModel.uri,
  3148. func.max(DatasetEvent.timestamp).label("last_dataset_update"),
  3149. func.sum(case((DatasetEvent.id.is_not(None), 1), else_=0)).label("total_updates"),
  3150. )
  3151. .join(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id, isouter=not has_event_filters)
  3152. .group_by(
  3153. DatasetModel.id,
  3154. DatasetModel.uri,
  3155. )
  3156. .order_by(*order_by)
  3157. )
  3158. if has_event_filters:
  3159. count_query = count_query.join(DatasetEvent, DatasetEvent.dataset_id == DatasetModel.id)
  3160. filters = [~DatasetModel.is_orphaned]
  3161. if uri_pattern:
  3162. filters.append(DatasetModel.uri.ilike(f"%{uri_pattern}%"))
  3163. if updated_after:
  3164. filters.append(DatasetEvent.timestamp >= updated_after)
  3165. if updated_before:
  3166. filters.append(DatasetEvent.timestamp <= updated_before)
  3167. query = query.where(*filters).offset(offset).limit(limit)
  3168. count_query = count_query.where(*filters)
  3169. query = session.execute(query)
  3170. datasets = [dict(dataset._mapping) for dataset in query]
  3171. data = {"datasets": datasets, "total_entries": session.scalar(count_query)}
  3172. return (
  3173. htmlsafe_json_dumps(data, separators=(",", ":"), cls=utils_json.WebEncoder),
  3174. {"Content-Type": "application/json; charset=utf-8"},
  3175. )
  3176. @expose("/robots.txt")
  3177. @action_logging
  3178. def robots(self):
  3179. """
  3180. Return a robots.txt file for blocking certain search engine crawlers.
  3181. This mitigates some of the risk associated with exposing Airflow to the public
  3182. internet, however it does not address the real security risks associated with
  3183. such a deployment.
  3184. """
  3185. return send_from_directory(get_airflow_app().static_folder, "robots.txt")
  3186. @expose("/audit_log")
  3187. def legacy_audit_log(self):
  3188. """Redirect from url param."""
  3189. return redirect(url_for("Airflow.audit_log", **sanitize_args(request.args)))
  3190. @expose("/dags/<string:dag_id>/audit_log")
  3191. def audit_log(self, dag_id: str):
  3192. current_page = request.args.get("page")
  3193. arg_sorting_key = request.args.get("sorting_key")
  3194. arg_sorting_direction = request.args.get("sorting_direction")
  3195. sort_args = {
  3196. "offset": current_page,
  3197. f"sort.{arg_sorting_key}": arg_sorting_direction,
  3198. "limit": PAGE_SIZE,
  3199. }
  3200. kwargs = {
  3201. **sanitize_args(sort_args),
  3202. "dag_id": dag_id,
  3203. "tab": "audit_log",
  3204. }
  3205. return redirect(url_for("Airflow.grid", **kwargs))
  3206. @expose("/parseDagFile/<string:file_token>")
  3207. def parse_dag(self, file_token: str):
  3208. from airflow.api_connexion.endpoints.dag_parsing import reparse_dag_file
  3209. with create_session() as session:
  3210. response = reparse_dag_file(file_token=file_token, session=session)
  3211. response_messages = {
  3212. 201: ["Reparsing request submitted successfully", "info"],
  3213. 401: ["Unauthenticated request", "error"],
  3214. 403: ["Permission Denied", "error"],
  3215. 404: ["DAG not found", "error"],
  3216. }
  3217. flash(response_messages[response.status_code][0], response_messages[response.status_code][1])
  3218. redirect_url = get_safe_url(request.values.get("redirect_url"))
  3219. return redirect(redirect_url)
  3220. class ConfigurationView(AirflowBaseView):
  3221. """View to show Airflow Configurations."""
  3222. default_view = "conf"
  3223. class_permission_name = permissions.RESOURCE_CONFIG
  3224. base_permissions = [
  3225. permissions.ACTION_CAN_READ,
  3226. permissions.ACTION_CAN_ACCESS_MENU,
  3227. ]
  3228. @expose("/configuration")
  3229. @auth.has_access_configuration("GET")
  3230. def conf(self):
  3231. """Show configuration."""
  3232. raw = request.args.get("raw") == "true"
  3233. title = "Airflow Configuration"
  3234. expose_config = conf.get("webserver", "expose_config").lower()
  3235. # TODO remove "if raw" usage in Airflow 3.0. Configuration can be fetched via the REST API.
  3236. if raw:
  3237. if expose_config == "non-sensitive-only":
  3238. updater = configupdater.ConfigUpdater()
  3239. updater.read(AIRFLOW_CONFIG)
  3240. for sect, key in conf.sensitive_config_values:
  3241. if updater.has_option(sect, key):
  3242. updater[sect][key].value = "< hidden >"
  3243. config = str(updater)
  3244. elif expose_config in {"true", "t", "1"}:
  3245. with open(AIRFLOW_CONFIG) as file:
  3246. config = file.read()
  3247. else:
  3248. config = (
  3249. "# Your Airflow administrator chose not to expose the configuration, "
  3250. "most likely for security reasons."
  3251. )
  3252. return Response(
  3253. response=config,
  3254. status=200,
  3255. mimetype="application/text",
  3256. headers={"Deprecation": "Endpoint will be removed in Airflow 3.0, use the REST API instead."},
  3257. )
  3258. if expose_config in {"non-sensitive-only", "true", "t", "1"}:
  3259. display_sensitive = expose_config != "non-sensitive-only"
  3260. table = [
  3261. (section, key, str(value), source)
  3262. for section, parameters in conf.as_dict(True, display_sensitive).items()
  3263. for key, (value, source) in parameters.items()
  3264. ]
  3265. return self.render_template(
  3266. template="airflow/config.html",
  3267. title=title,
  3268. table=table,
  3269. )
  3270. else:
  3271. return self.render_template(
  3272. "airflow/config.html",
  3273. title=title,
  3274. hide_config_msg=(
  3275. "Your Airflow administrator chose not to expose the configuration, "
  3276. "most likely for security reasons."
  3277. ),
  3278. )
  3279. class RedocView(AirflowBaseView):
  3280. """Redoc Open API documentation."""
  3281. default_view = "redoc"
  3282. @expose("/redoc")
  3283. def redoc(self):
  3284. """Redoc API documentation."""
  3285. openapi_spec_url = url_for("/api/v1./api/v1_openapi_yaml")
  3286. return self.render_template("airflow/redoc.html", openapi_spec_url=openapi_spec_url)
  3287. ######################################################################################
  3288. # ModelViews
  3289. ######################################################################################
  3290. class DagFilter(BaseFilter):
  3291. """Filter using DagIDs."""
  3292. def apply(self, query, func):
  3293. if get_auth_manager().is_authorized_dag(method="GET", user=g.user):
  3294. return query
  3295. if get_auth_manager().is_authorized_dag(method="PUT", user=g.user):
  3296. return query
  3297. filter_dag_ids = get_auth_manager().get_permitted_dag_ids(user=g.user)
  3298. return query.where(self.model.dag_id.in_(filter_dag_ids))
  3299. class AirflowModelView(ModelView):
  3300. """
  3301. Airflow Model View.
  3302. Overridden `__getattribute__` to wraps REST methods with action_logger
  3303. """
  3304. list_widget = AirflowModelListWidget
  3305. page_size = PAGE_SIZE
  3306. CustomSQLAInterface = wwwutils.CustomSQLAInterface
  3307. def __getattribute__(self, attr):
  3308. """
  3309. Wrap action REST methods with `action_logging` wrapper.
  3310. Overriding enables differentiating resource and generation of event name at the decorator level.
  3311. if attr in ["show", "list", "read", "get", "get_list"]:
  3312. return action_logging(event="RESOURCE_NAME"."action_name")(attr)
  3313. else:
  3314. return attr
  3315. """
  3316. attribute = object.__getattribute__(self, attr)
  3317. if (
  3318. callable(attribute)
  3319. and hasattr(attribute, "_permission_name")
  3320. and attribute._permission_name in self.method_permission_name
  3321. ):
  3322. permission_str = self.method_permission_name[attribute._permission_name]
  3323. if permission_str not in ["show", "list", "read", "get", "get_list"]:
  3324. return action_logging(event=f"{self.route_base.strip('/')}.{permission_str}")(attribute)
  3325. return attribute
  3326. @expose("/show/<pk>", methods=["GET"])
  3327. @auth.has_access_with_pk
  3328. def show(self, pk):
  3329. """
  3330. Show view.
  3331. Same implementation as
  3332. https://github.com/dpgaspar/Flask-AppBuilder/blob/1c3af9b665ed9a3daf36673fee3327d0abf43e5b/flask_appbuilder/views.py#L566
  3333. Override it to use a custom ``has_access_with_pk`` decorator to take into consideration resource for
  3334. fined-grained access.
  3335. """
  3336. pk = self._deserialize_pk_if_composite(pk)
  3337. widgets = self._show(pk)
  3338. return self.render_template(
  3339. self.show_template,
  3340. pk=pk,
  3341. title=self.show_title,
  3342. widgets=widgets,
  3343. related_views=self._related_views,
  3344. )
  3345. @expose("/edit/<pk>", methods=["GET", "POST"])
  3346. @auth.has_access_with_pk
  3347. def edit(self, pk):
  3348. """
  3349. Edit view.
  3350. Same implementation as
  3351. https://github.com/dpgaspar/Flask-AppBuilder/blob/1c3af9b665ed9a3daf36673fee3327d0abf43e5b/flask_appbuilder/views.py#L602
  3352. Override it to use a custom ``has_access_with_pk`` decorator to take into consideration resource for
  3353. fined-grained access.
  3354. """
  3355. pk = self._deserialize_pk_if_composite(pk)
  3356. widgets = self._edit(pk)
  3357. if not widgets:
  3358. return self.post_edit_redirect()
  3359. else:
  3360. return self.render_template(
  3361. self.edit_template,
  3362. title=self.edit_title,
  3363. widgets=widgets,
  3364. related_views=self._related_views,
  3365. )
  3366. @expose("/delete/<pk>", methods=["GET", "POST"])
  3367. @auth.has_access_with_pk
  3368. def delete(self, pk):
  3369. """
  3370. Delete view.
  3371. Same implementation as
  3372. https://github.com/dpgaspar/Flask-AppBuilder/blob/1c3af9b665ed9a3daf36673fee3327d0abf43e5b/flask_appbuilder/views.py#L623
  3373. Override it to use a custom ``has_access_with_pk`` decorator to take into consideration resource for
  3374. fined-grained access.
  3375. """
  3376. # Maintains compatibility but refuses to delete on GET methods if CSRF is enabled
  3377. if not self.is_get_mutation_allowed():
  3378. self.update_redirect()
  3379. logger.warning("CSRF is enabled and a delete using GET was invoked")
  3380. flash(as_unicode(FLAMSG_ERR_SEC_ACCESS_DENIED), "danger")
  3381. return self.post_delete_redirect()
  3382. pk = self._deserialize_pk_if_composite(pk)
  3383. self._delete(pk)
  3384. return self.post_delete_redirect()
  3385. @expose("/action_post", methods=["POST"])
  3386. def action_post(self):
  3387. """
  3388. Handle multiple records selected from a list view.
  3389. Same implementation as
  3390. https://github.com/dpgaspar/Flask-AppBuilder/blob/2c5763371b81cd679d88b9971ba5d1fc4d71d54b/flask_appbuilder/views.py#L677
  3391. The difference is, it no longer check permissions with ``self.appbuilder.sm.has_access``,
  3392. it executes the function without verifying permissions.
  3393. Thus, each action need to be annotated individually with ``@auth.has_access_*`` to check user
  3394. permissions.
  3395. """
  3396. name = request.form["action"]
  3397. pks = request.form.getlist("rowid")
  3398. action = self.actions.get(name)
  3399. items = [self.datamodel.get(self._deserialize_pk_if_composite(pk)) for pk in pks]
  3400. return action.func(items)
  3401. class SlaMissModelView(AirflowModelView):
  3402. """View to show SlaMiss table."""
  3403. route_base = "/slamiss"
  3404. datamodel = AirflowModelView.CustomSQLAInterface(SlaMiss) # type: ignore
  3405. class_permission_name = permissions.RESOURCE_SLA_MISS
  3406. method_permission_name = {
  3407. "list": "read",
  3408. "action_muldelete": "delete",
  3409. "action_mulnotificationsent": "edit",
  3410. "action_mulnotificationsentfalse": "edit",
  3411. "action_mulemailsent": "edit",
  3412. "action_mulemailsentfalse": "edit",
  3413. }
  3414. base_permissions = [
  3415. permissions.ACTION_CAN_READ,
  3416. permissions.ACTION_CAN_ACCESS_MENU,
  3417. ]
  3418. list_columns = ["dag_id", "task_id", "execution_date", "email_sent", "notification_sent", "timestamp"]
  3419. label_columns = {
  3420. "execution_date": "Logical Date",
  3421. }
  3422. add_columns = ["dag_id", "task_id", "execution_date", "email_sent", "notification_sent", "timestamp"]
  3423. edit_columns = ["dag_id", "task_id", "execution_date", "email_sent", "notification_sent", "timestamp"]
  3424. search_columns = ["dag_id", "task_id", "email_sent", "notification_sent", "timestamp", "execution_date"]
  3425. base_order = ("execution_date", "desc")
  3426. base_filters = [["dag_id", DagFilter, list]]
  3427. formatters_columns = {
  3428. "task_id": wwwutils.task_instance_link,
  3429. "execution_date": wwwutils.datetime_f("execution_date"),
  3430. "timestamp": wwwutils.datetime_f("timestamp"),
  3431. "dag_id": wwwutils.dag_link,
  3432. "map_index": wwwutils.format_map_index,
  3433. }
  3434. @action("muldelete", "Delete", "Are you sure you want to delete selected records?", single=False)
  3435. @auth.has_access_dag_entities("DELETE", DagAccessEntity.SLA_MISS)
  3436. def action_muldelete(self, items):
  3437. """Multiple delete action."""
  3438. self.datamodel.delete_all(items)
  3439. self.update_redirect()
  3440. return redirect(self.get_redirect())
  3441. @action(
  3442. "mulnotificationsent",
  3443. "Set notification sent to true",
  3444. "Are you sure you want to set all these notifications to sent?",
  3445. single=False,
  3446. )
  3447. @auth.has_access_dag_entities("PUT", DagAccessEntity.SLA_MISS)
  3448. def action_mulnotificationsent(self, items: list[SlaMiss]):
  3449. return self._set_notification_property(items, "notification_sent", True)
  3450. @action(
  3451. "mulnotificationsentfalse",
  3452. "Set notification sent to false",
  3453. "Are you sure you want to mark these SLA alerts as notification not sent yet?",
  3454. single=False,
  3455. )
  3456. @auth.has_access_dag_entities("PUT", DagAccessEntity.SLA_MISS)
  3457. def action_mulnotificationsentfalse(self, items: list[SlaMiss]):
  3458. return self._set_notification_property(items, "notification_sent", False)
  3459. @action(
  3460. "mulemailsent",
  3461. "Set email sent to true",
  3462. "Are you sure you want to mark these SLA alerts as emails were sent?",
  3463. single=False,
  3464. )
  3465. @auth.has_access_dag_entities("PUT", DagAccessEntity.SLA_MISS)
  3466. def action_mulemailsent(self, items: list[SlaMiss]):
  3467. return self._set_notification_property(items, "email_sent", True)
  3468. @action(
  3469. "mulemailsentfalse",
  3470. "Set email sent to false",
  3471. "Are you sure you want to mark these SLA alerts as emails not sent yet?",
  3472. single=False,
  3473. )
  3474. @auth.has_access_dag_entities("PUT", DagAccessEntity.SLA_MISS)
  3475. def action_mulemailsentfalse(self, items: list[SlaMiss]):
  3476. return self._set_notification_property(items, "email_sent", False)
  3477. @provide_session
  3478. def _set_notification_property(
  3479. self,
  3480. items: list[SlaMiss],
  3481. attr: str,
  3482. new_value: bool,
  3483. session: Session = NEW_SESSION,
  3484. ):
  3485. try:
  3486. count = 0
  3487. for sla in items:
  3488. count += 1
  3489. setattr(sla, attr, new_value)
  3490. session.merge(sla)
  3491. session.commit()
  3492. flash(f"{count} SLAMisses had {attr} set to {new_value}.")
  3493. except Exception as ex:
  3494. flash(str(ex), "error")
  3495. flash("Failed to set state", "error")
  3496. self.update_redirect()
  3497. return redirect(self.get_default_url())
  3498. class XComModelView(AirflowModelView):
  3499. """View to show records from XCom table."""
  3500. route_base = "/xcom"
  3501. list_title = "List XComs"
  3502. datamodel = AirflowModelView.CustomSQLAInterface(XCom)
  3503. class_permission_name = permissions.RESOURCE_XCOM
  3504. method_permission_name = {
  3505. "list": "read",
  3506. "delete": "delete",
  3507. "action_muldelete": "delete",
  3508. }
  3509. base_permissions = [
  3510. permissions.ACTION_CAN_READ,
  3511. permissions.ACTION_CAN_DELETE,
  3512. permissions.ACTION_CAN_ACCESS_MENU,
  3513. ]
  3514. search_columns = ["key", "value", "timestamp", "dag_id", "task_id", "run_id", "execution_date"]
  3515. list_columns = ["key", "value", "timestamp", "dag_id", "task_id", "run_id", "map_index", "execution_date"]
  3516. base_order = ("dag_run_id", "desc")
  3517. order_columns = [
  3518. "key",
  3519. "value",
  3520. "timestamp",
  3521. "dag_id",
  3522. "task_id",
  3523. "run_id",
  3524. "map_index",
  3525. # "execution_date", # execution_date sorting is not working and crashing the UI, disabled for now.
  3526. ]
  3527. base_filters = [["dag_id", DagFilter, list]]
  3528. formatters_columns = {
  3529. "task_id": wwwutils.task_instance_link,
  3530. "timestamp": wwwutils.datetime_f("timestamp"),
  3531. "dag_id": wwwutils.dag_link,
  3532. "map_index": wwwutils.format_map_index,
  3533. "execution_date": wwwutils.datetime_f("execution_date"),
  3534. }
  3535. @action("muldelete", "Delete", "Are you sure you want to delete selected records?", single=False)
  3536. @auth.has_access_dag_entities("DELETE", DagAccessEntity.XCOM)
  3537. def action_muldelete(self, items):
  3538. """Multiple delete action."""
  3539. self.datamodel.delete_all(items)
  3540. self.update_redirect()
  3541. return redirect(self.get_redirect())
  3542. def pre_add(self, item):
  3543. """Pre add hook."""
  3544. item.execution_date = timezone.make_aware(item.execution_date)
  3545. item.value = XCom.serialize_value(
  3546. value=item.value,
  3547. key=item.key,
  3548. task_id=item.task_id,
  3549. dag_id=item.dag_id,
  3550. run_id=item.run_id,
  3551. map_index=item.map_index,
  3552. )
  3553. def pre_update(self, item):
  3554. """Pre update hook."""
  3555. item.execution_date = timezone.make_aware(item.execution_date)
  3556. item.value = XCom.serialize_value(
  3557. value=item.value,
  3558. key=item.key,
  3559. task_id=item.task_id,
  3560. dag_id=item.dag_id,
  3561. run_id=item.run_id,
  3562. map_index=item.map_index,
  3563. )
  3564. # Used to store a dictionary of field behaviours used to dynamically change available
  3565. # fields in ConnectionForm based on type of connection chosen
  3566. # See airflow.hooks.base_hook.DiscoverableHook for details on how to customize your Hooks.
  3567. #
  3568. # Additionally, a list of connection types that support testing via Airflow REST API is stored to dynamically
  3569. # enable/disable the Test Connection button.
  3570. #
  3571. # These field behaviours and testable connection types are rendered as scripts in the conn_create.html and
  3572. # conn_edit.html templates.
  3573. class ConnectionFormWidget(FormWidget):
  3574. """Form widget used to display connection."""
  3575. @cached_property
  3576. def field_behaviours(self) -> str:
  3577. return json.dumps(ProvidersManager().field_behaviours)
  3578. @cached_property
  3579. def testable_connection_types(self) -> list[str]:
  3580. return [
  3581. connection_type
  3582. for connection_type, hook_info in ProvidersManager().hooks.items()
  3583. if hook_info and hook_info.connection_testable
  3584. ]
  3585. class ConnectionFormProxy:
  3586. """
  3587. A stand-in for the connection form class.
  3588. Flask-Appbuilder model views only ever call the ``refresh()`` function on
  3589. the form class, so this is the perfect place to make the form generation
  3590. dynamic. See docstring of ``create_connection_form_class`` for rationales.
  3591. """
  3592. @staticmethod
  3593. def refresh(obj=None):
  3594. return create_connection_form_class().refresh(obj)
  3595. class ConnectionModelView(AirflowModelView):
  3596. """View to show records from Connections table."""
  3597. route_base = "/connection"
  3598. datamodel = AirflowModelView.CustomSQLAInterface(Connection) # type: ignore
  3599. class_permission_name = permissions.RESOURCE_CONNECTION
  3600. method_permission_name = {
  3601. "add": "create",
  3602. "list": "read",
  3603. "edit": "edit",
  3604. "delete": "delete",
  3605. "action_muldelete": "delete",
  3606. "action_mulduplicate": "create",
  3607. }
  3608. base_permissions = [
  3609. permissions.ACTION_CAN_CREATE,
  3610. permissions.ACTION_CAN_READ,
  3611. permissions.ACTION_CAN_EDIT,
  3612. permissions.ACTION_CAN_DELETE,
  3613. permissions.ACTION_CAN_ACCESS_MENU,
  3614. ]
  3615. list_columns = [
  3616. "conn_id",
  3617. "conn_type",
  3618. "description",
  3619. "host",
  3620. "port",
  3621. "is_encrypted",
  3622. "is_extra_encrypted",
  3623. ]
  3624. # The real add_columns and edit_columns are dynamically generated at runtime
  3625. # so we can delay calculating entries relying on providers to make webserver
  3626. # start up faster.
  3627. _add_columns = _edit_columns = [
  3628. "conn_id",
  3629. "conn_type",
  3630. "description",
  3631. "host",
  3632. "schema",
  3633. "login",
  3634. "password",
  3635. "port",
  3636. "extra",
  3637. ]
  3638. # We will generate the actual ConnectionForm when it is actually needed,
  3639. # i.e. when the web form views are displayed and submitted.
  3640. add_form = edit_form = ConnectionFormProxy
  3641. add_template = "airflow/conn_create.html"
  3642. edit_template = "airflow/conn_edit.html"
  3643. add_widget = ConnectionFormWidget
  3644. edit_widget = ConnectionFormWidget
  3645. base_order = ("conn_id", "asc")
  3646. def _iter_extra_field_names_and_sensitivity(self) -> Iterator[tuple[str, str, bool]]:
  3647. """
  3648. Iterate through provider-backed connection fields.
  3649. Note that this cannot be a property (including a cached property)
  3650. because Flask-Appbuilder attempts to access all members on startup, and
  3651. using a property would initialize the providers manager too eagerly.
  3652. Returns tuple of:
  3653. * key
  3654. * field_name
  3655. * whether the field is sensitive
  3656. """
  3657. return (
  3658. (k, v.field_name, v.is_sensitive) for k, v in ProvidersManager().connection_form_widgets.items()
  3659. )
  3660. @property
  3661. def add_columns(self) -> list[str]:
  3662. """
  3663. A list of columns to show in the Add form.
  3664. This dynamically calculates additional fields from providers and add
  3665. them to the backing list. This calculation is done exactly once (by
  3666. checking we're referencing the class-level variable instead of the
  3667. instance-level), and only after we enter the request context (to skip
  3668. superfuluous checks done by Flask-Appbuilder on startup).
  3669. """
  3670. if self._add_columns is type(self)._add_columns and has_request_context():
  3671. self._add_columns = [
  3672. *self._add_columns,
  3673. *(k for k, _, _ in self._iter_extra_field_names_and_sensitivity()),
  3674. ]
  3675. return self._add_columns
  3676. @property
  3677. def edit_columns(self) -> list[str]:
  3678. """
  3679. A list of columns to show in the Edit form.
  3680. This dynamically calculates additional fields from providers and add
  3681. them to the backing list. This calculation is done exactly once (by
  3682. checking we're referencing the class-level variable instead of the
  3683. instance-level), and only after we enter the request context (to skip
  3684. superfuluous checks done by Flask-Appbuilder on startup).
  3685. """
  3686. if self._edit_columns is type(self)._edit_columns and has_request_context():
  3687. self._edit_columns = [
  3688. *self._edit_columns,
  3689. *(k for k, _, _ in self._iter_extra_field_names_and_sensitivity()),
  3690. ]
  3691. return self._edit_columns
  3692. @action("muldelete", "Delete", "Are you sure you want to delete selected records?", single=False)
  3693. @auth.has_access_connection("DELETE")
  3694. def action_muldelete(self, connections):
  3695. """Multiple delete."""
  3696. self.datamodel.delete_all(connections)
  3697. self.update_redirect()
  3698. return redirect(self.get_redirect())
  3699. @action(
  3700. "mulduplicate",
  3701. "Duplicate",
  3702. "Are you sure you want to duplicate the selected connections?",
  3703. single=False,
  3704. )
  3705. @provide_session
  3706. @auth.has_access_connection("POST")
  3707. @auth.has_access_connection("GET")
  3708. def action_mulduplicate(self, connections, session: Session = NEW_SESSION):
  3709. """Duplicate Multiple connections."""
  3710. for selected_conn in connections:
  3711. new_conn_id = selected_conn.conn_id
  3712. match = re2.search(r"_copy(\d+)$", selected_conn.conn_id)
  3713. base_conn_id = selected_conn.conn_id
  3714. if match:
  3715. base_conn_id = base_conn_id.split("_copy")[0]
  3716. potential_connection_ids = [f"{base_conn_id}_copy{i}" for i in range(1, 11)]
  3717. query = session.scalars(
  3718. select(Connection.conn_id).where(Connection.conn_id.in_(potential_connection_ids))
  3719. )
  3720. found_conn_id_set = set(query)
  3721. possible_conn_id_iter = (
  3722. connection_id
  3723. for connection_id in potential_connection_ids
  3724. if connection_id not in found_conn_id_set
  3725. )
  3726. try:
  3727. new_conn_id = next(possible_conn_id_iter)
  3728. except StopIteration:
  3729. flash(
  3730. f"Connection {new_conn_id} can't be added because it already exists, "
  3731. f"Please rename the existing connections",
  3732. "warning",
  3733. )
  3734. else:
  3735. dup_conn = Connection(
  3736. new_conn_id,
  3737. selected_conn.conn_type,
  3738. selected_conn.description,
  3739. selected_conn.host,
  3740. selected_conn.login,
  3741. selected_conn.password,
  3742. selected_conn.schema,
  3743. selected_conn.port,
  3744. selected_conn.extra,
  3745. )
  3746. try:
  3747. session.add(dup_conn)
  3748. session.commit()
  3749. flash(f"Connection {new_conn_id} added successfully.", "success")
  3750. except IntegrityError:
  3751. flash(
  3752. f"Connection {new_conn_id} can't be added. Integrity error, "
  3753. f"probably unique constraint.",
  3754. "warning",
  3755. )
  3756. session.rollback()
  3757. self.update_redirect()
  3758. return redirect(self.get_redirect())
  3759. def process_form(self, form, is_created):
  3760. """Process form data."""
  3761. conn_id = form.data["conn_id"]
  3762. conn_type = form.data["conn_type"]
  3763. # The extra value is the combination of custom fields for this conn_type and the Extra field.
  3764. # The extra form field with all extra values (including custom fields) is in the form being processed
  3765. # so we start with those values, and override them with anything in the custom fields.
  3766. extra = {}
  3767. extra_json = form.data.get("extra")
  3768. if extra_json:
  3769. try:
  3770. extra.update(json.loads(extra_json))
  3771. except (JSONDecodeError, TypeError):
  3772. flash(
  3773. Markup(
  3774. "<p>The <em>Extra</em> connection field contained an invalid value for Conn ID: "
  3775. "<q>{conn_id}</q>.</p>"
  3776. "<p>If connection parameters need to be added to <em>Extra</em>, "
  3777. "please make sure they are in the form of a single, valid JSON object.</p><br>"
  3778. "The following <em>Extra</em> parameters were <b>not</b> added to the connection:<br>"
  3779. "{extra_json}"
  3780. ).format(conn_id=conn_id, extra_json=extra_json),
  3781. category="error",
  3782. )
  3783. del form.extra
  3784. del extra_json
  3785. for key, field_name, _ in self._iter_extra_field_names_and_sensitivity():
  3786. if key in form.data and key.startswith("extra__"):
  3787. conn_type_from_extra_field = key.split("__")[1]
  3788. if conn_type_from_extra_field == conn_type:
  3789. value = form.data[key]
  3790. # Some extra fields have a default value of False so we need to explicitly check the
  3791. # value isn't an empty string.
  3792. if value != "":
  3793. extra[field_name] = value
  3794. elif field_name in extra:
  3795. del extra[field_name]
  3796. if extra.keys():
  3797. sensitive_unchanged_keys = set()
  3798. for key, value in extra.items():
  3799. if value == SENSITIVE_FIELD_PLACEHOLDER:
  3800. sensitive_unchanged_keys.add(key)
  3801. if sensitive_unchanged_keys:
  3802. try:
  3803. conn = BaseHook.get_connection(conn_id)
  3804. except AirflowNotFoundException:
  3805. conn = None
  3806. for key in sensitive_unchanged_keys:
  3807. if conn and conn.extra_dejson.get(key):
  3808. extra[key] = conn.extra_dejson.get(key)
  3809. else:
  3810. del extra[key]
  3811. form.extra.data = json.dumps(extra)
  3812. def prefill_form(self, form, pk):
  3813. """Prefill the form."""
  3814. try:
  3815. extra = form.data.get("extra")
  3816. if extra is None:
  3817. extra_dictionary = {}
  3818. else:
  3819. extra_dictionary = json.loads(extra)
  3820. except JSONDecodeError:
  3821. extra_dictionary = {}
  3822. if not isinstance(extra_dictionary, dict):
  3823. logger.warning("extra field for %s is not a dictionary", form.data.get("conn_id", "<unknown>"))
  3824. return
  3825. for field_key, field_name, is_sensitive in self._iter_extra_field_names_and_sensitivity():
  3826. value = extra_dictionary.get(field_name, "")
  3827. if not value:
  3828. # check if connection `extra` json is using old prefixed field name style
  3829. value = extra_dictionary.get(field_key, "")
  3830. if value:
  3831. field = getattr(form, field_key)
  3832. field.data = value
  3833. if is_sensitive and field_name in extra_dictionary:
  3834. extra_dictionary[field_name] = SENSITIVE_FIELD_PLACEHOLDER
  3835. # form.data is a property that builds the dictionary from fields so we have to modify the fields
  3836. if extra_dictionary:
  3837. form.extra.data = json.dumps(extra_dictionary)
  3838. else:
  3839. form.extra.data = None
  3840. class PluginView(AirflowBaseView):
  3841. """View to show Airflow Plugins."""
  3842. default_view = "list"
  3843. class_permission_name = permissions.RESOURCE_PLUGIN
  3844. method_permission_name = {
  3845. "list": "read",
  3846. }
  3847. base_permissions = [
  3848. permissions.ACTION_CAN_READ,
  3849. permissions.ACTION_CAN_ACCESS_MENU,
  3850. ]
  3851. plugins_attributes_to_dump = PLUGINS_ATTRIBUTES_TO_DUMP
  3852. @expose("/plugin")
  3853. @auth.has_access_view(AccessView.PLUGINS)
  3854. def list(self):
  3855. """List loaded plugins."""
  3856. plugins_manager.ensure_plugins_loaded()
  3857. plugins_manager.integrate_executor_plugins()
  3858. plugins_manager.initialize_extra_operators_links_plugins()
  3859. plugins_manager.initialize_web_ui_plugins()
  3860. plugins = []
  3861. for plugin_no, plugin in enumerate(plugins_manager.plugins, 1):
  3862. plugin_data = {
  3863. "plugin_no": plugin_no,
  3864. "plugin_name": plugin.name,
  3865. "attrs": {},
  3866. }
  3867. for attr_name in self.plugins_attributes_to_dump:
  3868. attr_value = getattr(plugin, attr_name)
  3869. plugin_data["attrs"][attr_name] = attr_value
  3870. plugins.append(plugin_data)
  3871. title = "Airflow Plugins"
  3872. doc_url = get_docs_url("plugins.html")
  3873. return self.render_template(
  3874. "airflow/plugin.html",
  3875. plugins=plugins,
  3876. title=title,
  3877. doc_url=doc_url,
  3878. )
  3879. class ProviderView(AirflowBaseView):
  3880. """View to show Airflow Providers."""
  3881. default_view = "list"
  3882. class_permission_name = permissions.RESOURCE_PROVIDER
  3883. method_permission_name = {
  3884. "list": "read",
  3885. }
  3886. base_permissions = [
  3887. permissions.ACTION_CAN_READ,
  3888. permissions.ACTION_CAN_ACCESS_MENU,
  3889. ]
  3890. @expose("/provider")
  3891. @auth.has_access_view(AccessView.PROVIDERS)
  3892. def list(self):
  3893. """List providers."""
  3894. providers_manager = ProvidersManager()
  3895. providers = []
  3896. for pi in providers_manager.providers.values():
  3897. provider_info = pi.data
  3898. provider_data = {
  3899. "package_name": provider_info["package-name"],
  3900. "description": self._clean_description(provider_info["description"]),
  3901. "version": pi.version,
  3902. "documentation_url": get_doc_url_for_provider(provider_info["package-name"], pi.version),
  3903. }
  3904. providers.append(provider_data)
  3905. title = "Providers"
  3906. doc_url = get_docs_url("apache-airflow-providers/index.html")
  3907. return self.render_template(
  3908. "airflow/providers.html",
  3909. providers=providers,
  3910. title=title,
  3911. doc_url=doc_url,
  3912. )
  3913. def _clean_description(self, description):
  3914. def _build_link(match_obj):
  3915. text = match_obj.group(1)
  3916. url = match_obj.group(2)
  3917. # parsing the url to check if it's a valid url
  3918. parsed_url = urlparse(url)
  3919. if not (parsed_url.scheme == "http" or parsed_url.scheme == "https"):
  3920. # returning the original raw text
  3921. return escape(match_obj.group(0))
  3922. return Markup(f'<a href="{url}" target="_blank" rel="noopener noreferrer">{text}</a>')
  3923. cd = escape(description)
  3924. cd = re2.sub(r"`(.*)[\s+]+&lt;(.*)&gt;`__", _build_link, cd)
  3925. return Markup(cd)
  3926. class PoolModelView(AirflowModelView):
  3927. """View to show records from Pool table."""
  3928. route_base = "/pool"
  3929. list_template = "airflow/pool_list.html"
  3930. datamodel = AirflowModelView.CustomSQLAInterface(models.Pool) # type: ignore
  3931. class_permission_name = permissions.RESOURCE_POOL
  3932. method_permission_name = {
  3933. "add": "create",
  3934. "list": "read",
  3935. "edit": "edit",
  3936. "delete": "delete",
  3937. "action_muldelete": "delete",
  3938. }
  3939. base_permissions = [
  3940. permissions.ACTION_CAN_CREATE,
  3941. permissions.ACTION_CAN_READ,
  3942. permissions.ACTION_CAN_EDIT,
  3943. permissions.ACTION_CAN_DELETE,
  3944. permissions.ACTION_CAN_ACCESS_MENU,
  3945. ]
  3946. list_columns = [
  3947. "pool",
  3948. "description",
  3949. "slots",
  3950. "running_slots",
  3951. "queued_slots",
  3952. "scheduled_slots",
  3953. "deferred_slots",
  3954. ]
  3955. add_columns = ["pool", "slots", "description", "include_deferred"]
  3956. edit_columns = ["pool", "slots", "description", "include_deferred"]
  3957. # include_deferred is non-nullable, but as a checkbox in the resulting form we want to allow it unchecked
  3958. include_deferred_field = BooleanField(
  3959. validators=[validators.Optional()],
  3960. description="Check to include deferred tasks when calculating open pool slots.",
  3961. )
  3962. edit_form_extra_fields = {"include_deferred": include_deferred_field}
  3963. add_form_extra_fields = {"include_deferred": include_deferred_field}
  3964. base_order = ("pool", "asc")
  3965. @action("muldelete", "Delete", "Are you sure you want to delete selected records?", single=False)
  3966. @auth.has_access_pool("DELETE")
  3967. def action_muldelete(self, items):
  3968. """Multiple delete."""
  3969. if any(item.pool == models.Pool.DEFAULT_POOL_NAME for item in items):
  3970. flash(f"{models.Pool.DEFAULT_POOL_NAME} cannot be deleted", "error")
  3971. self.update_redirect()
  3972. return redirect(self.get_redirect())
  3973. self.datamodel.delete_all(items)
  3974. self.update_redirect()
  3975. return redirect(self.get_redirect())
  3976. @expose("/delete/<pk>", methods=["GET", "POST"])
  3977. @auth.has_access_with_pk
  3978. def delete(self, pk):
  3979. """Single delete."""
  3980. if models.Pool.is_default_pool(pk):
  3981. flash(f"{models.Pool.DEFAULT_POOL_NAME} cannot be deleted", "error")
  3982. self.update_redirect()
  3983. return redirect(self.get_redirect())
  3984. return super().delete(pk)
  3985. def pool_link(self):
  3986. """Pool link rendering."""
  3987. pool_id = self.get("pool")
  3988. if pool_id is not None:
  3989. url = url_for("TaskInstanceModelView.list", _flt_3_pool=pool_id)
  3990. return Markup("<a href='{url}'>{pool_id}</a>").format(url=url, pool_id=pool_id)
  3991. else:
  3992. return Markup('<span class="label label-danger">Invalid</span>')
  3993. def frunning_slots(self):
  3994. """Format running slots rendering."""
  3995. pool_id = self.get("pool")
  3996. running_slots = self.get("running_slots")
  3997. if pool_id is not None and running_slots is not None:
  3998. url = url_for("TaskInstanceModelView.list", _flt_3_pool=pool_id, _flt_3_state="running")
  3999. return Markup("<a href='{url}'>{running_slots}</a>").format(url=url, running_slots=running_slots)
  4000. else:
  4001. return Markup('<span class="label label-danger">Invalid</span>')
  4002. def fqueued_slots(self):
  4003. """Queued slots rendering."""
  4004. pool_id = self.get("pool")
  4005. queued_slots = self.get("queued_slots")
  4006. if pool_id is not None and queued_slots is not None:
  4007. url = url_for("TaskInstanceModelView.list", _flt_3_pool=pool_id, _flt_3_state="queued")
  4008. return Markup("<a href='{url}'>{queued_slots}</a>").format(url=url, queued_slots=queued_slots)
  4009. else:
  4010. return Markup('<span class="label label-danger">Invalid</span>')
  4011. def fscheduled_slots(self):
  4012. """Scheduled slots rendering."""
  4013. pool_id = self.get("pool")
  4014. scheduled_slots = self.get("scheduled_slots")
  4015. if pool_id is not None and scheduled_slots is not None:
  4016. url = url_for("TaskInstanceModelView.list", _flt_3_pool=pool_id, _flt_3_state="scheduled")
  4017. return Markup("<a href='{url}'>{scheduled_slots}</a>").format(
  4018. url=url, scheduled_slots=scheduled_slots
  4019. )
  4020. else:
  4021. return Markup('<span class="label label-danger">Invalid</span>')
  4022. def fdeferred_slots(self):
  4023. """Deferred slots rendering."""
  4024. pool_id = self.get("pool")
  4025. deferred_slots = self.get("deferred_slots")
  4026. if pool_id is not None and deferred_slots is not None:
  4027. url = url_for("TaskInstanceModelView.list", _flt_3_pool=pool_id, _flt_3_state="deferred")
  4028. return Markup("<a href='{url}'>{deferred_slots}</a>").format(
  4029. url=url, deferred_slots=deferred_slots
  4030. )
  4031. else:
  4032. return Markup('<span class="label label-danger">Invalid</span>')
  4033. formatters_columns = {
  4034. "pool": pool_link,
  4035. "running_slots": frunning_slots,
  4036. "queued_slots": fqueued_slots,
  4037. "scheduled_slots": fscheduled_slots,
  4038. "deferred_slots": fdeferred_slots,
  4039. }
  4040. validators_columns = {"pool": [validators.DataRequired()], "slots": [validators.NumberRange(min=-1)]}
  4041. class VariableModelView(AirflowModelView):
  4042. """View to show records from Variable table."""
  4043. route_base = "/variable"
  4044. list_template = "airflow/variable_list.html"
  4045. edit_template = "airflow/variable_edit.html"
  4046. show_template = "airflow/variable_show.html"
  4047. show_widget = AirflowVariableShowWidget
  4048. datamodel = AirflowModelView.CustomSQLAInterface(models.Variable) # type: ignore
  4049. class_permission_name = permissions.RESOURCE_VARIABLE
  4050. method_permission_name = {
  4051. "add": "create",
  4052. "list": "read",
  4053. "edit": "edit",
  4054. "show": "read",
  4055. "delete": "delete",
  4056. "action_muldelete": "delete",
  4057. "action_varexport": "read",
  4058. }
  4059. base_permissions = [
  4060. permissions.ACTION_CAN_CREATE,
  4061. permissions.ACTION_CAN_READ,
  4062. permissions.ACTION_CAN_EDIT,
  4063. permissions.ACTION_CAN_DELETE,
  4064. permissions.ACTION_CAN_ACCESS_MENU,
  4065. ]
  4066. list_columns = ["key", "val", "description", "is_encrypted"]
  4067. add_columns = ["key", "val", "description"]
  4068. edit_columns = ["key", "val", "description"]
  4069. show_columns = ["key", "val", "description"]
  4070. search_columns = ["key", "val"]
  4071. base_order = ("key", "asc")
  4072. def hidden_field_formatter(self):
  4073. """Format hidden fields."""
  4074. key = self.get("key")
  4075. val = self.get("val")
  4076. if secrets_masker.should_hide_value_for_key(key):
  4077. return Markup("*" * 8)
  4078. if val:
  4079. return val
  4080. else:
  4081. return Markup('<span class="label label-danger">Invalid</span>')
  4082. formatters_columns = {
  4083. "val": hidden_field_formatter,
  4084. }
  4085. validators_columns = {"key": [validators.DataRequired()]}
  4086. def prefill_form(self, form, request_id):
  4087. if secrets_masker.should_hide_value_for_key(form.key.data):
  4088. form.val.data = "*" * 8
  4089. def prefill_show(self, item):
  4090. if secrets_masker.should_hide_value_for_key(item.key):
  4091. item.val = "*" * 8
  4092. def _show(self, pk):
  4093. pages = get_page_args()
  4094. page_sizes = get_page_size_args()
  4095. orders = get_order_args()
  4096. item = self.datamodel.get(pk, self._base_filters)
  4097. if not item:
  4098. abort(404)
  4099. self.prefill_show(item)
  4100. widgets = self._get_show_widget(pk, item)
  4101. self.update_redirect()
  4102. return self._get_related_views_widgets(
  4103. item, orders=orders, pages=pages, page_sizes=page_sizes, widgets=widgets
  4104. )
  4105. extra_args = {"can_create_variable": lambda: get_auth_manager().is_authorized_variable(method="POST")}
  4106. @action("muldelete", "Delete", "Are you sure you want to delete selected records?", single=False)
  4107. @auth.has_access_variable("DELETE")
  4108. def action_muldelete(self, items):
  4109. """Multiple delete."""
  4110. self.datamodel.delete_all(items)
  4111. self.update_redirect()
  4112. return redirect(self.get_redirect())
  4113. @action("varexport", "Export", "", single=False)
  4114. @auth.has_access_variable("GET")
  4115. def action_varexport(self, items):
  4116. """Export variables."""
  4117. var_dict = {}
  4118. decoder = json.JSONDecoder()
  4119. for var in items:
  4120. try:
  4121. val = decoder.decode(var.val)
  4122. except Exception:
  4123. val = var.val
  4124. var_dict[var.key] = val
  4125. response = make_response(json.dumps(var_dict, sort_keys=True, indent=4))
  4126. response.headers["Content-Disposition"] = "attachment; filename=variables.json"
  4127. response.headers["Content-Type"] = "application/json; charset=utf-8"
  4128. return response
  4129. @expose("/varimport", methods=["POST"])
  4130. @auth.has_access_variable("POST")
  4131. @action_logging(event=f"{permissions.RESOURCE_VARIABLE.lower()}.varimport")
  4132. @provide_session
  4133. def varimport(self, session):
  4134. """Import variables."""
  4135. try:
  4136. variable_dict = json.loads(request.files["file"].read())
  4137. action_on_existing = request.form.get("action_if_exists", "overwrite").lower()
  4138. except Exception:
  4139. self.update_redirect()
  4140. flash("Missing file or syntax error.", "error")
  4141. return redirect(self.get_redirect())
  4142. else:
  4143. existing_keys = set()
  4144. if action_on_existing != "overwrite":
  4145. existing_keys = set(
  4146. session.scalars(select(models.Variable.key).where(models.Variable.key.in_(variable_dict)))
  4147. )
  4148. if action_on_existing == "fail" and existing_keys:
  4149. failed_repr = ", ".join(repr(k) for k in sorted(existing_keys))
  4150. flash(f"Failed. The variables with these keys: {failed_repr} already exists.")
  4151. logger.error("Failed. The variables with these keys: %s already exists.", failed_repr)
  4152. self.update_redirect()
  4153. return redirect(self.get_redirect())
  4154. skipped = set()
  4155. suc_count = fail_count = 0
  4156. for k, v in variable_dict.items():
  4157. if action_on_existing == "skip" and k in existing_keys:
  4158. logger.warning("Variable: %s already exists, skipping.", k)
  4159. skipped.add(k)
  4160. continue
  4161. try:
  4162. models.Variable.set(k, v, serialize_json=not isinstance(v, str))
  4163. except Exception as exc:
  4164. logger.info("Variable import failed: %r", exc)
  4165. fail_count += 1
  4166. else:
  4167. suc_count += 1
  4168. flash(f"{suc_count} variable(s) successfully updated.")
  4169. if fail_count:
  4170. flash(f"{fail_count} variable(s) failed to be updated.", "error")
  4171. if skipped:
  4172. skipped_repr = ", ".join(repr(k) for k in sorted(skipped))
  4173. flash(
  4174. f"The variables with these keys: {skipped_repr} were skipped "
  4175. "because they already exists",
  4176. "warning",
  4177. )
  4178. self.update_redirect()
  4179. return redirect(self.get_redirect())
  4180. class JobModelView(AirflowModelView):
  4181. """View to show records from Job table."""
  4182. route_base = "/job"
  4183. datamodel = AirflowModelView.CustomSQLAInterface(Job) # type: ignore
  4184. class_permission_name = permissions.RESOURCE_JOB
  4185. method_permission_name = {
  4186. "list": "read",
  4187. }
  4188. base_permissions = [
  4189. permissions.ACTION_CAN_READ,
  4190. permissions.ACTION_CAN_ACCESS_MENU,
  4191. ]
  4192. list_columns = [
  4193. "id",
  4194. "dag_id",
  4195. "state",
  4196. "job_type",
  4197. "start_date",
  4198. "end_date",
  4199. "latest_heartbeat",
  4200. "executor_class",
  4201. "hostname",
  4202. "unixname",
  4203. ]
  4204. search_columns = [
  4205. "id",
  4206. "dag_id",
  4207. "state",
  4208. "job_type",
  4209. "start_date",
  4210. "end_date",
  4211. "latest_heartbeat",
  4212. "executor_class",
  4213. "hostname",
  4214. "unixname",
  4215. ]
  4216. base_order = ("start_date", "desc")
  4217. base_filters = [["dag_id", DagFilter, list]]
  4218. formatters_columns = {
  4219. "start_date": wwwutils.datetime_f("start_date"),
  4220. "end_date": wwwutils.datetime_f("end_date"),
  4221. "hostname": wwwutils.nobr_f("hostname"),
  4222. "state": wwwutils.state_f,
  4223. "latest_heartbeat": wwwutils.datetime_f("latest_heartbeat"),
  4224. }
  4225. class DagRunModelView(AirflowModelView):
  4226. """View to show records from DagRun table."""
  4227. route_base = "/dagrun"
  4228. datamodel = wwwutils.DagRunCustomSQLAInterface(models.DagRun) # type: ignore
  4229. class_permission_name = permissions.RESOURCE_DAG_RUN
  4230. method_permission_name = {
  4231. "delete": "delete",
  4232. "edit": "edit",
  4233. "list": "read",
  4234. "action_clear": "edit",
  4235. "action_muldelete": "delete",
  4236. "action_set_queued": "edit",
  4237. "action_set_running": "edit",
  4238. "action_set_failed": "edit",
  4239. "action_set_success": "edit",
  4240. }
  4241. base_permissions = [
  4242. permissions.ACTION_CAN_CREATE,
  4243. permissions.ACTION_CAN_READ,
  4244. permissions.ACTION_CAN_EDIT,
  4245. permissions.ACTION_CAN_DELETE,
  4246. permissions.ACTION_CAN_ACCESS_MENU,
  4247. ]
  4248. list_columns = [
  4249. "state",
  4250. "dag_id",
  4251. "execution_date",
  4252. "run_id",
  4253. "run_type",
  4254. "queued_at",
  4255. "start_date",
  4256. "end_date",
  4257. "note",
  4258. "external_trigger",
  4259. "conf",
  4260. "duration",
  4261. ]
  4262. search_columns = [
  4263. "state",
  4264. "dag_id",
  4265. "execution_date",
  4266. "run_id",
  4267. "run_type",
  4268. "start_date",
  4269. "end_date",
  4270. "note",
  4271. "external_trigger",
  4272. ]
  4273. label_columns = {
  4274. "execution_date": "Logical Date",
  4275. }
  4276. edit_columns = [
  4277. "state",
  4278. "dag_id",
  4279. "execution_date",
  4280. "start_date",
  4281. "end_date",
  4282. "run_id",
  4283. "conf",
  4284. "note",
  4285. ]
  4286. # duration is not a DB column, its derived
  4287. order_columns = [
  4288. "state",
  4289. "dag_id",
  4290. "execution_date",
  4291. "run_id",
  4292. "run_type",
  4293. "queued_at",
  4294. "start_date",
  4295. "end_date",
  4296. # "note", # todo: maybe figure out how to re-enable this
  4297. "external_trigger",
  4298. "conf",
  4299. ]
  4300. base_order = ("execution_date", "desc")
  4301. base_filters = [["dag_id", DagFilter, list]]
  4302. edit_form = DagRunEditForm
  4303. def duration_f(self):
  4304. """Duration calculation."""
  4305. end_date = self.get("end_date")
  4306. start_date = self.get("start_date")
  4307. difference = "0s"
  4308. if start_date and end_date:
  4309. difference = td_format(end_date - start_date)
  4310. return difference
  4311. formatters_columns = {
  4312. "execution_date": wwwutils.datetime_f("execution_date"),
  4313. "state": wwwutils.state_f,
  4314. "start_date": wwwutils.datetime_f("start_date"),
  4315. "end_date": wwwutils.datetime_f("end_date"),
  4316. "queued_at": wwwutils.datetime_f("queued_at"),
  4317. "dag_id": wwwutils.dag_link,
  4318. "run_id": wwwutils.dag_run_link,
  4319. "conf": wwwutils.json_f("conf"),
  4320. "duration": duration_f,
  4321. }
  4322. @action("muldelete", "Delete", "Are you sure you want to delete selected records?", single=False)
  4323. @auth.has_access_dag_entities("DELETE", DagAccessEntity.RUN)
  4324. @action_logging
  4325. def action_muldelete(self, items: list[DagRun]):
  4326. """Multiple delete."""
  4327. self.datamodel.delete_all(items)
  4328. self.update_redirect()
  4329. return redirect(self.get_redirect())
  4330. @action("set_queued", "Set state to 'queued'", "", single=False)
  4331. @auth.has_access_dag_entities("PUT", DagAccessEntity.RUN)
  4332. @action_logging
  4333. def action_set_queued(self, drs: list[DagRun]):
  4334. """Set state to queued."""
  4335. return self._set_dag_runs_to_active_state(drs, DagRunState.QUEUED)
  4336. @action("set_running", "Set state to 'running'", "", single=False)
  4337. @auth.has_access_dag_entities("PUT", DagAccessEntity.RUN)
  4338. @action_logging
  4339. def action_set_running(self, drs: list[DagRun]):
  4340. """Set state to running."""
  4341. return self._set_dag_runs_to_active_state(drs, DagRunState.RUNNING)
  4342. @provide_session
  4343. def _set_dag_runs_to_active_state(
  4344. self,
  4345. drs: list[DagRun],
  4346. state: DagRunState,
  4347. session: Session = NEW_SESSION,
  4348. ):
  4349. """Set dag run to active state; this routine only supports Running and Queued state."""
  4350. try:
  4351. count = 0
  4352. for dr in session.scalars(select(DagRun).where(DagRun.id.in_(dagrun.id for dagrun in drs))):
  4353. count += 1
  4354. if state == DagRunState.RUNNING:
  4355. dr.start_date = timezone.utcnow()
  4356. dr.state = state
  4357. session.commit()
  4358. flash(f"{count} dag runs were set to {state}.")
  4359. except Exception as ex:
  4360. flash(str(ex), "error")
  4361. flash("Failed to set state", "error")
  4362. return redirect(self.get_default_url())
  4363. @action(
  4364. "set_failed",
  4365. "Set state to 'failed'",
  4366. "All running task instances would also be marked as failed, are you sure?",
  4367. single=False,
  4368. )
  4369. @auth.has_access_dag_entities("PUT", DagAccessEntity.RUN)
  4370. @provide_session
  4371. @action_logging
  4372. def action_set_failed(self, drs: list[DagRun], session: Session = NEW_SESSION):
  4373. """Set state to failed."""
  4374. try:
  4375. count = 0
  4376. altered_tis = []
  4377. for dr in session.scalars(select(DagRun).where(DagRun.id.in_(dagrun.id for dagrun in drs))):
  4378. count += 1
  4379. altered_tis += set_dag_run_state_to_failed(
  4380. dag=get_airflow_app().dag_bag.get_dag(dr.dag_id),
  4381. run_id=dr.run_id,
  4382. commit=True,
  4383. session=session,
  4384. )
  4385. altered_ti_count = len(altered_tis)
  4386. flash(f"{count} dag runs and {altered_ti_count} task instances were set to failed")
  4387. except Exception:
  4388. flash("Failed to set state", "error")
  4389. return redirect(self.get_default_url())
  4390. @action(
  4391. "set_success",
  4392. "Set state to 'success'",
  4393. "All task instances would also be marked as success, are you sure?",
  4394. single=False,
  4395. )
  4396. @auth.has_access_dag_entities("PUT", DagAccessEntity.RUN)
  4397. @provide_session
  4398. @action_logging
  4399. def action_set_success(self, drs: list[DagRun], session: Session = NEW_SESSION):
  4400. """Set state to success."""
  4401. try:
  4402. count = 0
  4403. altered_tis = []
  4404. for dr in session.scalars(select(DagRun).where(DagRun.id.in_(dagrun.id for dagrun in drs))):
  4405. count += 1
  4406. altered_tis += set_dag_run_state_to_success(
  4407. dag=get_airflow_app().dag_bag.get_dag(dr.dag_id),
  4408. run_id=dr.run_id,
  4409. commit=True,
  4410. session=session,
  4411. )
  4412. altered_ti_count = len(altered_tis)
  4413. flash(f"{count} dag runs and {altered_ti_count} task instances were set to success")
  4414. except Exception:
  4415. flash("Failed to set state", "error")
  4416. return redirect(self.get_default_url())
  4417. @action("clear", "Clear the state", "All task instances would be cleared, are you sure?", single=False)
  4418. @auth.has_access_dag_entities("PUT", DagAccessEntity.RUN)
  4419. @provide_session
  4420. @action_logging
  4421. def action_clear(self, drs: list[DagRun], session: Session = NEW_SESSION):
  4422. """Clear the state."""
  4423. try:
  4424. count = 0
  4425. cleared_ti_count = 0
  4426. dag_to_tis: dict[DAG, list[TaskInstance]] = {}
  4427. for dr in session.scalars(select(DagRun).where(DagRun.id.in_(dagrun.id for dagrun in drs))):
  4428. count += 1
  4429. dag = get_airflow_app().dag_bag.get_dag(dr.dag_id)
  4430. tis_to_clear = dag_to_tis.setdefault(dag, [])
  4431. tis_to_clear += dr.get_task_instances()
  4432. for dag, tis in dag_to_tis.items():
  4433. cleared_ti_count += len(tis)
  4434. models.clear_task_instances(tis, session, dag=dag)
  4435. flash(f"{count} dag runs and {cleared_ti_count} task instances were cleared")
  4436. except Exception:
  4437. flash("Failed to clear state", "error")
  4438. return redirect(self.get_default_url())
  4439. class LogModelView(AirflowModelView):
  4440. """View to show records from Log table."""
  4441. route_base = "/log"
  4442. datamodel = AirflowModelView.CustomSQLAInterface(Log) # type:ignore
  4443. class_permission_name = permissions.RESOURCE_AUDIT_LOG
  4444. method_permission_name = {
  4445. "list": "read",
  4446. }
  4447. base_permissions = [
  4448. permissions.ACTION_CAN_READ,
  4449. permissions.ACTION_CAN_ACCESS_MENU,
  4450. ]
  4451. list_columns = [
  4452. "id",
  4453. "dttm",
  4454. "dag_id",
  4455. "task_id",
  4456. "run_id",
  4457. "event",
  4458. "execution_date",
  4459. "owner",
  4460. "owner_display_name",
  4461. "extra",
  4462. ]
  4463. search_columns = [
  4464. "dttm",
  4465. "dag_id",
  4466. "task_id",
  4467. "run_id",
  4468. "event",
  4469. "execution_date",
  4470. "owner",
  4471. "owner_display_name",
  4472. "extra",
  4473. ]
  4474. label_columns = {
  4475. "execution_date": "Logical Date",
  4476. "owner": "Owner ID",
  4477. "owner_display_name": "Owner Name",
  4478. }
  4479. base_order = ("dttm", "desc")
  4480. base_filters = [["dag_id", DagFilter, list]]
  4481. formatters_columns = {
  4482. "dttm": wwwutils.datetime_f("dttm"),
  4483. "execution_date": wwwutils.datetime_f("execution_date"),
  4484. "dag_id": wwwutils.dag_link,
  4485. }
  4486. class TaskRescheduleModelView(AirflowModelView):
  4487. """View to show records from Task Reschedule table."""
  4488. route_base = "/taskreschedule"
  4489. datamodel = AirflowModelView.CustomSQLAInterface(models.TaskReschedule) # type: ignore
  4490. related_views = [DagRunModelView]
  4491. class_permission_name = permissions.RESOURCE_TASK_RESCHEDULE
  4492. method_permission_name = {
  4493. "list": "read",
  4494. }
  4495. base_permissions = [
  4496. permissions.ACTION_CAN_READ,
  4497. permissions.ACTION_CAN_ACCESS_MENU,
  4498. ]
  4499. list_columns = [
  4500. "id",
  4501. "dag_id",
  4502. "run_id",
  4503. "dag_run.execution_date",
  4504. "task_id",
  4505. "map_index",
  4506. "try_number",
  4507. "start_date",
  4508. "end_date",
  4509. "duration",
  4510. "reschedule_date",
  4511. ]
  4512. label_columns = {
  4513. "dag_run.execution_date": "Logical Date",
  4514. }
  4515. search_columns = [
  4516. "dag_id",
  4517. "task_id",
  4518. "run_id",
  4519. "execution_date",
  4520. "start_date",
  4521. "end_date",
  4522. "reschedule_date",
  4523. ]
  4524. base_order = ("id", "desc")
  4525. base_filters = [["dag_id", DagFilter, list]]
  4526. def duration_f(self):
  4527. """Duration calculation."""
  4528. end_date = self.get("end_date")
  4529. duration = self.get("duration")
  4530. if end_date and duration:
  4531. return td_format(datetime.timedelta(seconds=duration))
  4532. return None
  4533. formatters_columns = {
  4534. "dag_id": wwwutils.dag_link,
  4535. "task_id": wwwutils.task_instance_link,
  4536. "start_date": wwwutils.datetime_f("start_date"),
  4537. "end_date": wwwutils.datetime_f("end_date"),
  4538. "dag_run.execution_date": wwwutils.datetime_f("dag_run.execution_date"),
  4539. "reschedule_date": wwwutils.datetime_f("reschedule_date"),
  4540. "duration": duration_f,
  4541. "map_index": wwwutils.format_map_index,
  4542. }
  4543. class TriggerModelView(AirflowModelView):
  4544. """View to show records from Task Reschedule table."""
  4545. route_base = "/triggerview"
  4546. datamodel = AirflowModelView.CustomSQLAInterface(models.Trigger) # type: ignore
  4547. class_permission_name = permissions.RESOURCE_TRIGGER
  4548. method_permission_name = {
  4549. "list": "read",
  4550. }
  4551. base_permissions = [
  4552. permissions.ACTION_CAN_READ,
  4553. permissions.ACTION_CAN_ACCESS_MENU,
  4554. ]
  4555. list_columns = [
  4556. "id",
  4557. "classpath",
  4558. "created_date",
  4559. "triggerer_id",
  4560. ]
  4561. search_columns = [
  4562. "id",
  4563. "classpath",
  4564. "created_date",
  4565. "triggerer_id",
  4566. ]
  4567. base_order = ("id", "created_date")
  4568. formatters_columns = {
  4569. "created_date": wwwutils.datetime_f("created_date"),
  4570. }
  4571. class TaskInstanceModelView(AirflowModelView):
  4572. """View to show records from TaskInstance table."""
  4573. route_base = "/taskinstance"
  4574. datamodel = AirflowModelView.CustomSQLAInterface(models.TaskInstance) # type: ignore
  4575. class_permission_name = permissions.RESOURCE_TASK_INSTANCE
  4576. method_permission_name = {
  4577. "list": "read",
  4578. "delete": "delete",
  4579. "action_clear": "edit",
  4580. "action_clear_downstream": "edit",
  4581. "action_muldelete": "delete",
  4582. "action_set_failed": "edit",
  4583. "action_set_success": "edit",
  4584. "action_set_retry": "edit",
  4585. "action_set_skipped": "edit",
  4586. }
  4587. base_permissions = [
  4588. permissions.ACTION_CAN_CREATE,
  4589. permissions.ACTION_CAN_READ,
  4590. permissions.ACTION_CAN_EDIT,
  4591. permissions.ACTION_CAN_DELETE,
  4592. permissions.ACTION_CAN_ACCESS_MENU,
  4593. ]
  4594. page_size = PAGE_SIZE
  4595. list_columns = [
  4596. "state",
  4597. "dag_id",
  4598. "task_id",
  4599. "run_id",
  4600. "map_index",
  4601. "dag_run.execution_date",
  4602. "operator",
  4603. "start_date",
  4604. "end_date",
  4605. "duration",
  4606. "note",
  4607. "job_id",
  4608. "hostname",
  4609. "unixname",
  4610. "priority_weight",
  4611. "queue",
  4612. "queued_dttm",
  4613. "prev_attempted_tries",
  4614. "pool",
  4615. "queued_by_job_id",
  4616. "external_executor_id",
  4617. "log_url",
  4618. ]
  4619. order_columns = [
  4620. "state",
  4621. "dag_id",
  4622. "task_id",
  4623. "run_id",
  4624. "map_index",
  4625. "dag_run.execution_date",
  4626. "operator",
  4627. "start_date",
  4628. "end_date",
  4629. "duration",
  4630. # "note", # TODO: Maybe figure out how to re-enable this.
  4631. "job_id",
  4632. "hostname",
  4633. "unixname",
  4634. "priority_weight",
  4635. "queue",
  4636. "queued_dttm",
  4637. "pool",
  4638. "queued_by_job_id",
  4639. ]
  4640. # todo: don't use prev_attempted_tries; just use try_number
  4641. label_columns = {"dag_run.execution_date": "Logical Date", "prev_attempted_tries": "Try Number"}
  4642. search_columns = [
  4643. "state",
  4644. "dag_id",
  4645. "task_id",
  4646. "run_id",
  4647. "map_index",
  4648. "rendered_map_index",
  4649. "execution_date",
  4650. "operator",
  4651. "start_date",
  4652. "end_date",
  4653. "note",
  4654. "hostname",
  4655. "priority_weight",
  4656. "queue",
  4657. "queued_dttm",
  4658. "try_number",
  4659. "pool",
  4660. "queued_by_job_id",
  4661. ]
  4662. edit_columns = [
  4663. "dag_id",
  4664. "task_id",
  4665. "execution_date",
  4666. "start_date",
  4667. "end_date",
  4668. "state",
  4669. "note",
  4670. ]
  4671. add_exclude_columns = ["next_method", "next_kwargs", "trigger_id"]
  4672. edit_form = TaskInstanceEditForm
  4673. base_order = ("job_id", "asc")
  4674. base_filters = [["dag_id", DagFilter, list]]
  4675. def log_url_formatter(self):
  4676. """Format log URL."""
  4677. dag_id = self.get("dag_id")
  4678. task_id = self.get("task_id")
  4679. run_id = self.get("run_id")
  4680. map_index = self.get("map_index", None)
  4681. if map_index == -1:
  4682. map_index = None
  4683. url = url_for(
  4684. "Airflow.grid",
  4685. dag_id=dag_id,
  4686. task_id=task_id,
  4687. dag_run_id=run_id,
  4688. map_index=map_index,
  4689. tab="logs",
  4690. )
  4691. return Markup(
  4692. '<a href="{log_url}"><span class="material-icons" aria-hidden="true">reorder</span></a>'
  4693. ).format(log_url=url)
  4694. def duration_f(self):
  4695. """Format duration."""
  4696. end_date = self.get("end_date")
  4697. duration = self.get("duration")
  4698. if end_date and duration:
  4699. return td_format(datetime.timedelta(seconds=duration))
  4700. return None
  4701. formatters_columns = {
  4702. "log_url": log_url_formatter,
  4703. "task_id": wwwutils.task_instance_link,
  4704. "run_id": wwwutils.dag_run_link,
  4705. "map_index": wwwutils.format_map_index,
  4706. "hostname": wwwutils.nobr_f("hostname"),
  4707. "state": wwwutils.state_f,
  4708. "dag_run.execution_date": wwwutils.datetime_f("dag_run.execution_date"),
  4709. "start_date": wwwutils.datetime_f("start_date"),
  4710. "end_date": wwwutils.datetime_f("end_date"),
  4711. "queued_dttm": wwwutils.datetime_f("queued_dttm"),
  4712. "dag_id": wwwutils.dag_link,
  4713. "duration": duration_f,
  4714. }
  4715. def _clear_task_instances(
  4716. self, task_instances: list[TaskInstance], session: Session, clear_downstream: bool = False
  4717. ) -> tuple[int, int]:
  4718. """
  4719. Clear task instances, optionally including their downstream dependencies.
  4720. :param task_instances: list of TIs to clear
  4721. :param clear_downstream: should downstream task instances be cleared as well?
  4722. :return: a tuple with:
  4723. - count of cleared task instances actually selected by the user
  4724. - count of downstream task instances that were additionally cleared
  4725. """
  4726. cleared_tis_count = 0
  4727. cleared_downstream_tis_count = 0
  4728. # Group TIs by dag id in order to call `get_dag` only once per dag
  4729. tis_grouped_by_dag_id = itertools.groupby(task_instances, lambda ti: ti.dag_id)
  4730. for dag_id, dag_tis in tis_grouped_by_dag_id:
  4731. dag = get_airflow_app().dag_bag.get_dag(dag_id)
  4732. tis_to_clear = list(dag_tis)
  4733. downstream_tis_to_clear = []
  4734. if clear_downstream:
  4735. tis_to_clear_grouped_by_dag_run = itertools.groupby(tis_to_clear, lambda ti: ti.dag_run)
  4736. for dag_run, dag_run_tis in tis_to_clear_grouped_by_dag_run:
  4737. # Determine tasks that are downstream of the cleared TIs and fetch associated TIs
  4738. # This has to be run for each dag run because the user may clear different TIs across runs
  4739. task_ids_to_clear = [ti.task_id for ti in dag_run_tis]
  4740. partial_dag = dag.partial_subset(
  4741. task_ids_or_regex=task_ids_to_clear, include_downstream=True, include_upstream=False
  4742. )
  4743. downstream_task_ids_to_clear = [
  4744. task_id for task_id in partial_dag.task_dict if task_id not in task_ids_to_clear
  4745. ]
  4746. # dag.clear returns TIs when in dry run mode
  4747. downstream_tis_to_clear.extend(
  4748. dag.clear(
  4749. start_date=dag_run.execution_date,
  4750. end_date=dag_run.execution_date,
  4751. task_ids=downstream_task_ids_to_clear,
  4752. include_subdags=False,
  4753. include_parentdag=False,
  4754. session=session,
  4755. dry_run=True,
  4756. )
  4757. )
  4758. # Once all TIs are fetched, perform the actual clearing
  4759. models.clear_task_instances(tis=tis_to_clear + downstream_tis_to_clear, session=session, dag=dag)
  4760. cleared_tis_count += len(tis_to_clear)
  4761. cleared_downstream_tis_count += len(downstream_tis_to_clear)
  4762. return cleared_tis_count, cleared_downstream_tis_count
  4763. @action(
  4764. "clear",
  4765. lazy_gettext("Clear"),
  4766. lazy_gettext(
  4767. "Are you sure you want to clear the state of the selected task"
  4768. " instance(s) and set their dagruns to the QUEUED state?"
  4769. ),
  4770. single=False,
  4771. )
  4772. @auth.has_access_dag_entities("PUT", DagAccessEntity.TASK_INSTANCE)
  4773. @provide_session
  4774. @action_logging
  4775. def action_clear(self, task_instances, session: Session = NEW_SESSION):
  4776. """Clear an arbitrary number of task instances."""
  4777. try:
  4778. count, _ = self._clear_task_instances(
  4779. task_instances=task_instances, session=session, clear_downstream=False
  4780. )
  4781. session.commit()
  4782. flash(f"{count} task instance{'s have' if count > 1 else ' has'} been cleared")
  4783. except Exception as e:
  4784. flash(f'Failed to clear task instances: "{e}"', "error")
  4785. self.update_redirect()
  4786. return redirect(self.get_redirect())
  4787. @action(
  4788. "clear_downstream",
  4789. lazy_gettext("Clear (including downstream tasks)"),
  4790. lazy_gettext(
  4791. "Are you sure you want to clear the state of the selected task"
  4792. " instance(s) and all their downstream dependencies, and set their dagruns to the QUEUED state?"
  4793. ),
  4794. single=False,
  4795. )
  4796. @auth.has_access_dag_entities("PUT", DagAccessEntity.TASK_INSTANCE)
  4797. @provide_session
  4798. @action_logging
  4799. def action_clear_downstream(self, task_instances, session: Session = NEW_SESSION):
  4800. """Clear an arbitrary number of task instances, including downstream dependencies."""
  4801. try:
  4802. selected_ti_count, downstream_ti_count = self._clear_task_instances(
  4803. task_instances=task_instances, session=session, clear_downstream=True
  4804. )
  4805. session.commit()
  4806. flash(
  4807. f"Cleared {selected_ti_count} selected task instance{'s' if selected_ti_count > 1 else ''} "
  4808. f"and {downstream_ti_count} downstream dependencies"
  4809. )
  4810. except Exception as e:
  4811. flash(f'Failed to clear task instances: "{e}"', "error")
  4812. self.update_redirect()
  4813. return redirect(self.get_redirect())
  4814. @action("muldelete", "Delete", "Are you sure you want to delete selected records?", single=False)
  4815. @auth.has_access_dag_entities("DELETE", DagAccessEntity.TASK_INSTANCE)
  4816. @action_logging
  4817. def action_muldelete(self, items):
  4818. self.datamodel.delete_all(items)
  4819. self.update_redirect()
  4820. return redirect(self.get_redirect())
  4821. @provide_session
  4822. def set_task_instance_state(
  4823. self,
  4824. tis: Collection[TaskInstance],
  4825. target_state: TaskInstanceState,
  4826. session: Session = NEW_SESSION,
  4827. ) -> None:
  4828. """Set task instance state."""
  4829. try:
  4830. count = len(tis)
  4831. for ti in tis:
  4832. ti.set_state(target_state, session)
  4833. session.commit()
  4834. flash(f"{count} task instances were set to '{target_state}'")
  4835. except Exception:
  4836. flash("Failed to set state", "error")
  4837. @action("set_failed", "Set state to 'failed'", "", single=False)
  4838. @auth.has_access_dag_entities("PUT", DagAccessEntity.TASK_INSTANCE)
  4839. @action_logging
  4840. def action_set_failed(self, tis):
  4841. """Set state to 'failed'."""
  4842. self.set_task_instance_state(tis, TaskInstanceState.FAILED)
  4843. self.update_redirect()
  4844. return redirect(self.get_redirect())
  4845. @action("set_success", "Set state to 'success'", "", single=False)
  4846. @auth.has_access_dag_entities("PUT", DagAccessEntity.TASK_INSTANCE)
  4847. @action_logging
  4848. def action_set_success(self, tis):
  4849. """Set state to 'success'."""
  4850. self.set_task_instance_state(tis, TaskInstanceState.SUCCESS)
  4851. self.update_redirect()
  4852. return redirect(self.get_redirect())
  4853. @action("set_retry", "Set state to 'up_for_retry'", "", single=False)
  4854. @auth.has_access_dag_entities("PUT", DagAccessEntity.TASK_INSTANCE)
  4855. @action_logging
  4856. def action_set_retry(self, tis):
  4857. """Set state to 'up_for_retry'."""
  4858. self.set_task_instance_state(tis, TaskInstanceState.UP_FOR_RETRY)
  4859. self.update_redirect()
  4860. return redirect(self.get_redirect())
  4861. @action("set_skipped", "Set state to 'skipped'", "", single=False)
  4862. @auth.has_access_dag_entities("PUT", DagAccessEntity.TASK_INSTANCE)
  4863. @action_logging
  4864. def action_set_skipped(self, tis):
  4865. """Set state to skipped."""
  4866. self.set_task_instance_state(tis, TaskInstanceState.SKIPPED)
  4867. self.update_redirect()
  4868. return redirect(self.get_redirect())
  4869. class AutocompleteView(AirflowBaseView):
  4870. """View to provide autocomplete results."""
  4871. @provide_session
  4872. @expose("/dagmodel/autocomplete")
  4873. def autocomplete(self, session: Session = NEW_SESSION):
  4874. """Autocomplete."""
  4875. query = unquote(request.args.get("query", ""))
  4876. if not query:
  4877. return flask.json.jsonify([])
  4878. # Provide suggestions of dag_ids and owners
  4879. dag_ids_query = select(
  4880. sqla.literal("dag").label("type"),
  4881. DagModel.dag_id.label("name"),
  4882. DagModel._dag_display_property_value.label("dag_display_name"),
  4883. ).where(
  4884. ~DagModel.is_subdag,
  4885. DagModel.is_active,
  4886. or_(
  4887. DagModel.dag_id.ilike(f"%{query}%"),
  4888. DagModel._dag_display_property_value.ilike(f"%{query}%"),
  4889. ),
  4890. )
  4891. owners_query = (
  4892. select(
  4893. sqla.literal("owner").label("type"),
  4894. DagModel.owners.label("name"),
  4895. sqla.literal(None).label("dag_display_name"),
  4896. )
  4897. .distinct()
  4898. .where(~DagModel.is_subdag, DagModel.is_active, DagModel.owners.ilike(f"%{query}%"))
  4899. )
  4900. # Hide DAGs if not showing status: "all"
  4901. status = flask_session.get(FILTER_STATUS_COOKIE)
  4902. if status == "active":
  4903. dag_ids_query = dag_ids_query.where(~DagModel.is_paused)
  4904. owners_query = owners_query.where(~DagModel.is_paused)
  4905. elif status == "paused":
  4906. dag_ids_query = dag_ids_query.where(DagModel.is_paused)
  4907. owners_query = owners_query.where(DagModel.is_paused)
  4908. filter_dag_ids = get_auth_manager().get_permitted_dag_ids(user=g.user)
  4909. dag_ids_query = dag_ids_query.where(DagModel.dag_id.in_(filter_dag_ids))
  4910. owners_query = owners_query.where(DagModel.dag_id.in_(filter_dag_ids))
  4911. payload = [
  4912. row._asdict()
  4913. for row in session.execute(dag_ids_query.union(owners_query).order_by("name").limit(10))
  4914. ]
  4915. return flask.json.jsonify(payload)
  4916. class DagDependenciesView(AirflowBaseView):
  4917. """View to show dependencies between DAGs."""
  4918. refresh_interval = datetime.timedelta(
  4919. seconds=conf.getint(
  4920. "webserver",
  4921. "dag_dependencies_refresh_interval",
  4922. fallback=conf.getint("scheduler", "dag_dir_list_interval"),
  4923. )
  4924. )
  4925. last_refresh = timezone.utcnow() - refresh_interval
  4926. nodes: list[dict[str, Any]] = []
  4927. edges: list[dict[str, str]] = []
  4928. @expose("/dag-dependencies")
  4929. @auth.has_access_dag("GET", DagAccessEntity.DEPENDENCIES)
  4930. @gzipped
  4931. def list(self):
  4932. """Display DAG dependencies."""
  4933. title = "DAG Dependencies"
  4934. if not self.nodes or not self.edges:
  4935. self._calculate_graph()
  4936. self.last_refresh = timezone.utcnow()
  4937. elif timezone.utcnow() > self.last_refresh + self.refresh_interval:
  4938. max_last_updated = SerializedDagModel.get_max_last_updated_datetime()
  4939. if max_last_updated is None or max_last_updated > self.last_refresh:
  4940. self._calculate_graph()
  4941. self.last_refresh = timezone.utcnow()
  4942. return self.render_template(
  4943. "airflow/dag_dependencies.html",
  4944. title=title,
  4945. nodes=self.nodes,
  4946. edges=self.edges,
  4947. last_refresh=self.last_refresh,
  4948. arrange=conf.get("webserver", "dag_orientation"),
  4949. width=request.args.get("width", "100%"),
  4950. height=request.args.get("height", "800"),
  4951. )
  4952. def _calculate_graph(self):
  4953. nodes_dict: dict[str, Any] = {}
  4954. edge_tuples: set[dict[str, str]] = set()
  4955. for dag, dependencies in SerializedDagModel.get_dag_dependencies().items():
  4956. dag_node_id = f"dag:{dag}"
  4957. if dag_node_id not in nodes_dict:
  4958. nodes_dict[dag_node_id] = node_dict(dag_node_id, dag, "dag")
  4959. for dep in dependencies:
  4960. if dep.node_id not in nodes_dict:
  4961. nodes_dict[dep.node_id] = node_dict(dep.node_id, dep.dependency_id, dep.dependency_type)
  4962. edge_tuples.add((f"dag:{dep.source}", dep.node_id))
  4963. edge_tuples.add((dep.node_id, f"dag:{dep.target}"))
  4964. self.nodes = list(nodes_dict.values())
  4965. self.edges = [{"u": u, "v": v} for u, v in edge_tuples]
  4966. def add_user_permissions_to_dag(sender, template, context, **extra):
  4967. """
  4968. Add `.can_edit`, `.can_trigger`, and `.can_delete` properties to DAG based on current user's permissions.
  4969. Located in `views.py` rather than the DAG model to keep permissions logic out of the Airflow core.
  4970. """
  4971. if "dag" not in context:
  4972. return
  4973. dag = context["dag"]
  4974. can_create_dag_run = get_auth_manager().is_authorized_dag(
  4975. method="POST", access_entity=DagAccessEntity.RUN, details=DagDetails(id=dag.dag_id)
  4976. )
  4977. dag.can_edit = get_auth_manager().is_authorized_dag(method="PUT", details=DagDetails(id=dag.dag_id))
  4978. dag.can_trigger = dag.can_edit and can_create_dag_run
  4979. dag.can_delete = get_auth_manager().is_authorized_dag(method="DELETE", details=DagDetails(id=dag.dag_id))
  4980. context["dag"] = dag
  4981. ##############################################################################
  4982. # #
  4983. # Development Views #
  4984. # #
  4985. ##############################################################################
  4986. def restrict_to_dev(f):
  4987. def wrapper(*args, **kwargs):
  4988. if not os.environ.get("AIRFLOW_ENV", None) == "development":
  4989. logger.error(
  4990. "You can only access this view in development mode. Set AIRFLOW_ENV=development to view it."
  4991. )
  4992. return abort(404)
  4993. return f(*args, **kwargs)
  4994. return wrapper
  4995. class DevView(BaseView):
  4996. """
  4997. View to show Airflow Dev Endpoints.
  4998. This view should only be accessible in development mode. You can enable development mode by setting
  4999. `AIRFLOW_ENV=development` in your environment.
  5000. :meta private:
  5001. """
  5002. route_base = "/dev"
  5003. @expose("/coverage/<path:path>")
  5004. @restrict_to_dev
  5005. def coverage(self, path):
  5006. return send_from_directory(Path("htmlcov").resolve(), path)
  5007. class DocsView(BaseView):
  5008. """
  5009. View to show airflow dev docs endpoints.
  5010. This view should only be accessible in development mode. You can enable development mode by setting
  5011. `AIRFLOW_ENV=development` in your environment.
  5012. """
  5013. route_base = "/docs"
  5014. @expose("/")
  5015. @expose("/<path:filename>")
  5016. @restrict_to_dev
  5017. def home(self, filename="index.html"):
  5018. """Serve documentation from the build directory."""
  5019. if filename != "index.html":
  5020. return send_from_directory(Path("docs/_build/docs/").resolve(), filename)
  5021. return send_from_directory(Path("docs/_build/").resolve(), filename)
  5022. # NOTE: Put this at the end of the file. Pylance is too clever and detects that
  5023. # before_render_template.connect() is declared as NoReturn, and marks everything
  5024. # after this line as unreachable code. It's technically correct based on the
  5025. # lint-time information, but that's not what actually happens at runtime.
  5026. before_render_template.connect(add_user_permissions_to_dag)