123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- """Serve logs process."""
- from __future__ import annotations
- import logging
- import os
- import socket
- from collections import namedtuple
- import gunicorn.app.base
- from flask import Flask, abort, request, send_from_directory
- from jwt.exceptions import (
- ExpiredSignatureError,
- ImmatureSignatureError,
- InvalidAudienceError,
- InvalidIssuedAtError,
- InvalidSignatureError,
- )
- from setproctitle import setproctitle
- from werkzeug.exceptions import HTTPException
- from airflow.configuration import conf
- from airflow.utils.docs import get_docs_url
- from airflow.utils.jwt_signer import JWTSigner
- from airflow.utils.module_loading import import_string
- logger = logging.getLogger(__name__)
- def create_app():
- flask_app = Flask(__name__, static_folder=None)
- expiration_time_in_seconds = conf.getint("webserver", "log_request_clock_grace", fallback=30)
- log_directory = os.path.expanduser(conf.get("logging", "BASE_LOG_FOLDER"))
- log_config_class = conf.get("logging", "logging_config_class")
- if log_config_class:
- logger.info("Detected user-defined logging config. Attempting to load %s", log_config_class)
- try:
- logging_config = import_string(log_config_class)
- try:
- base_log_folder = logging_config["handlers"]["task"]["base_log_folder"]
- except KeyError:
- base_log_folder = None
- if base_log_folder is not None:
- log_directory = base_log_folder
- logger.info(
- "Successfully imported user-defined logging config. Flask App will serve log from %s",
- log_directory,
- )
- else:
- logger.warning(
- "User-defined logging config does not specify 'base_log_folder'. "
- "Flask App will use default log directory %s",
- base_log_folder,
- )
- except Exception as e:
- raise ImportError(f"Unable to load {log_config_class} due to error: {e}")
- signer = JWTSigner(
- secret_key=conf.get("webserver", "secret_key"),
- expiration_time_in_seconds=expiration_time_in_seconds,
- audience="task-instance-logs",
- )
- # Prevent direct access to the logs port
- @flask_app.before_request
- def validate_pre_signed_url():
- try:
- auth = request.headers.get("Authorization")
- if auth is None:
- logger.warning("The Authorization header is missing: %s.", request.headers)
- abort(403)
- payload = signer.verify_token(auth)
- token_filename = payload.get("filename")
- request_filename = request.view_args["filename"]
- if token_filename is None:
- logger.warning("The payload does not contain 'filename' key: %s.", payload)
- abort(403)
- if token_filename != request_filename:
- logger.warning(
- "The payload log_relative_path key is different than the one in token:"
- "Request path: %s. Token path: %s.",
- request_filename,
- token_filename,
- )
- abort(403)
- except HTTPException:
- raise
- except InvalidAudienceError:
- logger.warning("Invalid audience for the request", exc_info=True)
- abort(403)
- except InvalidSignatureError:
- logger.warning("The signature of the request was wrong", exc_info=True)
- abort(403)
- except ImmatureSignatureError:
- logger.warning("The signature of the request was sent from the future", exc_info=True)
- abort(403)
- except ExpiredSignatureError:
- logger.warning(
- "The signature of the request has expired. Make sure that all components "
- "in your system have synchronized clocks. "
- "See more at %s",
- get_docs_url("configurations-ref.html#secret-key"),
- exc_info=True,
- )
- abort(403)
- except InvalidIssuedAtError:
- logger.warning(
- "The request was issues in the future. Make sure that all components "
- "in your system have synchronized clocks. "
- "See more at %s",
- get_docs_url("configurations-ref.html#secret-key"),
- exc_info=True,
- )
- abort(403)
- except Exception:
- logger.warning("Unknown error", exc_info=True)
- abort(403)
- @flask_app.route("/log/<path:filename>")
- def serve_logs_view(filename):
- return send_from_directory(log_directory, filename, mimetype="application/json", as_attachment=False)
- return flask_app
- GunicornOption = namedtuple("GunicornOption", ["key", "value"])
- class StandaloneGunicornApplication(gunicorn.app.base.BaseApplication):
- """
- Standalone Gunicorn application/serve for usage with any WSGI-application.
- Code inspired by an example from the Gunicorn documentation.
- https://github.com/benoitc/gunicorn/blob/cf55d2cec277f220ebd605989ce78ad1bb553c46/examples/standalone_app.py
- For details, about standalone gunicorn application, see:
- https://docs.gunicorn.org/en/stable/custom.html
- """
- def __init__(self, app, options=None):
- self.options = options or []
- self.application = app
- super().__init__()
- def load_config(self):
- for option in self.options:
- self.cfg.set(option.key.lower(), option.value)
- def load(self):
- return self.application
- def serve_logs(port=None):
- """Serve logs generated by Worker."""
- setproctitle("airflow serve-logs")
- wsgi_app = create_app()
- port = port or conf.getint("logging", "WORKER_LOG_SERVER_PORT")
- # If dual stack is available and IPV6_V6ONLY is not enabled on the socket
- # then when IPV6 is bound to it will also bind to IPV4 automatically
- if getattr(socket, "has_dualstack_ipv6", lambda: False)():
- bind_option = GunicornOption("bind", f"[::]:{port}")
- else:
- bind_option = GunicornOption("bind", f"0.0.0.0:{port}")
- options = [bind_option, GunicornOption("workers", 2)]
- StandaloneGunicornApplication(wsgi_app, options).run()
- if __name__ == "__main__":
- serve_logs()
|