file.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import ast
  20. import hashlib
  21. import logging
  22. import os
  23. import zipfile
  24. from io import TextIOWrapper
  25. from pathlib import Path
  26. from typing import Generator, NamedTuple, Pattern, Protocol, overload
  27. import re2
  28. from pathspec.patterns import GitWildMatchPattern
  29. from airflow.configuration import conf
  30. from airflow.exceptions import RemovedInAirflow3Warning
  31. log = logging.getLogger(__name__)
  32. MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}"
  33. class _IgnoreRule(Protocol):
  34. """Interface for ignore rules for structural subtyping."""
  35. @staticmethod
  36. def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
  37. """
  38. Build an ignore rule from the supplied pattern.
  39. ``base_dir`` and ``definition_file`` should be absolute paths.
  40. """
  41. @staticmethod
  42. def match(path: Path, rules: list[_IgnoreRule]) -> bool:
  43. """Match a candidate absolute path against a list of rules."""
  44. class _RegexpIgnoreRule(NamedTuple):
  45. """Typed namedtuple with utility functions for regexp ignore rules."""
  46. pattern: Pattern
  47. base_dir: Path
  48. @staticmethod
  49. def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
  50. """Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid."""
  51. try:
  52. return _RegexpIgnoreRule(re2.compile(pattern), base_dir)
  53. except re2.error as e:
  54. log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e)
  55. return None
  56. @staticmethod
  57. def match(path: Path, rules: list[_IgnoreRule]) -> bool:
  58. """Match a list of ignore rules against the supplied path."""
  59. for rule in rules:
  60. if not isinstance(rule, _RegexpIgnoreRule):
  61. raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}")
  62. if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None:
  63. return True
  64. return False
  65. class _GlobIgnoreRule(NamedTuple):
  66. """Typed namedtuple with utility functions for glob ignore rules."""
  67. pattern: Pattern
  68. raw_pattern: str
  69. include: bool | None = None
  70. relative_to: Path | None = None
  71. @staticmethod
  72. def compile(pattern: str, _, definition_file: Path) -> _IgnoreRule | None:
  73. """Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid."""
  74. relative_to: Path | None = None
  75. if pattern.strip() == "/":
  76. # "/" doesn't match anything in gitignore
  77. log.warning("Ignoring no-op glob pattern '/' from %s", definition_file)
  78. return None
  79. if pattern.startswith("/") or "/" in pattern.rstrip("/"):
  80. # See https://git-scm.com/docs/gitignore
  81. # > If there is a separator at the beginning or middle (or both) of the pattern, then the
  82. # > pattern is relative to the directory level of the particular .gitignore file itself.
  83. # > Otherwise the pattern may also match at any level below the .gitignore level.
  84. relative_to = definition_file.parent
  85. ignore_pattern = GitWildMatchPattern(pattern)
  86. return _GlobIgnoreRule(ignore_pattern.regex, pattern, ignore_pattern.include, relative_to)
  87. @staticmethod
  88. def match(path: Path, rules: list[_IgnoreRule]) -> bool:
  89. """Match a list of ignore rules against the supplied path."""
  90. matched = False
  91. for r in rules:
  92. if not isinstance(r, _GlobIgnoreRule):
  93. raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(r)}")
  94. rule: _GlobIgnoreRule = r # explicit typing to make mypy play nicely
  95. rel_path = str(path.relative_to(rule.relative_to) if rule.relative_to else path.name)
  96. if rule.raw_pattern.endswith("/") and path.is_dir():
  97. # ensure the test path will potentially match a directory pattern if it is a directory
  98. rel_path += "/"
  99. if rule.include is not None and rule.pattern.match(rel_path) is not None:
  100. matched = rule.include
  101. return matched
  102. def TemporaryDirectory(*args, **kwargs):
  103. """Use `tempfile.TemporaryDirectory`, this function is deprecated."""
  104. import warnings
  105. from tempfile import TemporaryDirectory as TmpDir
  106. warnings.warn(
  107. "This function is deprecated. Please use `tempfile.TemporaryDirectory`",
  108. RemovedInAirflow3Warning,
  109. stacklevel=2,
  110. )
  111. return TmpDir(*args, **kwargs)
  112. def mkdirs(path, mode):
  113. """
  114. Create the directory specified by path, creating intermediate directories as necessary.
  115. If directory already exists, this is a no-op.
  116. :param path: The directory to create
  117. :param mode: The mode to give to the directory e.g. 0o755, ignores umask
  118. """
  119. import warnings
  120. warnings.warn(
  121. f"This function is deprecated. Please use `pathlib.Path({path}).mkdir`",
  122. RemovedInAirflow3Warning,
  123. stacklevel=2,
  124. )
  125. Path(path).mkdir(mode=mode, parents=True, exist_ok=True)
  126. ZIP_REGEX = re2.compile(rf"((.*\.zip){re2.escape(os.sep)})?(.*)")
  127. @overload
  128. def correct_maybe_zipped(fileloc: None) -> None: ...
  129. @overload
  130. def correct_maybe_zipped(fileloc: str | Path) -> str | Path: ...
  131. def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path:
  132. """If the path contains a folder with a .zip suffix, treat it as a zip archive and return path."""
  133. if not fileloc:
  134. return fileloc
  135. search_ = ZIP_REGEX.search(str(fileloc))
  136. if not search_:
  137. return fileloc
  138. _, archive, _ = search_.groups()
  139. if archive and zipfile.is_zipfile(archive):
  140. return archive
  141. else:
  142. return fileloc
  143. def open_maybe_zipped(fileloc, mode="r"):
  144. """
  145. Open the given file.
  146. If the path contains a folder with a .zip suffix, then the folder
  147. is treated as a zip archive, opening the file inside the archive.
  148. :return: a file object, as in `open`, or as in `ZipFile.open`.
  149. """
  150. _, archive, filename = ZIP_REGEX.search(fileloc).groups()
  151. if archive and zipfile.is_zipfile(archive):
  152. return TextIOWrapper(zipfile.ZipFile(archive, mode=mode).open(filename))
  153. else:
  154. return open(fileloc, mode=mode)
  155. def _find_path_from_directory(
  156. base_dir_path: str | os.PathLike[str],
  157. ignore_file_name: str,
  158. ignore_rule_type: type[_IgnoreRule],
  159. ) -> Generator[str, None, None]:
  160. """
  161. Recursively search the base path and return the list of file paths that should not be ignored.
  162. :param base_dir_path: the base path to be searched
  163. :param ignore_file_name: the file name containing regular expressions for files that should be ignored.
  164. :param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface.
  165. :return: a generator of file paths which should not be ignored.
  166. """
  167. # A Dict of patterns, keyed using resolved, absolute paths
  168. patterns_by_dir: dict[Path, list[_IgnoreRule]] = {}
  169. for root, dirs, files in os.walk(base_dir_path, followlinks=True):
  170. patterns: list[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), [])
  171. ignore_file_path = Path(root) / ignore_file_name
  172. if ignore_file_path.is_file():
  173. with open(ignore_file_path) as ifile:
  174. lines_no_comments = [re2.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n")]
  175. # append new patterns and filter out "None" objects, which are invalid patterns
  176. patterns += [
  177. p
  178. for p in [
  179. ignore_rule_type.compile(line, Path(base_dir_path), ignore_file_path)
  180. for line in lines_no_comments
  181. if line
  182. ]
  183. if p is not None
  184. ]
  185. # evaluation order of patterns is important with negation
  186. # so that later patterns can override earlier patterns
  187. patterns = list(dict.fromkeys(patterns))
  188. dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)]
  189. # explicit loop for infinite recursion detection since we are following symlinks in this walk
  190. for sd in dirs:
  191. dirpath = (Path(root) / sd).resolve()
  192. if dirpath in patterns_by_dir:
  193. raise RuntimeError(
  194. "Detected recursive loop when walking DAG directory "
  195. f"{base_dir_path}: {dirpath} has appeared more than once."
  196. )
  197. patterns_by_dir.update({dirpath: patterns.copy()})
  198. for file in files:
  199. if file != ignore_file_name:
  200. abs_file_path = Path(root) / file
  201. if not ignore_rule_type.match(abs_file_path, patterns):
  202. yield str(abs_file_path)
  203. def find_path_from_directory(
  204. base_dir_path: str | os.PathLike[str],
  205. ignore_file_name: str,
  206. ignore_file_syntax: str = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="regexp"),
  207. ) -> Generator[str, None, None]:
  208. """
  209. Recursively search the base path for a list of file paths that should not be ignored.
  210. :param base_dir_path: the base path to be searched
  211. :param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored
  212. :param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob
  213. :return: a generator of file paths.
  214. """
  215. if ignore_file_syntax == "glob":
  216. return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule)
  217. elif ignore_file_syntax == "regexp" or not ignore_file_syntax:
  218. return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule)
  219. else:
  220. raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")
  221. def list_py_file_paths(
  222. directory: str | os.PathLike[str] | None,
  223. safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True),
  224. include_examples: bool | None = None,
  225. ) -> list[str]:
  226. """
  227. Traverse a directory and look for Python files.
  228. :param directory: the directory to traverse
  229. :param safe_mode: whether to use a heuristic to determine whether a file
  230. contains Airflow DAG definitions. If not provided, use the
  231. core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default
  232. to safe.
  233. :param include_examples: include example DAGs
  234. :return: a list of paths to Python files in the specified directory
  235. """
  236. if include_examples is None:
  237. include_examples = conf.getboolean("core", "LOAD_EXAMPLES")
  238. file_paths: list[str] = []
  239. if directory is None:
  240. file_paths = []
  241. elif os.path.isfile(directory):
  242. file_paths = [str(directory)]
  243. elif os.path.isdir(directory):
  244. file_paths.extend(find_dag_file_paths(directory, safe_mode))
  245. if include_examples:
  246. from airflow import example_dags
  247. example_dag_folder = next(iter(example_dags.__path__))
  248. file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, include_examples=False))
  249. return file_paths
  250. def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> list[str]:
  251. """Find file paths of all DAG files."""
  252. file_paths = []
  253. for file_path in find_path_from_directory(directory, ".airflowignore"):
  254. path = Path(file_path)
  255. try:
  256. if path.is_file() and (path.suffix == ".py" or zipfile.is_zipfile(path)):
  257. if might_contain_dag(file_path, safe_mode):
  258. file_paths.append(file_path)
  259. except Exception:
  260. log.exception("Error while examining %s", file_path)
  261. return file_paths
  262. COMMENT_PATTERN = re2.compile(r"\s*#.*")
  263. def might_contain_dag(file_path: str, safe_mode: bool, zip_file: zipfile.ZipFile | None = None) -> bool:
  264. """
  265. Check whether a Python file contains Airflow DAGs.
  266. When safe_mode is off (with False value), this function always returns True.
  267. If might_contain_dag_callable isn't specified, it uses airflow default heuristic
  268. """
  269. if not safe_mode:
  270. return True
  271. might_contain_dag_callable = conf.getimport(
  272. "core",
  273. "might_contain_dag_callable",
  274. fallback="airflow.utils.file.might_contain_dag_via_default_heuristic",
  275. )
  276. return might_contain_dag_callable(file_path=file_path, zip_file=zip_file)
  277. def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool:
  278. """
  279. Heuristic that guesses whether a Python file contains an Airflow DAG definition.
  280. :param file_path: Path to the file to be checked.
  281. :param zip_file: if passed, checks the archive. Otherwise, check local filesystem.
  282. :return: True, if file might contain DAGs.
  283. """
  284. if zip_file:
  285. with zip_file.open(file_path) as current_file:
  286. content = current_file.read()
  287. else:
  288. if zipfile.is_zipfile(file_path):
  289. return True
  290. with open(file_path, "rb") as dag_file:
  291. content = dag_file.read()
  292. content = content.lower()
  293. return all(s in content for s in (b"dag", b"airflow"))
  294. def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]:
  295. for st in module.body:
  296. if isinstance(st, ast.Import):
  297. for n in st.names:
  298. yield n.name
  299. elif isinstance(st, ast.ImportFrom) and st.module is not None:
  300. yield st.module
  301. def iter_airflow_imports(file_path: str) -> Generator[str, None, None]:
  302. """Find Airflow modules imported in the given file."""
  303. try:
  304. parsed = ast.parse(Path(file_path).read_bytes())
  305. except Exception:
  306. return
  307. for m in _find_imported_modules(parsed):
  308. if m.startswith("airflow."):
  309. yield m
  310. def get_unique_dag_module_name(file_path: str) -> str:
  311. """Return a unique module name in the format unusual_prefix_{sha1 of module's file path}_{original module name}."""
  312. if isinstance(file_path, str):
  313. path_hash = hashlib.sha1(file_path.encode("utf-8")).hexdigest()
  314. org_mod_name = re2.sub(r"[.-]", "_", Path(file_path).stem)
  315. return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash, module_name=org_mod_name)
  316. raise ValueError("file_path should be a string to generate unique module name")