# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import annotations import ast import hashlib import logging import os import zipfile from io import TextIOWrapper from pathlib import Path from typing import Generator, NamedTuple, Pattern, Protocol, overload import re2 from pathspec.patterns import GitWildMatchPattern from airflow.configuration import conf from airflow.exceptions import RemovedInAirflow3Warning log = logging.getLogger(__name__) MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}" class _IgnoreRule(Protocol): """Interface for ignore rules for structural subtyping.""" @staticmethod def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: """ Build an ignore rule from the supplied pattern. ``base_dir`` and ``definition_file`` should be absolute paths. """ @staticmethod def match(path: Path, rules: list[_IgnoreRule]) -> bool: """Match a candidate absolute path against a list of rules.""" class _RegexpIgnoreRule(NamedTuple): """Typed namedtuple with utility functions for regexp ignore rules.""" pattern: Pattern base_dir: Path @staticmethod def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None: """Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid.""" try: return _RegexpIgnoreRule(re2.compile(pattern), base_dir) except re2.error as e: log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e) return None @staticmethod def match(path: Path, rules: list[_IgnoreRule]) -> bool: """Match a list of ignore rules against the supplied path.""" for rule in rules: if not isinstance(rule, _RegexpIgnoreRule): raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}") if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None: return True return False class _GlobIgnoreRule(NamedTuple): """Typed namedtuple with utility functions for glob ignore rules.""" pattern: Pattern raw_pattern: str include: bool | None = None relative_to: Path | None = None @staticmethod def compile(pattern: str, _, definition_file: Path) -> _IgnoreRule | None: """Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid.""" relative_to: Path | None = None if pattern.strip() == "/": # "/" doesn't match anything in gitignore log.warning("Ignoring no-op glob pattern '/' from %s", definition_file) return None if pattern.startswith("/") or "/" in pattern.rstrip("/"): # See https://git-scm.com/docs/gitignore # > If there is a separator at the beginning or middle (or both) of the pattern, then the # > pattern is relative to the directory level of the particular .gitignore file itself. # > Otherwise the pattern may also match at any level below the .gitignore level. relative_to = definition_file.parent ignore_pattern = GitWildMatchPattern(pattern) return _GlobIgnoreRule(ignore_pattern.regex, pattern, ignore_pattern.include, relative_to) @staticmethod def match(path: Path, rules: list[_IgnoreRule]) -> bool: """Match a list of ignore rules against the supplied path.""" matched = False for r in rules: if not isinstance(r, _GlobIgnoreRule): raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(r)}") rule: _GlobIgnoreRule = r # explicit typing to make mypy play nicely rel_path = str(path.relative_to(rule.relative_to) if rule.relative_to else path.name) if rule.raw_pattern.endswith("/") and path.is_dir(): # ensure the test path will potentially match a directory pattern if it is a directory rel_path += "/" if rule.include is not None and rule.pattern.match(rel_path) is not None: matched = rule.include return matched def TemporaryDirectory(*args, **kwargs): """Use `tempfile.TemporaryDirectory`, this function is deprecated.""" import warnings from tempfile import TemporaryDirectory as TmpDir warnings.warn( "This function is deprecated. Please use `tempfile.TemporaryDirectory`", RemovedInAirflow3Warning, stacklevel=2, ) return TmpDir(*args, **kwargs) def mkdirs(path, mode): """ Create the directory specified by path, creating intermediate directories as necessary. If directory already exists, this is a no-op. :param path: The directory to create :param mode: The mode to give to the directory e.g. 0o755, ignores umask """ import warnings warnings.warn( f"This function is deprecated. Please use `pathlib.Path({path}).mkdir`", RemovedInAirflow3Warning, stacklevel=2, ) Path(path).mkdir(mode=mode, parents=True, exist_ok=True) ZIP_REGEX = re2.compile(rf"((.*\.zip){re2.escape(os.sep)})?(.*)") @overload def correct_maybe_zipped(fileloc: None) -> None: ... @overload def correct_maybe_zipped(fileloc: str | Path) -> str | Path: ... def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path: """If the path contains a folder with a .zip suffix, treat it as a zip archive and return path.""" if not fileloc: return fileloc search_ = ZIP_REGEX.search(str(fileloc)) if not search_: return fileloc _, archive, _ = search_.groups() if archive and zipfile.is_zipfile(archive): return archive else: return fileloc def open_maybe_zipped(fileloc, mode="r"): """ Open the given file. If the path contains a folder with a .zip suffix, then the folder is treated as a zip archive, opening the file inside the archive. :return: a file object, as in `open`, or as in `ZipFile.open`. """ _, archive, filename = ZIP_REGEX.search(fileloc).groups() if archive and zipfile.is_zipfile(archive): return TextIOWrapper(zipfile.ZipFile(archive, mode=mode).open(filename)) else: return open(fileloc, mode=mode) def _find_path_from_directory( base_dir_path: str | os.PathLike[str], ignore_file_name: str, ignore_rule_type: type[_IgnoreRule], ) -> Generator[str, None, None]: """ Recursively search the base path and return the list of file paths that should not be ignored. :param base_dir_path: the base path to be searched :param ignore_file_name: the file name containing regular expressions for files that should be ignored. :param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface. :return: a generator of file paths which should not be ignored. """ # A Dict of patterns, keyed using resolved, absolute paths patterns_by_dir: dict[Path, list[_IgnoreRule]] = {} for root, dirs, files in os.walk(base_dir_path, followlinks=True): patterns: list[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), []) ignore_file_path = Path(root) / ignore_file_name if ignore_file_path.is_file(): with open(ignore_file_path) as ifile: lines_no_comments = [re2.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n")] # append new patterns and filter out "None" objects, which are invalid patterns patterns += [ p for p in [ ignore_rule_type.compile(line, Path(base_dir_path), ignore_file_path) for line in lines_no_comments if line ] if p is not None ] # evaluation order of patterns is important with negation # so that later patterns can override earlier patterns patterns = list(dict.fromkeys(patterns)) dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)] # explicit loop for infinite recursion detection since we are following symlinks in this walk for sd in dirs: dirpath = (Path(root) / sd).resolve() if dirpath in patterns_by_dir: raise RuntimeError( "Detected recursive loop when walking DAG directory " f"{base_dir_path}: {dirpath} has appeared more than once." ) patterns_by_dir.update({dirpath: patterns.copy()}) for file in files: if file != ignore_file_name: abs_file_path = Path(root) / file if not ignore_rule_type.match(abs_file_path, patterns): yield str(abs_file_path) def find_path_from_directory( base_dir_path: str | os.PathLike[str], ignore_file_name: str, ignore_file_syntax: str = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="regexp"), ) -> Generator[str, None, None]: """ Recursively search the base path for a list of file paths that should not be ignored. :param base_dir_path: the base path to be searched :param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored :param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob :return: a generator of file paths. """ if ignore_file_syntax == "glob": return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule) elif ignore_file_syntax == "regexp" or not ignore_file_syntax: return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule) else: raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}") def list_py_file_paths( directory: str | os.PathLike[str] | None, safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True), include_examples: bool | None = None, ) -> list[str]: """ Traverse a directory and look for Python files. :param directory: the directory to traverse :param safe_mode: whether to use a heuristic to determine whether a file contains Airflow DAG definitions. If not provided, use the core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default to safe. :param include_examples: include example DAGs :return: a list of paths to Python files in the specified directory """ if include_examples is None: include_examples = conf.getboolean("core", "LOAD_EXAMPLES") file_paths: list[str] = [] if directory is None: file_paths = [] elif os.path.isfile(directory): file_paths = [str(directory)] elif os.path.isdir(directory): file_paths.extend(find_dag_file_paths(directory, safe_mode)) if include_examples: from airflow import example_dags example_dag_folder = next(iter(example_dags.__path__)) file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, include_examples=False)) return file_paths def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> list[str]: """Find file paths of all DAG files.""" file_paths = [] for file_path in find_path_from_directory(directory, ".airflowignore"): path = Path(file_path) try: if path.is_file() and (path.suffix == ".py" or zipfile.is_zipfile(path)): if might_contain_dag(file_path, safe_mode): file_paths.append(file_path) except Exception: log.exception("Error while examining %s", file_path) return file_paths COMMENT_PATTERN = re2.compile(r"\s*#.*") def might_contain_dag(file_path: str, safe_mode: bool, zip_file: zipfile.ZipFile | None = None) -> bool: """ Check whether a Python file contains Airflow DAGs. When safe_mode is off (with False value), this function always returns True. If might_contain_dag_callable isn't specified, it uses airflow default heuristic """ if not safe_mode: return True might_contain_dag_callable = conf.getimport( "core", "might_contain_dag_callable", fallback="airflow.utils.file.might_contain_dag_via_default_heuristic", ) return might_contain_dag_callable(file_path=file_path, zip_file=zip_file) def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool: """ Heuristic that guesses whether a Python file contains an Airflow DAG definition. :param file_path: Path to the file to be checked. :param zip_file: if passed, checks the archive. Otherwise, check local filesystem. :return: True, if file might contain DAGs. """ if zip_file: with zip_file.open(file_path) as current_file: content = current_file.read() else: if zipfile.is_zipfile(file_path): return True with open(file_path, "rb") as dag_file: content = dag_file.read() content = content.lower() return all(s in content for s in (b"dag", b"airflow")) def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]: for st in module.body: if isinstance(st, ast.Import): for n in st.names: yield n.name elif isinstance(st, ast.ImportFrom) and st.module is not None: yield st.module def iter_airflow_imports(file_path: str) -> Generator[str, None, None]: """Find Airflow modules imported in the given file.""" try: parsed = ast.parse(Path(file_path).read_bytes()) except Exception: return for m in _find_imported_modules(parsed): if m.startswith("airflow."): yield m def get_unique_dag_module_name(file_path: str) -> str: """Return a unique module name in the format unusual_prefix_{sha1 of module's file path}_{original module name}.""" if isinstance(file_path, str): path_hash = hashlib.sha1(file_path.encode("utf-8")).hexdigest() org_mod_name = re2.sub(r"[.-]", "_", Path(file_path).stem) return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash, module_name=org_mod_name) raise ValueError("file_path should be a string to generate unique module name")