init_views.py 13 KB


  1. # Licensed to the Apache Software Foundation (ASF) under one
  2. # or more contributor license agreements. See the NOTICE file
  3. # distributed with this work for additional information
  4. # regarding copyright ownership. The ASF licenses this file
  5. # to you under the Apache License, Version 2.0 (the
  6. # "License"); you may not use this file except in compliance
  7. # with the License. You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing,
  12. # software distributed under the License is distributed on an
  13. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  14. # KIND, either express or implied. See the License for the
  15. # specific language governing permissions and limitations
  16. # under the License.
  17. from __future__ import annotations
  18. import logging
  19. import warnings
  20. from functools import cached_property
  21. from pathlib import Path
  22. from typing import TYPE_CHECKING
  23. from connexion import FlaskApi, ProblemException, Resolver
  24. from connexion.decorators.validation import RequestBodyValidator
  25. from connexion.exceptions import BadRequestProblem
  26. from flask import request
  27. from werkzeug import Request
  28. from airflow.api_connexion.exceptions import common_error_handler
  29. from airflow.configuration import conf
  30. from airflow.exceptions import RemovedInAirflow3Warning
  31. from airflow.security import permissions
  32. from airflow.utils.yaml import safe_load
  33. from airflow.www.constants import SWAGGER_BUNDLE, SWAGGER_ENABLED
  34. from airflow.www.extensions.init_auth_manager import get_auth_manager
  35. if TYPE_CHECKING:
  36. from flask import Flask
  37. log = logging.getLogger(__name__)
  38. # airflow/www/extensions/init_views.py => airflow/
  39. ROOT_APP_DIR = Path(__file__).parents[2].resolve()
  40. def init_flash_views(app):
  41. """Init main app view - redirect to FAB."""
  42. from airflow.www.blueprints import routes
  43. app.register_blueprint(routes)
  44. def init_appbuilder_views(app):
  45. """Initialize Web UI views."""
  46. from airflow.models import import_all_models
  47. import_all_models()
  48. from airflow.www import views
  49. appbuilder = app.appbuilder
  50. # Remove the session from scoped_session registry to avoid
  51. # reusing a session with a disconnected connection
  52. appbuilder.session.remove()
  53. appbuilder.add_view_no_menu(views.AutocompleteView())
  54. appbuilder.add_view_no_menu(views.Airflow())
  55. appbuilder.add_view(
  56. views.DagRunModelView,
  57. permissions.RESOURCE_DAG_RUN,
  58. category=permissions.RESOURCE_BROWSE_MENU,
  59. category_icon="fa-globe",
  60. )
  61. appbuilder.add_view(
  62. views.JobModelView, permissions.RESOURCE_JOB, category=permissions.RESOURCE_BROWSE_MENU
  63. )
  64. appbuilder.add_view(
  65. views.LogModelView, permissions.RESOURCE_AUDIT_LOG, category=permissions.RESOURCE_BROWSE_MENU
  66. )
  67. appbuilder.add_view(
  68. views.VariableModelView, permissions.RESOURCE_VARIABLE, category=permissions.RESOURCE_ADMIN_MENU
  69. )
  70. appbuilder.add_view(
  71. views.TaskInstanceModelView,
  72. permissions.RESOURCE_TASK_INSTANCE,
  73. category=permissions.RESOURCE_BROWSE_MENU,
  74. )
  75. appbuilder.add_view(
  76. views.TaskRescheduleModelView,
  77. permissions.RESOURCE_TASK_RESCHEDULE,
  78. category=permissions.RESOURCE_BROWSE_MENU,
  79. )
  80. appbuilder.add_view(
  81. views.TriggerModelView,
  82. permissions.RESOURCE_TRIGGER,
  83. category=permissions.RESOURCE_BROWSE_MENU,
  84. )
  85. appbuilder.add_view(
  86. views.ConfigurationView,
  87. permissions.RESOURCE_CONFIG,
  88. category=permissions.RESOURCE_ADMIN_MENU,
  89. category_icon="fa-user",
  90. )
  91. appbuilder.add_view(
  92. views.ConnectionModelView, permissions.RESOURCE_CONNECTION, category=permissions.RESOURCE_ADMIN_MENU
  93. )
  94. appbuilder.add_view(
  95. views.SlaMissModelView, permissions.RESOURCE_SLA_MISS, category=permissions.RESOURCE_BROWSE_MENU
  96. )
  97. appbuilder.add_view(
  98. views.PluginView, permissions.RESOURCE_PLUGIN, category=permissions.RESOURCE_ADMIN_MENU
  99. )
  100. appbuilder.add_view(
  101. views.ProviderView, permissions.RESOURCE_PROVIDER, category=permissions.RESOURCE_ADMIN_MENU
  102. )
  103. appbuilder.add_view(
  104. views.PoolModelView, permissions.RESOURCE_POOL, category=permissions.RESOURCE_ADMIN_MENU
  105. )
  106. appbuilder.add_view(
  107. views.XComModelView, permissions.RESOURCE_XCOM, category=permissions.RESOURCE_ADMIN_MENU
  108. )
  109. appbuilder.add_view(
  110. views.DagDependenciesView,
  111. permissions.RESOURCE_DAG_DEPENDENCIES,
  112. category=permissions.RESOURCE_BROWSE_MENU,
  113. )
  114. # add_view_no_menu to change item position.
  115. # I added link in extensions.init_appbuilder_links.init_appbuilder_links
  116. appbuilder.add_view_no_menu(views.RedocView)
  117. # Development views
  118. appbuilder.add_view_no_menu(views.DevView)
  119. appbuilder.add_view_no_menu(views.DocsView)
  120. def init_plugins(app):
  121. """Integrate Flask and FAB with plugins."""
  122. from airflow import plugins_manager
  123. plugins_manager.initialize_web_ui_plugins()
  124. appbuilder = app.appbuilder
  125. for view in plugins_manager.flask_appbuilder_views:
  126. name = view.get("name")
  127. if name:
  128. filtered_view_kwargs = {k: v for k, v in view.items() if k not in ["view"]}
  129. log.debug("Adding view %s with menu", name)
  130. baseview = view.get("view")
  131. if baseview:
  132. appbuilder.add_view(baseview, **filtered_view_kwargs)
  133. else:
  134. log.error("'view' key is missing for the named view: %s", name)
  135. else:
  136. # if 'name' key is missing, intent is to add view without menu
  137. log.debug("Adding view %s without menu", str(type(view["view"])))
  138. appbuilder.add_view_no_menu(view["view"])
  139. for menu_link in sorted(
  140. plugins_manager.flask_appbuilder_menu_links, key=lambda x: (x.get("category", ""), x["name"])
  141. ):
  142. log.debug("Adding menu link %s to %s", menu_link["name"], menu_link["href"])
  143. appbuilder.add_link(**menu_link)
  144. for blue_print in plugins_manager.flask_blueprints:
  145. log.debug("Adding blueprint %s:%s", blue_print["name"], blue_print["blueprint"].import_name)
  146. app.register_blueprint(blue_print["blueprint"])
  147. def init_error_handlers(app: Flask):
  148. """Add custom errors handlers."""
  149. from airflow.www import views
  150. app.register_error_handler(500, views.show_traceback)
  151. app.register_error_handler(404, views.not_found)
  152. def set_cors_headers_on_response(response):
  153. """Add response headers."""
  154. allow_headers = conf.get("api", "access_control_allow_headers")
  155. allow_methods = conf.get("api", "access_control_allow_methods")
  156. allow_origins = conf.get("api", "access_control_allow_origins")
  157. if allow_headers:
  158. response.headers["Access-Control-Allow-Headers"] = allow_headers
  159. if allow_methods:
  160. response.headers["Access-Control-Allow-Methods"] = allow_methods
  161. if allow_origins == "*":
  162. response.headers["Access-Control-Allow-Origin"] = "*"
  163. elif allow_origins:
  164. allowed_origins = allow_origins.split(" ")
  165. origin = request.environ.get("HTTP_ORIGIN", allowed_origins[0])
  166. if origin in allowed_origins:
  167. response.headers["Access-Control-Allow-Origin"] = origin
  168. return response
  169. def init_data_form_parameters():
  170. """
  171. Initialize custom values for data form parameters.
  172. This is a workaround for Flask versions prior to 3.1.0.
  173. In order to allow users customizing form data parameters, we need these two fields to be configurable.
  174. Starting from Flask 3.1.0 these two parameters can be configured through Flask config, but unfortunately,
  175. current version of flask supported in Airflow is way older. That's why this workaround was introduced.
  176. See https://flask.palletsprojects.com/en/stable/api/#flask.Request.max_form_memory_size
  177. # TODO: remove it when Flask upgraded to version 3.1.0 or higher.
  178. """
  179. Request.max_form_parts = conf.getint("webserver", "max_form_parts")
  180. Request.max_form_memory_size = conf.getint("webserver", "max_form_memory_size")
  181. class _LazyResolution:
  182. """
  183. OpenAPI endpoint that lazily resolves the function on first use.
  184. This is a stand-in replacement for ``connexion.Resolution`` that implements
  185. its public attributes ``function`` and ``operation_id``, but the function
  186. is only resolved when it is first accessed.
  187. """
  188. def __init__(self, resolve_func, operation_id):
  189. self._resolve_func = resolve_func
  190. self.operation_id = operation_id
  191. @cached_property
  192. def function(self):
  193. return self._resolve_func(self.operation_id)
  194. class _LazyResolver(Resolver):
  195. """
  196. OpenAPI endpoint resolver that loads lazily on first use.
  197. This re-implements ``connexion.Resolver.resolve()`` to not eagerly resolve
  198. the endpoint function (and thus avoid importing it in the process), but only
  199. return a placeholder that will be actually resolved when the contained
  200. function is accessed.
  201. """
  202. def resolve(self, operation):
  203. operation_id = self.resolve_operation_id(operation)
  204. return _LazyResolution(self.resolve_function_from_operation_id, operation_id)
  205. class _CustomErrorRequestBodyValidator(RequestBodyValidator):
  206. """
  207. Custom request body validator that overrides error messages.
  208. By default, Connextion emits a very generic *None is not of type 'object'*
  209. error when receiving an empty request body (with the view specifying the
  210. body as non-nullable). We overrides it to provide a more useful message.
  211. """
  212. def validate_schema(self, data, url):
  213. if not self.is_null_value_valid and data is None:
  214. raise BadRequestProblem(detail="Request body must not be empty")
  215. return super().validate_schema(data, url)
  216. base_paths: list[str] = [] # contains the list of base paths that have api endpoints
  217. def init_api_error_handlers(app: Flask) -> None:
  218. """Add error handlers for 404 and 405 errors for existing API paths."""
  219. from airflow.www import views
  220. @app.errorhandler(404)
  221. def _handle_api_not_found(ex):
  222. if any([request.path.startswith(p) for p in base_paths]):
  223. # 404 errors are never handled on the blueprint level
  224. # unless raised from a view func so actual 404 errors,
  225. # i.e. "no route for it" defined, need to be handled
  226. # here on the application level
  227. return common_error_handler(ex)
  228. else:
  229. return views.not_found(ex)
  230. @app.errorhandler(405)
  231. def _handle_method_not_allowed(ex):
  232. if any([request.path.startswith(p) for p in base_paths]):
  233. return common_error_handler(ex)
  234. else:
  235. return views.method_not_allowed(ex)
  236. app.register_error_handler(ProblemException, common_error_handler)
  237. def init_api_connexion(app: Flask) -> None:
  238. """Initialize Stable API."""
  239. base_path = "/api/v1"
  240. base_paths.append(base_path)
  241. with ROOT_APP_DIR.joinpath("api_connexion", "openapi", "v1.yaml").open() as f:
  242. specification = safe_load(f)
  243. api_bp = FlaskApi(
  244. specification=specification,
  245. resolver=_LazyResolver(),
  246. base_path=base_path,
  247. options={"swagger_ui": SWAGGER_ENABLED, "swagger_path": SWAGGER_BUNDLE.__fspath__()},
  248. strict_validation=True,
  249. validate_responses=True,
  250. validator_map={"body": _CustomErrorRequestBodyValidator},
  251. ).blueprint
  252. api_bp.before_app_request(init_data_form_parameters)
  253. api_bp.after_request(set_cors_headers_on_response)
  254. app.register_blueprint(api_bp)
  255. app.extensions["csrf"].exempt(api_bp)
  256. def init_api_internal(app: Flask, standalone_api: bool = False) -> None:
  257. """Initialize Internal API."""
  258. if not standalone_api and not conf.getboolean("webserver", "run_internal_api", fallback=False):
  259. return
  260. base_paths.append("/internal_api/v1")
  261. with ROOT_APP_DIR.joinpath("api_internal", "openapi", "internal_api_v1.yaml").open() as f:
  262. specification = safe_load(f)
  263. api_bp = FlaskApi(
  264. specification=specification,
  265. base_path="/internal_api/v1",
  266. options={"swagger_ui": SWAGGER_ENABLED, "swagger_path": SWAGGER_BUNDLE.__fspath__()},
  267. strict_validation=True,
  268. validate_responses=True,
  269. ).blueprint
  270. api_bp.after_request(set_cors_headers_on_response)
  271. app.register_blueprint(api_bp)
  272. app.after_request_funcs.setdefault(api_bp.name, []).append(set_cors_headers_on_response)
  273. app.extensions["csrf"].exempt(api_bp)
  274. def init_api_experimental(app):
  275. """Initialize Experimental API."""
  276. if not conf.getboolean("api", "enable_experimental_api", fallback=False):
  277. return
  278. from airflow.www.api.experimental import endpoints
  279. warnings.warn(
  280. "The experimental REST API is deprecated. Please migrate to the stable REST API. "
  281. "Please note that the experimental API do not have access control. "
  282. "The authenticated user has full access.",
  283. RemovedInAirflow3Warning,
  284. stacklevel=2,
  285. )
  286. base_paths.append("/api/experimental")
  287. app.register_blueprint(endpoints.api_experimental, url_prefix="/api/experimental")
  288. app.extensions["csrf"].exempt(endpoints.api_experimental)
  289. def init_api_auth_provider(app):
  290. """Initialize the API offered by the auth manager."""
  291. auth_mgr = get_auth_manager()
  292. blueprint = auth_mgr.get_api_endpoints()
  293. if blueprint:
  294. base_paths.append(blueprint.url_prefix)
  295. app.register_blueprint(blueprint)
  296. app.extensions["csrf"].exempt(blueprint)