http.py 20 KB


  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. from typing import TYPE_CHECKING, Any, Callable
  20. from urllib.parse import urlparse
  21. import aiohttp
  22. import tenacity
  23. from aiohttp import ClientResponseError
  24. from asgiref.sync import sync_to_async
  25. from requests import PreparedRequest, Request, Response, Session
  26. from requests.auth import HTTPBasicAuth
  27. from requests.exceptions import ConnectionError, HTTPError
  28. from requests.models import DEFAULT_REDIRECT_LIMIT
  29. from requests_toolbelt.adapters.socket_options import TCPKeepAliveAdapter
  30. from airflow.exceptions import AirflowException
  31. from airflow.hooks.base import BaseHook
  32. from airflow.providers.http.exceptions import HttpErrorException, HttpMethodException
  33. if TYPE_CHECKING:
  34. from aiohttp.client_reqrep import ClientResponse
  35. from requests.adapters import HTTPAdapter
  36. from airflow.models import Connection
  37. def _url_from_endpoint(base_url: str | None, endpoint: str | None) -> str:
  38. """Combine base url with endpoint."""
  39. if base_url and not base_url.endswith("/") and endpoint and not endpoint.startswith("/"):
  40. return f"{base_url}/{endpoint}"
  41. return (base_url or "") + (endpoint or "")
  42. def _process_extra_options_from_connection(conn: Connection, extra_options: dict[str, Any]) -> dict:
  43. extra = conn.extra_dejson
  44. stream = extra.pop("stream", None)
  45. cert = extra.pop("cert", None)
  46. proxies = extra.pop("proxies", extra.pop("proxy", None))
  47. timeout = extra.pop("timeout", None)
  48. verify_ssl = extra.pop("verify", extra.pop("verify_ssl", None))
  49. allow_redirects = extra.pop("allow_redirects", None)
  50. max_redirects = extra.pop("max_redirects", None)
  51. trust_env = extra.pop("trust_env", None)
  52. check_response = extra.pop("check_response", None)
  53. if stream is not None and "stream" not in extra_options:
  54. extra_options["stream"] = stream
  55. if cert is not None and "cert" not in extra_options:
  56. extra_options["cert"] = cert
  57. if proxies is not None and "proxy" not in extra_options:
  58. extra_options["proxy"] = proxies
  59. if timeout is not None and "timeout" not in extra_options:
  60. extra_options["timeout"] = timeout
  61. if verify_ssl is not None and "verify_ssl" not in extra_options:
  62. extra_options["verify_ssl"] = verify_ssl
  63. if allow_redirects is not None and "allow_redirects" not in extra_options:
  64. extra_options["allow_redirects"] = allow_redirects
  65. if max_redirects is not None and "max_redirects" not in extra_options:
  66. extra_options["max_redirects"] = max_redirects
  67. if trust_env is not None and "trust_env" not in extra_options:
  68. extra_options["trust_env"] = trust_env
  69. if check_response is not None and "check_response" not in extra_options:
  70. extra_options["check_response"] = check_response
  71. return extra
  72. class HttpHook(BaseHook):
  73. """
  74. Interact with HTTP servers.
  75. :param method: the API method to be called
  76. :param http_conn_id: :ref:`http connection<howto/connection:http>` that has the base
  77. API url i.e https://www.google.com/ and optional authentication credentials. Default
  78. headers can also be specified in the Extra field in json format.
  79. :param auth_type: The auth type for the service
  80. :param adapter: An optional instance of `requests.adapters.HTTPAdapter` to mount for the session.
  81. :param tcp_keep_alive: Enable TCP Keep Alive for the connection.
  82. :param tcp_keep_alive_idle: The TCP Keep Alive Idle parameter (corresponds to ``socket.TCP_KEEPIDLE``).
  83. :param tcp_keep_alive_count: The TCP Keep Alive count parameter (corresponds to ``socket.TCP_KEEPCNT``)
  84. :param tcp_keep_alive_interval: The TCP Keep Alive interval parameter (corresponds to
  85. ``socket.TCP_KEEPINTVL``)
  86. :param auth_args: extra arguments used to initialize the auth_type if different than default HTTPBasicAuth
  87. """
  88. conn_name_attr = "http_conn_id"
  89. default_conn_name = "http_default"
  90. conn_type = "http"
  91. hook_name = "HTTP"
  92. default_host = ""
  93. default_headers: dict[str, str] = {}
  94. def __init__(
  95. self,
  96. method: str = "POST",
  97. http_conn_id: str = default_conn_name,
  98. auth_type: Any = None,
  99. tcp_keep_alive: bool = True,
  100. tcp_keep_alive_idle: int = 120,
  101. tcp_keep_alive_count: int = 20,
  102. tcp_keep_alive_interval: int = 30,
  103. adapter: HTTPAdapter | None = None,
  104. ) -> None:
  105. super().__init__()
  106. self.http_conn_id = http_conn_id
  107. self.method = method.upper()
  108. self.base_url: str = ""
  109. self._retry_obj: Callable[..., Any]
  110. self._auth_type: Any = auth_type
  111. # If no adapter is provided, use TCPKeepAliveAdapter (default behavior)
  112. self.adapter = adapter
  113. if tcp_keep_alive and adapter is None:
  114. self.keep_alive_adapter = TCPKeepAliveAdapter(
  115. idle=tcp_keep_alive_idle,
  116. count=tcp_keep_alive_count,
  117. interval=tcp_keep_alive_interval,
  118. )
  119. else:
  120. self.keep_alive_adapter = None
  121. @property
  122. def auth_type(self):
  123. return self._auth_type or HTTPBasicAuth
  124. @auth_type.setter
  125. def auth_type(self, v):
  126. self._auth_type = v
  127. # headers may be passed through directly or in the "extra" field in the connection
  128. # definition
  129. def get_conn(
  130. self, headers: dict[Any, Any] | None = None, extra_options: dict[str, Any] | None = None
  131. ) -> Session:
  132. """
  133. Create a Requests HTTP session.
  134. :param headers: Additional headers to be passed through as a dictionary.
  135. :param extra_options: additional options to be used when executing the request
  136. :return: A configured requests.Session object.
  137. """
  138. session = Session()
  139. connection = self.get_connection(self.http_conn_id)
  140. self._set_base_url(connection)
  141. session = self._configure_session_from_auth(session, connection)
  142. if connection.extra:
  143. session = self._configure_session_from_extra(session, connection, extra_options)
  144. session = self._configure_session_from_mount_adapters(session)
  145. if self.default_headers:
  146. session.headers.update(self.default_headers)
  147. if headers:
  148. session.headers.update(headers)
  149. return session
  150. def _set_base_url(self, connection: Connection) -> None:
  151. host = connection.host or self.default_host
  152. schema = connection.schema or "http"
  153. # RFC 3986 (https://www.rfc-editor.org/rfc/rfc3986.html#page-16)
  154. if "://" in host:
  155. self.base_url = host
  156. else:
  157. self.base_url = f"{schema}://{host}" if host else f"{schema}://"
  158. if connection.port:
  159. self.base_url = f"{self.base_url}:{connection.port}"
  160. parsed = urlparse(self.base_url)
  161. if not parsed.scheme:
  162. raise ValueError(f"Invalid base URL: Missing scheme in {self.base_url}")
  163. def _configure_session_from_auth(self, session: Session, connection: Connection) -> Session:
  164. session.auth = self._extract_auth(connection)
  165. return session
  166. def _extract_auth(self, connection: Connection) -> Any | None:
  167. if connection.login:
  168. return self.auth_type(connection.login, connection.password)
  169. elif self._auth_type:
  170. return self.auth_type()
  171. return None
  172. def _configure_session_from_extra(
  173. self, session: Session, connection: Connection, extra_options: dict[str, Any] | None = None
  174. ) -> Session:
  175. if extra_options is None:
  176. extra_options = {}
  177. headers = _process_extra_options_from_connection(connection, extra_options)
  178. session.proxies = extra_options.pop("proxies", extra_options.pop("proxy", {}))
  179. session.stream = extra_options.pop("stream", False)
  180. session.verify = extra_options.pop("verify", extra_options.pop("verify_ssl", True))
  181. session.cert = extra_options.pop("cert", None)
  182. session.max_redirects = extra_options.pop("max_redirects", DEFAULT_REDIRECT_LIMIT)
  183. session.trust_env = extra_options.pop("trust_env", True)
  184. try:
  185. session.headers.update(headers)
  186. except TypeError:
  187. self.log.warning("Connection to %s has invalid extra field.", connection.host)
  188. return session
  189. def _configure_session_from_mount_adapters(self, session: Session) -> Session:
  190. scheme = urlparse(self.base_url).scheme
  191. if not scheme:
  192. raise ValueError(
  193. f"Cannot mount adapters: {self.base_url} does not include a valid scheme (http or https)."
  194. )
  195. if self.adapter:
  196. session.mount(f"{scheme}://", self.adapter)
  197. elif self.keep_alive_adapter:
  198. session.mount("http://", self.keep_alive_adapter)
  199. session.mount("https://", self.keep_alive_adapter)
  200. return session
  201. def run(
  202. self,
  203. endpoint: str | None = None,
  204. data: dict[str, Any] | str | None = None,
  205. headers: dict[str, Any] | None = None,
  206. extra_options: dict[str, Any] | None = None,
  207. **request_kwargs: Any,
  208. ) -> Any:
  209. r"""
  210. Perform the request.
  211. :param endpoint: the endpoint to be called i.e. resource/v1/query?
  212. :param data: payload to be uploaded or request parameters
  213. :param headers: additional headers to be passed through as a dictionary
  214. :param extra_options: additional options to be used when executing the request
  215. i.e. {'check_response': False} to avoid checking raising exceptions on non
  216. 2XX or 3XX status codes
  217. :param request_kwargs: Additional kwargs to pass when creating a request.
  218. For example, ``run(json=obj)`` is passed as ``requests.Request(json=obj)``
  219. """
  220. extra_options = extra_options or {}
  221. session = self.get_conn(headers, extra_options)
  222. url = self.url_from_endpoint(endpoint)
  223. if self.method == "GET":
  224. # GET uses params
  225. req = Request(self.method, url, params=data, headers=headers, **request_kwargs)
  226. elif self.method == "HEAD":
  227. # HEAD doesn't use params
  228. req = Request(self.method, url, headers=headers, **request_kwargs)
  229. else:
  230. # Others use data
  231. req = Request(self.method, url, data=data, headers=headers, **request_kwargs)
  232. prepped_request = session.prepare_request(req)
  233. self.log.debug("Sending '%s' to url: %s", self.method, url)
  234. return self.run_and_check(session, prepped_request, extra_options)
  235. def check_response(self, response: Response) -> None:
  236. """
  237. Check the status code and raise on failure.
  238. :param response: A requests response object.
  239. :raise AirflowException: If the response contains a status code not
  240. in the 2xx and 3xx range.
  241. """
  242. try:
  243. response.raise_for_status()
  244. except HTTPError:
  245. self.log.error("HTTP error: %s", response.reason)
  246. self.log.error(response.text)
  247. raise AirflowException(str(response.status_code) + ":" + response.reason)
  248. def run_and_check(
  249. self,
  250. session: Session,
  251. prepped_request: PreparedRequest,
  252. extra_options: dict[Any, Any],
  253. ) -> Any:
  254. """
  255. Grab extra options, actually run the request, and check the result.
  256. :param session: the session to be used to execute the request
  257. :param prepped_request: the prepared request generated in run()
  258. :param extra_options: additional options to be used when executing the request
  259. i.e. ``{'check_response': False}`` to avoid checking raising exceptions on non 2XX
  260. or 3XX status codes
  261. """
  262. extra_options = extra_options or {}
  263. settings = session.merge_environment_settings(
  264. prepped_request.url,
  265. proxies=extra_options.get("proxies", {}),
  266. stream=extra_options.get("stream", False),
  267. verify=extra_options.get("verify"),
  268. cert=extra_options.get("cert"),
  269. )
  270. # Send the request.
  271. send_kwargs: dict[str, Any] = {
  272. "timeout": extra_options.get("timeout"),
  273. "allow_redirects": extra_options.get("allow_redirects", True),
  274. }
  275. send_kwargs.update(settings)
  276. try:
  277. response = session.send(prepped_request, **send_kwargs)
  278. if extra_options.get("check_response", True):
  279. self.check_response(response)
  280. return response
  281. except ConnectionError as ex:
  282. self.log.warning("%s Tenacity will retry to execute the operation", ex)
  283. raise ex
  284. def run_with_advanced_retry(self, _retry_args: dict[Any, Any], *args: Any, **kwargs: Any) -> Any:
  285. """
  286. Run the hook with retry.
  287. This is useful for connectors which might be disturbed by intermittent
  288. issues and should not instantly fail.
  289. :param _retry_args: Arguments which define the retry behaviour.
  290. See Tenacity documentation at https://github.com/jd/tenacity
  291. .. code-block:: python
  292. hook = HttpHook(http_conn_id="my_conn", method="GET")
  293. retry_args = dict(
  294. wait=tenacity.wait_exponential(),
  295. stop=tenacity.stop_after_attempt(10),
  296. retry=tenacity.retry_if_exception_type(Exception),
  297. )
  298. hook.run_with_advanced_retry(endpoint="v1/test", _retry_args=retry_args)
  299. """
  300. self._retry_obj = tenacity.Retrying(**_retry_args)
  301. # TODO: remove ignore type when https://github.com/jd/tenacity/issues/428 is resolved
  302. return self._retry_obj(self.run, *args, **kwargs) # type: ignore
  303. def url_from_endpoint(self, endpoint: str | None) -> str:
  304. """Combine base url with endpoint."""
  305. return _url_from_endpoint(base_url=self.base_url, endpoint=endpoint)
  306. def test_connection(self):
  307. """Test HTTP Connection."""
  308. try:
  309. self.run()
  310. return True, "Connection successfully tested"
  311. except Exception as e:
  312. return False, str(e)
  313. class HttpAsyncHook(BaseHook):
  314. """
  315. Interact with HTTP servers asynchronously.
  316. :param method: the API method to be called
  317. :param http_conn_id: http connection id that has the base
  318. API url i.e https://www.google.com/ and optional authentication credentials. Default
  319. headers can also be specified in the Extra field in json format.
  320. :param auth_type: The auth type for the service
  321. """
  322. conn_name_attr = "http_conn_id"
  323. default_conn_name = "http_default"
  324. conn_type = "http"
  325. hook_name = "HTTP"
  326. def __init__(
  327. self,
  328. method: str = "POST",
  329. http_conn_id: str = default_conn_name,
  330. auth_type: Any = aiohttp.BasicAuth,
  331. retry_limit: int = 3,
  332. retry_delay: float = 1.0,
  333. ) -> None:
  334. self.http_conn_id = http_conn_id
  335. self.method = method.upper()
  336. self.base_url: str = ""
  337. self._retry_obj: Callable[..., Any]
  338. self.auth_type: Any = auth_type
  339. if retry_limit < 1:
  340. raise ValueError("Retry limit must be greater than equal to 1")
  341. self.retry_limit = retry_limit
  342. self.retry_delay = retry_delay
  343. async def run(
  344. self,
  345. session: aiohttp.ClientSession,
  346. endpoint: str | None = None,
  347. data: dict[str, Any] | str | None = None,
  348. json: dict[str, Any] | str | None = None,
  349. headers: dict[str, Any] | None = None,
  350. extra_options: dict[str, Any] | None = None,
  351. ) -> ClientResponse:
  352. """
  353. Perform an asynchronous HTTP request call.
  354. :param endpoint: Endpoint to be called, i.e. ``resource/v1/query?``.
  355. :param data: Payload to be uploaded or request parameters.
  356. :param json: Payload to be uploaded as JSON.
  357. :param headers: Additional headers to be passed through as a dict.
  358. :param extra_options: Additional kwargs to pass when creating a request.
  359. For example, ``run(json=obj)`` is passed as
  360. ``aiohttp.ClientSession().get(json=obj)``.
  361. """
  362. extra_options = extra_options or {}
  363. # headers may be passed through directly or in the "extra" field in the connection
  364. # definition
  365. _headers = {}
  366. auth = None
  367. if self.http_conn_id:
  368. conn = await sync_to_async(self.get_connection)(self.http_conn_id)
  369. if conn.host and "://" in conn.host:
  370. self.base_url = conn.host
  371. else:
  372. # schema defaults to HTTP
  373. schema = conn.schema if conn.schema else "http"
  374. host = conn.host if conn.host else ""
  375. self.base_url = schema + "://" + host
  376. if conn.port:
  377. self.base_url += f":{conn.port}"
  378. if conn.login:
  379. auth = self.auth_type(conn.login, conn.password)
  380. if conn.extra:
  381. extra = _process_extra_options_from_connection(conn=conn, extra_options=extra_options)
  382. try:
  383. _headers.update(extra)
  384. except TypeError:
  385. self.log.warning("Connection to %s has invalid extra field.", conn.host)
  386. if headers:
  387. _headers.update(headers)
  388. url = _url_from_endpoint(self.base_url, endpoint)
  389. if self.method == "GET":
  390. request_func = session.get
  391. elif self.method == "POST":
  392. request_func = session.post
  393. elif self.method == "PATCH":
  394. request_func = session.patch
  395. elif self.method == "HEAD":
  396. request_func = session.head
  397. elif self.method == "PUT":
  398. request_func = session.put
  399. elif self.method == "DELETE":
  400. request_func = session.delete
  401. elif self.method == "OPTIONS":
  402. request_func = session.options
  403. else:
  404. raise HttpMethodException(f"Unexpected HTTP Method: {self.method}")
  405. for attempt in range(1, 1 + self.retry_limit):
  406. response = await request_func(
  407. url,
  408. params=data if self.method == "GET" else None,
  409. data=data if self.method in ("POST", "PUT", "PATCH") else None,
  410. json=json,
  411. headers=_headers,
  412. auth=auth,
  413. **extra_options,
  414. )
  415. try:
  416. response.raise_for_status()
  417. except ClientResponseError as e:
  418. self.log.warning(
  419. "[Try %d of %d] Request to %s failed.",
  420. attempt,
  421. self.retry_limit,
  422. url,
  423. )
  424. if not self._retryable_error_async(e) or attempt == self.retry_limit:
  425. self.log.exception("HTTP error with status: %s", e.status)
  426. # In this case, the user probably made a mistake.
  427. # Don't retry.
  428. raise HttpErrorException(f"{e.status}:{e.message}")
  429. else:
  430. return response
  431. raise NotImplementedError # should not reach this, but makes mypy happy
  432. def _retryable_error_async(self, exception: ClientResponseError) -> bool:
  433. """
  434. Determine whether an exception may successful on a subsequent attempt.
  435. It considers the following to be retryable:
  436. - requests_exceptions.ConnectionError
  437. - requests_exceptions.Timeout
  438. - anything with a status code >= 500
  439. Most retryable errors are covered by status code >= 500.
  440. """
  441. if exception.status == 429:
  442. # don't retry for too Many Requests
  443. return False
  444. if exception.status == 413:
  445. # don't retry for payload Too Large
  446. return False
  447. return exception.status >= 500