endpoints.py 14 KB


  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import logging
  20. from functools import wraps
  21. from typing import TYPE_CHECKING, Callable, TypeVar, cast
  22. from flask import Blueprint, current_app, g, jsonify, request, url_for
  23. from airflow import models
  24. from airflow.api.common import delete_dag as delete, trigger_dag as trigger
  25. from airflow.api.common.experimental import pool as pool_api
  26. from airflow.api.common.experimental.get_code import get_code
  27. from airflow.api.common.experimental.get_dag_run_state import get_dag_run_state
  28. from airflow.api.common.experimental.get_dag_runs import get_dag_runs
  29. from airflow.api.common.experimental.get_lineage import get_lineage as get_lineage_api
  30. from airflow.api.common.experimental.get_task import get_task
  31. from airflow.api.common.experimental.get_task_instance import get_task_instance
  32. from airflow.exceptions import AirflowException
  33. from airflow.utils import timezone
  34. from airflow.utils.docs import get_docs_url
  35. from airflow.utils.strings import to_boolean
  36. from airflow.version import version
  37. if TYPE_CHECKING:
  38. from flask import Response
  39. log = logging.getLogger(__name__)
  40. T = TypeVar("T", bound=Callable)
  41. def requires_authentication(function: T):
  42. """Mark a function as requiring authentication."""
  43. @wraps(function)
  44. def decorated(*args, **kwargs):
  45. auth = current_app.api_auth[0]
  46. return auth.requires_authentication(function)(*args, **kwargs)
  47. return cast(T, decorated)
  48. api_experimental = Blueprint("api_experimental", __name__)
  49. def add_deprecation_headers(response: Response):
  50. """
  51. Add Deprecation HTTP Header Field.
  52. .. seealso:: IETF proposal for the header field
  53. `here <https://datatracker.ietf.org/doc/draft-dalal-deprecation-header/>`_.
  54. """
  55. response.headers["Deprecation"] = "true"
  56. doc_url = get_docs_url("upgrading-to-2.html#migration-guide-from-experimental-api-to-stable-api-v1")
  57. deprecation_link = f'<{doc_url}>; rel="deprecation"; type="text/html"'
  58. if "link" in response.headers:
  59. response.headers["Link"] += f", {deprecation_link}"
  60. else:
  61. response.headers["Link"] = f"{deprecation_link}"
  62. return response
  63. # This API is deprecated. We do not care too much about typing here
  64. api_experimental.after_request(add_deprecation_headers) # type: ignore[arg-type]
  65. @api_experimental.route("/dags/<string:dag_id>/dag_runs", methods=["POST"])
  66. @requires_authentication
  67. def trigger_dag(dag_id):
  68. """Trigger a new dag run for a Dag with an execution date of now unless specified in the data."""
  69. data = request.get_json(force=True)
  70. run_id = None
  71. if "run_id" in data:
  72. run_id = data["run_id"]
  73. conf = None
  74. if "conf" in data:
  75. conf = data["conf"]
  76. if not isinstance(conf, dict):
  77. error_message = "Dag Run conf must be a dictionary object, other types are not supported"
  78. log.error(error_message)
  79. response = jsonify({"error": error_message})
  80. response.status_code = 400
  81. return response
  82. execution_date = None
  83. if "execution_date" in data and data["execution_date"] is not None:
  84. execution_date = data["execution_date"]
  85. # Convert string datetime into actual datetime
  86. try:
  87. execution_date = timezone.parse(execution_date)
  88. except ValueError:
  89. log.error("Given execution date could not be identified as a date.")
  90. error_message = (
  91. f"Given execution date, {execution_date}, could not be identified as a date. "
  92. f"Example date format: 2015-11-16T14:34:15+00:00"
  93. )
  94. response = jsonify({"error": error_message})
  95. response.status_code = 400
  96. return response
  97. replace_microseconds = execution_date is None
  98. if "replace_microseconds" in data:
  99. replace_microseconds = to_boolean(data["replace_microseconds"])
  100. try:
  101. dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date, replace_microseconds)
  102. except AirflowException as err:
  103. log.error(err)
  104. response = jsonify(error=f"{err}")
  105. response.status_code = err.status_code
  106. return response
  107. if getattr(g, "user", None):
  108. log.info("User %s created %s", g.user, dr)
  109. response = jsonify(
  110. message=f"Created {dr}", execution_date=dr.execution_date.isoformat(), run_id=dr.run_id
  111. )
  112. return response
  113. @api_experimental.route("/dags/<string:dag_id>", methods=["DELETE"])
  114. @requires_authentication
  115. def delete_dag(dag_id):
  116. """Delete all DB records related to the specified Dag."""
  117. try:
  118. count = delete.delete_dag(dag_id)
  119. except AirflowException as err:
  120. log.error(err)
  121. response = jsonify(error=f"{err}")
  122. response.status_code = err.status_code
  123. return response
  124. return jsonify(message=f"Removed {count} record(s)", count=count)
  125. @api_experimental.route("/dags/<string:dag_id>/dag_runs", methods=["GET"])
  126. @requires_authentication
  127. def dag_runs(dag_id):
  128. """
  129. Return a list of Dag Runs for a specific DAG ID.
  130. :query param state: a query string parameter '?state=queued|running|success...'
  131. :param dag_id: String identifier of a DAG
  132. :return: List of DAG runs of a DAG with requested state,
  133. or all runs if the state is not specified
  134. """
  135. try:
  136. state = request.args.get("state")
  137. dagruns = get_dag_runs(dag_id, state)
  138. except AirflowException as err:
  139. log.info(err)
  140. response = jsonify(error=f"{err}")
  141. response.status_code = 400
  142. return response
  143. return jsonify(dagruns)
  144. @api_experimental.route("/test", methods=["GET"])
  145. @requires_authentication
  146. def test():
  147. """Test endpoint to check authentication."""
  148. return jsonify(status="OK")
  149. @api_experimental.route("/info", methods=["GET"])
  150. @requires_authentication
  151. def info():
  152. """Get Airflow Version."""
  153. return jsonify(version=version)
  154. @api_experimental.route("/dags/<string:dag_id>/code", methods=["GET"])
  155. @requires_authentication
  156. def get_dag_code(dag_id):
  157. """Return python code of a given dag_id."""
  158. try:
  159. return get_code(dag_id)
  160. except AirflowException as err:
  161. log.info(err)
  162. response = jsonify(error=f"{err}")
  163. response.status_code = err.status_code
  164. return response
  165. @api_experimental.route("/dags/<string:dag_id>/tasks/<string:task_id>", methods=["GET"])
  166. @requires_authentication
  167. def task_info(dag_id, task_id):
  168. """Return a JSON with a task's public instance variables."""
  169. try:
  170. t_info = get_task(dag_id, task_id)
  171. except AirflowException as err:
  172. log.info(err)
  173. response = jsonify(error=f"{err}")
  174. response.status_code = err.status_code
  175. return response
  176. # JSONify and return.
  177. fields = {k: str(v) for k, v in vars(t_info).items() if not k.startswith("_")}
  178. return jsonify(fields)
  179. # ToDo: Shouldn't this be a PUT method?
  180. @api_experimental.route("/dags/<string:dag_id>/paused/<string:paused>", methods=["GET"])
  181. @requires_authentication
  182. def dag_paused(dag_id, paused):
  183. """(Un)pause a dag."""
  184. is_paused = bool(paused == "true")
  185. models.DagModel.get_dagmodel(dag_id).set_is_paused(
  186. is_paused=is_paused,
  187. )
  188. return jsonify({"response": "ok"})
  189. @api_experimental.route("/dags/<string:dag_id>/paused", methods=["GET"])
  190. @requires_authentication
  191. def dag_is_paused(dag_id):
  192. """Get paused state of a dag."""
  193. is_paused = models.DagModel.get_dagmodel(dag_id).is_paused
  194. return jsonify({"is_paused": is_paused})
  195. @api_experimental.route(
  196. "/dags/<string:dag_id>/dag_runs/<string:execution_date>/tasks/<string:task_id>", methods=["GET"]
  197. )
  198. @requires_authentication
  199. def task_instance_info(dag_id, execution_date, task_id):
  200. """
  201. Return a JSON with a task instance's public instance variables.
  202. The format for the exec_date is expected to be
  203. "YYYY-mm-DDTHH:MM:SS", for example: "2016-11-16T11:34:15". This will
  204. of course need to have been encoded for URL in the request.
  205. """
  206. # Convert string datetime into actual datetime
  207. try:
  208. execution_date = timezone.parse(execution_date)
  209. except ValueError:
  210. log.error("Given execution date could not be identified as a date.")
  211. error_message = (
  212. f"Given execution date, {execution_date}, could not be identified as a date. "
  213. f"Example date format: 2015-11-16T14:34:15+00:00"
  214. )
  215. response = jsonify({"error": error_message})
  216. response.status_code = 400
  217. return response
  218. try:
  219. ti_info = get_task_instance(dag_id, task_id, execution_date)
  220. except AirflowException as err:
  221. log.info(err)
  222. response = jsonify(error=f"{err}")
  223. response.status_code = err.status_code
  224. return response
  225. # JSONify and return.
  226. fields = {k: str(v) for k, v in vars(ti_info).items() if not k.startswith("_")}
  227. return jsonify(fields)
  228. @api_experimental.route("/dags/<string:dag_id>/dag_runs/<string:execution_date>", methods=["GET"])
  229. @requires_authentication
  230. def dag_run_status(dag_id, execution_date):
  231. """
  232. Return a JSON with a dag_run's public instance variables.
  233. The format for the exec_date is expected to be
  234. "YYYY-mm-DDTHH:MM:SS", for example: "2016-11-16T11:34:15". This will
  235. of course need to have been encoded for URL in the request.
  236. """
  237. # Convert string datetime into actual datetime
  238. try:
  239. execution_date = timezone.parse(execution_date)
  240. except ValueError:
  241. log.error("Given execution date could not be identified as a date.")
  242. error_message = (
  243. f"Given execution date, {execution_date}, could not be identified as a date. "
  244. f"Example date format: 2015-11-16T14:34:15+00:00"
  245. )
  246. response = jsonify({"error": error_message})
  247. response.status_code = 400
  248. return response
  249. try:
  250. dr_info = get_dag_run_state(dag_id, execution_date)
  251. except AirflowException as err:
  252. log.info(err)
  253. response = jsonify(error=f"{err}")
  254. response.status_code = err.status_code
  255. return response
  256. return jsonify(dr_info)
  257. @api_experimental.route("/latest_runs", methods=["GET"])
  258. @requires_authentication
  259. def latest_dag_runs():
  260. """Return the latest DagRun for each DAG formatted for the UI."""
  261. from airflow.models import DagRun
  262. dagruns = DagRun.get_latest_runs()
  263. payload = []
  264. for dagrun in dagruns:
  265. if dagrun.execution_date:
  266. payload.append(
  267. {
  268. "dag_id": dagrun.dag_id,
  269. "execution_date": dagrun.execution_date.isoformat(),
  270. "start_date": ((dagrun.start_date or "") and dagrun.start_date.isoformat()),
  271. "dag_run_url": url_for(
  272. "Airflow.graph", dag_id=dagrun.dag_id, execution_date=dagrun.execution_date
  273. ),
  274. }
  275. )
  276. return jsonify(items=payload) # old flask versions don't support jsonifying arrays
  277. @api_experimental.route("/pools/<string:name>", methods=["GET"])
  278. @requires_authentication
  279. def get_pool(name):
  280. """Get pool by a given name."""
  281. try:
  282. pool = pool_api.get_pool(name=name)
  283. except AirflowException as err:
  284. log.error(err)
  285. response = jsonify(error=f"{err}")
  286. response.status_code = err.status_code
  287. return response
  288. else:
  289. return jsonify(pool.to_json())
  290. @api_experimental.route("/pools", methods=["GET"])
  291. @requires_authentication
  292. def get_pools():
  293. """Get all pools."""
  294. try:
  295. pools = pool_api.get_pools()
  296. except AirflowException as err:
  297. log.error(err)
  298. response = jsonify(error=f"{err}")
  299. response.status_code = err.status_code
  300. return response
  301. else:
  302. return jsonify([p.to_json() for p in pools])
  303. @api_experimental.route("/pools", methods=["POST"])
  304. @requires_authentication
  305. def create_pool():
  306. """Create a pool."""
  307. params = request.get_json(force=True)
  308. try:
  309. pool = pool_api.create_pool(**params)
  310. except AirflowException as err:
  311. log.error(err)
  312. response = jsonify(error=f"{err}")
  313. response.status_code = err.status_code
  314. return response
  315. else:
  316. return jsonify(pool.to_json())
  317. @api_experimental.route("/pools/<string:name>", methods=["DELETE"])
  318. @requires_authentication
  319. def delete_pool(name):
  320. """Delete pool."""
  321. try:
  322. pool = pool_api.delete_pool(name=name)
  323. except AirflowException as err:
  324. log.error(err)
  325. response = jsonify(error=f"{err}")
  326. response.status_code = err.status_code
  327. return response
  328. else:
  329. return jsonify(pool.to_json())
  330. @api_experimental.route("/lineage/<string:dag_id>/<string:execution_date>", methods=["GET"])
  331. @requires_authentication
  332. def get_lineage(dag_id: str, execution_date: str):
  333. """Get Lineage details for a DagRun."""
  334. # Convert string datetime into actual datetime
  335. try:
  336. execution_dt = timezone.parse(execution_date)
  337. except ValueError:
  338. log.error("Given execution date could not be identified as a date.")
  339. error_message = (
  340. f"Given execution date, {execution_date}, could not be identified as a date. "
  341. f"Example date format: 2015-11-16T14:34:15+00:00"
  342. )
  343. response = jsonify({"error": error_message})
  344. response.status_code = 400
  345. return response
  346. try:
  347. lineage = get_lineage_api(dag_id=dag_id, execution_date=execution_dt)
  348. except AirflowException as err:
  349. log.error(err)
  350. response = jsonify(error=f"{err}")
  351. response.status_code = err.status_code
  352. return response
  353. else:
  354. return jsonify(lineage)