dagbag.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import hashlib
  20. import importlib
  21. import importlib.machinery
  22. import importlib.util
  23. import os
  24. import sys
  25. import textwrap
  26. import traceback
  27. import warnings
  28. import zipfile
  29. from datetime import datetime, timedelta
  30. from pathlib import Path
  31. from typing import TYPE_CHECKING, NamedTuple
  32. from sqlalchemy import (
  33. Column,
  34. String,
  35. )
  36. from sqlalchemy.exc import OperationalError
  37. from tabulate import tabulate
  38. from airflow import settings
  39. from airflow.configuration import conf
  40. from airflow.exceptions import (
  41. AirflowClusterPolicyError,
  42. AirflowClusterPolicySkipDag,
  43. AirflowClusterPolicyViolation,
  44. AirflowDagCycleException,
  45. AirflowDagDuplicatedIdException,
  46. AirflowException,
  47. AirflowTaskTimeout,
  48. RemovedInAirflow3Warning,
  49. )
  50. from airflow.listeners.listener import get_listener_manager
  51. from airflow.models.base import Base
  52. from airflow.stats import Stats
  53. from airflow.utils import timezone
  54. from airflow.utils.dag_cycle_tester import check_cycle
  55. from airflow.utils.docs import get_docs_url
  56. from airflow.utils.file import (
  57. correct_maybe_zipped,
  58. get_unique_dag_module_name,
  59. list_py_file_paths,
  60. might_contain_dag,
  61. )
  62. from airflow.utils.log.logging_mixin import LoggingMixin
  63. from airflow.utils.retries import MAX_DB_RETRIES, run_with_db_retries
  64. from airflow.utils.session import NEW_SESSION, provide_session
  65. from airflow.utils.timeout import timeout
  66. from airflow.utils.types import NOTSET
  67. from airflow.utils.warnings import capture_with_reraise
  68. if TYPE_CHECKING:
  69. from sqlalchemy.orm import Session
  70. from airflow.models.dag import DAG
  71. from airflow.utils.types import ArgNotSet
  72. class FileLoadStat(NamedTuple):
  73. """
  74. Information about single file.
  75. :param file: Loaded file.
  76. :param duration: Time spent on process file.
  77. :param dag_num: Total number of DAGs loaded in this file.
  78. :param task_num: Total number of Tasks loaded in this file.
  79. :param dags: DAGs names loaded in this file.
  80. :param warning_num: Total number of warnings captured from processing this file.
  81. """
  82. file: str
  83. duration: timedelta
  84. dag_num: int
  85. task_num: int
  86. dags: str
  87. warning_num: int
  88. class DagBag(LoggingMixin):
  89. """
  90. A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings.
  91. Some possible setting are database to use as a backend and what executor
  92. to use to fire off tasks. This makes it easier to run distinct environments
  93. for say production and development, tests, or for different teams or security
  94. profiles. What would have been system level settings are now dagbag level so
  95. that one system can run multiple, independent settings sets.
  96. :param dag_folder: the folder to scan to find DAGs
  97. :param include_examples: whether to include the examples that ship
  98. with airflow or not
  99. :param safe_mode: when ``False``, scans all python modules for dags.
  100. When ``True`` uses heuristics (files containing ``DAG`` and ``airflow`` strings)
  101. to filter python modules to scan for dags.
  102. :param read_dags_from_db: Read DAGs from DB if ``True`` is passed.
  103. If ``False`` DAGs are read from python files.
  104. :param store_serialized_dags: deprecated parameter, same effect as `read_dags_from_db`
  105. :param load_op_links: Should the extra operator link be loaded via plugins when
  106. de-serializing the DAG? This flag is set to False in Scheduler so that Extra Operator links
  107. are not loaded to not run User code in Scheduler.
  108. :param collect_dags: when True, collects dags during class initialization.
  109. """
  110. def __init__(
  111. self,
  112. dag_folder: str | Path | None = None,
  113. include_examples: bool | ArgNotSet = NOTSET,
  114. safe_mode: bool | ArgNotSet = NOTSET,
  115. read_dags_from_db: bool = False,
  116. store_serialized_dags: bool | None = None,
  117. load_op_links: bool = True,
  118. collect_dags: bool = True,
  119. ):
  120. # Avoid circular import
  121. super().__init__()
  122. include_examples = (
  123. include_examples
  124. if isinstance(include_examples, bool)
  125. else conf.getboolean("core", "LOAD_EXAMPLES")
  126. )
  127. safe_mode = (
  128. safe_mode if isinstance(safe_mode, bool) else conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE")
  129. )
  130. if store_serialized_dags:
  131. warnings.warn(
  132. "The store_serialized_dags parameter has been deprecated. "
  133. "You should pass the read_dags_from_db parameter.",
  134. RemovedInAirflow3Warning,
  135. stacklevel=2,
  136. )
  137. read_dags_from_db = store_serialized_dags
  138. dag_folder = dag_folder or settings.DAGS_FOLDER
  139. self.dag_folder = dag_folder
  140. self.dags: dict[str, DAG] = {}
  141. # the file's last modified timestamp when we last read it
  142. self.file_last_changed: dict[str, datetime] = {}
  143. self.import_errors: dict[str, str] = {}
  144. self.captured_warnings: dict[str, tuple[str, ...]] = {}
  145. self.has_logged = False
  146. self.read_dags_from_db = read_dags_from_db
  147. # Only used by read_dags_from_db=True
  148. self.dags_last_fetched: dict[str, datetime] = {}
  149. # Only used by SchedulerJob to compare the dag_hash to identify change in DAGs
  150. self.dags_hash: dict[str, str] = {}
  151. self.dagbag_import_error_tracebacks = conf.getboolean("core", "dagbag_import_error_tracebacks")
  152. self.dagbag_import_error_traceback_depth = conf.getint("core", "dagbag_import_error_traceback_depth")
  153. if collect_dags:
  154. self.collect_dags(
  155. dag_folder=dag_folder,
  156. include_examples=include_examples,
  157. safe_mode=safe_mode,
  158. )
  159. # Should the extra operator link be loaded via plugins?
  160. # This flag is set to False in Scheduler so that Extra Operator links are not loaded
  161. self.load_op_links = load_op_links
  162. def size(self) -> int:
  163. """:return: the amount of dags contained in this dagbag"""
  164. return len(self.dags)
  165. @property
  166. def store_serialized_dags(self) -> bool:
  167. """Whether to read dags from DB."""
  168. warnings.warn(
  169. "The store_serialized_dags property has been deprecated. Use read_dags_from_db instead.",
  170. RemovedInAirflow3Warning,
  171. stacklevel=2,
  172. )
  173. return self.read_dags_from_db
  174. @property
  175. def dag_ids(self) -> list[str]:
  176. """
  177. Get DAG ids.
  178. :return: a list of DAG IDs in this bag
  179. """
  180. return list(self.dags)
  181. @provide_session
  182. def get_dag(self, dag_id, session: Session = None):
  183. """
  184. Get the DAG out of the dictionary, and refreshes it if expired.
  185. :param dag_id: DAG ID
  186. """
  187. # Avoid circular import
  188. from airflow.models.dag import DagModel
  189. if self.read_dags_from_db:
  190. # Import here so that serialized dag is only imported when serialization is enabled
  191. from airflow.models.serialized_dag import SerializedDagModel
  192. if dag_id not in self.dags:
  193. # Load from DB if not (yet) in the bag
  194. self._add_dag_from_db(dag_id=dag_id, session=session)
  195. return self.dags.get(dag_id)
  196. # If DAG is in the DagBag, check the following
  197. # 1. if time has come to check if DAG is updated (controlled by min_serialized_dag_fetch_secs)
  198. # 2. check the last_updated and hash columns in SerializedDag table to see if
  199. # Serialized DAG is updated
  200. # 3. if (2) is yes, fetch the Serialized DAG.
  201. # 4. if (2) returns None (i.e. Serialized DAG is deleted), remove dag from dagbag
  202. # if it exists and return None.
  203. min_serialized_dag_fetch_secs = timedelta(seconds=settings.MIN_SERIALIZED_DAG_FETCH_INTERVAL)
  204. if (
  205. dag_id in self.dags_last_fetched
  206. and timezone.utcnow() > self.dags_last_fetched[dag_id] + min_serialized_dag_fetch_secs
  207. ):
  208. sd_latest_version_and_updated_datetime = (
  209. SerializedDagModel.get_latest_version_hash_and_updated_datetime(
  210. dag_id=dag_id, session=session
  211. )
  212. )
  213. if not sd_latest_version_and_updated_datetime:
  214. self.log.warning("Serialized DAG %s no longer exists", dag_id)
  215. del self.dags[dag_id]
  216. del self.dags_last_fetched[dag_id]
  217. del self.dags_hash[dag_id]
  218. return None
  219. sd_latest_version, sd_last_updated_datetime = sd_latest_version_and_updated_datetime
  220. if (
  221. sd_last_updated_datetime > self.dags_last_fetched[dag_id]
  222. or sd_latest_version != self.dags_hash[dag_id]
  223. ):
  224. self._add_dag_from_db(dag_id=dag_id, session=session)
  225. return self.dags.get(dag_id)
  226. # If asking for a known subdag, we want to refresh the parent
  227. dag = None
  228. root_dag_id = dag_id
  229. if dag_id in self.dags:
  230. dag = self.dags[dag_id]
  231. if dag.parent_dag:
  232. root_dag_id = dag.parent_dag.dag_id
  233. # If DAG Model is absent, we can't check last_expired property. Is the DAG not yet synchronized?
  234. orm_dag = DagModel.get_current(root_dag_id, session=session)
  235. if not orm_dag:
  236. return self.dags.get(dag_id)
  237. # If the dag corresponding to root_dag_id is absent or expired
  238. is_missing = root_dag_id not in self.dags
  239. is_expired = orm_dag.last_expired and dag and dag.last_loaded < orm_dag.last_expired
  240. if is_expired:
  241. # Remove associated dags so we can re-add them.
  242. self.dags = {
  243. key: dag
  244. for key, dag in self.dags.items()
  245. if root_dag_id != key and not (dag.parent_dag and root_dag_id == dag.parent_dag.dag_id)
  246. }
  247. if is_missing or is_expired:
  248. # Reprocess source file.
  249. found_dags = self.process_file(
  250. filepath=correct_maybe_zipped(orm_dag.fileloc), only_if_updated=False
  251. )
  252. # If the source file no longer exports `dag_id`, delete it from self.dags
  253. if found_dags and dag_id in [found_dag.dag_id for found_dag in found_dags]:
  254. return self.dags[dag_id]
  255. elif dag_id in self.dags:
  256. del self.dags[dag_id]
  257. return self.dags.get(dag_id)
  258. def _add_dag_from_db(self, dag_id: str, session: Session):
  259. """Add DAG to DagBag from DB."""
  260. from airflow.models.serialized_dag import SerializedDagModel
  261. row = SerializedDagModel.get(dag_id, session)
  262. if not row:
  263. return None
  264. row.load_op_links = self.load_op_links
  265. dag = row.dag
  266. for subdag in dag.subdags:
  267. self.dags[subdag.dag_id] = subdag
  268. self.dags[dag.dag_id] = dag
  269. self.dags_last_fetched[dag.dag_id] = timezone.utcnow()
  270. self.dags_hash[dag.dag_id] = row.dag_hash
  271. def process_file(self, filepath, only_if_updated=True, safe_mode=True):
  272. """Given a path to a python module or zip file, import the module and look for dag objects within."""
  273. from airflow.models.dag import DagContext
  274. # if the source file no longer exists in the DB or in the filesystem,
  275. # return an empty list
  276. # todo: raise exception?
  277. if filepath is None or not os.path.isfile(filepath):
  278. return []
  279. try:
  280. # This failed before in what may have been a git sync
  281. # race condition
  282. file_last_changed_on_disk = datetime.fromtimestamp(os.path.getmtime(filepath))
  283. if (
  284. only_if_updated
  285. and filepath in self.file_last_changed
  286. and file_last_changed_on_disk == self.file_last_changed[filepath]
  287. ):
  288. return []
  289. except Exception as e:
  290. self.log.exception(e)
  291. return []
  292. # Ensure we don't pick up anything else we didn't mean to
  293. DagContext.autoregistered_dags.clear()
  294. self.captured_warnings.pop(filepath, None)
  295. with capture_with_reraise() as captured_warnings:
  296. if filepath.endswith(".py") or not zipfile.is_zipfile(filepath):
  297. mods = self._load_modules_from_file(filepath, safe_mode)
  298. else:
  299. mods = self._load_modules_from_zip(filepath, safe_mode)
  300. if captured_warnings:
  301. formatted_warnings = []
  302. for msg in captured_warnings:
  303. category = msg.category.__name__
  304. if (module := msg.category.__module__) != "builtins":
  305. category = f"{module}.{category}"
  306. formatted_warnings.append(f"{msg.filename}:{msg.lineno}: {category}: {msg.message}")
  307. self.captured_warnings[filepath] = tuple(formatted_warnings)
  308. found_dags = self._process_modules(filepath, mods, file_last_changed_on_disk)
  309. self.file_last_changed[filepath] = file_last_changed_on_disk
  310. return found_dags
  311. def _load_modules_from_file(self, filepath, safe_mode):
  312. from airflow.models.dag import DagContext
  313. if not might_contain_dag(filepath, safe_mode):
  314. # Don't want to spam user with skip messages
  315. if not self.has_logged:
  316. self.has_logged = True
  317. self.log.info("File %s assumed to contain no DAGs. Skipping.", filepath)
  318. return []
  319. self.log.debug("Importing %s", filepath)
  320. mod_name = get_unique_dag_module_name(filepath)
  321. if mod_name in sys.modules:
  322. del sys.modules[mod_name]
  323. DagContext.current_autoregister_module_name = mod_name
  324. def parse(mod_name, filepath):
  325. try:
  326. loader = importlib.machinery.SourceFileLoader(mod_name, filepath)
  327. spec = importlib.util.spec_from_loader(mod_name, loader)
  328. new_module = importlib.util.module_from_spec(spec)
  329. sys.modules[spec.name] = new_module
  330. loader.exec_module(new_module)
  331. return [new_module]
  332. except (Exception, AirflowTaskTimeout) as e:
  333. DagContext.autoregistered_dags.clear()
  334. self.log.exception("Failed to import: %s", filepath)
  335. if self.dagbag_import_error_tracebacks:
  336. self.import_errors[filepath] = traceback.format_exc(
  337. limit=-self.dagbag_import_error_traceback_depth
  338. )
  339. else:
  340. self.import_errors[filepath] = str(e)
  341. return []
  342. dagbag_import_timeout = settings.get_dagbag_import_timeout(filepath)
  343. if not isinstance(dagbag_import_timeout, (int, float)):
  344. raise TypeError(
  345. f"Value ({dagbag_import_timeout}) from get_dagbag_import_timeout must be int or float"
  346. )
  347. if dagbag_import_timeout <= 0: # no parsing timeout
  348. return parse(mod_name, filepath)
  349. timeout_msg = (
  350. f"DagBag import timeout for {filepath} after {dagbag_import_timeout}s.\n"
  351. "Please take a look at these docs to improve your DAG import time:\n"
  352. f"* {get_docs_url('best-practices.html#top-level-python-code')}\n"
  353. f"* {get_docs_url('best-practices.html#reducing-dag-complexity')}"
  354. )
  355. with timeout(dagbag_import_timeout, error_message=timeout_msg):
  356. return parse(mod_name, filepath)
  357. def _load_modules_from_zip(self, filepath, safe_mode):
  358. from airflow.models.dag import DagContext
  359. mods = []
  360. with zipfile.ZipFile(filepath) as current_zip_file:
  361. for zip_info in current_zip_file.infolist():
  362. zip_path = Path(zip_info.filename)
  363. if zip_path.suffix not in [".py", ".pyc"] or len(zip_path.parts) > 1:
  364. continue
  365. if zip_path.stem == "__init__":
  366. self.log.warning("Found %s at root of %s", zip_path.name, filepath)
  367. self.log.debug("Reading %s from %s", zip_info.filename, filepath)
  368. if not might_contain_dag(zip_info.filename, safe_mode, current_zip_file):
  369. # todo: create ignore list
  370. # Don't want to spam user with skip messages
  371. if not self.has_logged:
  372. self.has_logged = True
  373. self.log.info(
  374. "File %s:%s assumed to contain no DAGs. Skipping.", filepath, zip_info.filename
  375. )
  376. continue
  377. mod_name = zip_path.stem
  378. if mod_name in sys.modules:
  379. del sys.modules[mod_name]
  380. DagContext.current_autoregister_module_name = mod_name
  381. try:
  382. sys.path.insert(0, filepath)
  383. current_module = importlib.import_module(mod_name)
  384. mods.append(current_module)
  385. except Exception as e:
  386. DagContext.autoregistered_dags.clear()
  387. fileloc = os.path.join(filepath, zip_info.filename)
  388. self.log.exception("Failed to import: %s", fileloc)
  389. if self.dagbag_import_error_tracebacks:
  390. self.import_errors[fileloc] = traceback.format_exc(
  391. limit=-self.dagbag_import_error_traceback_depth
  392. )
  393. else:
  394. self.import_errors[fileloc] = str(e)
  395. finally:
  396. if sys.path[0] == filepath:
  397. del sys.path[0]
  398. return mods
  399. def _process_modules(self, filepath, mods, file_last_changed_on_disk):
  400. from airflow.models.dag import DAG, DagContext # Avoid circular import
  401. top_level_dags = {(o, m) for m in mods for o in m.__dict__.values() if isinstance(o, DAG)}
  402. top_level_dags.update(DagContext.autoregistered_dags)
  403. DagContext.current_autoregister_module_name = None
  404. DagContext.autoregistered_dags.clear()
  405. found_dags = []
  406. for dag, mod in top_level_dags:
  407. dag.fileloc = mod.__file__
  408. try:
  409. dag.validate()
  410. self.bag_dag(dag=dag, root_dag=dag)
  411. except AirflowClusterPolicySkipDag:
  412. pass
  413. except Exception as e:
  414. self.log.exception("Failed to bag_dag: %s", dag.fileloc)
  415. self.import_errors[dag.fileloc] = f"{type(e).__name__}: {e}"
  416. self.file_last_changed[dag.fileloc] = file_last_changed_on_disk
  417. else:
  418. found_dags.append(dag)
  419. found_dags += dag.subdags
  420. return found_dags
  421. def bag_dag(self, dag, root_dag):
  422. """
  423. Add the DAG into the bag, recurses into sub dags.
  424. :raises: AirflowDagCycleException if a cycle is detected in this dag or its subdags.
  425. :raises: AirflowDagDuplicatedIdException if this dag or its subdags already exists in the bag.
  426. """
  427. self._bag_dag(dag=dag, root_dag=root_dag, recursive=True)
  428. def _bag_dag(self, *, dag, root_dag, recursive):
  429. """
  430. Actual implementation of bagging a dag.
  431. The only purpose of this is to avoid exposing ``recursive`` in ``bag_dag()``,
  432. intended to only be used by the ``_bag_dag()`` implementation.
  433. """
  434. check_cycle(dag) # throws if a task cycle is found
  435. dag.resolve_template_files()
  436. dag.last_loaded = timezone.utcnow()
  437. try:
  438. # Check policies
  439. settings.dag_policy(dag)
  440. for task in dag.tasks:
  441. # The listeners are not supported when ending a task via a trigger on asynchronous operators.
  442. if getattr(task, "end_from_trigger", False) and get_listener_manager().has_listeners:
  443. raise AirflowException(
  444. "Listeners are not supported with end_from_trigger=True for deferrable operators. "
  445. "Task %s in DAG %s has end_from_trigger=True with listeners from plugins. "
  446. "Set end_from_trigger=False to use listeners.",
  447. task.task_id,
  448. dag.dag_id,
  449. )
  450. settings.task_policy(task)
  451. except (AirflowClusterPolicyViolation, AirflowClusterPolicySkipDag):
  452. raise
  453. except Exception as e:
  454. self.log.exception(e)
  455. raise AirflowClusterPolicyError(e)
  456. subdags = dag.subdags
  457. try:
  458. # DAG.subdags automatically performs DFS search, so we don't recurse
  459. # into further _bag_dag() calls.
  460. if recursive:
  461. for subdag in subdags:
  462. subdag.fileloc = dag.fileloc
  463. subdag.parent_dag = dag
  464. self._bag_dag(dag=subdag, root_dag=root_dag, recursive=False)
  465. prev_dag = self.dags.get(dag.dag_id)
  466. if prev_dag and prev_dag.fileloc != dag.fileloc:
  467. raise AirflowDagDuplicatedIdException(
  468. dag_id=dag.dag_id,
  469. incoming=dag.fileloc,
  470. existing=self.dags[dag.dag_id].fileloc,
  471. )
  472. self.dags[dag.dag_id] = dag
  473. self.log.debug("Loaded DAG %s", dag)
  474. except (AirflowDagCycleException, AirflowDagDuplicatedIdException):
  475. # There was an error in bagging the dag. Remove it from the list of dags
  476. self.log.exception("Exception bagging dag: %s", dag.dag_id)
  477. # Only necessary at the root level since DAG.subdags automatically
  478. # performs DFS to search through all subdags
  479. if recursive:
  480. for subdag in subdags:
  481. if subdag.dag_id in self.dags:
  482. del self.dags[subdag.dag_id]
  483. raise
  484. def collect_dags(
  485. self,
  486. dag_folder: str | Path | None = None,
  487. only_if_updated: bool = True,
  488. include_examples: bool = conf.getboolean("core", "LOAD_EXAMPLES"),
  489. safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE"),
  490. ):
  491. """
  492. Look for python modules in a given path, import them, and add them to the dagbag collection.
  493. Note that if a ``.airflowignore`` file is found while processing
  494. the directory, it will behave much like a ``.gitignore``,
  495. ignoring files that match any of the patterns specified
  496. in the file.
  497. **Note**: The patterns in ``.airflowignore`` are interpreted as either
  498. un-anchored regexes or gitignore-like glob expressions, depending on
  499. the ``DAG_IGNORE_FILE_SYNTAX`` configuration parameter.
  500. """
  501. if self.read_dags_from_db:
  502. return
  503. self.log.info("Filling up the DagBag from %s", dag_folder)
  504. dag_folder = dag_folder or self.dag_folder
  505. # Used to store stats around DagBag processing
  506. stats = []
  507. # Ensure dag_folder is a str -- it may have been a pathlib.Path
  508. dag_folder = correct_maybe_zipped(str(dag_folder))
  509. for filepath in list_py_file_paths(
  510. dag_folder,
  511. safe_mode=safe_mode,
  512. include_examples=include_examples,
  513. ):
  514. try:
  515. file_parse_start_dttm = timezone.utcnow()
  516. found_dags = self.process_file(filepath, only_if_updated=only_if_updated, safe_mode=safe_mode)
  517. file_parse_end_dttm = timezone.utcnow()
  518. stats.append(
  519. FileLoadStat(
  520. file=filepath.replace(settings.DAGS_FOLDER, ""),
  521. duration=file_parse_end_dttm - file_parse_start_dttm,
  522. dag_num=len(found_dags),
  523. task_num=sum(len(dag.tasks) for dag in found_dags),
  524. dags=str([dag.dag_id for dag in found_dags]),
  525. warning_num=len(self.captured_warnings.get(filepath, [])),
  526. )
  527. )
  528. except Exception as e:
  529. self.log.exception(e)
  530. self.dagbag_stats = sorted(stats, key=lambda x: x.duration, reverse=True)
  531. def collect_dags_from_db(self):
  532. """Collect DAGs from database."""
  533. from airflow.models.serialized_dag import SerializedDagModel
  534. with Stats.timer("collect_db_dags"):
  535. self.log.info("Filling up the DagBag from database")
  536. # The dagbag contains all rows in serialized_dag table. Deleted DAGs are deleted
  537. # from the table by the scheduler job.
  538. self.dags = SerializedDagModel.read_all_dags()
  539. # Adds subdags.
  540. # DAG post-processing steps such as self.bag_dag and croniter are not needed as
  541. # they are done by scheduler before serialization.
  542. subdags = {}
  543. for dag in self.dags.values():
  544. for subdag in dag.subdags:
  545. subdags[subdag.dag_id] = subdag
  546. self.dags.update(subdags)
  547. def dagbag_report(self):
  548. """Print a report around DagBag loading stats."""
  549. stats = self.dagbag_stats
  550. dag_folder = self.dag_folder
  551. duration = sum((o.duration for o in stats), timedelta()).total_seconds()
  552. dag_num = sum(o.dag_num for o in stats)
  553. task_num = sum(o.task_num for o in stats)
  554. table = tabulate(stats, headers="keys")
  555. report = textwrap.dedent(
  556. f"""\n
  557. -------------------------------------------------------------------
  558. DagBag loading stats for {dag_folder}
  559. -------------------------------------------------------------------
  560. Number of DAGs: {dag_num}
  561. Total task number: {task_num}
  562. DagBag parsing time: {duration}\n{table}
  563. """
  564. )
  565. return report
  566. @classmethod
  567. @provide_session
  568. def _sync_to_db(
  569. cls,
  570. dags: dict[str, DAG],
  571. processor_subdir: str | None = None,
  572. session: Session = NEW_SESSION,
  573. ):
  574. """Save attributes about list of DAG to the DB."""
  575. # To avoid circular import - airflow.models.dagbag -> airflow.models.dag -> airflow.models.dagbag
  576. from airflow.models.dag import DAG
  577. from airflow.models.serialized_dag import SerializedDagModel
  578. log = cls.logger()
  579. def _serialize_dag_capturing_errors(dag, session, processor_subdir):
  580. """
  581. Try to serialize the dag to the DB, but make a note of any errors.
  582. We can't place them directly in import_errors, as this may be retried, and work the next time
  583. """
  584. if dag.is_subdag:
  585. return []
  586. try:
  587. # We can't use bulk_write_to_db as we want to capture each error individually
  588. dag_was_updated = SerializedDagModel.write_dag(
  589. dag,
  590. min_update_interval=settings.MIN_SERIALIZED_DAG_UPDATE_INTERVAL,
  591. session=session,
  592. processor_subdir=processor_subdir,
  593. )
  594. if dag_was_updated:
  595. DagBag._sync_perm_for_dag(dag, session=session)
  596. return []
  597. except OperationalError:
  598. raise
  599. except Exception:
  600. log.exception("Failed to write serialized DAG: %s", dag.fileloc)
  601. dagbag_import_error_traceback_depth = conf.getint(
  602. "core", "dagbag_import_error_traceback_depth"
  603. )
  604. return [(dag.fileloc, traceback.format_exc(limit=-dagbag_import_error_traceback_depth))]
  605. # Retry 'DAG.bulk_write_to_db' & 'SerializedDagModel.bulk_sync_to_db' in case
  606. # of any Operational Errors
  607. # In case of failures, provide_session handles rollback
  608. import_errors = {}
  609. for attempt in run_with_db_retries(logger=log):
  610. with attempt:
  611. serialize_errors = []
  612. log.debug(
  613. "Running dagbag.sync_to_db with retries. Try %d of %d",
  614. attempt.retry_state.attempt_number,
  615. MAX_DB_RETRIES,
  616. )
  617. log.debug("Calling the DAG.bulk_sync_to_db method")
  618. try:
  619. # Write Serialized DAGs to DB, capturing errors
  620. for dag in dags.values():
  621. serialize_errors.extend(
  622. _serialize_dag_capturing_errors(dag, session, processor_subdir)
  623. )
  624. DAG.bulk_write_to_db(dags.values(), processor_subdir=processor_subdir, session=session)
  625. except OperationalError:
  626. session.rollback()
  627. raise
  628. # Only now we are "complete" do we update import_errors - don't want to record errors from
  629. # previous failed attempts
  630. import_errors.update(dict(serialize_errors))
  631. return import_errors
  632. @provide_session
  633. def sync_to_db(self, processor_subdir: str | None = None, session: Session = NEW_SESSION):
  634. import_errors = DagBag._sync_to_db(dags=self.dags, processor_subdir=processor_subdir, session=session)
  635. self.import_errors.update(import_errors)
  636. @classmethod
  637. @provide_session
  638. def _sync_perm_for_dag(cls, dag: DAG, session: Session = NEW_SESSION):
  639. """Sync DAG specific permissions."""
  640. root_dag_id = dag.parent_dag.dag_id if dag.parent_dag else dag.dag_id
  641. cls.logger().debug("Syncing DAG permissions: %s to the DB", root_dag_id)
  642. from airflow.www.security_appless import ApplessAirflowSecurityManager
  643. security_manager = ApplessAirflowSecurityManager(session=session)
  644. security_manager.sync_perm_for_dag(root_dag_id, dag.access_control)
  645. def generate_md5_hash(context):
  646. fileloc = context.get_current_parameters()["fileloc"]
  647. return hashlib.md5(fileloc.encode()).hexdigest()
  648. class DagPriorityParsingRequest(Base):
  649. """Model to store the dag parsing requests that will be prioritized when parsing files."""
  650. __tablename__ = "dag_priority_parsing_request"
  651. # Adding a unique constraint to fileloc results in the creation of an index and we have a limitation
  652. # on the size of the string we can use in the index for MySQL DB. We also have to keep the fileloc
  653. # size consistent with other tables. This is a workaround to enforce the unique constraint.
  654. id = Column(String(32), primary_key=True, default=generate_md5_hash, onupdate=generate_md5_hash)
  655. # The location of the file containing the DAG object
  656. # Note: Do not depend on fileloc pointing to a file; in the case of a
  657. # packaged DAG, it will point to the subpath of the DAG within the
  658. # associated zip.
  659. fileloc = Column(String(2000), nullable=False)
  660. def __init__(self, fileloc: str) -> None:
  661. super().__init__()
  662. self.fileloc = fileloc
  663. def __repr__(self) -> str:
  664. return f"<DagPriorityParsingRequest: fileloc={self.fileloc}>"