log_reader.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import logging
  19. import time
  20. from functools import cached_property
  21. from typing import TYPE_CHECKING, Iterator
  22. from airflow.configuration import conf
  23. from airflow.utils.helpers import render_log_filename
  24. from airflow.utils.log.logging_mixin import ExternalLoggingMixin
  25. from airflow.utils.session import NEW_SESSION, provide_session
  26. from airflow.utils.state import TaskInstanceState
  27. if TYPE_CHECKING:
  28. from sqlalchemy.orm.session import Session
  29. from airflow.models.taskinstance import TaskInstance
  30. class TaskLogReader:
  31. """Task log reader."""
  32. STREAM_LOOP_SLEEP_SECONDS = 1
  33. """Time to sleep between loops while waiting for more logs"""
  34. def read_log_chunks(
  35. self, ti: TaskInstance, try_number: int | None, metadata
  36. ) -> tuple[list[tuple[tuple[str, str]]], dict[str, str]]:
  37. """
  38. Read chunks of Task Instance logs.
  39. :param ti: The taskInstance
  40. :param try_number: If provided, logs for the given try will be returned.
  41. Otherwise, logs from all attempts are returned.
  42. :param metadata: A dictionary containing information about how to read the task log
  43. The following is an example of how to use this method to read log:
  44. .. code-block:: python
  45. logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
  46. logs = logs[0] if try_number is not None else logs
  47. where task_log_reader is an instance of TaskLogReader. The metadata will always
  48. contain information about the task log which can enable you read logs to the
  49. end.
  50. """
  51. logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
  52. metadata = metadatas[0]
  53. return logs, metadata
  54. def read_log_stream(self, ti: TaskInstance, try_number: int | None, metadata: dict) -> Iterator[str]:
  55. """
  56. Continuously read log to the end.
  57. :param ti: The Task Instance
  58. :param try_number: the task try number
  59. :param metadata: A dictionary containing information about how to read the task log
  60. """
  61. if try_number is None:
  62. next_try = ti.next_try_number
  63. try_numbers = list(range(1, next_try))
  64. else:
  65. try_numbers = [try_number]
  66. for current_try_number in try_numbers:
  67. metadata.pop("end_of_log", None)
  68. metadata.pop("max_offset", None)
  69. metadata.pop("offset", None)
  70. metadata.pop("log_pos", None)
  71. while True:
  72. logs, metadata = self.read_log_chunks(ti, current_try_number, metadata)
  73. for host, log in logs[0]:
  74. yield "\n".join([host or "", log]) + "\n"
  75. if "end_of_log" not in metadata or (
  76. not metadata["end_of_log"]
  77. and ti.state not in (TaskInstanceState.RUNNING, TaskInstanceState.DEFERRED)
  78. ):
  79. if not logs[0]:
  80. # we did not receive any logs in this loop
  81. # sleeping to conserve resources / limit requests on external services
  82. time.sleep(self.STREAM_LOOP_SLEEP_SECONDS)
  83. else:
  84. break
  85. @cached_property
  86. def log_handler(self):
  87. """Get the log handler which is configured to read logs."""
  88. task_log_reader = conf.get("logging", "task_log_reader")
  89. def handlers():
  90. """
  91. Yield all handlers first from airflow.task logger then root logger.
  92. Depending on whether we're in a running task, it could be in either of these locations.
  93. """
  94. yield from logging.getLogger("airflow.task").handlers
  95. yield from logging.getLogger().handlers
  96. return next((h for h in handlers() if h.name == task_log_reader), None)
  97. @property
  98. def supports_read(self):
  99. """Checks if a read operation is supported by a current log handler."""
  100. return hasattr(self.log_handler, "read")
  101. @property
  102. def supports_external_link(self) -> bool:
  103. """Check if the logging handler supports external links (e.g. to Elasticsearch, Stackdriver, etc)."""
  104. if not isinstance(self.log_handler, ExternalLoggingMixin):
  105. return False
  106. return self.log_handler.supports_external_link
  107. @provide_session
  108. def render_log_filename(
  109. self,
  110. ti: TaskInstance,
  111. try_number: int | None = None,
  112. *,
  113. session: Session = NEW_SESSION,
  114. ) -> str:
  115. """
  116. Render the log attachment filename.
  117. :param ti: The task instance
  118. :param try_number: The task try number
  119. """
  120. dagrun = ti.get_dagrun(session=session)
  121. attachment_filename = render_log_filename(
  122. ti=ti,
  123. try_number="all" if try_number is None else try_number,
  124. filename_template=dagrun.get_log_template(session=session).filename,
  125. )
  126. return attachment_filename