1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480 |
- #
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- """Processes DAGs."""
- from __future__ import annotations
- import enum
- import importlib
- import inspect
- import logging
- import multiprocessing
- import os
- import random
- import signal
- import sys
- import time
- import zipfile
- from collections import defaultdict, deque
- from datetime import datetime, timedelta
- from importlib import import_module
- from pathlib import Path
- from typing import TYPE_CHECKING, Any, Callable, Iterator, NamedTuple, cast
- from setproctitle import setproctitle
- from sqlalchemy import delete, select, update
- from tabulate import tabulate
- import airflow.models
- from airflow.api_internal.internal_api_call import internal_api_call
- from airflow.callbacks.callback_requests import CallbackRequest, SlaCallbackRequest
- from airflow.configuration import conf
- from airflow.dag_processing.processor import DagFileProcessorProcess
- from airflow.models.dag import DagModel
- from airflow.models.dagbag import DagPriorityParsingRequest
- from airflow.models.dagwarning import DagWarning
- from airflow.models.db_callback_request import DbCallbackRequest
- from airflow.models.errors import ParseImportError
- from airflow.models.serialized_dag import SerializedDagModel
- from airflow.secrets.cache import SecretCache
- from airflow.stats import Stats
- from airflow.traces.tracer import Trace, span
- from airflow.utils import timezone
- from airflow.utils.dates import datetime_to_nano
- from airflow.utils.file import list_py_file_paths, might_contain_dag
- from airflow.utils.log.logging_mixin import LoggingMixin
- from airflow.utils.mixins import MultiprocessingStartMethodMixin
- from airflow.utils.net import get_hostname
- from airflow.utils.process_utils import (
- kill_child_processes_by_pids,
- reap_process_group,
- set_new_process_group,
- )
- from airflow.utils.retries import retry_db_transaction
- from airflow.utils.session import NEW_SESSION, provide_session
- from airflow.utils.sqlalchemy import prohibit_commit, with_row_locks
- if TYPE_CHECKING:
- from multiprocessing.connection import Connection as MultiprocessingConnection
- from sqlalchemy.orm import Session
- class DagParsingStat(NamedTuple):
- """Information on processing progress."""
- done: bool
- all_files_processed: bool
- class DagFileStat(NamedTuple):
- """Information about single processing of one file."""
- num_dags: int
- import_errors: int
- last_finish_time: datetime | None
- last_duration: timedelta | None
- run_count: int
- last_num_of_db_queries: int
- class DagParsingSignal(enum.Enum):
- """All signals sent to parser."""
- AGENT_RUN_ONCE = "agent_run_once"
- TERMINATE_MANAGER = "terminate_manager"
- END_MANAGER = "end_manager"
- class DagFileProcessorAgent(LoggingMixin, MultiprocessingStartMethodMixin):
- """
- Agent for DAG file processing.
- It is responsible for all DAG parsing related jobs in scheduler process.
- Mainly it can spin up DagFileProcessorManager in a subprocess,
- collect DAG parsing results from it and communicate signal/DAG parsing stat with it.
- This class runs in the main `airflow scheduler` process.
- :param dag_directory: Directory where DAG definitions are kept. All
- files in file_paths should be under this directory
- :param max_runs: The number of times to parse and schedule each file. -1
- for unlimited.
- :param processor_timeout: How long to wait before timing out a DAG file processor
- :param dag_ids: if specified, only schedule tasks with these DAG IDs
- :param pickle_dags: whether to pickle DAGs.
- :param async_mode: Whether to start agent in async mode
- """
- def __init__(
- self,
- dag_directory: os.PathLike,
- max_runs: int,
- processor_timeout: timedelta,
- dag_ids: list[str] | None,
- pickle_dags: bool,
- async_mode: bool,
- ):
- super().__init__()
- self._dag_directory: os.PathLike = dag_directory
- self._max_runs = max_runs
- self._processor_timeout = processor_timeout
- self._dag_ids = dag_ids
- self._pickle_dags = pickle_dags
- self._async_mode = async_mode
- # Map from file path to the processor
- self._processors: dict[str, DagFileProcessorProcess] = {}
- # Pipe for communicating signals
- self._process: multiprocessing.process.BaseProcess | None = None
- self._done: bool = False
- # Initialized as true so we do not deactivate w/o any actual DAG parsing.
- self._all_files_processed = True
- self._parent_signal_conn: MultiprocessingConnection | None = None
- self._last_parsing_stat_received_at: float = time.monotonic()
- def start(self) -> None:
- """Launch DagFileProcessorManager processor and start DAG parsing loop in manager."""
- context = self._get_multiprocessing_context()
- self._last_parsing_stat_received_at = time.monotonic()
- self._parent_signal_conn, child_signal_conn = context.Pipe()
- process = context.Process(
- target=type(self)._run_processor_manager,
- args=(
- self._dag_directory,
- self._max_runs,
- self._processor_timeout,
- child_signal_conn,
- self._dag_ids,
- self._pickle_dags,
- self._async_mode,
- ),
- )
- self._process = process
- process.start()
- self.log.info("Launched DagFileProcessorManager with pid: %s", process.pid)
- def run_single_parsing_loop(self) -> None:
- """
- Send agent heartbeat signal to the manager, requesting that it runs one processing "loop".
- Should only be used when launched DAG file processor manager in sync mode.
- Call wait_until_finished to ensure that any launched processors have finished before continuing.
- """
- if not self._parent_signal_conn or not self._process:
- raise ValueError("Process not started.")
- if not self._process.is_alive():
- return
- try:
- self._parent_signal_conn.send(DagParsingSignal.AGENT_RUN_ONCE)
- except ConnectionError:
- # If this died cos of an error then we will noticed and restarted
- # when harvest_serialized_dags calls _heartbeat_manager.
- pass
- def get_callbacks_pipe(self) -> MultiprocessingConnection:
- """Return the pipe for sending Callbacks to DagProcessorManager."""
- if not self._parent_signal_conn:
- raise ValueError("Process not started.")
- return self._parent_signal_conn
- def wait_until_finished(self) -> None:
- """Wait until DAG parsing is finished."""
- if not self._parent_signal_conn:
- raise ValueError("Process not started.")
- if self._async_mode:
- raise RuntimeError("wait_until_finished should only be called in sync_mode")
- while self._parent_signal_conn.poll(timeout=None):
- try:
- result = self._parent_signal_conn.recv()
- except EOFError:
- return
- self._process_message(result)
- if isinstance(result, DagParsingStat):
- # In sync mode (which is the only time we call this function) we don't send this message from
- # the Manager until all the running processors have finished
- return
- @staticmethod
- def _run_processor_manager(
- dag_directory: os.PathLike,
- max_runs: int,
- processor_timeout: timedelta,
- signal_conn: MultiprocessingConnection,
- dag_ids: list[str] | None,
- pickle_dags: bool,
- async_mode: bool,
- ) -> None:
- # Make this process start as a new process group - that makes it easy
- # to kill all sub-process of this at the OS-level, rather than having
- # to iterate the child processes
- set_new_process_group()
- span = Trace.get_current_span()
- span.set_attribute("dag_directory", str(dag_directory))
- span.set_attribute("dag_ids", str(dag_ids))
- setproctitle("airflow scheduler -- DagFileProcessorManager")
- reload_configuration_for_dag_processing()
- processor_manager = DagFileProcessorManager(
- dag_directory=dag_directory,
- max_runs=max_runs,
- processor_timeout=processor_timeout,
- dag_ids=dag_ids,
- pickle_dags=pickle_dags,
- signal_conn=signal_conn,
- async_mode=async_mode,
- )
- processor_manager.start()
- def heartbeat(self) -> None:
- """Check if the DagFileProcessorManager process is alive, and process any pending messages."""
- if not self._parent_signal_conn:
- raise ValueError("Process not started.")
- # Receive any pending messages before checking if the process has exited.
- while self._parent_signal_conn.poll(timeout=0.01):
- try:
- result = self._parent_signal_conn.recv()
- except (EOFError, ConnectionError):
- break
- self._process_message(result)
- # If it died unexpectedly restart the manager process
- self._heartbeat_manager()
- def _process_message(self, message):
- span = Trace.get_current_span()
- self.log.debug("Received message of type %s", type(message).__name__)
- if isinstance(message, DagParsingStat):
- span.set_attribute("all_files_processed", str(message.all_files_processed))
- self._sync_metadata(message)
- else:
- raise RuntimeError(f"Unexpected message received of type {type(message).__name__}")
- def _heartbeat_manager(self):
- """Heartbeat DAG file processor and restart it if we are not done."""
- if not self._parent_signal_conn:
- raise ValueError("Process not started.")
- if self._process and not self._process.is_alive():
- self._process.join(timeout=0)
- if not self.done:
- self.log.warning(
- "DagFileProcessorManager (PID=%d) exited with exit code %d - re-launching",
- self._process.pid,
- self._process.exitcode,
- )
- self.start()
- if self.done:
- return
- parsing_stat_age = time.monotonic() - self._last_parsing_stat_received_at
- if parsing_stat_age > self._processor_timeout.total_seconds():
- Stats.incr("dag_processing.manager_stalls")
- self.log.error(
- "DagFileProcessorManager (PID=%d) last sent a heartbeat %.2f seconds ago! Restarting it",
- self._process.pid,
- parsing_stat_age,
- )
- reap_process_group(self._process.pid, logger=self.log)
- self.start()
- def _sync_metadata(self, stat):
- """Sync metadata from stat queue and only keep the latest stat."""
- self._done = stat.done
- self._all_files_processed = stat.all_files_processed
- self._last_parsing_stat_received_at = time.monotonic()
- @property
- def done(self) -> bool:
- """Whether the DagFileProcessorManager finished."""
- return self._done
- @property
- def all_files_processed(self):
- """Whether all files been processed at least once."""
- return self._all_files_processed
- def terminate(self):
- """Send termination signal to DAG parsing processor manager to terminate all DAG file processors."""
- if self._process and self._process.is_alive():
- self.log.info("Sending termination message to manager.")
- try:
- self._parent_signal_conn.send(DagParsingSignal.TERMINATE_MANAGER)
- except ConnectionError:
- pass
- def end(self):
- """Terminate (and then kill) the manager process launched."""
- if not self._process:
- self.log.warning("Ending without manager process.")
- return
- # Give the Manager some time to cleanly shut down, but not too long, as
- # it's better to finish sooner than wait for (non-critical) work to
- # finish
- self._process.join(timeout=1.0)
- reap_process_group(self._process.pid, logger=self.log)
- self._parent_signal_conn.close()
- class DagFileProcessorManager(LoggingMixin):
- """
- Manage processes responsible for parsing DAGs.
- Given a list of DAG definition files, this kicks off several processors
- in parallel to process them and put the results to a multiprocessing.Queue
- for DagFileProcessorAgent to harvest. The parallelism is limited and as the
- processors finish, more are launched. The files are processed over and
- over again, but no more often than the specified interval.
- :param dag_directory: Directory where DAG definitions are kept. All
- files in file_paths should be under this directory
- :param max_runs: The number of times to parse and schedule each file. -1
- for unlimited.
- :param processor_timeout: How long to wait before timing out a DAG file processor
- :param signal_conn: connection to communicate signal with processor agent.
- :param dag_ids: if specified, only schedule tasks with these DAG IDs
- :param pickle_dags: whether to pickle DAGs.
- :param async_mode: whether to start the manager in async mode
- """
- DEFAULT_FILE_STAT = DagFileStat(
- num_dags=0,
- import_errors=0,
- last_finish_time=None,
- last_duration=None,
- run_count=0,
- last_num_of_db_queries=0,
- )
- def __init__(
- self,
- dag_directory: os.PathLike[str],
- max_runs: int,
- processor_timeout: timedelta,
- dag_ids: list[str] | None,
- pickle_dags: bool,
- signal_conn: MultiprocessingConnection | None = None,
- async_mode: bool = True,
- ):
- super().__init__()
- # known files; this will be updated every `dag_dir_list_interval` and stuff added/removed accordingly
- self._file_paths: list[str] = []
- self._file_path_queue: deque[str] = deque()
- self._max_runs = max_runs
- # signal_conn is None for dag_processor_standalone mode.
- self._direct_scheduler_conn = signal_conn
- self._pickle_dags = pickle_dags
- self._dag_ids = dag_ids
- self._async_mode = async_mode
- self._parsing_start_time: float | None = None
- self._dag_directory = dag_directory
- # Set the signal conn in to non-blocking mode, so that attempting to
- # send when the buffer is full errors, rather than hangs for-ever
- # attempting to send (this is to avoid deadlocks!)
- #
- # Don't do this in sync_mode, as we _need_ the DagParsingStat sent to
- # continue the scheduler
- if self._async_mode and self._direct_scheduler_conn is not None:
- os.set_blocking(self._direct_scheduler_conn.fileno(), False)
- self.standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
- self._parallelism = conf.getint("scheduler", "parsing_processes")
- if (
- conf.get_mandatory_value("database", "sql_alchemy_conn").startswith("sqlite")
- and self._parallelism > 1
- ):
- self.log.warning(
- "Because we cannot use more than 1 thread (parsing_processes = "
- "%d) when using sqlite. So we set parallelism to 1.",
- self._parallelism,
- )
- self._parallelism = 1
- # Parse and schedule each file no faster than this interval.
- self._file_process_interval = conf.getint("scheduler", "min_file_process_interval")
- # How often to print out DAG file processing stats to the log. Default to
- # 30 seconds.
- self.print_stats_interval = conf.getint("scheduler", "print_stats_interval")
- # Map from file path to the processor
- self._processors: dict[str, DagFileProcessorProcess] = {}
- self._num_run = 0
- # Map from file path to stats about the file
- self._file_stats: dict[str, DagFileStat] = {}
- # Last time that the DAG dir was traversed to look for files
- self.last_dag_dir_refresh_time = timezone.make_aware(datetime.fromtimestamp(0))
- # Last time stats were printed
- self.last_stat_print_time = 0
- # Last time we cleaned up DAGs which are no longer in files
- self.last_deactivate_stale_dags_time = timezone.make_aware(datetime.fromtimestamp(0))
- # How often to check for DAGs which are no longer in files
- self.parsing_cleanup_interval = conf.getint("scheduler", "parsing_cleanup_interval")
- # How long to wait for a DAG to be reparsed after its file has been parsed before disabling
- self.stale_dag_threshold = conf.getint("scheduler", "stale_dag_threshold")
- # How long to wait before timing out a process to parse a DAG file
- self._processor_timeout = processor_timeout
- # How often to scan the DAGs directory for new files. Default to 5 minutes.
- self.dag_dir_list_interval = conf.getint("scheduler", "dag_dir_list_interval")
- # Mapping file name and callbacks requests
- self._callback_to_execute: dict[str, list[CallbackRequest]] = defaultdict(list)
- self._log = logging.getLogger("airflow.processor_manager")
- self.waitables: dict[Any, MultiprocessingConnection | DagFileProcessorProcess] = (
- {
- self._direct_scheduler_conn: self._direct_scheduler_conn,
- }
- if self._direct_scheduler_conn is not None
- else {}
- )
- self.heartbeat: Callable[[], None] = lambda: None
- def register_exit_signals(self):
- """Register signals that stop child processes."""
- signal.signal(signal.SIGINT, self._exit_gracefully)
- signal.signal(signal.SIGTERM, self._exit_gracefully)
- # So that we ignore the debug dump signal, making it easier to send
- signal.signal(signal.SIGUSR2, signal.SIG_IGN)
- def _exit_gracefully(self, signum, frame):
- """Clean up DAG file processors to avoid leaving orphan processes."""
- self.log.info("Exiting gracefully upon receiving signal %s", signum)
- self.log.debug("Current Stacktrace is: %s", "\n".join(map(str, inspect.stack())))
- self.terminate()
- self.end()
- self.log.debug("Finished terminating DAG processors.")
- sys.exit(os.EX_OK)
- def start(self):
- """
- Use multiple processes to parse and generate tasks for the DAGs in parallel.
- By processing them in separate processes, we can get parallelism and isolation
- from potentially harmful user code.
- """
- self.register_exit_signals()
- set_new_process_group()
- self.log.info("Processing files using up to %s processes at a time ", self._parallelism)
- self.log.info("Process each file at most once every %s seconds", self._file_process_interval)
- self.log.info(
- "Checking for new files in %s every %s seconds", self._dag_directory, self.dag_dir_list_interval
- )
- return self._run_parsing_loop()
- def _scan_stale_dags(self):
- """Scan at fix internal DAGs which are no longer present in files."""
- now = timezone.utcnow()
- elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
- if elapsed_time_since_refresh > self.parsing_cleanup_interval:
- last_parsed = {
- fp: self.get_last_finish_time(fp) for fp in self.file_paths if self.get_last_finish_time(fp)
- }
- DagFileProcessorManager.deactivate_stale_dags(
- last_parsed=last_parsed,
- dag_directory=self.get_dag_directory(),
- stale_dag_threshold=self.stale_dag_threshold,
- )
- self.last_deactivate_stale_dags_time = timezone.utcnow()
- @classmethod
- @internal_api_call
- @provide_session
- def deactivate_stale_dags(
- cls,
- last_parsed: dict[str, datetime | None],
- dag_directory: str,
- stale_dag_threshold: int,
- session: Session = NEW_SESSION,
- ):
- """
- Detect DAGs which are no longer present in files.
- Deactivate them and remove them in the serialized_dag table.
- """
- to_deactivate = set()
- query = select(DagModel.dag_id, DagModel.fileloc, DagModel.last_parsed_time).where(DagModel.is_active)
- standalone_dag_processor = conf.getboolean("scheduler", "standalone_dag_processor")
- if standalone_dag_processor:
- query = query.where(DagModel.processor_subdir == dag_directory)
- dags_parsed = session.execute(query)
- for dag in dags_parsed:
- # The largest valid difference between a DagFileStat's last_finished_time and a DAG's
- # last_parsed_time is the processor_timeout. Longer than that indicates that the DAG is
- # no longer present in the file. We have a stale_dag_threshold configured to prevent a
- # significant delay in deactivation of stale dags when a large timeout is configured
- if (
- dag.fileloc in last_parsed
- and (dag.last_parsed_time + timedelta(seconds=stale_dag_threshold)) < last_parsed[dag.fileloc]
- ):
- cls.logger().info("DAG %s is missing and will be deactivated.", dag.dag_id)
- to_deactivate.add(dag.dag_id)
- if to_deactivate:
- deactivated_dagmodel = session.execute(
- update(DagModel)
- .where(DagModel.dag_id.in_(to_deactivate))
- .values(is_active=False)
- .execution_options(synchronize_session="fetch")
- )
- deactivated = deactivated_dagmodel.rowcount
- if deactivated:
- cls.logger().info("Deactivated %i DAGs which are no longer present in file.", deactivated)
- for dag_id in to_deactivate:
- SerializedDagModel.remove_dag(dag_id)
- cls.logger().info("Deleted DAG %s in serialized_dag table", dag_id)
- def _run_parsing_loop(self):
- # In sync mode we want timeout=None -- wait forever until a message is received
- if self._async_mode:
- poll_time = 0.0
- else:
- poll_time = None
- self._refresh_dag_dir()
- self.prepare_file_path_queue()
- max_callbacks_per_loop = conf.getint("scheduler", "max_callbacks_per_loop")
- if self._async_mode:
- # If we're in async mode, we can start up straight away. If we're
- # in sync mode we need to be told to start a "loop"
- self.start_new_processes()
- while True:
- with Trace.start_span(span_name="dag_parsing_loop", component="DagFileProcessorManager") as span:
- loop_start_time = time.monotonic()
- ready = multiprocessing.connection.wait(self.waitables.keys(), timeout=poll_time)
- if span.is_recording():
- span.add_event(name="heartbeat")
- self.heartbeat()
- if self._direct_scheduler_conn is not None and self._direct_scheduler_conn in ready:
- agent_signal = self._direct_scheduler_conn.recv()
- self.log.debug("Received %s signal from DagFileProcessorAgent", agent_signal)
- if agent_signal == DagParsingSignal.TERMINATE_MANAGER:
- if span.is_recording():
- span.add_event(name="terminate")
- self.terminate()
- break
- elif agent_signal == DagParsingSignal.END_MANAGER:
- if span.is_recording():
- span.add_event(name="end")
- self.end()
- sys.exit(os.EX_OK)
- elif agent_signal == DagParsingSignal.AGENT_RUN_ONCE:
- # continue the loop to parse dags
- pass
- elif isinstance(agent_signal, CallbackRequest):
- self._add_callback_to_queue(agent_signal)
- else:
- raise ValueError(f"Invalid message {type(agent_signal)}")
- if not ready and not self._async_mode:
- # In "sync" mode we don't want to parse the DAGs until we
- # are told to (as that would open another connection to the
- # SQLite DB which isn't a good practice
- # This shouldn't happen, as in sync mode poll should block for
- # ever. Lets be defensive about that.
- self.log.warning(
- "wait() unexpectedly returned nothing ready after infinite timeout (%r)!", poll_time
- )
- continue
- for sentinel in ready:
- if sentinel is not self._direct_scheduler_conn:
- processor = self.waitables.get(sentinel)
- if processor:
- self._collect_results_from_processor(processor)
- self.waitables.pop(sentinel)
- self._processors.pop(processor.file_path)
- if self.standalone_dag_processor:
- for callback in DagFileProcessorManager._fetch_callbacks(
- max_callbacks_per_loop, self.standalone_dag_processor, self.get_dag_directory()
- ):
- self._add_callback_to_queue(callback)
- self._scan_stale_dags()
- DagWarning.purge_inactive_dag_warnings()
- refreshed_dag_dir = self._refresh_dag_dir()
- if span.is_recording():
- span.add_event(name="_kill_timed_out_processors")
- self._kill_timed_out_processors()
- # Generate more file paths to process if we processed all the files already. Note for this
- # to clear down, we must have cleared all files found from scanning the dags dir _and_ have
- # cleared all files added as a result of callbacks
- if not self._file_path_queue:
- self.emit_metrics()
- if span.is_recording():
- span.add_event(name="prepare_file_path_queue")
- self.prepare_file_path_queue()
- # if new files found in dag dir, add them
- elif refreshed_dag_dir:
- if span.is_recording():
- span.add_event(name="add_new_file_path_to_queue")
- self.add_new_file_path_to_queue()
- self._refresh_requested_filelocs()
- if span.is_recording():
- span.add_event(name="start_new_processes")
- self.start_new_processes()
- # Update number of loop iteration.
- self._num_run += 1
- if not self._async_mode:
- self.log.debug("Waiting for processors to finish since we're using sqlite")
- # Wait until the running DAG processors are finished before
- # sending a DagParsingStat message back. This means the Agent
- # can tell we've got to the end of this iteration when it sees
- # this type of message
- self.wait_until_finished()
- # Collect anything else that has finished, but don't kick off any more processors
- if span.is_recording():
- span.add_event(name="collect_results")
- self.collect_results()
- if span.is_recording():
- span.add_event(name="print_stat")
- self._print_stat()
- all_files_processed = all(self.get_last_finish_time(x) is not None for x in self.file_paths)
- max_runs_reached = self.max_runs_reached()
- try:
- if self._direct_scheduler_conn:
- self._direct_scheduler_conn.send(
- DagParsingStat(
- max_runs_reached,
- all_files_processed,
- )
- )
- except BlockingIOError:
- # Try again next time around the loop!
- # It is better to fail, than it is deadlock. This should
- # "almost never happen" since the DagParsingStat object is
- # small, and in async mode this stat is not actually _required_
- # for normal operation (It only drives "max runs")
- self.log.debug("BlockingIOError received trying to send DagParsingStat, ignoring")
- if max_runs_reached:
- self.log.info(
- "Exiting dag parsing loop as all files have been processed %s times", self._max_runs
- )
- if span.is_recording():
- span.add_event(
- name="info",
- attributes={
- "message": "Exiting dag parsing loop as all files have been processed {self._max_runs} times"
- },
- )
- break
- if self._async_mode:
- loop_duration = time.monotonic() - loop_start_time
- if loop_duration < 1:
- poll_time = 1 - loop_duration
- else:
- poll_time = 0.0
- @classmethod
- @internal_api_call
- @provide_session
- def _fetch_callbacks(
- cls,
- max_callbacks: int,
- standalone_dag_processor: bool,
- dag_directory: str,
- session: Session = NEW_SESSION,
- ) -> list[CallbackRequest]:
- return cls._fetch_callbacks_with_retries(
- max_callbacks, standalone_dag_processor, dag_directory, session
- )
- @classmethod
- @retry_db_transaction
- def _fetch_callbacks_with_retries(
- cls, max_callbacks: int, standalone_dag_processor: bool, dag_directory: str, session: Session
- ) -> list[CallbackRequest]:
- """Fetch callbacks from database and add them to the internal queue for execution."""
- cls.logger().debug("Fetching callbacks from the database.")
- callback_queue: list[CallbackRequest] = []
- with prohibit_commit(session) as guard:
- query = select(DbCallbackRequest)
- if standalone_dag_processor:
- query = query.where(
- DbCallbackRequest.processor_subdir == dag_directory,
- )
- query = query.order_by(DbCallbackRequest.priority_weight.asc()).limit(max_callbacks)
- query = with_row_locks(query, of=DbCallbackRequest, session=session, skip_locked=True)
- callbacks = session.scalars(query)
- for callback in callbacks:
- try:
- callback_queue.append(callback.get_callback_request())
- session.delete(callback)
- except Exception as e:
- cls.logger().warning("Error adding callback for execution: %s, %s", callback, e)
- guard.commit()
- return callback_queue
- def _add_callback_to_queue(self, request: CallbackRequest):
- # requests are sent by dag processors. SLAs exist per-dag, but can be generated once per SLA-enabled
- # task in the dag. If treated like other callbacks, SLAs can cause feedback where a SLA arrives,
- # goes to the front of the queue, gets processed, triggers more SLAs from the same DAG, which go to
- # the front of the queue, and we never get round to picking stuff off the back of the queue
- if isinstance(request, SlaCallbackRequest):
- if request in self._callback_to_execute[request.full_filepath]:
- self.log.debug("Skipping already queued SlaCallbackRequest")
- return
- # not already queued, queue the callback
- # do NOT add the file of this SLA to self._file_path_queue. SLAs can arrive so rapidly that
- # they keep adding to the file queue and never letting it drain. This in turn prevents us from
- # ever rescanning the dags folder for changes to existing dags. We simply store the callback, and
- # periodically, when self._file_path_queue is drained, we rescan and re-queue all DAG files.
- # The SLAs will be picked up then. It means a delay in reacting to the SLAs (as controlled by the
- # min_file_process_interval config) but stops SLAs from DoS'ing the queue.
- self.log.debug("Queuing SlaCallbackRequest for %s", request.dag_id)
- self._callback_to_execute[request.full_filepath].append(request)
- Stats.incr("dag_processing.sla_callback_count")
- # Other callbacks have a higher priority over DAG Run scheduling, so those callbacks gazump, even if
- # already in the file path queue
- else:
- self.log.debug("Queuing %s CallbackRequest: %s", type(request).__name__, request)
- self._callback_to_execute[request.full_filepath].append(request)
- if request.full_filepath in self._file_path_queue:
- # Remove file paths matching request.full_filepath from self._file_path_queue
- # Since we are already going to use that filepath to run callback,
- # there is no need to have same file path again in the queue
- self._file_path_queue = deque(
- file_path for file_path in self._file_path_queue if file_path != request.full_filepath
- )
- self._add_paths_to_queue([request.full_filepath], True)
- Stats.incr("dag_processing.other_callback_count")
- def _refresh_requested_filelocs(self) -> None:
- """Refresh filepaths from dag dir as requested by users via APIs."""
- # Get values from DB table
- filelocs = DagFileProcessorManager._get_priority_filelocs()
- for fileloc in filelocs:
- # Try removing the fileloc if already present
- try:
- self._file_path_queue.remove(fileloc)
- except ValueError:
- pass
- # enqueue fileloc to the start of the queue.
- self._file_path_queue.appendleft(fileloc)
- @classmethod
- @internal_api_call
- @provide_session
- def _get_priority_filelocs(cls, session: Session = NEW_SESSION):
- """Get filelocs from DB table."""
- filelocs: list[str] = []
- requests = session.scalars(select(DagPriorityParsingRequest))
- for request in requests:
- filelocs.append(request.fileloc)
- session.delete(request)
- return filelocs
- def _refresh_dag_dir(self) -> bool:
- """Refresh file paths from dag dir if we haven't done it for too long."""
- now = timezone.utcnow()
- elapsed_time_since_refresh = (now - self.last_dag_dir_refresh_time).total_seconds()
- if elapsed_time_since_refresh > self.dag_dir_list_interval:
- # Build up a list of Python files that could contain DAGs
- self.log.info("Searching for files in %s", self._dag_directory)
- self._file_paths = list_py_file_paths(self._dag_directory)
- self.last_dag_dir_refresh_time = now
- self.log.info("There are %s files in %s", len(self._file_paths), self._dag_directory)
- self.set_file_paths(self._file_paths)
- try:
- self.log.debug("Removing old import errors")
- DagFileProcessorManager.clear_nonexistent_import_errors(
- file_paths=self._file_paths, processor_subdir=self.get_dag_directory()
- )
- except Exception:
- self.log.exception("Error removing old import errors")
- def _iter_dag_filelocs(fileloc: str) -> Iterator[str]:
- """
- Get "full" paths to DAGs if inside ZIP files.
- This is the format used by the remove/delete functions.
- """
- if fileloc.endswith(".py") or not zipfile.is_zipfile(fileloc):
- yield fileloc
- return
- try:
- with zipfile.ZipFile(fileloc) as z:
- for info in z.infolist():
- if might_contain_dag(info.filename, True, z):
- yield os.path.join(fileloc, info.filename)
- except zipfile.BadZipFile:
- self.log.exception("There was an error accessing ZIP file %s %s", fileloc)
- dag_filelocs = {full_loc for path in self._file_paths for full_loc in _iter_dag_filelocs(path)}
- from airflow.models.dagcode import DagCode
- SerializedDagModel.remove_deleted_dags(
- alive_dag_filelocs=dag_filelocs,
- processor_subdir=self.get_dag_directory(),
- )
- DagModel.deactivate_deleted_dags(
- dag_filelocs,
- processor_subdir=self.get_dag_directory(),
- )
- DagCode.remove_deleted_code(
- dag_filelocs,
- processor_subdir=self.get_dag_directory(),
- )
- return True
- return False
- def _print_stat(self):
- """Occasionally print out stats about how fast the files are getting processed."""
- if 0 < self.print_stats_interval < time.monotonic() - self.last_stat_print_time:
- if self._file_paths:
- self._log_file_processing_stats(self._file_paths)
- self.last_stat_print_time = time.monotonic()
- @staticmethod
- @internal_api_call
- @provide_session
- def clear_nonexistent_import_errors(
- file_paths: list[str] | None, processor_subdir: str | None, session=NEW_SESSION
- ):
- """
- Clear import errors for files that no longer exist.
- :param file_paths: list of paths to DAG definition files
- :param session: session for ORM operations
- """
- query = delete(ParseImportError)
- if file_paths:
- query = query.where(
- ~ParseImportError.filename.in_(file_paths),
- ParseImportError.processor_subdir == processor_subdir,
- )
- session.execute(query.execution_options(synchronize_session="fetch"))
- session.commit()
- def _log_file_processing_stats(self, known_file_paths):
- """
- Print out stats about how files are getting processed.
- :param known_file_paths: a list of file paths that may contain Airflow
- DAG definitions
- :return: None
- """
- # File Path: Path to the file containing the DAG definition
- # PID: PID associated with the process that's processing the file. May
- # be empty.
- # Runtime: If the process is currently running, how long it's been
- # running for in seconds.
- # Last Runtime: If the process ran before, how long did it take to
- # finish in seconds
- # Last Run: When the file finished processing in the previous run.
- # Last # of DB Queries: The number of queries performed to the
- # Airflow database during last parsing of the file.
- headers = [
- "File Path",
- "PID",
- "Runtime",
- "# DAGs",
- "# Errors",
- "Last Runtime",
- "Last Run",
- "Last # of DB Queries",
- ]
- rows = []
- now = timezone.utcnow()
- for file_path in known_file_paths:
- last_runtime = self.get_last_runtime(file_path)
- num_dags = self.get_last_dag_count(file_path)
- num_errors = self.get_last_error_count(file_path)
- file_name = Path(file_path).stem
- processor_pid = self.get_pid(file_path)
- processor_start_time = self.get_start_time(file_path)
- runtime = (now - processor_start_time) if processor_start_time else None
- last_run = self.get_last_finish_time(file_path)
- if last_run:
- seconds_ago = (now - last_run).total_seconds()
- Stats.gauge(f"dag_processing.last_run.seconds_ago.{file_name}", seconds_ago)
- last_num_of_db_queries = self.get_last_num_of_db_queries(file_path)
- Stats.gauge(f"dag_processing.last_num_of_db_queries.{file_name}", last_num_of_db_queries)
- rows.append(
- (
- file_path,
- processor_pid,
- runtime,
- num_dags,
- num_errors,
- last_runtime,
- last_run,
- last_num_of_db_queries,
- )
- )
- # Sort by longest last runtime. (Can't sort None values in python3)
- rows.sort(key=lambda x: x[5] or 0.0, reverse=True)
- formatted_rows = []
- for (
- file_path,
- pid,
- runtime,
- num_dags,
- num_errors,
- last_runtime,
- last_run,
- last_num_of_db_queries,
- ) in rows:
- formatted_rows.append(
- (
- file_path,
- pid,
- f"{runtime.total_seconds():.2f}s" if runtime else None,
- num_dags,
- num_errors,
- f"{last_runtime:.2f}s" if last_runtime else None,
- last_run.strftime("%Y-%m-%dT%H:%M:%S") if last_run else None,
- last_num_of_db_queries,
- )
- )
- log_str = (
- "\n"
- + "=" * 80
- + "\n"
- + "DAG File Processing Stats\n\n"
- + tabulate(formatted_rows, headers=headers)
- + "\n"
- + "=" * 80
- )
- self.log.info(log_str)
- def get_pid(self, file_path) -> int | None:
- """
- Retrieve the PID of the process processing the given file or None if the file is not being processed.
- :param file_path: the path to the file that's being processed.
- """
- if file_path in self._processors:
- return self._processors[file_path].pid
- return None
- def get_all_pids(self) -> list[int]:
- """
- Get all pids.
- :return: a list of the PIDs for the processors that are running
- """
- return [x.pid for x in self._processors.values()]
- def get_last_runtime(self, file_path) -> float | None:
- """
- Retrieve the last processing time of a specific path.
- :param file_path: the path to the file that was processed
- :return: the runtime (in seconds) of the process of the last run, or
- None if the file was never processed.
- """
- stat = self._file_stats.get(file_path)
- return stat.last_duration.total_seconds() if stat and stat.last_duration else None
- def get_last_dag_count(self, file_path) -> int | None:
- """
- Retrieve the total DAG count at a specific path.
- :param file_path: the path to the file that was processed
- :return: the number of dags loaded from that file, or None if the file was never processed.
- """
- stat = self._file_stats.get(file_path)
- return stat.num_dags if stat else None
- def get_last_error_count(self, file_path) -> int | None:
- """
- Retrieve the total number of errors from processing a specific path.
- :param file_path: the path to the file that was processed
- :return: the number of import errors from processing, or None if the file was never processed.
- """
- stat = self._file_stats.get(file_path)
- return stat.import_errors if stat else None
- def get_last_num_of_db_queries(self, file_path) -> int | None:
- """
- Retrieve the number of queries performed to the Airflow database during last parsing of the file.
- :param file_path: the path to the file that was processed
- :return: the number of queries performed to the Airflow database during last parsing of the file,
- or None if the file was never processed.
- """
- stat = self._file_stats.get(file_path)
- return stat.last_num_of_db_queries if stat else None
- def get_last_finish_time(self, file_path) -> datetime | None:
- """
- Retrieve the last completion time for processing a specific path.
- :param file_path: the path to the file that was processed
- :return: the finish time of the process of the last run, or None if the file was never processed.
- """
- stat = self._file_stats.get(file_path)
- return stat.last_finish_time if stat else None
- def get_start_time(self, file_path) -> datetime | None:
- """
- Retrieve the last start time for processing a specific path.
- :param file_path: the path to the file that's being processed
- :return: the start time of the process that's processing the
- specified file or None if the file is not currently being processed.
- """
- if file_path in self._processors:
- return self._processors[file_path].start_time
- return None
- def get_run_count(self, file_path) -> int:
- """
- Return the number of times the given file has been parsed.
- :param file_path: the path to the file that's being processed.
- """
- stat = self._file_stats.get(file_path)
- return stat.run_count if stat else 0
- def get_dag_directory(self) -> str:
- """Return the dag_director as a string."""
- if isinstance(self._dag_directory, Path):
- return str(self._dag_directory.resolve())
- else:
- return str(self._dag_directory)
- def set_file_paths(self, new_file_paths):
- """
- Update this with a new set of paths to DAG definition files.
- :param new_file_paths: list of paths to DAG definition files
- :return: None
- """
- self._file_paths = new_file_paths
- # clean up the queues; remove anything queued which no longer in the list, including callbacks
- self._file_path_queue = deque(x for x in self._file_path_queue if x in new_file_paths)
- Stats.gauge("dag_processing.file_path_queue_size", len(self._file_path_queue))
- callback_paths_to_del = [x for x in self._callback_to_execute if x not in new_file_paths]
- for path_to_del in callback_paths_to_del:
- del self._callback_to_execute[path_to_del]
- # Stop processors that are working on deleted files
- filtered_processors = {}
- for file_path, processor in self._processors.items():
- if file_path in new_file_paths:
- filtered_processors[file_path] = processor
- else:
- self.log.warning("Stopping processor for %s", file_path)
- Stats.decr("dag_processing.processes", tags={"file_path": file_path, "action": "stop"})
- processor.terminate()
- self._file_stats.pop(file_path)
- to_remove = set(self._file_stats).difference(self._file_paths)
- for key in to_remove:
- # Remove the stats for any dag files that don't exist anymore
- del self._file_stats[key]
- self._processors = filtered_processors
- def wait_until_finished(self):
- """Sleeps until all the processors are done."""
- for processor in self._processors.values():
- while not processor.done:
- time.sleep(0.1)
- def _collect_results_from_processor(self, processor) -> None:
- self.log.debug("Processor for %s finished", processor.file_path)
- Stats.decr("dag_processing.processes", tags={"file_path": processor.file_path, "action": "finish"})
- last_finish_time = timezone.utcnow()
- if processor.result is not None:
- num_dags, count_import_errors, last_num_of_db_queries = processor.result
- else:
- self.log.error(
- "Processor for %s exited with return code %s.", processor.file_path, processor.exit_code
- )
- count_import_errors = -1
- num_dags = 0
- last_num_of_db_queries = 0
- last_duration = last_finish_time - processor.start_time
- stat = DagFileStat(
- num_dags=num_dags,
- import_errors=count_import_errors,
- last_finish_time=last_finish_time,
- last_duration=last_duration,
- run_count=self.get_run_count(processor.file_path) + 1,
- last_num_of_db_queries=last_num_of_db_queries,
- )
- self._file_stats[processor.file_path] = stat
- file_name = Path(processor.file_path).stem
- """crude exposure of instrumentation code which may need to be furnished"""
- span = Trace.get_tracer("DagFileProcessorManager").start_span(
- "dag_processing", start_time=datetime_to_nano(processor.start_time)
- )
- span.set_attribute("file_path", processor.file_path)
- span.set_attribute("run_count", self.get_run_count(processor.file_path) + 1)
- if processor.result is None:
- span.set_attribute("error", True)
- span.set_attribute("processor.exit_code", processor.exit_code)
- else:
- span.set_attribute("num_dags", num_dags)
- span.set_attribute("import_errors", count_import_errors)
- if count_import_errors > 0:
- span.set_attribute("error", True)
- span.end(end_time=datetime_to_nano(last_finish_time))
- Stats.timing(f"dag_processing.last_duration.{file_name}", last_duration)
- Stats.timing("dag_processing.last_duration", last_duration, tags={"file_name": file_name})
- def collect_results(self) -> None:
- """Collect the result from any finished DAG processors."""
- ready = multiprocessing.connection.wait(
- self.waitables.keys() - [self._direct_scheduler_conn], timeout=0
- )
- for sentinel in ready:
- if sentinel is not self._direct_scheduler_conn:
- processor = cast(DagFileProcessorProcess, self.waitables[sentinel])
- self.waitables.pop(processor.waitable_handle)
- self._processors.pop(processor.file_path)
- self._collect_results_from_processor(processor)
- self.log.debug("%s/%s DAG parsing processes running", len(self._processors), self._parallelism)
- self.log.debug("%s file paths queued for processing", len(self._file_path_queue))
- @staticmethod
- def _create_process(file_path, pickle_dags, dag_ids, dag_directory, callback_requests):
- """Create DagFileProcessorProcess instance."""
- return DagFileProcessorProcess(
- file_path=file_path,
- pickle_dags=pickle_dags,
- dag_ids=dag_ids,
- dag_directory=dag_directory,
- callback_requests=callback_requests,
- )
- @span
- def start_new_processes(self):
- """Start more processors if we have enough slots and files to process."""
- # initialize cache to mutualize calls to Variable.get in DAGs
- # needs to be done before this process is forked to create the DAG parsing processes.
- SecretCache.init()
- while self._parallelism > len(self._processors) and self._file_path_queue:
- file_path = self._file_path_queue.popleft()
- # Stop creating duplicate processor i.e. processor with the same filepath
- if file_path in self._processors:
- continue
- callback_to_execute_for_file = self._callback_to_execute[file_path]
- processor = self._create_process(
- file_path,
- self._pickle_dags,
- self._dag_ids,
- self.get_dag_directory(),
- callback_to_execute_for_file,
- )
- del self._callback_to_execute[file_path]
- Stats.incr("dag_processing.processes", tags={"file_path": file_path, "action": "start"})
- span = Trace.get_current_span()
- span.set_attribute("category", "processing")
- processor.start()
- self.log.debug("Started a process (PID: %s) to generate tasks for %s", processor.pid, file_path)
- if span.is_recording():
- span.add_event(
- name="dag_processing processor started",
- attributes={"file_path": file_path, "pid": processor.pid},
- )
- self._processors[file_path] = processor
- self.waitables[processor.waitable_handle] = processor
- Stats.gauge("dag_processing.file_path_queue_size", len(self._file_path_queue))
- @span
- def add_new_file_path_to_queue(self):
- for file_path in self.file_paths:
- if file_path not in self._file_stats:
- # We found new file after refreshing dir. add to parsing queue at start
- self.log.info("Adding new file %s to parsing queue", file_path)
- self._file_stats[file_path] = DagFileProcessorManager.DEFAULT_FILE_STAT
- self._file_path_queue.appendleft(file_path)
- span = Trace.get_current_span()
- if span.is_recording():
- span.add_event(
- name="adding new file to parsing queue", attributes={"file_path": file_path}
- )
- def prepare_file_path_queue(self):
- """
- Scan dags dir to generate more file paths to process.
- Note this method is only called when the file path queue is empty
- """
- self._parsing_start_time = time.perf_counter()
- # If the file path is already being processed, or if a file was
- # processed recently, wait until the next batch
- file_paths_in_progress = set(self._processors)
- now = timezone.utcnow()
- # Sort the file paths by the parsing order mode
- list_mode = conf.get("scheduler", "file_parsing_sort_mode")
- files_with_mtime = {}
- file_paths = []
- is_mtime_mode = list_mode == "modified_time"
- file_paths_recently_processed = []
- file_paths_to_stop_watching = set()
- for file_path in self._file_paths:
- if is_mtime_mode:
- try:
- files_with_mtime[file_path] = os.path.getmtime(file_path)
- except FileNotFoundError:
- self.log.warning("Skipping processing of missing file: %s", file_path)
- self._file_stats.pop(file_path, None)
- file_paths_to_stop_watching.add(file_path)
- continue
- file_modified_time = datetime.fromtimestamp(files_with_mtime[file_path], tz=timezone.utc)
- else:
- file_paths.append(file_path)
- file_modified_time = None
- # Find file paths that were recently processed to exclude them
- # from being added to file_path_queue
- # unless they were modified recently and parsing mode is "modified_time"
- # in which case we don't honor "self._file_process_interval" (min_file_process_interval)
- last_finish_time = self.get_last_finish_time(file_path)
- if (
- last_finish_time is not None
- and (now - last_finish_time).total_seconds() < self._file_process_interval
- and not (is_mtime_mode and file_modified_time and (file_modified_time > last_finish_time))
- ):
- file_paths_recently_processed.append(file_path)
- # Sort file paths via last modified time
- if is_mtime_mode:
- file_paths = sorted(files_with_mtime, key=files_with_mtime.get, reverse=True)
- elif list_mode == "alphabetical":
- file_paths.sort()
- elif list_mode == "random_seeded_by_host":
- # Shuffle the list seeded by hostname so multiple schedulers can work on different
- # set of files. Since we set the seed, the sort order will remain same per host
- random.Random(get_hostname()).shuffle(file_paths)
- if file_paths_to_stop_watching:
- self.set_file_paths(
- [path for path in self._file_paths if path not in file_paths_to_stop_watching]
- )
- files_paths_at_run_limit = [
- file_path for file_path, stat in self._file_stats.items() if stat.run_count == self._max_runs
- ]
- file_paths_to_exclude = file_paths_in_progress.union(
- file_paths_recently_processed,
- files_paths_at_run_limit,
- )
- # Do not convert the following list to set as set does not preserve the order
- # and we need to maintain the order of file_paths for `[scheduler] file_parsing_sort_mode`
- files_paths_to_queue = [
- file_path for file_path in file_paths if file_path not in file_paths_to_exclude
- ]
- if self.log.isEnabledFor(logging.DEBUG):
- for processor in self._processors.values():
- self.log.debug(
- "File path %s is still being processed (started: %s)",
- processor.file_path,
- processor.start_time.isoformat(),
- )
- self.log.debug(
- "Queuing the following files for processing:\n\t%s", "\n\t".join(files_paths_to_queue)
- )
- for file_path in files_paths_to_queue:
- self._file_stats.setdefault(file_path, DagFileProcessorManager.DEFAULT_FILE_STAT)
- self._add_paths_to_queue(files_paths_to_queue, False)
- Stats.incr("dag_processing.file_path_queue_update_count")
- def _kill_timed_out_processors(self):
- """Kill any file processors that timeout to defend against process hangs."""
- now = timezone.utcnow()
- processors_to_remove = []
- for file_path, processor in self._processors.items():
- duration = now - processor.start_time
- if duration > self._processor_timeout:
- self.log.error(
- "Processor for %s with PID %s started at %s has timed out, killing it.",
- file_path,
- processor.pid,
- processor.start_time.isoformat(),
- )
- Stats.decr("dag_processing.processes", tags={"file_path": file_path, "action": "timeout"})
- Stats.incr("dag_processing.processor_timeouts", tags={"file_path": file_path})
- # Deprecated; may be removed in a future Airflow release.
- Stats.incr("dag_file_processor_timeouts")
- processor.kill()
- span = Trace.get_current_span()
- span.set_attribute("category", "processing")
- if span.is_recording():
- span.add_event(
- name="dag processing killed processor",
- attributes={"file_path": file_path, "action": "timeout"},
- )
- # Clean up processor references
- self.waitables.pop(processor.waitable_handle)
- processors_to_remove.append(file_path)
- stat = DagFileStat(
- num_dags=0,
- import_errors=1,
- last_finish_time=now,
- last_duration=duration,
- run_count=self.get_run_count(file_path) + 1,
- last_num_of_db_queries=0,
- )
- self._file_stats[processor.file_path] = stat
- # Clean up `self._processors` after iterating over it
- for proc in processors_to_remove:
- self._processors.pop(proc)
- def _add_paths_to_queue(self, file_paths_to_enqueue: list[str], add_at_front: bool):
- """Add stuff to the back or front of the file queue, unless it's already present."""
- new_file_paths = list(p for p in file_paths_to_enqueue if p not in self._file_path_queue)
- if add_at_front:
- self._file_path_queue.extendleft(new_file_paths)
- else:
- self._file_path_queue.extend(new_file_paths)
- Stats.gauge("dag_processing.file_path_queue_size", len(self._file_path_queue))
- def max_runs_reached(self):
- """:return: whether all file paths have been processed max_runs times."""
- if self._max_runs == -1: # Unlimited runs.
- return False
- for stat in self._file_stats.values():
- if stat.run_count < self._max_runs:
- return False
- if self._num_run < self._max_runs:
- return False
- return True
- def terminate(self):
- """Stop all running processors."""
- for processor in self._processors.values():
- Stats.decr(
- "dag_processing.processes", tags={"file_path": processor.file_path, "action": "terminate"}
- )
- processor.terminate()
- def end(self):
- """Kill all child processes on exit since we don't want to leave them as orphaned."""
- pids_to_kill = self.get_all_pids()
- if pids_to_kill:
- kill_child_processes_by_pids(pids_to_kill)
- def emit_metrics(self):
- """
- Emit metrics about dag parsing summary.
- This is called once every time around the parsing "loop" - i.e. after
- all files have been parsed.
- """
- with Trace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span:
- parse_time = time.perf_counter() - self._parsing_start_time
- Stats.gauge("dag_processing.total_parse_time", parse_time)
- Stats.gauge("dagbag_size", sum(stat.num_dags for stat in self._file_stats.values()))
- Stats.gauge(
- "dag_processing.import_errors", sum(stat.import_errors for stat in self._file_stats.values())
- )
- span.set_attribute("total_parse_time", parse_time)
- span.set_attribute("dag_bag_size", sum(stat.num_dags for stat in self._file_stats.values()))
- span.set_attribute("import_errors", sum(stat.import_errors for stat in self._file_stats.values()))
- @property
- def file_paths(self):
- return self._file_paths
- def reload_configuration_for_dag_processing():
- # Reload configurations and settings to avoid collision with parent process.
- # Because this process may need custom configurations that cannot be shared,
- # e.g. RotatingFileHandler. And it can cause connection corruption if we
- # do not recreate the SQLA connection pool.
- os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"] = "True"
- os.environ["AIRFLOW__LOGGING__COLORED_CONSOLE_LOG"] = "False"
- # Replicating the behavior of how logging module was loaded
- # in logging_config.py
- # TODO: This reloading should be removed when we fix our logging behaviour
- # In case of "spawn" method of starting processes for multiprocessing, reinitializing of the
- # SQLAlchemy engine causes extremely unexpected behaviour of messing with objects already loaded
- # in a parent process (likely via resources shared in memory by the ORM libraries).
- # This caused flaky tests in our CI for many months and has been discovered while
- # iterating on https://github.com/apache/airflow/pull/19860
- # The issue that describes the problem and possible remediation is
- # at https://github.com/apache/airflow/issues/19934
- importlib.reload(import_module(airflow.settings.LOGGING_CLASS_PATH.rsplit(".", 1)[0])) # type: ignore
- importlib.reload(airflow.settings)
- airflow.settings.initialize()
- del os.environ["CONFIG_PROCESSOR_MANAGER_LOGGER"]
|