ftp.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. #
  2. # Licensed to the Apache Software Foundation (ASF) under one
  3. # or more contributor license agreements. See the NOTICE file
  4. # distributed with this work for additional information
  5. # regarding copyright ownership. The ASF licenses this file
  6. # to you under the Apache License, Version 2.0 (the
  7. # "License"); you may not use this file except in compliance
  8. # with the License. You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing,
  13. # software distributed under the License is distributed on an
  14. # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  15. # KIND, either express or implied. See the License for the
  16. # specific language governing permissions and limitations
  17. # under the License.
  18. from __future__ import annotations
  19. import datetime
  20. import ftplib # nosec: B402
  21. import logging
  22. from typing import Any, Callable
  23. from airflow.hooks.base import BaseHook
  24. logger = logging.getLogger(__name__)
  25. class FTPHook(BaseHook):
  26. """
  27. Interact with FTP.
  28. Errors that may occur throughout but should be handled downstream.
  29. You can specify mode for data transfers in the extra field of your
  30. connection as ``{"passive": "true"}``.
  31. :param ftp_conn_id: The :ref:`ftp connection id <howto/connection:ftp>`
  32. reference.
  33. """
  34. conn_name_attr = "ftp_conn_id"
  35. default_conn_name = "ftp_default"
  36. conn_type = "ftp"
  37. hook_name = "FTP"
  38. def __init__(self, ftp_conn_id: str = default_conn_name) -> None:
  39. super().__init__()
  40. self.ftp_conn_id = ftp_conn_id
  41. self.conn: ftplib.FTP | None = None
  42. def __enter__(self):
  43. return self
  44. def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
  45. if self.conn is not None:
  46. self.close_conn()
  47. def get_conn(self) -> ftplib.FTP:
  48. """Return an FTP connection object."""
  49. if self.conn is None:
  50. params = self.get_connection(self.ftp_conn_id)
  51. pasv = params.extra_dejson.get("passive", True)
  52. self.conn = ftplib.FTP() # nosec: B321
  53. if params.host:
  54. port = ftplib.FTP_PORT
  55. if params.port is not None:
  56. port = params.port
  57. logger.info("Connecting via FTP to %s:%d", params.host, port)
  58. self.conn.connect(params.host, port)
  59. if params.login:
  60. self.conn.login(params.login, params.password)
  61. self.conn.set_pasv(pasv)
  62. return self.conn
  63. def close_conn(self):
  64. """Close the connection; an error will occur if the connection was never opened."""
  65. conn = self.conn
  66. conn.quit()
  67. self.conn = None
  68. def describe_directory(self, path: str) -> dict:
  69. """
  70. Return a dictionary of {filename: {attributes}} for all files on a remote system which supports MLSD.
  71. :param path: full path to the remote directory
  72. """
  73. conn = self.get_conn()
  74. return dict(conn.mlsd(path))
  75. def list_directory(self, path: str) -> list[str]:
  76. """
  77. Return a list of files on the remote system.
  78. :param path: full path to the remote directory to list
  79. """
  80. conn = self.get_conn()
  81. return conn.nlst(path)
  82. def create_directory(self, path: str) -> None:
  83. """
  84. Create a directory on the remote system.
  85. :param path: full path to the remote directory to create
  86. """
  87. conn = self.get_conn()
  88. conn.mkd(path)
  89. def delete_directory(self, path: str) -> None:
  90. """
  91. Delete a directory on the remote system.
  92. :param path: full path to the remote directory to delete
  93. """
  94. conn = self.get_conn()
  95. conn.rmd(path)
  96. def retrieve_file(
  97. self,
  98. remote_full_path: str,
  99. local_full_path_or_buffer: Any,
  100. callback: Callable | None = None,
  101. block_size: int = 8192,
  102. ) -> None:
  103. """
  104. Transfer the remote file to a local location.
  105. If local_full_path_or_buffer is a string path, the file will be put
  106. at that location; if it is a file-like buffer, the file will
  107. be written to the buffer but not closed.
  108. :param remote_full_path: full path to the remote file
  109. :param local_full_path_or_buffer: full path to the local file or a
  110. file-like buffer
  111. :param callback: callback which is called each time a block of data
  112. is read. if you do not use a callback, these blocks will be written
  113. to the file or buffer passed in. if you do pass in a callback, note
  114. that writing to a file or buffer will need to be handled inside the
  115. callback.
  116. [default: output_handle.write()]
  117. :param block_size: file is transferred in chunks of default size 8192
  118. or as set by user
  119. .. code-block:: python
  120. hook = FTPHook(ftp_conn_id="my_conn")
  121. remote_path = "/path/to/remote/file"
  122. local_path = "/path/to/local/file"
  123. # with a custom callback (in this case displaying progress on each read)
  124. def print_progress(percent_progress):
  125. self.log.info("Percent Downloaded: %s%%" % percent_progress)
  126. total_downloaded = 0
  127. total_file_size = hook.get_size(remote_path)
  128. output_handle = open(local_path, "wb")
  129. def write_to_file_with_progress(data):
  130. total_downloaded += len(data)
  131. output_handle.write(data)
  132. percent_progress = (total_downloaded / total_file_size) * 100
  133. print_progress(percent_progress)
  134. hook.retrieve_file(remote_path, None, callback=write_to_file_with_progress)
  135. # without a custom callback data is written to the local_path
  136. hook.retrieve_file(remote_path, local_path)
  137. """
  138. conn = self.get_conn()
  139. is_path = isinstance(local_full_path_or_buffer, str)
  140. # without a callback, default to writing to a user-provided file or
  141. # file-like buffer
  142. if not callback:
  143. if is_path:
  144. output_handle = open(local_full_path_or_buffer, "wb")
  145. else:
  146. output_handle = local_full_path_or_buffer
  147. callback = output_handle.write
  148. self.log.info("Retrieving file from FTP: %s", remote_full_path)
  149. conn.retrbinary(f"RETR {remote_full_path}", callback, block_size)
  150. self.log.info("Finished retrieving file from FTP: %s", remote_full_path)
  151. if is_path and output_handle:
  152. output_handle.close()
  153. def store_file(
  154. self, remote_full_path: str, local_full_path_or_buffer: Any, block_size: int = 8192
  155. ) -> None:
  156. """
  157. Transfers a local file to the remote location.
  158. If local_full_path_or_buffer is a string path, the file will be read
  159. from that location; if it is a file-like buffer, the file will
  160. be read from the buffer but not closed.
  161. :param remote_full_path: full path to the remote file
  162. :param local_full_path_or_buffer: full path to the local file or a
  163. file-like buffer
  164. :param block_size: file is transferred in chunks of default size 8192
  165. or as set by user
  166. """
  167. conn = self.get_conn()
  168. is_path = isinstance(local_full_path_or_buffer, str)
  169. if is_path:
  170. input_handle = open(local_full_path_or_buffer, "rb")
  171. else:
  172. input_handle = local_full_path_or_buffer
  173. conn.storbinary(f"STOR {remote_full_path}", input_handle, block_size)
  174. if is_path:
  175. input_handle.close()
  176. def delete_file(self, path: str) -> None:
  177. """
  178. Remove a file on the FTP Server.
  179. :param path: full path to the remote file
  180. """
  181. conn = self.get_conn()
  182. conn.delete(path)
  183. def rename(self, from_name: str, to_name: str) -> str:
  184. """
  185. Rename a file.
  186. :param from_name: rename file from name
  187. :param to_name: rename file to name
  188. """
  189. conn = self.get_conn()
  190. return conn.rename(from_name, to_name)
  191. def get_mod_time(self, path: str) -> datetime.datetime:
  192. """
  193. Return a datetime object representing the last time the file was modified.
  194. :param path: remote file path
  195. """
  196. conn = self.get_conn()
  197. ftp_mdtm = conn.sendcmd("MDTM " + path)
  198. time_val = ftp_mdtm[4:]
  199. # time_val optionally has microseconds
  200. try:
  201. return datetime.datetime.strptime(time_val, "%Y%m%d%H%M%S.%f")
  202. except ValueError:
  203. return datetime.datetime.strptime(time_val, "%Y%m%d%H%M%S")
  204. def get_size(self, path: str) -> int | None:
  205. """
  206. Return the size of a file (in bytes).
  207. :param path: remote file path
  208. """
  209. conn = self.get_conn()
  210. size = conn.size(path)
  211. return int(size) if size else None
  212. def test_connection(self) -> tuple[bool, str]:
  213. """Test the FTP connection by calling path with directory."""
  214. try:
  215. conn = self.get_conn()
  216. conn.pwd
  217. return True, "Connection successfully tested"
  218. except Exception as e:
  219. return False, str(e)
  220. class FTPSHook(FTPHook):
  221. """Interact with FTPS."""
  222. def get_conn(self) -> ftplib.FTP:
  223. """Return an FTPS connection object."""
  224. import ssl
  225. if self.conn is None:
  226. params = self.get_connection(self.ftp_conn_id)
  227. pasv = params.extra_dejson.get("passive", True)
  228. if params.port:
  229. ftplib.FTP_TLS.port = params.port
  230. # Construct FTP_TLS instance with SSL context to allow certificates to be validated by default
  231. context = ssl.create_default_context()
  232. self.conn = ftplib.FTP_TLS(params.host, params.login, params.password, context=context) # nosec: B321
  233. self.conn.set_pasv(pasv)
  234. return self.conn