processor.py 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import importlib
  19. import logging
  20. import os
  21. import signal
  22. import threading
  23. import time
  24. import zipfile
  25. from contextlib import contextmanager, redirect_stderr, redirect_stdout, suppress
  26. from dataclasses import dataclass
  27. from datetime import timedelta
  28. from typing import TYPE_CHECKING, Generator, Iterable, Iterator
  29. from setproctitle import setproctitle
  30. from sqlalchemy import delete, event, func, or_, select
  31. from airflow import settings
  32. from airflow.api_internal.internal_api_call import InternalApiConfig, internal_api_call
  33. from airflow.callbacks.callback_requests import (
  34. DagCallbackRequest,
  35. SlaCallbackRequest,
  36. TaskCallbackRequest,
  37. )
  38. from airflow.configuration import conf
  39. from airflow.exceptions import AirflowException, TaskNotFound
  40. from airflow.listeners.listener import get_listener_manager
  41. from airflow.models import SlaMiss
  42. from airflow.models.dag import DAG, DagModel
  43. from airflow.models.dagbag import DagBag
  44. from airflow.models.dagrun import DagRun as DR
  45. from airflow.models.dagwarning import DagWarning, DagWarningType
  46. from airflow.models.errors import ParseImportError
  47. from airflow.models.serialized_dag import SerializedDagModel
  48. from airflow.models.taskinstance import TaskInstance, TaskInstance as TI, _run_finished_callback
  49. from airflow.stats import Stats
  50. from airflow.utils import timezone
  51. from airflow.utils.email import get_email_address_list, send_email
  52. from airflow.utils.file import iter_airflow_imports, might_contain_dag
  53. from airflow.utils.log.logging_mixin import LoggingMixin, StreamLogWriter, set_context
  54. from airflow.utils.mixins import MultiprocessingStartMethodMixin
  55. from airflow.utils.session import NEW_SESSION, provide_session
  56. from airflow.utils.state import TaskInstanceState
  57. if TYPE_CHECKING:
  58. import multiprocessing
  59. from datetime import datetime
  60. from multiprocessing.connection import Connection as MultiprocessingConnection
  61. from sqlalchemy.orm.session import Session
  62. from airflow.callbacks.callback_requests import CallbackRequest
  63. from airflow.models.operator import Operator
  64. @dataclass
  65. class _QueryCounter:
  66. queries_number: int = 0
  67. def inc(self):
  68. self.queries_number += 1
  69. @contextmanager
  70. def count_queries(session: Session) -> Generator[_QueryCounter, None, None]:
  71. # using list allows to read the updated counter from what context manager returns
  72. counter: _QueryCounter = _QueryCounter()
  73. @event.listens_for(session, "do_orm_execute")
  74. def _count_db_queries(orm_execute_state):
  75. nonlocal counter
  76. counter.inc()
  77. yield counter
  78. event.remove(session, "do_orm_execute", _count_db_queries)
  79. class DagFileProcessorProcess(LoggingMixin, MultiprocessingStartMethodMixin):
  80. """
  81. Runs DAG processing in a separate process using DagFileProcessor.
  82. :param file_path: a Python file containing Airflow DAG definitions
  83. :param pickle_dags: whether to serialize the DAG objects to the DB
  84. :param dag_ids: If specified, only look at these DAG ID's
  85. :param callback_requests: failure callback to execute
  86. """
  87. # Counter that increments every time an instance of this class is created
  88. class_creation_counter = 0
  89. def __init__(
  90. self,
  91. file_path: str,
  92. pickle_dags: bool,
  93. dag_ids: list[str] | None,
  94. dag_directory: str,
  95. callback_requests: list[CallbackRequest],
  96. ):
  97. super().__init__()
  98. self._file_path = file_path
  99. self._pickle_dags = pickle_dags
  100. self._dag_ids = dag_ids
  101. self._dag_directory = dag_directory
  102. self._callback_requests = callback_requests
  103. # The process that was launched to process the given .
  104. self._process: multiprocessing.process.BaseProcess | None = None
  105. # The result of DagFileProcessor.process_file(file_path).
  106. self._result: tuple[int, int, int] | None = None
  107. # Whether the process is done running.
  108. self._done = False
  109. # When the process started.
  110. self._start_time: datetime | None = None
  111. # This ID is use to uniquely name the process / thread that's launched
  112. # by this processor instance
  113. self._instance_id = DagFileProcessorProcess.class_creation_counter
  114. self._parent_channel: MultiprocessingConnection | None = None
  115. DagFileProcessorProcess.class_creation_counter += 1
  116. @property
  117. def file_path(self) -> str:
  118. return self._file_path
  119. @staticmethod
  120. def _run_file_processor(
  121. result_channel: MultiprocessingConnection,
  122. parent_channel: MultiprocessingConnection,
  123. file_path: str,
  124. pickle_dags: bool,
  125. dag_ids: list[str] | None,
  126. thread_name: str,
  127. dag_directory: str,
  128. callback_requests: list[CallbackRequest],
  129. ) -> None:
  130. """
  131. Process the given file.
  132. :param result_channel: the connection to use for passing back the result
  133. :param parent_channel: the parent end of the channel to close in the child
  134. :param file_path: the file to process
  135. :param pickle_dags: whether to pickle the DAGs found in the file and
  136. save them to the DB
  137. :param dag_ids: if specified, only examine DAG ID's that are
  138. in this list
  139. :param thread_name: the name to use for the process that is launched
  140. :param callback_requests: failure callback to execute
  141. :return: the process that was launched
  142. """
  143. # This helper runs in the newly created process
  144. log: logging.Logger = logging.getLogger("airflow.processor")
  145. # Since we share all open FDs from the parent, we need to close the parent side of the pipe here in
  146. # the child, else it won't get closed properly until we exit.
  147. parent_channel.close()
  148. del parent_channel
  149. set_context(log, file_path)
  150. setproctitle(f"airflow scheduler - DagFileProcessor {file_path}")
  151. def _handle_dag_file_processing():
  152. # Re-configure the ORM engine as there are issues with multiple processes
  153. settings.configure_orm()
  154. # Change the thread name to differentiate log lines. This is
  155. # really a separate process, but changing the name of the
  156. # process doesn't work, so changing the thread name instead.
  157. threading.current_thread().name = thread_name
  158. log.info("Started process (PID=%s) to work on %s", os.getpid(), file_path)
  159. dag_file_processor = DagFileProcessor(dag_ids=dag_ids, dag_directory=dag_directory, log=log)
  160. result: tuple[int, int, int] = dag_file_processor.process_file(
  161. file_path=file_path,
  162. pickle_dags=pickle_dags,
  163. callback_requests=callback_requests,
  164. )
  165. result_channel.send(result)
  166. try:
  167. DAG_PROCESSOR_LOG_TARGET = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET")
  168. if DAG_PROCESSOR_LOG_TARGET == "stdout":
  169. with Stats.timer() as timer:
  170. _handle_dag_file_processing()
  171. else:
  172. # The following line ensures that stdout goes to the same destination as the logs. If stdout
  173. # gets sent to logs and logs are sent to stdout, this leads to an infinite loop. This
  174. # necessitates this conditional based on the value of DAG_PROCESSOR_LOG_TARGET.
  175. with redirect_stdout(StreamLogWriter(log, logging.INFO)), redirect_stderr(
  176. StreamLogWriter(log, logging.WARNING)
  177. ), Stats.timer() as timer:
  178. _handle_dag_file_processing()
  179. log.info("Processing %s took %.3f seconds", file_path, timer.duration)
  180. except Exception:
  181. # Log exceptions through the logging framework.
  182. log.exception("Got an exception! Propagating...")
  183. raise
  184. finally:
  185. # We re-initialized the ORM within this Process above so we need to
  186. # tear it down manually here
  187. settings.dispose_orm()
  188. result_channel.close()
  189. def start(self) -> None:
  190. """Launch the process and start processing the DAG."""
  191. if conf.getboolean("scheduler", "parsing_pre_import_modules", fallback=True):
  192. # Read the file to pre-import airflow modules used.
  193. # This prevents them from being re-imported from zero in each "processing" process
  194. # and saves CPU time and memory.
  195. zip_file_paths = []
  196. if zipfile.is_zipfile(self.file_path):
  197. try:
  198. with zipfile.ZipFile(self.file_path) as z:
  199. zip_file_paths.extend(
  200. [
  201. os.path.join(self.file_path, info.filename)
  202. for info in z.infolist()
  203. if might_contain_dag(info.filename, True, z)
  204. ]
  205. )
  206. except zipfile.BadZipFile as err:
  207. self.log.error("There was an err accessing %s, %s", self.file_path, err)
  208. if zip_file_paths:
  209. self.import_modules(zip_file_paths)
  210. else:
  211. self.import_modules(self.file_path)
  212. context = self._get_multiprocessing_context()
  213. _parent_channel, _child_channel = context.Pipe(duplex=False)
  214. process = context.Process(
  215. target=type(self)._run_file_processor,
  216. args=(
  217. _child_channel,
  218. _parent_channel,
  219. self.file_path,
  220. self._pickle_dags,
  221. self._dag_ids,
  222. f"DagFileProcessor{self._instance_id}",
  223. self._dag_directory,
  224. self._callback_requests,
  225. ),
  226. name=f"DagFileProcessor{self._instance_id}-Process",
  227. )
  228. self._process = process
  229. self._start_time = timezone.utcnow()
  230. process.start()
  231. # Close the child side of the pipe now the subprocess has started -- otherwise this would prevent it
  232. # from closing in some cases
  233. _child_channel.close()
  234. del _child_channel
  235. # Don't store it on self until after we've started the child process - we don't want to keep it from
  236. # getting GCd/closed
  237. self._parent_channel = _parent_channel
  238. def kill(self) -> None:
  239. """Kill the process launched to process the file, and ensure consistent state."""
  240. if self._process is None:
  241. raise AirflowException("Tried to kill before starting!")
  242. self._kill_process()
  243. def terminate(self, sigkill: bool = False) -> None:
  244. """
  245. Terminate (and then kill) the process launched to process the file.
  246. :param sigkill: whether to issue a SIGKILL if SIGTERM doesn't work.
  247. """
  248. if self._process is None or self._parent_channel is None:
  249. raise AirflowException("Tried to call terminate before starting!")
  250. self._process.terminate()
  251. # Arbitrarily wait 5s for the process to die
  252. with suppress(TimeoutError):
  253. self._process._popen.wait(5) # type: ignore
  254. if sigkill:
  255. self._kill_process()
  256. self._parent_channel.close()
  257. def _kill_process(self) -> None:
  258. if self._process is None:
  259. raise AirflowException("Tried to kill process before starting!")
  260. if self._process.is_alive() and self._process.pid:
  261. self.log.warning("Killing DAGFileProcessorProcess (PID=%d)", self._process.pid)
  262. os.kill(self._process.pid, signal.SIGKILL)
  263. # Reap the spawned zombie. We active wait, because in Python 3.9 `waitpid` might lead to an
  264. # exception, due to change in Python standard library and possibility of race condition
  265. # see https://bugs.python.org/issue42558
  266. while self._process._popen.poll() is None: # type: ignore
  267. time.sleep(0.001)
  268. if self._parent_channel:
  269. self._parent_channel.close()
  270. @property
  271. def pid(self) -> int:
  272. """PID of the process launched to process the given file."""
  273. if self._process is None or self._process.pid is None:
  274. raise AirflowException("Tried to get PID before starting!")
  275. return self._process.pid
  276. @property
  277. def exit_code(self) -> int | None:
  278. """
  279. After the process is finished, this can be called to get the return code.
  280. :return: the exit code of the process
  281. """
  282. if self._process is None:
  283. raise AirflowException("Tried to get exit code before starting!")
  284. if not self._done:
  285. raise AirflowException("Tried to call retcode before process was finished!")
  286. return self._process.exitcode
  287. @property
  288. def done(self) -> bool:
  289. """
  290. Check if the process launched to process this file is done.
  291. :return: whether the process is finished running
  292. """
  293. if self._process is None or self._parent_channel is None:
  294. raise AirflowException("Tried to see if it's done before starting!")
  295. if self._done:
  296. return True
  297. if self._parent_channel.poll():
  298. try:
  299. self._result = self._parent_channel.recv()
  300. self._done = True
  301. self.log.debug("Waiting for %s", self._process)
  302. self._process.join()
  303. self._parent_channel.close()
  304. return True
  305. except EOFError:
  306. # If we get an EOFError, it means the child end of the pipe has been closed. This only happens
  307. # in the finally block. But due to a possible race condition, the process may have not yet
  308. # terminated (it could be doing cleanup/python shutdown still). So we kill it here after a
  309. # "suitable" timeout.
  310. self._done = True
  311. # Arbitrary timeout -- error/race condition only, so this doesn't need to be tunable.
  312. self._process.join(timeout=5)
  313. if self._process.is_alive():
  314. # Didn't shut down cleanly - kill it
  315. self._kill_process()
  316. if not self._process.is_alive():
  317. self._done = True
  318. self.log.debug("Waiting for %s", self._process)
  319. self._process.join()
  320. self._parent_channel.close()
  321. return True
  322. return False
  323. @property
  324. def result(self) -> tuple[int, int, int] | None:
  325. """Result of running ``DagFileProcessor.process_file()``."""
  326. if not self.done:
  327. raise AirflowException("Tried to get the result before it's done!")
  328. return self._result
  329. @property
  330. def start_time(self) -> datetime:
  331. """Time when this started to process the file."""
  332. if self._start_time is None:
  333. raise AirflowException("Tried to get start time before it started!")
  334. return self._start_time
  335. @property
  336. def waitable_handle(self):
  337. return self._process.sentinel
  338. def import_modules(self, file_path: str | Iterable[str]):
  339. def _import_modules(filepath):
  340. for module in iter_airflow_imports(filepath):
  341. try:
  342. importlib.import_module(module)
  343. except Exception as e:
  344. # only log as warning because an error here is not preventing anything from working, and
  345. # if it's serious, it's going to be surfaced to the user when the dag is actually parsed.
  346. self.log.warning(
  347. "Error when trying to pre-import module '%s' found in %s: %s",
  348. module,
  349. file_path,
  350. e,
  351. )
  352. if isinstance(file_path, str):
  353. _import_modules(file_path)
  354. elif isinstance(file_path, Iterable):
  355. for path in file_path:
  356. _import_modules(path)
  357. class DagFileProcessor(LoggingMixin):
  358. """
  359. Process a Python file containing Airflow DAGs.
  360. This includes:
  361. 1. Execute the file and look for DAG objects in the namespace.
  362. 2. Execute any Callbacks if passed to DagFileProcessor.process_file
  363. 3. Serialize the DAGs and save it to DB (or update existing record in the DB).
  364. 4. Pickle the DAG and save it to the DB (if necessary).
  365. 5. Record any errors importing the file into ORM
  366. Returns a tuple of 'number of dags found' and 'the count of import errors'
  367. :param dag_ids: If specified, only look at these DAG ID's
  368. :param log: Logger to save the processing process
  369. """
  370. UNIT_TEST_MODE: bool = conf.getboolean("core", "UNIT_TEST_MODE")
  371. def __init__(self, dag_ids: list[str] | None, dag_directory: str, log: logging.Logger):
  372. super().__init__()
  373. self.dag_ids = dag_ids
  374. self._log = log
  375. self._dag_directory = dag_directory
  376. self.dag_warnings: set[tuple[str, str]] = set()
  377. self._last_num_of_db_queries = 0
  378. @classmethod
  379. @internal_api_call
  380. @provide_session
  381. def manage_slas(cls, dag_folder, dag_id: str, session: Session = NEW_SESSION) -> None:
  382. """
  383. Find all tasks that have SLAs defined, and send alert emails when needed.
  384. New SLA misses are also recorded in the database.
  385. We are assuming that the scheduler runs often, so we only check for
  386. tasks that should have succeeded in the past hour.
  387. """
  388. dagbag = DagFileProcessor._get_dagbag(dag_folder)
  389. dag = dagbag.get_dag(dag_id)
  390. cls.logger().info("Running SLA Checks for %s", dag.dag_id)
  391. if not any(isinstance(ti.sla, timedelta) for ti in dag.tasks):
  392. cls.logger().info("Skipping SLA check for %s because no tasks in DAG have SLAs", dag)
  393. return
  394. qry = (
  395. select(TI.task_id, func.max(DR.execution_date).label("max_ti"))
  396. .join(TI.dag_run)
  397. .where(TI.dag_id == dag.dag_id)
  398. .where(or_(TI.state == TaskInstanceState.SUCCESS, TI.state == TaskInstanceState.SKIPPED))
  399. .where(TI.task_id.in_(dag.task_ids))
  400. .group_by(TI.task_id)
  401. .subquery("sq")
  402. )
  403. # get recorded SlaMiss
  404. recorded_slas_query = set(
  405. session.execute(
  406. select(SlaMiss.dag_id, SlaMiss.task_id, SlaMiss.execution_date).where(
  407. SlaMiss.dag_id == dag.dag_id, SlaMiss.task_id.in_(dag.task_ids)
  408. )
  409. )
  410. )
  411. max_tis: Iterator[TI] = session.scalars(
  412. select(TI)
  413. .join(TI.dag_run)
  414. .where(TI.dag_id == dag.dag_id, TI.task_id == qry.c.task_id, DR.execution_date == qry.c.max_ti)
  415. )
  416. ts = timezone.utcnow()
  417. for ti in max_tis:
  418. task = dag.get_task(ti.task_id)
  419. if not task.sla:
  420. continue
  421. if not isinstance(task.sla, timedelta):
  422. raise TypeError(
  423. f"SLA is expected to be timedelta object, got "
  424. f"{type(task.sla)} in {task.dag_id}:{task.task_id}"
  425. )
  426. sla_misses = []
  427. next_info = dag.next_dagrun_info(dag.get_run_data_interval(ti.dag_run), restricted=False)
  428. while next_info and next_info.logical_date < ts:
  429. next_info = dag.next_dagrun_info(next_info.data_interval, restricted=False)
  430. if next_info is None:
  431. break
  432. if (ti.dag_id, ti.task_id, next_info.logical_date) in recorded_slas_query:
  433. continue
  434. if next_info.logical_date + task.sla < ts:
  435. sla_miss = SlaMiss(
  436. task_id=ti.task_id,
  437. dag_id=ti.dag_id,
  438. execution_date=next_info.logical_date,
  439. timestamp=ts,
  440. )
  441. sla_misses.append(sla_miss)
  442. Stats.incr("sla_missed", tags={"dag_id": ti.dag_id, "task_id": ti.task_id})
  443. if sla_misses:
  444. session.add_all(sla_misses)
  445. session.commit()
  446. slas: list[SlaMiss] = session.scalars(
  447. select(SlaMiss).where(~SlaMiss.notification_sent, SlaMiss.dag_id == dag.dag_id)
  448. ).all()
  449. if slas:
  450. sla_dates: list[datetime] = [sla.execution_date for sla in slas]
  451. fetched_tis: list[TI] = session.scalars(
  452. select(TI).where(
  453. TI.dag_id == dag.dag_id,
  454. TI.execution_date.in_(sla_dates),
  455. TI.state != TaskInstanceState.SUCCESS,
  456. )
  457. ).all()
  458. blocking_tis: list[TI] = []
  459. for ti in fetched_tis:
  460. if ti.task_id in dag.task_ids:
  461. ti.task = dag.get_task(ti.task_id)
  462. blocking_tis.append(ti)
  463. else:
  464. session.delete(ti)
  465. session.commit()
  466. task_list = "\n".join(sla.task_id + " on " + sla.execution_date.isoformat() for sla in slas)
  467. blocking_task_list = "\n".join(
  468. ti.task_id + " on " + ti.execution_date.isoformat() for ti in blocking_tis
  469. )
  470. # Track whether email or any alert notification sent
  471. # We consider email or the alert callback as notifications
  472. email_sent = False
  473. notification_sent = False
  474. if dag.sla_miss_callback:
  475. # Execute the alert callback
  476. callbacks = (
  477. dag.sla_miss_callback
  478. if isinstance(dag.sla_miss_callback, list)
  479. else [dag.sla_miss_callback]
  480. )
  481. for callback in callbacks:
  482. cls.logger().info("Calling SLA miss callback %s", callback)
  483. try:
  484. callback(dag, task_list, blocking_task_list, slas, blocking_tis)
  485. notification_sent = True
  486. except Exception:
  487. Stats.incr(
  488. "sla_callback_notification_failure",
  489. tags={
  490. "dag_id": dag.dag_id,
  491. "func_name": callback.__name__,
  492. },
  493. )
  494. cls.logger().exception(
  495. "Could not call sla_miss_callback(%s) for DAG %s",
  496. callback.__name__,
  497. dag.dag_id,
  498. )
  499. email_content = f"""\
  500. Here's a list of tasks that missed their SLAs:
  501. <pre><code>{task_list}\n<code></pre>
  502. Blocking tasks:
  503. <pre><code>{blocking_task_list}<code></pre>
  504. Airflow Webserver URL: {conf.get(section='webserver', key='base_url')}
  505. """
  506. tasks_missed_sla = []
  507. for sla in slas:
  508. try:
  509. task = dag.get_task(sla.task_id)
  510. except TaskNotFound:
  511. # task already deleted from DAG, skip it
  512. cls.logger().warning(
  513. "Task %s doesn't exist in DAG anymore, skipping SLA miss notification.", sla.task_id
  514. )
  515. else:
  516. tasks_missed_sla.append(task)
  517. emails: set[str] = set()
  518. for task in tasks_missed_sla:
  519. if task.email:
  520. if isinstance(task.email, str):
  521. emails.update(get_email_address_list(task.email))
  522. elif isinstance(task.email, (list, tuple)):
  523. emails.update(task.email)
  524. if emails:
  525. try:
  526. send_email(emails, f"[airflow] SLA miss on DAG={dag.dag_id}", email_content)
  527. email_sent = True
  528. notification_sent = True
  529. except Exception:
  530. Stats.incr("sla_email_notification_failure", tags={"dag_id": dag.dag_id})
  531. cls.logger().exception(
  532. "Could not send SLA Miss email notification for DAG %s", dag.dag_id
  533. )
  534. # If we sent any notification, update the sla_miss table
  535. if notification_sent:
  536. for sla in slas:
  537. sla.email_sent = email_sent
  538. sla.notification_sent = True
  539. session.merge(sla)
  540. session.commit()
  541. @staticmethod
  542. @internal_api_call
  543. @provide_session
  544. def update_import_errors(
  545. file_last_changed: dict[str, datetime],
  546. import_errors: dict[str, str],
  547. processor_subdir: str | None,
  548. session: Session = NEW_SESSION,
  549. ) -> None:
  550. """
  551. Update any import errors to be displayed in the UI.
  552. For the DAGs in the given DagBag, record any associated import errors and clears
  553. errors for files that no longer have them. These are usually displayed through the
  554. Airflow UI so that users know that there are issues parsing DAGs.
  555. :param file_last_changed: Dictionary containing the last changed time of the files
  556. :param import_errors: Dictionary containing the import errors
  557. :param session: session for ORM operations
  558. """
  559. files_without_error = file_last_changed - import_errors.keys()
  560. # Clear the errors of the processed files
  561. # that no longer have errors
  562. for dagbag_file in files_without_error:
  563. session.execute(
  564. delete(ParseImportError)
  565. .where(ParseImportError.filename.startswith(dagbag_file))
  566. .execution_options(synchronize_session="fetch")
  567. )
  568. # files that still have errors
  569. existing_import_error_files = [x.filename for x in session.query(ParseImportError.filename).all()]
  570. # Add the errors of the processed files
  571. for filename, stacktrace in import_errors.items():
  572. if filename in existing_import_error_files:
  573. session.query(ParseImportError).filter(ParseImportError.filename == filename).update(
  574. {"filename": filename, "timestamp": timezone.utcnow(), "stacktrace": stacktrace},
  575. synchronize_session="fetch",
  576. )
  577. # sending notification when an existing dag import error occurs
  578. get_listener_manager().hook.on_existing_dag_import_error(
  579. filename=filename, stacktrace=stacktrace
  580. )
  581. else:
  582. session.add(
  583. ParseImportError(
  584. filename=filename,
  585. timestamp=timezone.utcnow(),
  586. stacktrace=stacktrace,
  587. processor_subdir=processor_subdir,
  588. )
  589. )
  590. # sending notification when a new dag import error occurs
  591. get_listener_manager().hook.on_new_dag_import_error(filename=filename, stacktrace=stacktrace)
  592. (
  593. session.query(DagModel)
  594. .filter(DagModel.fileloc == filename)
  595. .update({"has_import_errors": True}, synchronize_session="fetch")
  596. )
  597. session.commit()
  598. session.flush()
  599. @classmethod
  600. def update_dag_warnings(cla, *, dagbag: DagBag) -> None:
  601. """Validate and raise exception if any task in a dag is using a non-existent pool."""
  602. def get_pools(dag) -> dict[str, set[str]]:
  603. return {dag.dag_id: {task.pool for task in dag.tasks}}
  604. pool_dict: dict[str, set[str]] = {}
  605. for dag in dagbag.dags.values():
  606. pool_dict.update(get_pools(dag))
  607. for subdag in dag.subdags:
  608. pool_dict.update(get_pools(subdag))
  609. dag_ids = {dag.dag_id for dag in dagbag.dags.values()}
  610. return DagFileProcessor._validate_task_pools_and_update_dag_warnings(pool_dict, dag_ids)
  611. @classmethod
  612. @internal_api_call
  613. @provide_session
  614. def _validate_task_pools_and_update_dag_warnings(
  615. cls, pool_dict: dict[str, set[str]], dag_ids: set[str], session: Session = NEW_SESSION
  616. ) -> None:
  617. from airflow.models.pool import Pool
  618. all_pools = {p.pool for p in Pool.get_pools(session)}
  619. warnings: set[DagWarning] = set()
  620. for dag_id, dag_pools in pool_dict.items():
  621. nonexistent_pools = dag_pools - all_pools
  622. if nonexistent_pools:
  623. warnings.add(
  624. DagWarning(
  625. dag_id,
  626. DagWarningType.NONEXISTENT_POOL,
  627. f"Dag '{dag_id}' references non-existent pools: {sorted(nonexistent_pools)!r}",
  628. )
  629. )
  630. stored_warnings = set(session.query(DagWarning).filter(DagWarning.dag_id.in_(dag_ids)).all())
  631. for warning_to_delete in stored_warnings - warnings:
  632. session.delete(warning_to_delete)
  633. for warning_to_add in warnings:
  634. session.merge(warning_to_add)
  635. session.flush()
  636. session.commit()
  637. @classmethod
  638. @internal_api_call
  639. @provide_session
  640. def execute_callbacks(
  641. cls,
  642. dagbag: DagBag,
  643. callback_requests: list[CallbackRequest],
  644. unit_test_mode: bool,
  645. session: Session = NEW_SESSION,
  646. ) -> None:
  647. """
  648. Execute on failure callbacks.
  649. These objects can come from SchedulerJobRunner or from DagProcessorJobRunner.
  650. :param dagbag: Dag Bag of dags
  651. :param callback_requests: failure callbacks to execute
  652. :param session: DB session.
  653. :return: number of queries executed
  654. """
  655. for request in callback_requests:
  656. cls.logger().debug("Processing Callback Request: %s", request)
  657. try:
  658. if isinstance(request, TaskCallbackRequest):
  659. cls._execute_task_callbacks(dagbag, request, unit_test_mode, session=session)
  660. elif isinstance(request, SlaCallbackRequest):
  661. if InternalApiConfig.get_use_internal_api():
  662. cls.logger().warning(
  663. "SlaCallbacks are not supported when the Internal API is enabled"
  664. )
  665. else:
  666. DagFileProcessor.manage_slas(dagbag.dag_folder, request.dag_id, session=session)
  667. elif isinstance(request, DagCallbackRequest):
  668. cls._execute_dag_callbacks(dagbag, request, session=session)
  669. except Exception:
  670. cls.logger().exception(
  671. "Error executing %s callback for file: %s",
  672. request.__class__.__name__,
  673. request.full_filepath,
  674. )
  675. session.flush()
  676. session.commit()
  677. @classmethod
  678. @internal_api_call
  679. @provide_session
  680. def execute_callbacks_without_dag(
  681. cls, callback_requests: list[CallbackRequest], unit_test_mode: bool, session: Session = NEW_SESSION
  682. ) -> None:
  683. """
  684. Execute what callbacks we can as "best effort" when the dag cannot be found/had parse errors.
  685. This is so important so that tasks that failed when there is a parse
  686. error don't get stuck in queued state.
  687. """
  688. for request in callback_requests:
  689. cls.logger().debug("Processing Callback Request: %s", request)
  690. if isinstance(request, TaskCallbackRequest):
  691. cls._execute_task_callbacks(None, request, unit_test_mode, session)
  692. else:
  693. cls.logger().info(
  694. "Not executing %s callback for file %s as there was a dag parse error",
  695. request.__class__.__name__,
  696. request.full_filepath,
  697. )
  698. session.flush()
  699. session.commit()
  700. @classmethod
  701. def _execute_dag_callbacks(cls, dagbag: DagBag, request: DagCallbackRequest, session: Session):
  702. dag = dagbag.dags[request.dag_id]
  703. callbacks, context = DAG.fetch_callback(
  704. dag=dag,
  705. dag_run_id=request.run_id,
  706. success=not request.is_failure_callback,
  707. reason=request.msg,
  708. session=session,
  709. ) or (None, None)
  710. if callbacks and context:
  711. DAG.execute_callback(callbacks, context, dag.dag_id)
  712. @classmethod
  713. @internal_api_call
  714. @provide_session
  715. def _execute_task_callbacks(
  716. cls, dagbag: DagBag | None, request: TaskCallbackRequest, unit_test_mode: bool, session: Session
  717. ) -> None:
  718. """
  719. Execute the task callbacks.
  720. :param dagbag: the DagBag to use to get the task instance
  721. :param request: the task callback request
  722. :param session: the session to use
  723. """
  724. try:
  725. callback_type = TaskInstanceState(request.task_callback_type)
  726. except ValueError:
  727. callback_type = None
  728. is_remote = callback_type in (TaskInstanceState.SUCCESS, TaskInstanceState.FAILED)
  729. # previously we ignored any request besides failures. now if given callback type directly,
  730. # then we respect it and execute it. additionally because in this scenario the callback
  731. # is submitted remotely, we assume there is no need to mess with state; we simply run
  732. # the callback
  733. if not is_remote and not request.is_failure_callback:
  734. return
  735. simple_ti = request.simple_task_instance
  736. ti = TaskInstance.get_task_instance(
  737. dag_id=simple_ti.dag_id,
  738. run_id=simple_ti.run_id,
  739. task_id=simple_ti.task_id,
  740. map_index=simple_ti.map_index,
  741. session=session,
  742. )
  743. if not ti:
  744. return
  745. task: Operator | None = None
  746. if dagbag and simple_ti.dag_id in dagbag.dags:
  747. dag = dagbag.dags[simple_ti.dag_id]
  748. if simple_ti.task_id in dag.task_ids:
  749. task = dag.get_task(simple_ti.task_id)
  750. else:
  751. # We don't have the _real_ dag here (perhaps it had a parse error?) but we still want to run
  752. # `handle_failure` so that the state of the TI gets progressed.
  753. #
  754. # Since handle_failure _really_ wants a task, we do our best effort to give it one
  755. task = SerializedDagModel.get_serialized_dag(
  756. dag_id=simple_ti.dag_id, task_id=simple_ti.task_id, session=session
  757. )
  758. if task:
  759. ti.refresh_from_task(task)
  760. if callback_type is TaskInstanceState.SUCCESS:
  761. context = ti.get_template_context(session=session)
  762. if TYPE_CHECKING:
  763. assert ti.task
  764. callbacks = ti.task.on_success_callback
  765. _run_finished_callback(callbacks=callbacks, context=context)
  766. cls.logger().info("Executed callback for %s in state %s", ti, ti.state)
  767. elif not is_remote or callback_type is TaskInstanceState.FAILED:
  768. ti.handle_failure(error=request.msg, test_mode=unit_test_mode, session=session)
  769. cls.logger().info("Executed callback for %s in state %s", ti, ti.state)
  770. session.flush()
  771. @classmethod
  772. def _get_dagbag(cls, file_path: str):
  773. try:
  774. return DagBag(file_path, include_examples=False)
  775. except Exception:
  776. cls.logger().exception("Failed at reloading the DAG file %s", file_path)
  777. Stats.incr("dag_file_refresh_error", tags={"file_path": file_path})
  778. raise
  779. @provide_session
  780. def process_file(
  781. self,
  782. file_path: str,
  783. callback_requests: list[CallbackRequest],
  784. pickle_dags: bool = False,
  785. session: Session = NEW_SESSION,
  786. ) -> tuple[int, int, int]:
  787. """
  788. Process a Python file containing Airflow DAGs.
  789. This includes:
  790. 1. Execute the file and look for DAG objects in the namespace.
  791. 2. Execute any Callbacks if passed to this method.
  792. 3. Serialize the DAGs and save it to DB (or update existing record in the DB).
  793. 4. Pickle the DAG and save it to the DB (if necessary).
  794. 5. Mark any DAGs which are no longer present as inactive
  795. 6. Record any errors importing the file into ORM
  796. :param file_path: the path to the Python file that should be executed
  797. :param callback_requests: failure callback to execute
  798. :param pickle_dags: whether serialize the DAGs found in the file and
  799. save them to the db
  800. :return: number of dags found, count of import errors, last number of db queries
  801. """
  802. self.log.info("Processing file %s for tasks to queue", file_path)
  803. with count_queries(session) as query_counter:
  804. try:
  805. dagbag = DagFileProcessor._get_dagbag(file_path)
  806. except Exception:
  807. self.log.exception("Failed at reloading the DAG file %s", file_path)
  808. Stats.incr("dag_file_refresh_error", 1, 1, tags={"file_path": file_path})
  809. return 0, 0, self._cache_last_num_of_db_queries(query_counter)
  810. if dagbag.dags:
  811. self.log.info("DAG(s) %s retrieved from %s", ", ".join(map(repr, dagbag.dags)), file_path)
  812. else:
  813. self.log.warning("No viable dags retrieved from %s", file_path)
  814. DagFileProcessor.update_import_errors(
  815. file_last_changed=dagbag.file_last_changed,
  816. import_errors=dagbag.import_errors,
  817. processor_subdir=self._dag_directory,
  818. )
  819. if callback_requests:
  820. # If there were callback requests for this file but there was a
  821. # parse error we still need to progress the state of TIs,
  822. # otherwise they might be stuck in queued/running for ever!
  823. DagFileProcessor.execute_callbacks_without_dag(callback_requests, self.UNIT_TEST_MODE)
  824. return 0, len(dagbag.import_errors), self._cache_last_num_of_db_queries(query_counter)
  825. self.execute_callbacks(dagbag, callback_requests, self.UNIT_TEST_MODE)
  826. serialize_errors = DagFileProcessor.save_dag_to_db(
  827. dags=dagbag.dags,
  828. dag_directory=self._dag_directory,
  829. pickle_dags=pickle_dags,
  830. )
  831. dagbag.import_errors.update(dict(serialize_errors))
  832. # Record import errors into the ORM
  833. try:
  834. DagFileProcessor.update_import_errors(
  835. file_last_changed=dagbag.file_last_changed,
  836. import_errors=dagbag.import_errors,
  837. processor_subdir=self._dag_directory,
  838. )
  839. except Exception:
  840. self.log.exception("Error logging import errors!")
  841. # Record DAG warnings in the metadatabase.
  842. try:
  843. self.update_dag_warnings(dagbag=dagbag)
  844. except Exception:
  845. self.log.exception("Error logging DAG warnings.")
  846. return len(dagbag.dags), len(dagbag.import_errors), self._cache_last_num_of_db_queries(query_counter)
  847. def _cache_last_num_of_db_queries(self, query_counter: _QueryCounter | None = None):
  848. if query_counter:
  849. self._last_num_of_db_queries = query_counter.queries_number
  850. return self._last_num_of_db_queries
  851. @staticmethod
  852. @internal_api_call
  853. @provide_session
  854. def save_dag_to_db(
  855. dags: dict[str, DAG],
  856. dag_directory: str,
  857. pickle_dags: bool = False,
  858. session=NEW_SESSION,
  859. ):
  860. import_errors = DagBag._sync_to_db(dags=dags, processor_subdir=dag_directory, session=session)
  861. session.commit()
  862. dag_ids = list(dags)
  863. if pickle_dags:
  864. paused_dag_ids = DagModel.get_paused_dag_ids(dag_ids=dag_ids)
  865. unpaused_dags: list[DAG] = [dag for dag_id, dag in dags.items() if dag_id not in paused_dag_ids]
  866. for dag in unpaused_dags:
  867. dag.pickle(session)
  868. return import_errors