logging_mixin.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import abc
  20. import enum
  21. import logging
  22. import sys
  23. from io import IOBase
  24. from logging import Handler, StreamHandler
  25. from typing import IO, TYPE_CHECKING, Any, Optional, TypeVar, cast
  26. import re2
  27. if TYPE_CHECKING:
  28. from logging import Logger
  29. # 7-bit C1 ANSI escape sequences
  30. ANSI_ESCAPE = re2.compile(r"\x1B[@-_][0-?]*[ -/]*[@-~]")
  31. # Private: A sentinel objects
  32. class SetContextPropagate(enum.Enum):
  33. """
  34. Sentinel objects for log propagation contexts.
  35. :meta private:
  36. """
  37. # If a `set_context` function wants to _keep_ propagation set on its logger it needs to return this
  38. # special value.
  39. MAINTAIN_PROPAGATE = object()
  40. # Don't use this one anymore!
  41. DISABLE_PROPAGATE = object()
  42. def __getattr__(name):
  43. if name in ("DISABLE_PROPOGATE", "DISABLE_PROPAGATE"):
  44. # Compat for spelling on off chance someone is using this directly
  45. # And old object that isn't needed anymore
  46. return SetContextPropagate.DISABLE_PROPAGATE
  47. raise AttributeError(f"module {__name__} has no attribute {name}")
  48. def remove_escape_codes(text: str) -> str:
  49. """Remove ANSI escapes codes from string; used to remove "colors" from log messages."""
  50. return ANSI_ESCAPE.sub("", text)
  51. _T = TypeVar("_T")
  52. class LoggingMixin:
  53. """Convenience super-class to have a logger configured with the class name."""
  54. _log: logging.Logger | None = None
  55. # Parent logger used by this class. It should match one of the loggers defined in the
  56. # `logging_config_class`. By default, this attribute is used to create the final name of the logger, and
  57. # will prefix the `_logger_name` with a separating dot.
  58. _log_config_logger_name: Optional[str] = None # noqa: UP007
  59. _logger_name: Optional[str] = None # noqa: UP007
  60. def __init__(self, context=None):
  61. self._set_context(context)
  62. @staticmethod
  63. def _create_logger_name(
  64. logged_class: type[_T],
  65. log_config_logger_name: str | None = None,
  66. class_logger_name: str | None = None,
  67. ) -> str:
  68. """
  69. Generate a logger name for the given `logged_class`.
  70. By default, this function returns the `class_logger_name` as logger name. If it is not provided,
  71. the {class.__module__}.{class.__name__} is returned instead. When a `parent_logger_name` is provided,
  72. it will prefix the logger name with a separating dot.
  73. """
  74. logger_name: str = (
  75. class_logger_name
  76. if class_logger_name is not None
  77. else f"{logged_class.__module__}.{logged_class.__name__}"
  78. )
  79. if log_config_logger_name:
  80. return f"{log_config_logger_name}.{logger_name}" if logger_name else log_config_logger_name
  81. return logger_name
  82. @classmethod
  83. def _get_log(cls, obj: Any, clazz: type[_T]) -> Logger:
  84. if obj._log is None:
  85. logger_name: str = cls._create_logger_name(
  86. logged_class=clazz,
  87. log_config_logger_name=obj._log_config_logger_name,
  88. class_logger_name=obj._logger_name,
  89. )
  90. obj._log = logging.getLogger(logger_name)
  91. return obj._log
  92. @classmethod
  93. def logger(cls) -> Logger:
  94. """Return a logger."""
  95. return LoggingMixin._get_log(cls, cls)
  96. @property
  97. def log(self) -> Logger:
  98. """Return a logger."""
  99. return LoggingMixin._get_log(self, self.__class__)
  100. def _set_context(self, context):
  101. if context is not None:
  102. set_context(self.log, context)
  103. class ExternalLoggingMixin:
  104. """Define a log handler based on an external service (e.g. ELK, StackDriver)."""
  105. @property
  106. @abc.abstractmethod
  107. def log_name(self) -> str:
  108. """Return log name."""
  109. @abc.abstractmethod
  110. def get_external_log_url(self, task_instance, try_number) -> str:
  111. """Return the URL for log visualization in the external service."""
  112. @property
  113. @abc.abstractmethod
  114. def supports_external_link(self) -> bool:
  115. """Return whether handler is able to support external links."""
  116. # We have to ignore typing errors here because Python I/O classes are a mess, and they do not
  117. # have the same type hierarchy defined as the `typing.IO` - they violate Liskov Substitution Principle
  118. # While it is ok to make your class derive from IOBase (and its good thing to do as they provide
  119. # base implementation for IO-implementing classes, it's impossible to make them work with
  120. # IO generics (and apparently it has not even been intended)
  121. # See more: https://giters.com/python/typeshed/issues/6077
  122. class StreamLogWriter(IOBase, IO[str]): # type: ignore[misc]
  123. """
  124. Allows to redirect stdout and stderr to logger.
  125. :param log: The log level method to write to, ie. log.debug, log.warning
  126. """
  127. encoding: None = None
  128. def __init__(self, logger, level):
  129. self.logger = logger
  130. self.level = level
  131. self._buffer = ""
  132. def close(self):
  133. """
  134. Provide close method, for compatibility with the io.IOBase interface.
  135. This is a no-op method.
  136. """
  137. @property
  138. def closed(self):
  139. """
  140. Return False to indicate that the stream is not closed.
  141. Streams will be open for the duration of Airflow's lifecycle.
  142. For compatibility with the io.IOBase interface.
  143. """
  144. return False
  145. def _propagate_log(self, message):
  146. """Propagate message removing escape codes."""
  147. self.logger.log(self.level, remove_escape_codes(message))
  148. def write(self, message):
  149. """
  150. Do whatever it takes to actually log the specified logging record.
  151. :param message: message to log
  152. """
  153. if not message.endswith("\n"):
  154. self._buffer += message
  155. else:
  156. self._buffer += message.rstrip()
  157. self.flush()
  158. def flush(self):
  159. """Ensure all logging output has been flushed."""
  160. buf = self._buffer
  161. if buf:
  162. self._buffer = ""
  163. self._propagate_log(buf)
  164. def isatty(self):
  165. """
  166. Return False to indicate the fd is not connected to a tty(-like) device.
  167. For compatibility reasons.
  168. """
  169. return False
  170. class RedirectStdHandler(StreamHandler):
  171. """
  172. Custom StreamHandler that uses current sys.stderr/stdout as the stream for logging.
  173. This class is like a StreamHandler using sys.stderr/stdout, but uses
  174. whatever sys.stderr/stdout is currently set to rather than the value of
  175. sys.stderr/stdout at handler construction time, except when running a
  176. task in a kubernetes executor pod.
  177. """
  178. def __init__(self, stream):
  179. if not isinstance(stream, str):
  180. raise TypeError(
  181. "Cannot use file like objects. Use 'stdout' or 'stderr' as a str and without 'ext://'."
  182. )
  183. self._use_stderr = True
  184. if "stdout" in stream:
  185. self._use_stderr = False
  186. self._orig_stream = sys.stdout
  187. else:
  188. self._orig_stream = sys.stderr
  189. # StreamHandler tries to set self.stream
  190. Handler.__init__(self)
  191. @property
  192. def stream(self):
  193. """Returns current stream."""
  194. from airflow.settings import IS_EXECUTOR_CONTAINER, IS_K8S_EXECUTOR_POD
  195. if IS_K8S_EXECUTOR_POD or IS_EXECUTOR_CONTAINER:
  196. return self._orig_stream
  197. if self._use_stderr:
  198. return sys.stderr
  199. return sys.stdout
  200. def set_context(logger, value):
  201. """
  202. Walk the tree of loggers and try to set the context for each handler.
  203. :param logger: logger
  204. :param value: value to set
  205. """
  206. while logger:
  207. orig_propagate = logger.propagate
  208. for handler in logger.handlers:
  209. # Not all handlers need to have context passed in so we ignore
  210. # the error when handlers do not have set_context defined.
  211. # Don't use getatrr so we have type checking. And we don't care if handler is actually a
  212. # FileTaskHandler, it just needs to have a set_context function!
  213. if hasattr(handler, "set_context"):
  214. from airflow.utils.log.file_task_handler import FileTaskHandler
  215. flag = cast(FileTaskHandler, handler).set_context(value)
  216. # By default we disable propagate once we have configured the logger, unless that handler
  217. # explicitly asks us to keep it on.
  218. if flag is not SetContextPropagate.MAINTAIN_PROPAGATE:
  219. logger.propagate = False
  220. if orig_propagate is True:
  221. # If we were set to propagate before we turned if off, then keep passing set_context up
  222. logger = logger.parent
  223. else:
  224. break