json_client.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. """JSON API Client."""
  19. from __future__ import annotations
  20. from urllib.parse import urljoin
  21. from airflow.api.client import api_client
  22. class Client(api_client.Client):
  23. """
  24. Json API client implementation.
  25. This client is used to interact with a Json API server and perform various actions
  26. such as triggering DAG runs,deleting DAGs, interacting with pools, and getting lineage information.
  27. """
  28. def _request(self, url: str, json=None, method: str = "GET") -> dict:
  29. """
  30. Make a request to the Json API server.
  31. :param url: The URL to send the request to.
  32. :param method: The HTTP method to use (e.g. "GET", "POST", "DELETE").
  33. :param json: A dictionary containing JSON data to send in the request body.
  34. :return: A dictionary containing the JSON response from the server.
  35. :raises OSError: If the server returns an error status.
  36. """
  37. params = {
  38. "url": url,
  39. }
  40. if json is not None:
  41. params["json"] = json
  42. resp = getattr(self._session, method.lower())(**params)
  43. if resp.is_error:
  44. # It is justified here because there might be many resp types.
  45. try:
  46. data = resp.json()
  47. except Exception:
  48. data = {}
  49. raise OSError(data.get("error", "Server error"))
  50. return resp.json()
  51. def trigger_dag(self, dag_id, run_id=None, conf=None, execution_date=None, replace_microseconds=True):
  52. """
  53. Trigger a DAG run.
  54. :param dag_id: The ID of the DAG to trigger.
  55. :param run_id: The ID of the DAG run to create. If not provided, a default ID will be generated.
  56. :param conf: A dictionary containing configuration data to pass to the DAG run.
  57. :param execution_date: The execution date for the DAG run, in the format "YYYY-MM-DDTHH:MM:SS".
  58. :param replace_microseconds: Whether to replace microseconds in the execution date with zeros.
  59. :return: A message indicating the status of the DAG run trigger.
  60. """
  61. endpoint = f"/api/experimental/dags/{dag_id}/dag_runs"
  62. url = urljoin(self._api_base_url, endpoint)
  63. data = {
  64. "run_id": run_id,
  65. "conf": conf,
  66. "execution_date": execution_date,
  67. "replace_microseconds": replace_microseconds,
  68. }
  69. return self._request(url, method="POST", json=data)["message"]
  70. def delete_dag(self, dag_id: str):
  71. """
  72. Delete a DAG.
  73. :param dag_id: The ID of the DAG to delete.
  74. :return: A message indicating the status of the DAG delete operation.
  75. """
  76. endpoint = f"/api/experimental/dags/{dag_id}/delete_dag"
  77. url = urljoin(self._api_base_url, endpoint)
  78. data = self._request(url, method="DELETE")
  79. return data["message"]
  80. def get_pool(self, name: str):
  81. """
  82. Get information about a specific pool.
  83. :param name: The name of the pool to retrieve information for.
  84. :return: A tuple containing the name of the pool, the number of
  85. slots in the pool, and a description of the pool.
  86. """
  87. endpoint = f"/api/experimental/pools/{name}"
  88. url = urljoin(self._api_base_url, endpoint)
  89. pool = self._request(url)
  90. return pool["pool"], pool["slots"], pool["description"]
  91. def get_pools(self):
  92. """
  93. Get a list of all pools.
  94. :return: A list of tuples, each containing the name of a pool,
  95. the number of slots in the pool, and a description of the pool.
  96. """
  97. endpoint = "/api/experimental/pools"
  98. url = urljoin(self._api_base_url, endpoint)
  99. pools = self._request(url)
  100. return [(p["pool"], p["slots"], p["description"]) for p in pools]
  101. def create_pool(self, name: str, slots: int, description: str, include_deferred: bool):
  102. """
  103. Create a new pool.
  104. :param name: The name of the pool to create.
  105. :param slots: The number of slots in the pool.
  106. :param description: A description of the pool.
  107. :param include_deferred: include deferred tasks in pool calculations
  108. :return: A tuple containing the name of the pool, the number of slots in the pool,
  109. a description of the pool and the include_deferred flag.
  110. """
  111. endpoint = "/api/experimental/pools"
  112. data = {
  113. "name": name,
  114. "slots": slots,
  115. "description": description,
  116. "include_deferred": include_deferred,
  117. }
  118. response = self._request(urljoin(self._api_base_url, endpoint), method="POST", json=data)
  119. return response["pool"], response["slots"], response["description"], response["include_deferred"]
  120. def delete_pool(self, name: str):
  121. """
  122. Delete a pool.
  123. :param name: The name of the pool to delete.
  124. :return: A tuple containing the name of the pool, the number
  125. of slots in the pool, and a description of the pool.
  126. """
  127. endpoint = f"/api/experimental/pools/{name}"
  128. url = urljoin(self._api_base_url, endpoint)
  129. pool = self._request(url, method="DELETE")
  130. return pool["pool"], pool["slots"], pool["description"]
  131. def get_lineage(self, dag_id: str, execution_date: str):
  132. """
  133. Get the lineage of a DAG run.
  134. :param dag_id: The ID of the DAG.
  135. :param execution_date: The execution date of the DAG run, in the format "YYYY-MM-DDTHH:MM:SS".
  136. :return: A message indicating the status of the lineage request.
  137. """
  138. endpoint = f"/api/experimental/lineage/{dag_id}/{execution_date}"
  139. url = urljoin(self._api_base_url, endpoint)
  140. data = self._request(url, method="GET")
  141. return data["message"]