123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- #
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- from __future__ import annotations
- import ast
- import hashlib
- import logging
- import os
- import zipfile
- from io import TextIOWrapper
- from pathlib import Path
- from typing import Generator, NamedTuple, Pattern, Protocol, overload
- import re2
- from pathspec.patterns import GitWildMatchPattern
- from airflow.configuration import conf
- from airflow.exceptions import RemovedInAirflow3Warning
- log = logging.getLogger(__name__)
- MODIFIED_DAG_MODULE_NAME = "unusual_prefix_{path_hash}_{module_name}"
- class _IgnoreRule(Protocol):
- """Interface for ignore rules for structural subtyping."""
- @staticmethod
- def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
- """
- Build an ignore rule from the supplied pattern.
- ``base_dir`` and ``definition_file`` should be absolute paths.
- """
- @staticmethod
- def match(path: Path, rules: list[_IgnoreRule]) -> bool:
- """Match a candidate absolute path against a list of rules."""
- class _RegexpIgnoreRule(NamedTuple):
- """Typed namedtuple with utility functions for regexp ignore rules."""
- pattern: Pattern
- base_dir: Path
- @staticmethod
- def compile(pattern: str, base_dir: Path, definition_file: Path) -> _IgnoreRule | None:
- """Build an ignore rule from the supplied regexp pattern and log a useful warning if it is invalid."""
- try:
- return _RegexpIgnoreRule(re2.compile(pattern), base_dir)
- except re2.error as e:
- log.warning("Ignoring invalid regex '%s' from %s: %s", pattern, definition_file, e)
- return None
- @staticmethod
- def match(path: Path, rules: list[_IgnoreRule]) -> bool:
- """Match a list of ignore rules against the supplied path."""
- for rule in rules:
- if not isinstance(rule, _RegexpIgnoreRule):
- raise ValueError(f"_RegexpIgnoreRule cannot match rules of type: {type(rule)}")
- if rule.pattern.search(str(path.relative_to(rule.base_dir))) is not None:
- return True
- return False
- class _GlobIgnoreRule(NamedTuple):
- """Typed namedtuple with utility functions for glob ignore rules."""
- pattern: Pattern
- raw_pattern: str
- include: bool | None = None
- relative_to: Path | None = None
- @staticmethod
- def compile(pattern: str, _, definition_file: Path) -> _IgnoreRule | None:
- """Build an ignore rule from the supplied glob pattern and log a useful warning if it is invalid."""
- relative_to: Path | None = None
- if pattern.strip() == "/":
- # "/" doesn't match anything in gitignore
- log.warning("Ignoring no-op glob pattern '/' from %s", definition_file)
- return None
- if pattern.startswith("/") or "/" in pattern.rstrip("/"):
- # See https://git-scm.com/docs/gitignore
- # > If there is a separator at the beginning or middle (or both) of the pattern, then the
- # > pattern is relative to the directory level of the particular .gitignore file itself.
- # > Otherwise the pattern may also match at any level below the .gitignore level.
- relative_to = definition_file.parent
- ignore_pattern = GitWildMatchPattern(pattern)
- return _GlobIgnoreRule(ignore_pattern.regex, pattern, ignore_pattern.include, relative_to)
- @staticmethod
- def match(path: Path, rules: list[_IgnoreRule]) -> bool:
- """Match a list of ignore rules against the supplied path."""
- matched = False
- for r in rules:
- if not isinstance(r, _GlobIgnoreRule):
- raise ValueError(f"_GlobIgnoreRule cannot match rules of type: {type(r)}")
- rule: _GlobIgnoreRule = r # explicit typing to make mypy play nicely
- rel_path = str(path.relative_to(rule.relative_to) if rule.relative_to else path.name)
- if rule.raw_pattern.endswith("/") and path.is_dir():
- # ensure the test path will potentially match a directory pattern if it is a directory
- rel_path += "/"
- if rule.include is not None and rule.pattern.match(rel_path) is not None:
- matched = rule.include
- return matched
- def TemporaryDirectory(*args, **kwargs):
- """Use `tempfile.TemporaryDirectory`, this function is deprecated."""
- import warnings
- from tempfile import TemporaryDirectory as TmpDir
- warnings.warn(
- "This function is deprecated. Please use `tempfile.TemporaryDirectory`",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- return TmpDir(*args, **kwargs)
- def mkdirs(path, mode):
- """
- Create the directory specified by path, creating intermediate directories as necessary.
- If directory already exists, this is a no-op.
- :param path: The directory to create
- :param mode: The mode to give to the directory e.g. 0o755, ignores umask
- """
- import warnings
- warnings.warn(
- f"This function is deprecated. Please use `pathlib.Path({path}).mkdir`",
- RemovedInAirflow3Warning,
- stacklevel=2,
- )
- Path(path).mkdir(mode=mode, parents=True, exist_ok=True)
- ZIP_REGEX = re2.compile(rf"((.*\.zip){re2.escape(os.sep)})?(.*)")
- @overload
- def correct_maybe_zipped(fileloc: None) -> None: ...
- @overload
- def correct_maybe_zipped(fileloc: str | Path) -> str | Path: ...
- def correct_maybe_zipped(fileloc: None | str | Path) -> None | str | Path:
- """If the path contains a folder with a .zip suffix, treat it as a zip archive and return path."""
- if not fileloc:
- return fileloc
- search_ = ZIP_REGEX.search(str(fileloc))
- if not search_:
- return fileloc
- _, archive, _ = search_.groups()
- if archive and zipfile.is_zipfile(archive):
- return archive
- else:
- return fileloc
- def open_maybe_zipped(fileloc, mode="r"):
- """
- Open the given file.
- If the path contains a folder with a .zip suffix, then the folder
- is treated as a zip archive, opening the file inside the archive.
- :return: a file object, as in `open`, or as in `ZipFile.open`.
- """
- _, archive, filename = ZIP_REGEX.search(fileloc).groups()
- if archive and zipfile.is_zipfile(archive):
- return TextIOWrapper(zipfile.ZipFile(archive, mode=mode).open(filename))
- else:
- return open(fileloc, mode=mode)
- def _find_path_from_directory(
- base_dir_path: str | os.PathLike[str],
- ignore_file_name: str,
- ignore_rule_type: type[_IgnoreRule],
- ) -> Generator[str, None, None]:
- """
- Recursively search the base path and return the list of file paths that should not be ignored.
- :param base_dir_path: the base path to be searched
- :param ignore_file_name: the file name containing regular expressions for files that should be ignored.
- :param ignore_rule_type: the concrete class for ignore rules, which implements the _IgnoreRule interface.
- :return: a generator of file paths which should not be ignored.
- """
- # A Dict of patterns, keyed using resolved, absolute paths
- patterns_by_dir: dict[Path, list[_IgnoreRule]] = {}
- for root, dirs, files in os.walk(base_dir_path, followlinks=True):
- patterns: list[_IgnoreRule] = patterns_by_dir.get(Path(root).resolve(), [])
- ignore_file_path = Path(root) / ignore_file_name
- if ignore_file_path.is_file():
- with open(ignore_file_path) as ifile:
- lines_no_comments = [re2.sub(r"\s*#.*", "", line) for line in ifile.read().split("\n")]
- # append new patterns and filter out "None" objects, which are invalid patterns
- patterns += [
- p
- for p in [
- ignore_rule_type.compile(line, Path(base_dir_path), ignore_file_path)
- for line in lines_no_comments
- if line
- ]
- if p is not None
- ]
- # evaluation order of patterns is important with negation
- # so that later patterns can override earlier patterns
- patterns = list(dict.fromkeys(patterns))
- dirs[:] = [subdir for subdir in dirs if not ignore_rule_type.match(Path(root) / subdir, patterns)]
- # explicit loop for infinite recursion detection since we are following symlinks in this walk
- for sd in dirs:
- dirpath = (Path(root) / sd).resolve()
- if dirpath in patterns_by_dir:
- raise RuntimeError(
- "Detected recursive loop when walking DAG directory "
- f"{base_dir_path}: {dirpath} has appeared more than once."
- )
- patterns_by_dir.update({dirpath: patterns.copy()})
- for file in files:
- if file != ignore_file_name:
- abs_file_path = Path(root) / file
- if not ignore_rule_type.match(abs_file_path, patterns):
- yield str(abs_file_path)
- def find_path_from_directory(
- base_dir_path: str | os.PathLike[str],
- ignore_file_name: str,
- ignore_file_syntax: str = conf.get_mandatory_value("core", "DAG_IGNORE_FILE_SYNTAX", fallback="regexp"),
- ) -> Generator[str, None, None]:
- """
- Recursively search the base path for a list of file paths that should not be ignored.
- :param base_dir_path: the base path to be searched
- :param ignore_file_name: the file name in which specifies the patterns of files/dirs to be ignored
- :param ignore_file_syntax: the syntax of patterns in the ignore file: regexp or glob
- :return: a generator of file paths.
- """
- if ignore_file_syntax == "glob":
- return _find_path_from_directory(base_dir_path, ignore_file_name, _GlobIgnoreRule)
- elif ignore_file_syntax == "regexp" or not ignore_file_syntax:
- return _find_path_from_directory(base_dir_path, ignore_file_name, _RegexpIgnoreRule)
- else:
- raise ValueError(f"Unsupported ignore_file_syntax: {ignore_file_syntax}")
- def list_py_file_paths(
- directory: str | os.PathLike[str] | None,
- safe_mode: bool = conf.getboolean("core", "DAG_DISCOVERY_SAFE_MODE", fallback=True),
- include_examples: bool | None = None,
- ) -> list[str]:
- """
- Traverse a directory and look for Python files.
- :param directory: the directory to traverse
- :param safe_mode: whether to use a heuristic to determine whether a file
- contains Airflow DAG definitions. If not provided, use the
- core.DAG_DISCOVERY_SAFE_MODE configuration setting. If not set, default
- to safe.
- :param include_examples: include example DAGs
- :return: a list of paths to Python files in the specified directory
- """
- if include_examples is None:
- include_examples = conf.getboolean("core", "LOAD_EXAMPLES")
- file_paths: list[str] = []
- if directory is None:
- file_paths = []
- elif os.path.isfile(directory):
- file_paths = [str(directory)]
- elif os.path.isdir(directory):
- file_paths.extend(find_dag_file_paths(directory, safe_mode))
- if include_examples:
- from airflow import example_dags
- example_dag_folder = next(iter(example_dags.__path__))
- file_paths.extend(list_py_file_paths(example_dag_folder, safe_mode, include_examples=False))
- return file_paths
- def find_dag_file_paths(directory: str | os.PathLike[str], safe_mode: bool) -> list[str]:
- """Find file paths of all DAG files."""
- file_paths = []
- for file_path in find_path_from_directory(directory, ".airflowignore"):
- path = Path(file_path)
- try:
- if path.is_file() and (path.suffix == ".py" or zipfile.is_zipfile(path)):
- if might_contain_dag(file_path, safe_mode):
- file_paths.append(file_path)
- except Exception:
- log.exception("Error while examining %s", file_path)
- return file_paths
- COMMENT_PATTERN = re2.compile(r"\s*#.*")
- def might_contain_dag(file_path: str, safe_mode: bool, zip_file: zipfile.ZipFile | None = None) -> bool:
- """
- Check whether a Python file contains Airflow DAGs.
- When safe_mode is off (with False value), this function always returns True.
- If might_contain_dag_callable isn't specified, it uses airflow default heuristic
- """
- if not safe_mode:
- return True
- might_contain_dag_callable = conf.getimport(
- "core",
- "might_contain_dag_callable",
- fallback="airflow.utils.file.might_contain_dag_via_default_heuristic",
- )
- return might_contain_dag_callable(file_path=file_path, zip_file=zip_file)
- def might_contain_dag_via_default_heuristic(file_path: str, zip_file: zipfile.ZipFile | None = None) -> bool:
- """
- Heuristic that guesses whether a Python file contains an Airflow DAG definition.
- :param file_path: Path to the file to be checked.
- :param zip_file: if passed, checks the archive. Otherwise, check local filesystem.
- :return: True, if file might contain DAGs.
- """
- if zip_file:
- with zip_file.open(file_path) as current_file:
- content = current_file.read()
- else:
- if zipfile.is_zipfile(file_path):
- return True
- with open(file_path, "rb") as dag_file:
- content = dag_file.read()
- content = content.lower()
- return all(s in content for s in (b"dag", b"airflow"))
- def _find_imported_modules(module: ast.Module) -> Generator[str, None, None]:
- for st in module.body:
- if isinstance(st, ast.Import):
- for n in st.names:
- yield n.name
- elif isinstance(st, ast.ImportFrom) and st.module is not None:
- yield st.module
- def iter_airflow_imports(file_path: str) -> Generator[str, None, None]:
- """Find Airflow modules imported in the given file."""
- try:
- parsed = ast.parse(Path(file_path).read_bytes())
- except Exception:
- return
- for m in _find_imported_modules(parsed):
- if m.startswith("airflow."):
- yield m
- def get_unique_dag_module_name(file_path: str) -> str:
- """Return a unique module name in the format unusual_prefix_{sha1 of module's file path}_{original module name}."""
- if isinstance(file_path, str):
- path_hash = hashlib.sha1(file_path.encode("utf-8")).hexdigest()
- org_mod_name = re2.sub(r"[.-]", "_", Path(file_path).stem)
- return MODIFIED_DAG_MODULE_NAME.format(path_hash=path_hash, module_name=org_mod_name)
- raise ValueError("file_path should be a string to generate unique module name")
|