| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434 |
- #
- # Licensed to the Apache Software Foundation (ASF) under one
- # or more contributor license agreements. See the NOTICE file
- # distributed with this work for additional information
- # regarding copyright ownership. The ASF licenses this file
- # to you under the Apache License, Version 2.0 (the
- # "License"); you may not use this file except in compliance
- # with the License. You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing,
- # software distributed under the License is distributed on an
- # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- # KIND, either express or implied. See the License for the
- # specific language governing permissions and limitations
- # under the License.
- from __future__ import annotations
- import logging
- from functools import wraps
- from typing import TYPE_CHECKING, Callable, TypeVar, cast
- from flask import Blueprint, current_app, g, jsonify, request, url_for
- from airflow import models
- from airflow.api.common import delete_dag as delete, trigger_dag as trigger
- from airflow.api.common.experimental import pool as pool_api
- from airflow.api.common.experimental.get_code import get_code
- from airflow.api.common.experimental.get_dag_run_state import get_dag_run_state
- from airflow.api.common.experimental.get_dag_runs import get_dag_runs
- from airflow.api.common.experimental.get_lineage import get_lineage as get_lineage_api
- from airflow.api.common.experimental.get_task import get_task
- from airflow.api.common.experimental.get_task_instance import get_task_instance
- from airflow.exceptions import AirflowException
- from airflow.utils import timezone
- from airflow.utils.docs import get_docs_url
- from airflow.utils.strings import to_boolean
- from airflow.version import version
- if TYPE_CHECKING:
- from flask import Response
- log = logging.getLogger(__name__)
- T = TypeVar("T", bound=Callable)
- def requires_authentication(function: T):
- """Mark a function as requiring authentication."""
- @wraps(function)
- def decorated(*args, **kwargs):
- auth = current_app.api_auth[0]
- return auth.requires_authentication(function)(*args, **kwargs)
- return cast(T, decorated)
- api_experimental = Blueprint("api_experimental", __name__)
- def add_deprecation_headers(response: Response):
- """
- Add Deprecation HTTP Header Field.
- .. seealso:: IETF proposal for the header field
- `here <https://datatracker.ietf.org/doc/draft-dalal-deprecation-header/>`_.
- """
- response.headers["Deprecation"] = "true"
- doc_url = get_docs_url("upgrading-to-2.html#migration-guide-from-experimental-api-to-stable-api-v1")
- deprecation_link = f'<{doc_url}>; rel="deprecation"; type="text/html"'
- if "link" in response.headers:
- response.headers["Link"] += f", {deprecation_link}"
- else:
- response.headers["Link"] = f"{deprecation_link}"
- return response
- # This API is deprecated. We do not care too much about typing here
- api_experimental.after_request(add_deprecation_headers) # type: ignore[arg-type]
- @api_experimental.route("/dags/<string:dag_id>/dag_runs", methods=["POST"])
- @requires_authentication
- def trigger_dag(dag_id):
- """Trigger a new dag run for a Dag with an execution date of now unless specified in the data."""
- data = request.get_json(force=True)
- run_id = None
- if "run_id" in data:
- run_id = data["run_id"]
- conf = None
- if "conf" in data:
- conf = data["conf"]
- if not isinstance(conf, dict):
- error_message = "Dag Run conf must be a dictionary object, other types are not supported"
- log.error(error_message)
- response = jsonify({"error": error_message})
- response.status_code = 400
- return response
- execution_date = None
- if "execution_date" in data and data["execution_date"] is not None:
- execution_date = data["execution_date"]
- # Convert string datetime into actual datetime
- try:
- execution_date = timezone.parse(execution_date)
- except ValueError:
- log.error("Given execution date could not be identified as a date.")
- error_message = (
- f"Given execution date, {execution_date}, could not be identified as a date. "
- f"Example date format: 2015-11-16T14:34:15+00:00"
- )
- response = jsonify({"error": error_message})
- response.status_code = 400
- return response
- replace_microseconds = execution_date is None
- if "replace_microseconds" in data:
- replace_microseconds = to_boolean(data["replace_microseconds"])
- try:
- dr = trigger.trigger_dag(dag_id, run_id, conf, execution_date, replace_microseconds)
- except AirflowException as err:
- log.error(err)
- response = jsonify(error=f"{err}")
- response.status_code = err.status_code
- return response
- if getattr(g, "user", None):
- log.info("User %s created %s", g.user, dr)
- response = jsonify(
- message=f"Created {dr}", execution_date=dr.execution_date.isoformat(), run_id=dr.run_id
- )
- return response
- @api_experimental.route("/dags/<string:dag_id>", methods=["DELETE"])
- @requires_authentication
- def delete_dag(dag_id):
- """Delete all DB records related to the specified Dag."""
- try:
- count = delete.delete_dag(dag_id)
- except AirflowException as err:
- log.error(err)
- response = jsonify(error=f"{err}")
- response.status_code = err.status_code
- return response
- return jsonify(message=f"Removed {count} record(s)", count=count)
- @api_experimental.route("/dags/<string:dag_id>/dag_runs", methods=["GET"])
- @requires_authentication
- def dag_runs(dag_id):
- """
- Return a list of Dag Runs for a specific DAG ID.
- :query param state: a query string parameter '?state=queued|running|success...'
- :param dag_id: String identifier of a DAG
- :return: List of DAG runs of a DAG with requested state,
- or all runs if the state is not specified
- """
- try:
- state = request.args.get("state")
- dagruns = get_dag_runs(dag_id, state)
- except AirflowException as err:
- log.info(err)
- response = jsonify(error=f"{err}")
- response.status_code = 400
- return response
- return jsonify(dagruns)
- @api_experimental.route("/test", methods=["GET"])
- @requires_authentication
- def test():
- """Test endpoint to check authentication."""
- return jsonify(status="OK")
- @api_experimental.route("/info", methods=["GET"])
- @requires_authentication
- def info():
- """Get Airflow Version."""
- return jsonify(version=version)
- @api_experimental.route("/dags/<string:dag_id>/code", methods=["GET"])
- @requires_authentication
- def get_dag_code(dag_id):
- """Return python code of a given dag_id."""
- try:
- return get_code(dag_id)
- except AirflowException as err:
- log.info(err)
- response = jsonify(error=f"{err}")
- response.status_code = err.status_code
- return response
- @api_experimental.route("/dags/<string:dag_id>/tasks/<string:task_id>", methods=["GET"])
- @requires_authentication
- def task_info(dag_id, task_id):
- """Return a JSON with a task's public instance variables."""
- try:
- t_info = get_task(dag_id, task_id)
- except AirflowException as err:
- log.info(err)
- response = jsonify(error=f"{err}")
- response.status_code = err.status_code
- return response
- # JSONify and return.
- fields = {k: str(v) for k, v in vars(t_info).items() if not k.startswith("_")}
- return jsonify(fields)
- # ToDo: Shouldn't this be a PUT method?
- @api_experimental.route("/dags/<string:dag_id>/paused/<string:paused>", methods=["GET"])
- @requires_authentication
- def dag_paused(dag_id, paused):
- """(Un)pause a dag."""
- is_paused = bool(paused == "true")
- models.DagModel.get_dagmodel(dag_id).set_is_paused(
- is_paused=is_paused,
- )
- return jsonify({"response": "ok"})
- @api_experimental.route("/dags/<string:dag_id>/paused", methods=["GET"])
- @requires_authentication
- def dag_is_paused(dag_id):
- """Get paused state of a dag."""
- is_paused = models.DagModel.get_dagmodel(dag_id).is_paused
- return jsonify({"is_paused": is_paused})
- @api_experimental.route(
- "/dags/<string:dag_id>/dag_runs/<string:execution_date>/tasks/<string:task_id>", methods=["GET"]
- )
- @requires_authentication
- def task_instance_info(dag_id, execution_date, task_id):
- """
- Return a JSON with a task instance's public instance variables.
- The format for the exec_date is expected to be
- "YYYY-mm-DDTHH:MM:SS", for example: "2016-11-16T11:34:15". This will
- of course need to have been encoded for URL in the request.
- """
- # Convert string datetime into actual datetime
- try:
- execution_date = timezone.parse(execution_date)
- except ValueError:
- log.error("Given execution date could not be identified as a date.")
- error_message = (
- f"Given execution date, {execution_date}, could not be identified as a date. "
- f"Example date format: 2015-11-16T14:34:15+00:00"
- )
- response = jsonify({"error": error_message})
- response.status_code = 400
- return response
- try:
- ti_info = get_task_instance(dag_id, task_id, execution_date)
- except AirflowException as err:
- log.info(err)
- response = jsonify(error=f"{err}")
- response.status_code = err.status_code
- return response
- # JSONify and return.
- fields = {k: str(v) for k, v in vars(ti_info).items() if not k.startswith("_")}
- return jsonify(fields)
- @api_experimental.route("/dags/<string:dag_id>/dag_runs/<string:execution_date>", methods=["GET"])
- @requires_authentication
- def dag_run_status(dag_id, execution_date):
- """
- Return a JSON with a dag_run's public instance variables.
- The format for the exec_date is expected to be
- "YYYY-mm-DDTHH:MM:SS", for example: "2016-11-16T11:34:15". This will
- of course need to have been encoded for URL in the request.
- """
- # Convert string datetime into actual datetime
- try:
- execution_date = timezone.parse(execution_date)
- except ValueError:
- log.error("Given execution date could not be identified as a date.")
- error_message = (
- f"Given execution date, {execution_date}, could not be identified as a date. "
- f"Example date format: 2015-11-16T14:34:15+00:00"
- )
- response = jsonify({"error": error_message})
- response.status_code = 400
- return response
- try:
- dr_info = get_dag_run_state(dag_id, execution_date)
- except AirflowException as err:
- log.info(err)
- response = jsonify(error=f"{err}")
- response.status_code = err.status_code
- return response
- return jsonify(dr_info)
- @api_experimental.route("/latest_runs", methods=["GET"])
- @requires_authentication
- def latest_dag_runs():
- """Return the latest DagRun for each DAG formatted for the UI."""
- from airflow.models import DagRun
- dagruns = DagRun.get_latest_runs()
- payload = []
- for dagrun in dagruns:
- if dagrun.execution_date:
- payload.append(
- {
- "dag_id": dagrun.dag_id,
- "execution_date": dagrun.execution_date.isoformat(),
- "start_date": ((dagrun.start_date or "") and dagrun.start_date.isoformat()),
- "dag_run_url": url_for(
- "Airflow.graph", dag_id=dagrun.dag_id, execution_date=dagrun.execution_date
- ),
- }
- )
- return jsonify(items=payload) # old flask versions don't support jsonifying arrays
- @api_experimental.route("/pools/<string:name>", methods=["GET"])
- @requires_authentication
- def get_pool(name):
- """Get pool by a given name."""
- try:
- pool = pool_api.get_pool(name=name)
- except AirflowException as err:
- log.error(err)
- response = jsonify(error=f"{err}")
- response.status_code = err.status_code
- return response
- else:
- return jsonify(pool.to_json())
- @api_experimental.route("/pools", methods=["GET"])
- @requires_authentication
- def get_pools():
- """Get all pools."""
- try:
- pools = pool_api.get_pools()
- except AirflowException as err:
- log.error(err)
- response = jsonify(error=f"{err}")
- response.status_code = err.status_code
- return response
- else:
- return jsonify([p.to_json() for p in pools])
- @api_experimental.route("/pools", methods=["POST"])
- @requires_authentication
- def create_pool():
- """Create a pool."""
- params = request.get_json(force=True)
- try:
- pool = pool_api.create_pool(**params)
- except AirflowException as err:
- log.error(err)
- response = jsonify(error=f"{err}")
- response.status_code = err.status_code
- return response
- else:
- return jsonify(pool.to_json())
- @api_experimental.route("/pools/<string:name>", methods=["DELETE"])
- @requires_authentication
- def delete_pool(name):
- """Delete pool."""
- try:
- pool = pool_api.delete_pool(name=name)
- except AirflowException as err:
- log.error(err)
- response = jsonify(error=f"{err}")
- response.status_code = err.status_code
- return response
- else:
- return jsonify(pool.to_json())
- @api_experimental.route("/lineage/<string:dag_id>/<string:execution_date>", methods=["GET"])
- @requires_authentication
- def get_lineage(dag_id: str, execution_date: str):
- """Get Lineage details for a DagRun."""
- # Convert string datetime into actual datetime
- try:
- execution_dt = timezone.parse(execution_date)
- except ValueError:
- log.error("Given execution date could not be identified as a date.")
- error_message = (
- f"Given execution date, {execution_date}, could not be identified as a date. "
- f"Example date format: 2015-11-16T14:34:15+00:00"
- )
- response = jsonify({"error": error_message})
- response.status_code = 400
- return response
- try:
- lineage = get_lineage_api(dag_id=dag_id, execution_date=execution_dt)
- except AirflowException as err:
- log.error(err)
- response = jsonify(error=f"{err}")
- response.status_code = err.status_code
- return response
- else:
- return jsonify(lineage)
|