dag_command.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Dag sub-commands."""
  18. from __future__ import annotations
  19. import ast
  20. import errno
  21. import json
  22. import logging
  23. import operator
  24. import signal
  25. import subprocess
  26. import sys
  27. import warnings
  28. from typing import TYPE_CHECKING
  29. import re2
  30. from sqlalchemy import delete, select
  31. from airflow import settings
  32. from airflow.api.client import get_current_api_client
  33. from airflow.api_connexion.schemas.dag_schema import dag_schema
  34. from airflow.cli.simple_table import AirflowConsole
  35. from airflow.configuration import conf
  36. from airflow.exceptions import AirflowException, RemovedInAirflow3Warning
  37. from airflow.jobs.job import Job
  38. from airflow.models import DagBag, DagModel, DagRun, TaskInstance
  39. from airflow.models.dag import DAG
  40. from airflow.models.serialized_dag import SerializedDagModel
  41. from airflow.utils import cli as cli_utils, timezone
  42. from airflow.utils.cli import get_dag, get_dags, process_subdir, sigint_handler, suppress_logs_and_warning
  43. from airflow.utils.dag_parsing_context import _airflow_parsing_context_manager
  44. from airflow.utils.dot_renderer import render_dag, render_dag_dependencies
  45. from airflow.utils.helpers import ask_yesno
  46. from airflow.utils.providers_configuration_loader import providers_configuration_loaded
  47. from airflow.utils.session import NEW_SESSION, create_session, provide_session
  48. from airflow.utils.state import DagRunState
  49. if TYPE_CHECKING:
  50. from graphviz.dot import Dot
  51. from sqlalchemy.orm import Session
  52. from airflow.timetables.base import DataInterval
  53. log = logging.getLogger(__name__)
  54. def _run_dag_backfill(dags: list[DAG], args) -> None:
  55. # If only one date is passed, using same as start and end
  56. args.end_date = args.end_date or args.start_date
  57. args.start_date = args.start_date or args.end_date
  58. run_conf = None
  59. if args.conf:
  60. run_conf = json.loads(args.conf)
  61. for dag in dags:
  62. if args.task_regex:
  63. dag = dag.partial_subset(
  64. task_ids_or_regex=args.task_regex, include_upstream=not args.ignore_dependencies
  65. )
  66. if not dag.task_dict:
  67. raise AirflowException(
  68. f"There are no tasks that match '{args.task_regex}' regex. Nothing to run, exiting..."
  69. )
  70. if args.dry_run:
  71. print(f"Dry run of DAG {dag.dag_id} on {args.start_date}")
  72. dagrun_infos = dag.iter_dagrun_infos_between(earliest=args.start_date, latest=args.end_date)
  73. for dagrun_info in dagrun_infos:
  74. dr = DagRun(
  75. dag.dag_id,
  76. execution_date=dagrun_info.logical_date,
  77. data_interval=dagrun_info.data_interval,
  78. )
  79. for task in dag.tasks:
  80. print(f"Task {task.task_id} located in DAG {dag.dag_id}")
  81. ti = TaskInstance(task, run_id=None)
  82. ti.dag_run = dr
  83. ti.dry_run()
  84. else:
  85. if args.reset_dagruns:
  86. DAG.clear_dags(
  87. [dag],
  88. start_date=args.start_date,
  89. end_date=args.end_date,
  90. confirm_prompt=not args.yes,
  91. include_subdags=True,
  92. dag_run_state=DagRunState.QUEUED,
  93. )
  94. try:
  95. dag.run(
  96. start_date=args.start_date,
  97. end_date=args.end_date,
  98. mark_success=args.mark_success,
  99. local=args.local,
  100. donot_pickle=(args.donot_pickle or conf.getboolean("core", "donot_pickle")),
  101. ignore_first_depends_on_past=args.ignore_first_depends_on_past,
  102. ignore_task_deps=args.ignore_dependencies,
  103. pool=args.pool,
  104. delay_on_limit_secs=args.delay_on_limit,
  105. verbose=args.verbose,
  106. conf=run_conf,
  107. rerun_failed_tasks=args.rerun_failed_tasks,
  108. run_backwards=args.run_backwards,
  109. continue_on_failures=args.continue_on_failures,
  110. disable_retry=args.disable_retry,
  111. )
  112. except ValueError as vr:
  113. print(str(vr))
  114. sys.exit(1)
  115. @cli_utils.action_cli
  116. @providers_configuration_loaded
  117. def dag_backfill(args, dag: list[DAG] | DAG | None = None) -> None:
  118. """Create backfill job or dry run for a DAG or list of DAGs using regex."""
  119. logging.basicConfig(level=settings.LOGGING_LEVEL, format=settings.SIMPLE_LOG_FORMAT)
  120. signal.signal(signal.SIGTERM, sigint_handler)
  121. if args.ignore_first_depends_on_past:
  122. warnings.warn(
  123. "--ignore-first-depends-on-past is deprecated as the value is always set to True",
  124. category=RemovedInAirflow3Warning,
  125. stacklevel=4,
  126. )
  127. args.ignore_first_depends_on_past = True
  128. if not args.treat_dag_id_as_regex and args.treat_dag_as_regex:
  129. warnings.warn(
  130. "--treat-dag-as-regex is deprecated, use --treat-dag-id-as-regex instead",
  131. category=RemovedInAirflow3Warning,
  132. stacklevel=4,
  133. )
  134. args.treat_dag_id_as_regex = args.treat_dag_as_regex
  135. if not args.start_date and not args.end_date:
  136. raise AirflowException("Provide a start_date and/or end_date")
  137. if not dag:
  138. dags = get_dags(args.subdir, dag_id=args.dag_id, use_regex=args.treat_dag_id_as_regex)
  139. elif isinstance(dag, list):
  140. dags = dag
  141. else:
  142. dags = [dag]
  143. del dag
  144. dags.sort(key=lambda d: d.dag_id)
  145. _run_dag_backfill(dags, args)
  146. if len(dags) > 1:
  147. log.info("All of the backfills are done.")
  148. @cli_utils.action_cli
  149. @providers_configuration_loaded
  150. def dag_trigger(args) -> None:
  151. """Create a dag run for the specified dag."""
  152. api_client = get_current_api_client()
  153. try:
  154. message = api_client.trigger_dag(
  155. dag_id=args.dag_id,
  156. run_id=args.run_id,
  157. conf=args.conf,
  158. execution_date=args.exec_date,
  159. replace_microseconds=args.replace_microseconds,
  160. )
  161. AirflowConsole().print_as(
  162. data=[message] if message is not None else [],
  163. output=args.output,
  164. )
  165. except OSError as err:
  166. raise AirflowException(err)
  167. @cli_utils.action_cli
  168. @providers_configuration_loaded
  169. def dag_delete(args) -> None:
  170. """Delete all DB records related to the specified dag."""
  171. api_client = get_current_api_client()
  172. if (
  173. args.yes
  174. or input("This will drop all existing records related to the specified DAG. Proceed? (y/n)").upper()
  175. == "Y"
  176. ):
  177. try:
  178. message = api_client.delete_dag(dag_id=args.dag_id)
  179. print(message)
  180. except OSError as err:
  181. raise AirflowException(err)
  182. else:
  183. print("Cancelled")
  184. @cli_utils.action_cli
  185. @providers_configuration_loaded
  186. def dag_pause(args) -> None:
  187. """Pauses a DAG."""
  188. set_is_paused(True, args)
  189. @cli_utils.action_cli
  190. @providers_configuration_loaded
  191. def dag_unpause(args) -> None:
  192. """Unpauses a DAG."""
  193. set_is_paused(False, args)
  194. @providers_configuration_loaded
  195. def set_is_paused(is_paused: bool, args) -> None:
  196. """Set is_paused for DAG by a given dag_id."""
  197. should_apply = True
  198. with create_session() as session:
  199. query = select(DagModel)
  200. if args.treat_dag_id_as_regex:
  201. query = query.where(DagModel.dag_id.regexp_match(args.dag_id))
  202. else:
  203. query = query.where(DagModel.dag_id == args.dag_id)
  204. query = query.where(DagModel.is_paused != is_paused)
  205. matched_dags = session.scalars(query).all()
  206. if not matched_dags:
  207. print(f"No {'un' if is_paused else ''}paused DAGs were found")
  208. return
  209. if not args.yes and args.treat_dag_id_as_regex:
  210. dags_ids = [dag.dag_id for dag in matched_dags]
  211. question = (
  212. f"You are about to {'un' if not is_paused else ''}pause {len(dags_ids)} DAGs:\n"
  213. f"{','.join(dags_ids)}"
  214. f"\n\nAre you sure? [y/n]"
  215. )
  216. should_apply = ask_yesno(question)
  217. if should_apply:
  218. for dag_model in matched_dags:
  219. dag_model.set_is_paused(is_paused=is_paused)
  220. AirflowConsole().print_as(
  221. data=[{"dag_id": dag.dag_id, "is_paused": not dag.get_is_paused()} for dag in matched_dags],
  222. output=args.output,
  223. )
  224. else:
  225. print("Operation cancelled by user")
  226. @providers_configuration_loaded
  227. def dag_dependencies_show(args) -> None:
  228. """Display DAG dependencies, save to file or show as imgcat image."""
  229. deduplicated_dag_dependencies = {
  230. dag_id: list(set(dag_dependencies))
  231. for dag_id, dag_dependencies in SerializedDagModel.get_dag_dependencies().items()
  232. }
  233. dot = render_dag_dependencies(deduplicated_dag_dependencies)
  234. filename = args.save
  235. imgcat = args.imgcat
  236. if filename and imgcat:
  237. raise SystemExit(
  238. "Option --save and --imgcat are mutually exclusive. "
  239. "Please remove one option to execute the command.",
  240. )
  241. elif filename:
  242. _save_dot_to_file(dot, filename)
  243. elif imgcat:
  244. _display_dot_via_imgcat(dot)
  245. else:
  246. print(dot.source)
  247. @providers_configuration_loaded
  248. def dag_show(args) -> None:
  249. """Display DAG or saves its graphic representation to the file."""
  250. dag = get_dag(args.subdir, args.dag_id)
  251. dot = render_dag(dag)
  252. filename = args.save
  253. imgcat = args.imgcat
  254. if filename and imgcat:
  255. raise SystemExit(
  256. "Option --save and --imgcat are mutually exclusive. "
  257. "Please remove one option to execute the command.",
  258. )
  259. elif filename:
  260. _save_dot_to_file(dot, filename)
  261. elif imgcat:
  262. _display_dot_via_imgcat(dot)
  263. else:
  264. print(dot.source)
  265. def _display_dot_via_imgcat(dot: Dot) -> None:
  266. data = dot.pipe(format="png")
  267. try:
  268. with subprocess.Popen("imgcat", stdout=subprocess.PIPE, stdin=subprocess.PIPE) as proc:
  269. out, err = proc.communicate(data)
  270. if out:
  271. print(out.decode("utf-8"))
  272. if err:
  273. print(err.decode("utf-8"))
  274. except OSError as e:
  275. if e.errno == errno.ENOENT:
  276. raise SystemExit("Failed to execute. Make sure the imgcat executables are on your systems 'PATH'")
  277. else:
  278. raise
  279. def _save_dot_to_file(dot: Dot, filename: str) -> None:
  280. filename_without_ext, _, ext = filename.rpartition(".")
  281. dot.render(filename=filename_without_ext, format=ext, cleanup=True)
  282. print(f"File {filename} saved")
  283. def _get_dagbag_dag_details(dag: DAG) -> dict:
  284. """Return a dagbag dag details dict."""
  285. return {
  286. "dag_id": dag.dag_id,
  287. "dag_display_name": dag.dag_display_name,
  288. "root_dag_id": dag.parent_dag.dag_id if dag.parent_dag else None,
  289. "is_paused": dag.get_is_paused(),
  290. "is_active": dag.get_is_active(),
  291. "is_subdag": dag.is_subdag,
  292. "last_parsed_time": None,
  293. "last_pickled": None,
  294. "last_expired": None,
  295. "scheduler_lock": None,
  296. "pickle_id": dag.pickle_id,
  297. "default_view": dag.default_view,
  298. "fileloc": dag.fileloc,
  299. "file_token": None,
  300. "owners": dag.owner,
  301. "description": dag.description,
  302. "schedule_interval": dag.schedule_interval,
  303. "timetable_description": dag.timetable.description,
  304. "tags": dag.tags,
  305. "max_active_tasks": dag.max_active_tasks,
  306. "max_active_runs": dag.max_active_runs,
  307. "max_consecutive_failed_dag_runs": dag.max_consecutive_failed_dag_runs,
  308. "has_task_concurrency_limits": any(
  309. t.max_active_tis_per_dag is not None or t.max_active_tis_per_dagrun is not None for t in dag.tasks
  310. ),
  311. "has_import_errors": False,
  312. "next_dagrun": None,
  313. "next_dagrun_data_interval_start": None,
  314. "next_dagrun_data_interval_end": None,
  315. "next_dagrun_create_after": None,
  316. }
  317. @cli_utils.action_cli
  318. @providers_configuration_loaded
  319. @provide_session
  320. def dag_state(args, session: Session = NEW_SESSION) -> None:
  321. """
  322. Return the state (and conf if exists) of a DagRun at the command line.
  323. >>> airflow dags state tutorial 2015-01-01T00:00:00.000000
  324. running
  325. >>> airflow dags state a_dag_with_conf_passed 2015-01-01T00:00:00.000000
  326. failed, {"name": "bob", "age": "42"}
  327. """
  328. dag = DagModel.get_dagmodel(args.dag_id, session=session)
  329. if not dag:
  330. raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
  331. dr = session.scalar(select(DagRun).filter_by(dag_id=args.dag_id, execution_date=args.execution_date))
  332. out = dr.state if dr else None
  333. conf_out = ""
  334. if out and dr.conf:
  335. conf_out = ", " + json.dumps(dr.conf)
  336. print(str(out) + conf_out)
  337. @cli_utils.action_cli
  338. @providers_configuration_loaded
  339. def dag_next_execution(args) -> None:
  340. """
  341. Return the next execution datetime of a DAG at the command line.
  342. >>> airflow dags next-execution tutorial
  343. 2018-08-31 10:38:00
  344. """
  345. dag = get_dag(args.subdir, args.dag_id)
  346. with create_session() as session:
  347. last_parsed_dag: DagModel = session.scalars(
  348. select(DagModel).where(DagModel.dag_id == dag.dag_id)
  349. ).one()
  350. if last_parsed_dag.get_is_paused():
  351. print("[INFO] Please be reminded this DAG is PAUSED now.", file=sys.stderr)
  352. def print_execution_interval(interval: DataInterval | None):
  353. if interval is None:
  354. print(
  355. "[WARN] No following schedule can be found. "
  356. "This DAG may have schedule interval '@once' or `None`.",
  357. file=sys.stderr,
  358. )
  359. print(None)
  360. return
  361. print(interval.start.isoformat())
  362. next_interval = dag.get_next_data_interval(last_parsed_dag)
  363. print_execution_interval(next_interval)
  364. for _ in range(1, args.num_executions):
  365. next_info = dag.next_dagrun_info(next_interval, restricted=False)
  366. next_interval = None if next_info is None else next_info.data_interval
  367. print_execution_interval(next_interval)
  368. @cli_utils.action_cli
  369. @suppress_logs_and_warning
  370. @providers_configuration_loaded
  371. @provide_session
  372. def dag_list_dags(args, session=NEW_SESSION) -> None:
  373. """Display dags with or without stats at the command line."""
  374. cols = args.columns if args.columns else []
  375. invalid_cols = [c for c in cols if c not in dag_schema.fields]
  376. valid_cols = [c for c in cols if c in dag_schema.fields]
  377. if invalid_cols:
  378. from rich import print as rich_print
  379. rich_print(
  380. f"[red][bold]Error:[/bold] Ignoring the following invalid columns: {invalid_cols}. "
  381. f"List of valid columns: {list(dag_schema.fields.keys())}",
  382. file=sys.stderr,
  383. )
  384. dagbag = DagBag(process_subdir(args.subdir))
  385. if dagbag.import_errors:
  386. from rich import print as rich_print
  387. rich_print(
  388. "[red][bold]Error:[/bold] Failed to load all files. "
  389. "For details, run `airflow dags list-import-errors`",
  390. file=sys.stderr,
  391. )
  392. def get_dag_detail(dag: DAG) -> dict:
  393. dag_model = DagModel.get_dagmodel(dag.dag_id, session=session)
  394. if dag_model:
  395. dag_detail = dag_schema.dump(dag_model)
  396. else:
  397. dag_detail = _get_dagbag_dag_details(dag)
  398. return {col: dag_detail[col] for col in valid_cols}
  399. AirflowConsole().print_as(
  400. data=sorted(dagbag.dags.values(), key=operator.attrgetter("dag_id")),
  401. output=args.output,
  402. mapper=get_dag_detail,
  403. )
  404. @cli_utils.action_cli
  405. @suppress_logs_and_warning
  406. @providers_configuration_loaded
  407. @provide_session
  408. def dag_details(args, session=NEW_SESSION):
  409. """Get DAG details given a DAG id."""
  410. dag = DagModel.get_dagmodel(args.dag_id, session=session)
  411. if not dag:
  412. raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
  413. dag_detail = dag_schema.dump(dag)
  414. if args.output in ["table", "plain"]:
  415. data = [{"property_name": key, "property_value": value} for key, value in dag_detail.items()]
  416. else:
  417. data = [dag_detail]
  418. AirflowConsole().print_as(
  419. data=data,
  420. output=args.output,
  421. )
  422. @cli_utils.action_cli
  423. @suppress_logs_and_warning
  424. @providers_configuration_loaded
  425. def dag_list_import_errors(args) -> None:
  426. """Display dags with import errors on the command line."""
  427. dagbag = DagBag(process_subdir(args.subdir))
  428. data = []
  429. for filename, errors in dagbag.import_errors.items():
  430. data.append({"filepath": filename, "error": errors})
  431. AirflowConsole().print_as(
  432. data=data,
  433. output=args.output,
  434. )
  435. if data:
  436. sys.exit(1)
  437. @cli_utils.action_cli
  438. @suppress_logs_and_warning
  439. @providers_configuration_loaded
  440. def dag_report(args) -> None:
  441. """Display dagbag stats at the command line."""
  442. dagbag = DagBag(process_subdir(args.subdir))
  443. AirflowConsole().print_as(
  444. data=dagbag.dagbag_stats,
  445. output=args.output,
  446. mapper=lambda x: {
  447. "file": x.file,
  448. "duration": x.duration,
  449. "dag_num": x.dag_num,
  450. "task_num": x.task_num,
  451. "dags": sorted(ast.literal_eval(x.dags)),
  452. },
  453. )
  454. @cli_utils.action_cli
  455. @suppress_logs_and_warning
  456. @providers_configuration_loaded
  457. @provide_session
  458. def dag_list_jobs(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> None:
  459. """List latest n jobs."""
  460. queries = []
  461. if dag:
  462. args.dag_id = dag.dag_id
  463. if args.dag_id:
  464. dag = DagModel.get_dagmodel(args.dag_id, session=session)
  465. if not dag:
  466. raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
  467. queries.append(Job.dag_id == args.dag_id)
  468. if args.state:
  469. queries.append(Job.state == args.state)
  470. fields = ["dag_id", "state", "job_type", "start_date", "end_date"]
  471. all_jobs_iter = session.scalars(
  472. select(Job).where(*queries).order_by(Job.start_date.desc()).limit(args.limit)
  473. )
  474. all_jobs = [{f: str(job.__getattribute__(f)) for f in fields} for job in all_jobs_iter]
  475. AirflowConsole().print_as(
  476. data=all_jobs,
  477. output=args.output,
  478. )
  479. @cli_utils.action_cli
  480. @suppress_logs_and_warning
  481. @providers_configuration_loaded
  482. @provide_session
  483. def dag_list_dag_runs(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> None:
  484. """List dag runs for a given DAG."""
  485. if dag:
  486. args.dag_id = dag.dag_id
  487. else:
  488. dag = DagModel.get_dagmodel(args.dag_id, session=session)
  489. if not dag:
  490. raise SystemExit(f"DAG: {args.dag_id} does not exist in 'dag' table")
  491. state = args.state.lower() if args.state else None
  492. dag_runs = DagRun.find(
  493. dag_id=args.dag_id,
  494. state=state,
  495. no_backfills=args.no_backfill,
  496. execution_start_date=args.start_date,
  497. execution_end_date=args.end_date,
  498. session=session,
  499. )
  500. dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
  501. AirflowConsole().print_as(
  502. data=dag_runs,
  503. output=args.output,
  504. mapper=lambda dr: {
  505. "dag_id": dr.dag_id,
  506. "run_id": dr.run_id,
  507. "state": dr.state,
  508. "execution_date": dr.execution_date.isoformat(),
  509. "start_date": dr.start_date.isoformat() if dr.start_date else "",
  510. "end_date": dr.end_date.isoformat() if dr.end_date else "",
  511. },
  512. )
  513. @cli_utils.action_cli
  514. @providers_configuration_loaded
  515. @provide_session
  516. def dag_test(args, dag: DAG | None = None, session: Session = NEW_SESSION) -> None:
  517. """Execute one single DagRun for a given DAG and execution date."""
  518. run_conf = None
  519. if args.conf:
  520. try:
  521. run_conf = json.loads(args.conf)
  522. except ValueError as e:
  523. raise SystemExit(f"Configuration {args.conf!r} is not valid JSON. Error: {e}")
  524. execution_date = args.execution_date or timezone.utcnow()
  525. use_executor = args.use_executor
  526. mark_success_pattern = (
  527. re2.compile(args.mark_success_pattern) if args.mark_success_pattern is not None else None
  528. )
  529. with _airflow_parsing_context_manager(dag_id=args.dag_id):
  530. dag = dag or get_dag(subdir=args.subdir, dag_id=args.dag_id)
  531. dr: DagRun = dag.test(
  532. execution_date=execution_date,
  533. run_conf=run_conf,
  534. use_executor=use_executor,
  535. mark_success_pattern=mark_success_pattern,
  536. session=session,
  537. )
  538. show_dagrun = args.show_dagrun
  539. imgcat = args.imgcat_dagrun
  540. filename = args.save_dagrun
  541. if show_dagrun or imgcat or filename:
  542. tis = session.scalars(
  543. select(TaskInstance).where(
  544. TaskInstance.dag_id == args.dag_id,
  545. TaskInstance.execution_date == execution_date,
  546. )
  547. ).all()
  548. dot_graph = render_dag(dag, tis=tis)
  549. print()
  550. if filename:
  551. _save_dot_to_file(dot_graph, filename)
  552. if imgcat:
  553. _display_dot_via_imgcat(dot_graph)
  554. if show_dagrun:
  555. print(dot_graph.source)
  556. if dr and dr.state == DagRunState.FAILED:
  557. raise SystemExit("DagRun failed")
  558. @cli_utils.action_cli
  559. @providers_configuration_loaded
  560. @provide_session
  561. def dag_reserialize(args, session: Session = NEW_SESSION) -> None:
  562. """Serialize a DAG instance."""
  563. session.execute(delete(SerializedDagModel).execution_options(synchronize_session=False))
  564. if not args.clear_only:
  565. dagbag = DagBag(process_subdir(args.subdir))
  566. dagbag.sync_to_db(session=session)