triggerer_job_runner.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import asyncio
  19. import logging
  20. import os
  21. import signal
  22. import sys
  23. import threading
  24. import time
  25. import warnings
  26. from collections import deque
  27. from contextlib import suppress
  28. from copy import copy
  29. from queue import SimpleQueue
  30. from typing import TYPE_CHECKING
  31. from sqlalchemy import func, select
  32. from airflow.configuration import conf
  33. from airflow.jobs.base_job_runner import BaseJobRunner
  34. from airflow.jobs.job import perform_heartbeat
  35. from airflow.models.trigger import Trigger
  36. from airflow.stats import Stats
  37. from airflow.traces.tracer import Trace, span
  38. from airflow.triggers.base import TriggerEvent
  39. from airflow.typing_compat import TypedDict
  40. from airflow.utils import timezone
  41. from airflow.utils.log.file_task_handler import FileTaskHandler
  42. from airflow.utils.log.logging_mixin import LoggingMixin
  43. from airflow.utils.log.trigger_handler import (
  44. DropTriggerLogsFilter,
  45. LocalQueueHandler,
  46. TriggererHandlerWrapper,
  47. TriggerMetadataFilter,
  48. ctx_indiv_trigger,
  49. ctx_task_instance,
  50. ctx_trigger_end,
  51. ctx_trigger_id,
  52. )
  53. from airflow.utils.module_loading import import_string
  54. from airflow.utils.session import NEW_SESSION, provide_session
  55. if TYPE_CHECKING:
  56. from sqlalchemy.orm import Session
  57. from airflow.jobs.job import Job
  58. from airflow.models import TaskInstance
  59. from airflow.triggers.base import BaseTrigger
  60. HANDLER_SUPPORTS_TRIGGERER = False
  61. """
  62. If this value is true, root handler is configured to log individual trigger messages
  63. visible in task logs.
  64. :meta private:
  65. """
  66. SEND_TRIGGER_END_MARKER = True
  67. """
  68. If handler natively supports triggers, may want to disable sending trigger end marker.
  69. :meta private:
  70. """
  71. logger = logging.getLogger(__name__)
  72. DISABLE_WRAPPER = conf.getboolean("logging", "disable_trigger_handler_wrapper", fallback=False)
  73. DISABLE_LISTENER = conf.getboolean("logging", "disable_trigger_handler_queue_listener", fallback=False)
  74. def configure_trigger_log_handler():
  75. """
  76. Configure logging where each trigger logs to its own file and can be exposed via the airflow webserver.
  77. Generally speaking, we take the log handler configured for logger ``airflow.task``,
  78. wrap it with TriggerHandlerWrapper, and set it as the handler for root logger.
  79. If there already is a handler configured for the root logger and it supports triggers, we wrap it instead.
  80. :meta private:
  81. """
  82. global HANDLER_SUPPORTS_TRIGGERER
  83. def should_wrap(handler):
  84. return handler.__dict__.get("trigger_should_wrap", False) or handler.__class__.__dict__.get(
  85. "trigger_should_wrap", False
  86. )
  87. def should_queue(handler):
  88. return handler.__dict__.get("trigger_should_queue", True) or handler.__class__.__dict__.get(
  89. "trigger_should_queue", True
  90. )
  91. def send_trigger_end_marker(handler):
  92. val = handler.__dict__.get("trigger_send_end_marker", None)
  93. if val is not None:
  94. return val
  95. val = handler.__class__.__dict__.get("trigger_send_end_marker", None)
  96. if val is not None:
  97. return val
  98. return True
  99. def supports_triggerer(handler):
  100. return (
  101. should_wrap(handler)
  102. or handler.__dict__.get("trigger_supported", False)
  103. or handler.__class__.__dict__.get("trigger_supported", False)
  104. )
  105. def get_task_handler_from_logger(logger_):
  106. for h in logger_.handlers:
  107. if isinstance(h, FileTaskHandler) and not supports_triggerer(h):
  108. warnings.warn(
  109. f"Handler {h.__class__.__name__} does not support "
  110. "individual trigger logging. Please check the release notes "
  111. "for your provider to see if a newer version supports "
  112. "individual trigger logging.",
  113. category=UserWarning,
  114. stacklevel=3,
  115. )
  116. if supports_triggerer(h):
  117. return h
  118. def find_suitable_task_handler():
  119. # check root logger then check airflow.task to see if a handler
  120. # suitable for use with TriggerHandlerWrapper (has trigger_should_wrap
  121. # attr, likely inherits from FileTaskHandler)
  122. h = get_task_handler_from_logger(root_logger)
  123. if not h:
  124. # try to use handler configured from airflow task
  125. logger.debug("No task logger configured for root logger; trying `airflow.task`.")
  126. h = get_task_handler_from_logger(logging.getLogger("airflow.task"))
  127. if h:
  128. logger.debug("Using logging configuration from `airflow.task`")
  129. if not h:
  130. warnings.warn(
  131. "Could not find log handler suitable for individual trigger logging.",
  132. category=UserWarning,
  133. stacklevel=3,
  134. )
  135. return None
  136. return h
  137. def filter_trigger_logs_from_other_root_handlers(new_hdlr):
  138. # we add context vars to log records emitted for individual triggerer logging
  139. # we want these records to be processed by our special trigger handler wrapper
  140. # but not by any other handlers, so we filter out these messages from
  141. # other handlers by adding DropTriggerLogsFilter
  142. # we could consider only adding this filter to the default console logger
  143. # so as to leave other custom handlers alone
  144. for h in root_logger.handlers:
  145. if h is not new_hdlr:
  146. h.addFilter(DropTriggerLogsFilter())
  147. def add_handler_wrapper_to_root(base_handler):
  148. # first make sure we remove from root logger if it happens to be there
  149. # it could have come from root or airflow.task, but we only need
  150. # to make sure we remove from root, since messages will not flow
  151. # through airflow.task
  152. if base_handler in root_logger.handlers:
  153. root_logger.removeHandler(base_handler)
  154. logger.info("Setting up TriggererHandlerWrapper with handler %s", base_handler)
  155. h = TriggererHandlerWrapper(base_handler=base_handler, level=base_handler.level)
  156. # just extra cautious, checking if user manually configured it there
  157. if h not in root_logger.handlers:
  158. root_logger.addHandler(h)
  159. return h
  160. root_logger = logging.getLogger()
  161. task_handler = find_suitable_task_handler()
  162. if not task_handler:
  163. return None
  164. if TYPE_CHECKING:
  165. assert isinstance(task_handler, FileTaskHandler)
  166. if should_wrap(task_handler):
  167. trigger_handler = add_handler_wrapper_to_root(task_handler)
  168. else:
  169. trigger_handler = copy(task_handler)
  170. root_logger.addHandler(trigger_handler)
  171. filter_trigger_logs_from_other_root_handlers(trigger_handler)
  172. if send_trigger_end_marker(trigger_handler) is False:
  173. global SEND_TRIGGER_END_MARKER
  174. SEND_TRIGGER_END_MARKER = False
  175. HANDLER_SUPPORTS_TRIGGERER = True
  176. return should_queue(trigger_handler)
  177. def setup_queue_listener():
  178. """
  179. Route log messages to a queue and process them with QueueListener.
  180. Airflow task handlers make blocking I/O calls.
  181. We replace trigger log handlers, with LocalQueueHandler,
  182. which sends log records to a queue.
  183. Then we start a QueueListener in a thread, which is configured
  184. to consume the queue and pass the records to the handlers as
  185. originally configured. This keeps the handler I/O out of the
  186. async event loop.
  187. :meta private:
  188. """
  189. queue = SimpleQueue()
  190. root_logger = logging.getLogger()
  191. handlers: list[logging.Handler] = []
  192. queue_handler = LocalQueueHandler(queue)
  193. queue_handler.addFilter(TriggerMetadataFilter())
  194. root_logger.addHandler(queue_handler)
  195. for h in root_logger.handlers[:]:
  196. if h is not queue_handler and "pytest" not in h.__module__:
  197. root_logger.removeHandler(h)
  198. handlers.append(h)
  199. this_logger = logging.getLogger(__name__)
  200. if handlers:
  201. this_logger.info("Setting up logging queue listener with handlers %s", handlers)
  202. listener = logging.handlers.QueueListener(queue, *handlers, respect_handler_level=True)
  203. listener.start()
  204. return listener
  205. else:
  206. this_logger.warning("Unable to set up individual trigger logging")
  207. return None
  208. class TriggererJobRunner(BaseJobRunner, LoggingMixin):
  209. """
  210. Run active triggers in asyncio and update their dependent tests/DAGs once their events have fired.
  211. It runs as two threads:
  212. - The main thread does DB calls/checkins
  213. - A subthread runs all the async code
  214. """
  215. job_type = "TriggererJob"
  216. def __init__(
  217. self,
  218. job: Job,
  219. capacity=None,
  220. ):
  221. super().__init__(job)
  222. if capacity is None:
  223. self.capacity = conf.getint("triggerer", "default_capacity", fallback=1000)
  224. elif isinstance(capacity, int) and capacity > 0:
  225. self.capacity = capacity
  226. else:
  227. raise ValueError(f"Capacity number {capacity} is invalid")
  228. self.health_check_threshold = conf.getint("triggerer", "triggerer_health_check_threshold")
  229. should_queue = True
  230. if DISABLE_WRAPPER:
  231. self.log.warning(
  232. "Skipping trigger log configuration; disabled by param "
  233. "`disable_trigger_handler_wrapper=True`."
  234. )
  235. else:
  236. should_queue = configure_trigger_log_handler()
  237. self.listener = None
  238. if DISABLE_LISTENER:
  239. self.log.warning(
  240. "Skipping trigger logger queue listener; disabled by param "
  241. "`disable_trigger_handler_queue_listener=True`."
  242. )
  243. elif should_queue is False:
  244. self.log.warning("Skipping trigger logger queue listener; disabled by handler setting.")
  245. else:
  246. self.listener = setup_queue_listener()
  247. # Set up runner async thread
  248. self.trigger_runner = TriggerRunner()
  249. @provide_session
  250. def heartbeat_callback(self, session: Session = NEW_SESSION) -> None:
  251. Stats.incr("triggerer_heartbeat", 1, 1)
  252. def register_signals(self) -> None:
  253. """Register signals that stop child processes."""
  254. signal.signal(signal.SIGINT, self._exit_gracefully)
  255. signal.signal(signal.SIGTERM, self._exit_gracefully)
  256. @classmethod
  257. @provide_session
  258. def is_needed(cls, session) -> bool:
  259. """
  260. Test if the triggerer job needs to be run (i.e., if there are triggers in the trigger table).
  261. This is used for the warning boxes in the UI.
  262. """
  263. return session.execute(select(func.count(Trigger.id))).scalar_one() > 0
  264. def on_kill(self):
  265. """
  266. Stop the trigger runner.
  267. Called when there is an external kill command (via the heartbeat mechanism, for example).
  268. """
  269. self.trigger_runner.stop = True
  270. def _kill_listener(self):
  271. if self.listener:
  272. for h in self.listener.handlers:
  273. h.close()
  274. self.listener.stop()
  275. def _exit_gracefully(self, signum, frame) -> None:
  276. """Clean up processor_agent to avoid leaving orphan processes."""
  277. # The first time, try to exit nicely
  278. if not self.trigger_runner.stop:
  279. self.log.info("Exiting gracefully upon receiving signal %s", signum)
  280. self.trigger_runner.stop = True
  281. self._kill_listener()
  282. else:
  283. self.log.warning("Forcing exit due to second exit signal %s", signum)
  284. sys.exit(os.EX_SOFTWARE)
  285. def _execute(self) -> int | None:
  286. self.log.info("Starting the triggerer")
  287. try:
  288. # set job_id so that it can be used in log file names
  289. self.trigger_runner.job_id = self.job.id
  290. # Kick off runner thread
  291. self.trigger_runner.start()
  292. # Start our own DB loop in the main thread
  293. self._run_trigger_loop()
  294. except Exception:
  295. self.log.exception("Exception when executing TriggererJobRunner._run_trigger_loop")
  296. raise
  297. finally:
  298. self.log.info("Waiting for triggers to clean up")
  299. # Tell the subthread to stop and then wait for it.
  300. # If the user interrupts/terms again, _graceful_exit will allow them
  301. # to force-kill here.
  302. self.trigger_runner.stop = True
  303. self.trigger_runner.join(30)
  304. self.log.info("Exited trigger loop")
  305. return None
  306. def _run_trigger_loop(self) -> None:
  307. """Run synchronously and handle all database reads/writes; the main-thread trigger loop."""
  308. while not self.trigger_runner.stop:
  309. if not self.trigger_runner.is_alive():
  310. self.log.error("Trigger runner thread has died! Exiting.")
  311. break
  312. with Trace.start_span(span_name="triggerer_job_loop", component="TriggererJobRunner") as span:
  313. # Clean out unused triggers
  314. if span.is_recording():
  315. span.add_event(name="Trigger.clean_unused")
  316. Trigger.clean_unused()
  317. # Load/delete triggers
  318. if span.is_recording():
  319. span.add_event(name="load_triggers")
  320. self.load_triggers()
  321. # Handle events
  322. if span.is_recording():
  323. span.add_event(name="handle_events")
  324. self.handle_events()
  325. # Handle failed triggers
  326. if span.is_recording():
  327. span.add_event(name="handle_failed_triggers")
  328. self.handle_failed_triggers()
  329. if span.is_recording():
  330. span.add_event(name="perform_heartbeat")
  331. perform_heartbeat(
  332. self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True
  333. )
  334. # Collect stats
  335. if span.is_recording():
  336. span.add_event(name="emit_metrics")
  337. self.emit_metrics()
  338. # Idle sleep
  339. time.sleep(1)
  340. @span
  341. def load_triggers(self):
  342. """Query the database for the triggers we're supposed to be running and update the runner."""
  343. Trigger.assign_unassigned(self.job.id, self.capacity, self.health_check_threshold)
  344. ids = Trigger.ids_for_triggerer(self.job.id)
  345. self.trigger_runner.update_triggers(set(ids))
  346. @span
  347. def handle_events(self):
  348. """Dispatch outbound events to the Trigger model which pushes them to the relevant task instances."""
  349. while self.trigger_runner.events:
  350. # Get the event and its trigger ID
  351. trigger_id, event = self.trigger_runner.events.popleft()
  352. # Tell the model to wake up its tasks
  353. Trigger.submit_event(trigger_id=trigger_id, event=event)
  354. # Emit stat event
  355. Stats.incr("triggers.succeeded")
  356. @span
  357. def handle_failed_triggers(self):
  358. """
  359. Handle "failed" triggers. - ones that errored or exited before they sent an event.
  360. Task Instances that depend on them need failing.
  361. """
  362. while self.trigger_runner.failed_triggers:
  363. # Tell the model to fail this trigger's deps
  364. trigger_id, saved_exc = self.trigger_runner.failed_triggers.popleft()
  365. Trigger.submit_failure(trigger_id=trigger_id, exc=saved_exc)
  366. # Emit stat event
  367. Stats.incr("triggers.failed")
  368. @span
  369. def emit_metrics(self):
  370. Stats.gauge(f"triggers.running.{self.job.hostname}", len(self.trigger_runner.triggers))
  371. Stats.gauge(
  372. "triggers.running", len(self.trigger_runner.triggers), tags={"hostname": self.job.hostname}
  373. )
  374. span = Trace.get_current_span()
  375. span.set_attribute("trigger host", self.job.hostname)
  376. span.set_attribute("triggers running", len(self.trigger_runner.triggers))
  377. class TriggerDetails(TypedDict):
  378. """Type class for the trigger details dictionary."""
  379. task: asyncio.Task
  380. name: str
  381. events: int
  382. class TriggerRunner(threading.Thread, LoggingMixin):
  383. """
  384. Runtime environment for all triggers.
  385. Mainly runs inside its own thread, where it hands control off to an asyncio
  386. event loop, but is also sometimes interacted with from the main thread
  387. (where all the DB queries are done). All communication between threads is
  388. done via Deques.
  389. """
  390. # Maps trigger IDs to their running tasks and other info
  391. triggers: dict[int, TriggerDetails]
  392. # Cache for looking up triggers by classpath
  393. trigger_cache: dict[str, type[BaseTrigger]]
  394. # Inbound queue of new triggers
  395. to_create: deque[tuple[int, BaseTrigger]]
  396. # Inbound queue of deleted triggers
  397. to_cancel: deque[int]
  398. # Outbound queue of events
  399. events: deque[tuple[int, TriggerEvent]]
  400. # Outbound queue of failed triggers
  401. failed_triggers: deque[tuple[int, BaseException]]
  402. # Should-we-stop flag
  403. stop: bool = False
  404. def __init__(self):
  405. super().__init__()
  406. self.triggers = {}
  407. self.trigger_cache = {}
  408. self.to_create = deque()
  409. self.to_cancel = deque()
  410. self.events = deque()
  411. self.failed_triggers = deque()
  412. self.job_id = None
  413. def run(self):
  414. """Sync entrypoint - just run a run in an async loop."""
  415. asyncio.run(self.arun())
  416. async def arun(self):
  417. """
  418. Run trigger addition/deletion/cleanup; main (asynchronous) logic loop.
  419. Actual triggers run in their own separate coroutines.
  420. """
  421. watchdog = asyncio.create_task(self.block_watchdog())
  422. last_status = time.time()
  423. try:
  424. while not self.stop:
  425. # Run core logic
  426. await self.create_triggers()
  427. await self.cancel_triggers()
  428. await self.cleanup_finished_triggers()
  429. # Sleep for a bit
  430. await asyncio.sleep(1)
  431. # Every minute, log status
  432. if time.time() - last_status >= 60:
  433. count = len(self.triggers)
  434. self.log.info("%i triggers currently running", count)
  435. last_status = time.time()
  436. except Exception:
  437. self.stop = True
  438. raise
  439. # Wait for watchdog to complete
  440. await watchdog
  441. async def create_triggers(self):
  442. """Drain the to_create queue and create all new triggers that have been requested in the DB."""
  443. while self.to_create:
  444. trigger_id, trigger_instance = self.to_create.popleft()
  445. if trigger_id not in self.triggers:
  446. ti: TaskInstance = trigger_instance.task_instance
  447. self.triggers[trigger_id] = {
  448. "task": asyncio.create_task(self.run_trigger(trigger_id, trigger_instance)),
  449. "name": f"{ti.dag_id}/{ti.run_id}/{ti.task_id}/{ti.map_index}/{ti.try_number} "
  450. f"(ID {trigger_id})",
  451. "events": 0,
  452. }
  453. else:
  454. self.log.warning("Trigger %s had insertion attempted twice", trigger_id)
  455. await asyncio.sleep(0)
  456. async def cancel_triggers(self):
  457. """
  458. Drain the to_cancel queue and ensure all triggers that are not in the DB are cancelled.
  459. This allows the cleanup job to delete them.
  460. """
  461. while self.to_cancel:
  462. trigger_id = self.to_cancel.popleft()
  463. if trigger_id in self.triggers:
  464. # We only delete if it did not exit already
  465. self.triggers[trigger_id]["task"].cancel()
  466. await asyncio.sleep(0)
  467. async def cleanup_finished_triggers(self):
  468. """
  469. Go through all trigger tasks (coroutines) and clean up entries for ones that have exited.
  470. Optionally warn users if the exit was not normal.
  471. """
  472. for trigger_id, details in list(self.triggers.items()):
  473. if details["task"].done():
  474. # Check to see if it exited for good reasons
  475. saved_exc = None
  476. try:
  477. result = details["task"].result()
  478. except (asyncio.CancelledError, SystemExit, KeyboardInterrupt):
  479. # These are "expected" exceptions and we stop processing here
  480. # If we don't, then the system requesting a trigger be removed -
  481. # which turns into CancelledError - results in a failure.
  482. del self.triggers[trigger_id]
  483. continue
  484. except BaseException as e:
  485. # This is potentially bad, so log it.
  486. self.log.exception("Trigger %s exited with error %s", details["name"], e)
  487. saved_exc = e
  488. else:
  489. # See if they foolishly returned a TriggerEvent
  490. if isinstance(result, TriggerEvent):
  491. self.log.error(
  492. "Trigger %s returned a TriggerEvent rather than yielding it", details["name"]
  493. )
  494. # See if this exited without sending an event, in which case
  495. # any task instances depending on it need to be failed
  496. if details["events"] == 0:
  497. self.log.error(
  498. "Trigger %s exited without sending an event. Dependent tasks will be failed.",
  499. details["name"],
  500. )
  501. self.failed_triggers.append((trigger_id, saved_exc))
  502. del self.triggers[trigger_id]
  503. await asyncio.sleep(0)
  504. async def block_watchdog(self):
  505. """
  506. Watchdog loop that detects blocking (badly-written) triggers.
  507. Triggers should be well-behaved async coroutines and await whenever
  508. they need to wait; this loop tries to run every 100ms to see if
  509. there are badly-written triggers taking longer than that and blocking
  510. the event loop.
  511. Unfortunately, we can't tell what trigger is blocking things, but
  512. we can at least detect the top-level problem.
  513. """
  514. while not self.stop:
  515. last_run = time.monotonic()
  516. await asyncio.sleep(0.1)
  517. # We allow a generous amount of buffer room for now, since it might
  518. # be a busy event loop.
  519. time_elapsed = time.monotonic() - last_run
  520. if time_elapsed > 0.2:
  521. self.log.info(
  522. "Triggerer's async thread was blocked for %.2f seconds, "
  523. "likely by a badly-written trigger. Set PYTHONASYNCIODEBUG=1 "
  524. "to get more information on overrunning coroutines.",
  525. time_elapsed,
  526. )
  527. Stats.incr("triggers.blocked_main_thread")
  528. @staticmethod
  529. def set_individual_trigger_logging(trigger):
  530. """Configure trigger logging to allow individual files and stdout filtering."""
  531. # set logging context vars for routing to appropriate handler
  532. ctx_task_instance.set(trigger.task_instance)
  533. ctx_trigger_id.set(trigger.trigger_id)
  534. ctx_trigger_end.set(False)
  535. # mark that we're in the context of an individual trigger so log records can be filtered
  536. ctx_indiv_trigger.set(True)
  537. async def run_trigger(self, trigger_id, trigger):
  538. """Run a trigger (they are async generators) and push their events into our outbound event deque."""
  539. name = self.triggers[trigger_id]["name"]
  540. self.log.info("trigger %s starting", name)
  541. try:
  542. self.set_individual_trigger_logging(trigger)
  543. async for event in trigger.run():
  544. self.log.info("Trigger %s fired: %s", self.triggers[trigger_id]["name"], event)
  545. self.triggers[trigger_id]["events"] += 1
  546. self.events.append((trigger_id, event))
  547. except asyncio.CancelledError:
  548. if timeout := trigger.task_instance.trigger_timeout:
  549. timeout = timeout.replace(tzinfo=timezone.utc) if not timeout.tzinfo else timeout
  550. if timeout < timezone.utcnow():
  551. self.log.error("Trigger cancelled due to timeout")
  552. raise
  553. finally:
  554. # CancelledError will get injected when we're stopped - which is
  555. # fine, the cleanup process will understand that, but we want to
  556. # allow triggers a chance to cleanup, either in that case or if
  557. # they exit cleanly. Exception from cleanup methods are ignored.
  558. with suppress(Exception):
  559. await trigger.cleanup()
  560. if SEND_TRIGGER_END_MARKER:
  561. self.mark_trigger_end(trigger)
  562. # unsetting ctx_indiv_trigger var restores stdout logging
  563. ctx_indiv_trigger.set(None)
  564. self.log.info("trigger %s completed", name)
  565. @staticmethod
  566. def mark_trigger_end(trigger):
  567. if not HANDLER_SUPPORTS_TRIGGERER:
  568. return
  569. ctx_trigger_end.set(True)
  570. # this is a special message required by TriggerHandlerWrapper
  571. # it tells the wrapper to close the handler for this trigger
  572. # we set level to 100 so that it will not be filtered by user logging settings
  573. # it is not emitted; see TriggererHandlerWrapper.handle method.
  574. trigger.log.log(level=100, msg="trigger end")
  575. def update_triggers(self, requested_trigger_ids: set[int]):
  576. """
  577. Request that we update what triggers we're running.
  578. Works out the differences - ones to add, and ones to remove - then
  579. adds them to the deques so the subthread can actually mutate the running
  580. trigger set.
  581. """
  582. # Note that `triggers` could be mutated by the other thread during this
  583. # line's execution, but we consider that safe, since there's a strict
  584. # add -> remove -> never again lifecycle this function is already
  585. # handling.
  586. running_trigger_ids = set(self.triggers.keys())
  587. known_trigger_ids = (
  588. running_trigger_ids.union(x[0] for x in self.events)
  589. .union(self.to_cancel)
  590. .union(x[0] for x in self.to_create)
  591. .union(trigger[0] for trigger in self.failed_triggers)
  592. )
  593. # Work out the two difference sets
  594. new_trigger_ids = requested_trigger_ids - known_trigger_ids
  595. cancel_trigger_ids = running_trigger_ids - requested_trigger_ids
  596. # Bulk-fetch new trigger records
  597. new_triggers = Trigger.bulk_fetch(new_trigger_ids)
  598. # Add in new triggers
  599. for new_id in new_trigger_ids:
  600. # Check it didn't vanish in the meantime
  601. if new_id not in new_triggers:
  602. self.log.warning("Trigger ID %s disappeared before we could start it", new_id)
  603. continue
  604. # Resolve trigger record into an actual class instance
  605. try:
  606. new_trigger_orm = new_triggers[new_id]
  607. trigger_class = self.get_trigger_by_classpath(new_trigger_orm.classpath)
  608. except BaseException as e:
  609. # Either the trigger code or the path to it is bad. Fail the trigger.
  610. self.failed_triggers.append((new_id, e))
  611. continue
  612. # If new_trigger_orm.task_instance is None, this means the TaskInstance
  613. # row was updated by either Trigger.submit_event or Trigger.submit_failure
  614. # and can happen when a single trigger Job is being run on multiple TriggerRunners
  615. # in a High-Availability setup.
  616. if new_trigger_orm.task_instance is None:
  617. self.log.info(
  618. (
  619. "TaskInstance for Trigger ID %s is None. It was likely updated by another trigger job. "
  620. "Skipping trigger instantiation."
  621. ),
  622. new_id,
  623. )
  624. continue
  625. try:
  626. new_trigger_instance = trigger_class(**new_trigger_orm.kwargs)
  627. except TypeError as err:
  628. self.log.error("Trigger failed; message=%s", err)
  629. self.failed_triggers.append((new_id, err))
  630. continue
  631. self.set_trigger_logging_metadata(new_trigger_orm.task_instance, new_id, new_trigger_instance)
  632. self.to_create.append((new_id, new_trigger_instance))
  633. # Enqueue orphaned triggers for cancellation
  634. self.to_cancel.extend(cancel_trigger_ids)
  635. def set_trigger_logging_metadata(self, ti: TaskInstance, trigger_id, trigger):
  636. """
  637. Set up logging for triggers.
  638. We want to ensure that each trigger logs to its own file and that the log messages are not
  639. propagated to parent loggers.
  640. :meta private:
  641. """
  642. if ti: # can be None in tests
  643. ti.is_trigger_log_context = True
  644. trigger.task_instance = ti
  645. trigger.triggerer_job_id = self.job_id
  646. trigger.trigger_id = trigger_id
  647. def get_trigger_by_classpath(self, classpath: str) -> type[BaseTrigger]:
  648. """
  649. Get a trigger class by its classpath ("path.to.module.classname").
  650. Uses a cache dictionary to speed up lookups after the first time.
  651. """
  652. if classpath not in self.trigger_cache:
  653. self.trigger_cache[classpath] = import_string(classpath)
  654. return self.trigger_cache[classpath]