trigger_handler.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import asyncio
  19. import logging
  20. from contextlib import suppress
  21. from contextvars import ContextVar
  22. from copy import copy
  23. from logging.handlers import QueueHandler
  24. from typing import TYPE_CHECKING
  25. if TYPE_CHECKING:
  26. from airflow.utils.log.file_task_handler import FileTaskHandler
  27. ctx_task_instance: ContextVar = ContextVar("task_instance")
  28. ctx_trigger_id: ContextVar = ContextVar("trigger_id")
  29. ctx_trigger_end: ContextVar = ContextVar("trigger_end")
  30. ctx_indiv_trigger: ContextVar = ContextVar("__individual_trigger")
  31. class TriggerMetadataFilter(logging.Filter):
  32. """
  33. Injects TI key, triggerer job_id, and trigger_id into the log record.
  34. :meta private:
  35. """
  36. def filter(self, record):
  37. for var in (
  38. ctx_task_instance,
  39. ctx_trigger_id,
  40. ctx_trigger_end,
  41. ctx_indiv_trigger,
  42. ):
  43. val = var.get(None)
  44. if val is not None:
  45. setattr(record, var.name, val)
  46. return True
  47. class DropTriggerLogsFilter(logging.Filter):
  48. """
  49. If record has attr with name ctx_indiv_trigger, filter the record.
  50. The purpose here is to prevent trigger logs from going to stdout
  51. in the trigger service.
  52. :meta private:
  53. """
  54. def filter(self, record):
  55. return getattr(record, ctx_indiv_trigger.name, None) is None
  56. class TriggererHandlerWrapper(logging.Handler):
  57. """
  58. Wrap inheritors of FileTaskHandler and direct log messages to them based on trigger_id.
  59. :meta private:
  60. """
  61. trigger_should_queue = True
  62. def __init__(self, base_handler: FileTaskHandler, level=logging.NOTSET):
  63. super().__init__(level=level)
  64. self.base_handler: FileTaskHandler = base_handler
  65. self.handlers: dict[int, FileTaskHandler] = {}
  66. def _make_handler(self, ti):
  67. h = copy(self.base_handler)
  68. h.set_context(ti=ti)
  69. return h
  70. def _get_or_create_handler(self, trigger_id, ti):
  71. if trigger_id not in self.handlers:
  72. self.handlers[trigger_id] = self._make_handler(ti)
  73. return self.handlers[trigger_id]
  74. def emit(self, record):
  75. h = self._get_or_create_handler(record.trigger_id, record.task_instance)
  76. h.emit(record)
  77. def handle(self, record):
  78. if not getattr(record, ctx_indiv_trigger.name, None):
  79. return False
  80. if record.trigger_end:
  81. self.close_one(record.trigger_id)
  82. return False
  83. emit = self.filter(record)
  84. if emit:
  85. self.emit(record)
  86. return emit
  87. def close_one(self, trigger_id):
  88. h = self.handlers.get(trigger_id)
  89. if h:
  90. h.close()
  91. with suppress(KeyError): # race condition between `handle` and `close`
  92. del self.handlers[trigger_id]
  93. def flush(self):
  94. for h in self.handlers.values():
  95. h.flush()
  96. def close(self):
  97. for trigger_id in list(self.handlers.keys()):
  98. self.close_one(trigger_id)
  99. class LocalQueueHandler(QueueHandler):
  100. """
  101. Send messages to queue.
  102. :meta private:
  103. """
  104. def emit(self, record: logging.LogRecord) -> None:
  105. # There is no need to call `prepare` because queue is in same process.
  106. try:
  107. self.enqueue(record)
  108. except asyncio.CancelledError:
  109. raise
  110. except Exception:
  111. self.handleError(record)