serve_logs.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. """Serve logs process."""
  18. from __future__ import annotations
  19. import logging
  20. import os
  21. import socket
  22. from collections import namedtuple
  23. import gunicorn.app.base
  24. from flask import Flask, abort, request, send_from_directory
  25. from jwt.exceptions import (
  26. ExpiredSignatureError,
  27. ImmatureSignatureError,
  28. InvalidAudienceError,
  29. InvalidIssuedAtError,
  30. InvalidSignatureError,
  31. )
  32. from setproctitle import setproctitle
  33. from werkzeug.exceptions import HTTPException
  34. from airflow.configuration import conf
  35. from airflow.utils.docs import get_docs_url
  36. from airflow.utils.jwt_signer import JWTSigner
  37. from airflow.utils.module_loading import import_string
  38. logger = logging.getLogger(__name__)
  39. def create_app():
  40. flask_app = Flask(__name__, static_folder=None)
  41. expiration_time_in_seconds = conf.getint("webserver", "log_request_clock_grace", fallback=30)
  42. log_directory = os.path.expanduser(conf.get("logging", "BASE_LOG_FOLDER"))
  43. log_config_class = conf.get("logging", "logging_config_class")
  44. if log_config_class:
  45. logger.info("Detected user-defined logging config. Attempting to load %s", log_config_class)
  46. try:
  47. logging_config = import_string(log_config_class)
  48. try:
  49. base_log_folder = logging_config["handlers"]["task"]["base_log_folder"]
  50. except KeyError:
  51. base_log_folder = None
  52. if base_log_folder is not None:
  53. log_directory = base_log_folder
  54. logger.info(
  55. "Successfully imported user-defined logging config. Flask App will serve log from %s",
  56. log_directory,
  57. )
  58. else:
  59. logger.warning(
  60. "User-defined logging config does not specify 'base_log_folder'. "
  61. "Flask App will use default log directory %s",
  62. base_log_folder,
  63. )
  64. except Exception as e:
  65. raise ImportError(f"Unable to load {log_config_class} due to error: {e}")
  66. signer = JWTSigner(
  67. secret_key=conf.get("webserver", "secret_key"),
  68. expiration_time_in_seconds=expiration_time_in_seconds,
  69. audience="task-instance-logs",
  70. )
  71. # Prevent direct access to the logs port
  72. @flask_app.before_request
  73. def validate_pre_signed_url():
  74. try:
  75. auth = request.headers.get("Authorization")
  76. if auth is None:
  77. logger.warning("The Authorization header is missing: %s.", request.headers)
  78. abort(403)
  79. payload = signer.verify_token(auth)
  80. token_filename = payload.get("filename")
  81. request_filename = request.view_args["filename"]
  82. if token_filename is None:
  83. logger.warning("The payload does not contain 'filename' key: %s.", payload)
  84. abort(403)
  85. if token_filename != request_filename:
  86. logger.warning(
  87. "The payload log_relative_path key is different than the one in token:"
  88. "Request path: %s. Token path: %s.",
  89. request_filename,
  90. token_filename,
  91. )
  92. abort(403)
  93. except HTTPException:
  94. raise
  95. except InvalidAudienceError:
  96. logger.warning("Invalid audience for the request", exc_info=True)
  97. abort(403)
  98. except InvalidSignatureError:
  99. logger.warning("The signature of the request was wrong", exc_info=True)
  100. abort(403)
  101. except ImmatureSignatureError:
  102. logger.warning("The signature of the request was sent from the future", exc_info=True)
  103. abort(403)
  104. except ExpiredSignatureError:
  105. logger.warning(
  106. "The signature of the request has expired. Make sure that all components "
  107. "in your system have synchronized clocks. "
  108. "See more at %s",
  109. get_docs_url("configurations-ref.html#secret-key"),
  110. exc_info=True,
  111. )
  112. abort(403)
  113. except InvalidIssuedAtError:
  114. logger.warning(
  115. "The request was issues in the future. Make sure that all components "
  116. "in your system have synchronized clocks. "
  117. "See more at %s",
  118. get_docs_url("configurations-ref.html#secret-key"),
  119. exc_info=True,
  120. )
  121. abort(403)
  122. except Exception:
  123. logger.warning("Unknown error", exc_info=True)
  124. abort(403)
  125. @flask_app.route("/log/<path:filename>")
  126. def serve_logs_view(filename):
  127. return send_from_directory(log_directory, filename, mimetype="application/json", as_attachment=False)
  128. return flask_app
  129. GunicornOption = namedtuple("GunicornOption", ["key", "value"])
  130. class StandaloneGunicornApplication(gunicorn.app.base.BaseApplication):
  131. """
  132. Standalone Gunicorn application/serve for usage with any WSGI-application.
  133. Code inspired by an example from the Gunicorn documentation.
  134. https://github.com/benoitc/gunicorn/blob/cf55d2cec277f220ebd605989ce78ad1bb553c46/examples/standalone_app.py
  135. For details, about standalone gunicorn application, see:
  136. https://docs.gunicorn.org/en/stable/custom.html
  137. """
  138. def __init__(self, app, options=None):
  139. self.options = options or []
  140. self.application = app
  141. super().__init__()
  142. def load_config(self):
  143. for option in self.options:
  144. self.cfg.set(option.key.lower(), option.value)
  145. def load(self):
  146. return self.application
  147. def serve_logs(port=None):
  148. """Serve logs generated by Worker."""
  149. setproctitle("airflow serve-logs")
  150. wsgi_app = create_app()
  151. port = port or conf.getint("logging", "WORKER_LOG_SERVER_PORT")
  152. # If dual stack is available and IPV6_V6ONLY is not enabled on the socket
  153. # then when IPV6 is bound to it will also bind to IPV4 automatically
  154. if getattr(socket, "has_dualstack_ipv6", lambda: False)():
  155. bind_option = GunicornOption("bind", f"[::]:{port}")
  156. else:
  157. bind_option = GunicornOption("bind", f"0.0.0.0:{port}")
  158. options = [bind_option, GunicornOption("workers", 2)]
  159. StandaloneGunicornApplication(wsgi_app, options).run()
  160. if __name__ == "__main__":
  161. serve_logs()