ftp.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395
  1. import os
  2. import sys
  3. import uuid
  4. import warnings
  5. from ftplib import FTP, FTP_TLS, Error, error_perm
  6. from typing import Any
  7. from ..spec import AbstractBufferedFile, AbstractFileSystem
  8. from ..utils import infer_storage_options, isfilelike
  9. class FTPFileSystem(AbstractFileSystem):
  10. """A filesystem over classic FTP"""
  11. root_marker = "/"
  12. cachable = False
  13. protocol = "ftp"
  14. def __init__(
  15. self,
  16. host,
  17. port=21,
  18. username=None,
  19. password=None,
  20. acct=None,
  21. block_size=None,
  22. tempdir=None,
  23. timeout=30,
  24. encoding="utf-8",
  25. tls=False,
  26. **kwargs,
  27. ):
  28. """
  29. You can use _get_kwargs_from_urls to get some kwargs from
  30. a reasonable FTP url.
  31. Authentication will be anonymous if username/password are not
  32. given.
  33. Parameters
  34. ----------
  35. host: str
  36. The remote server name/ip to connect to
  37. port: int
  38. Port to connect with
  39. username: str or None
  40. If authenticating, the user's identifier
  41. password: str of None
  42. User's password on the server, if using
  43. acct: str or None
  44. Some servers also need an "account" string for auth
  45. block_size: int or None
  46. If given, the read-ahead or write buffer size.
  47. tempdir: str
  48. Directory on remote to put temporary files when in a transaction
  49. timeout: int
  50. Timeout of the ftp connection in seconds
  51. encoding: str
  52. Encoding to use for directories and filenames in FTP connection
  53. tls: bool
  54. Use FTP-TLS, by default False
  55. """
  56. super().__init__(**kwargs)
  57. self.host = host
  58. self.port = port
  59. self.tempdir = tempdir or "/tmp"
  60. self.cred = username or "", password or "", acct or ""
  61. self.timeout = timeout
  62. self.encoding = encoding
  63. if block_size is not None:
  64. self.blocksize = block_size
  65. else:
  66. self.blocksize = 2**16
  67. self.tls = tls
  68. self._connect()
  69. if self.tls:
  70. self.ftp.prot_p()
  71. def _connect(self):
  72. if self.tls:
  73. ftp_cls = FTP_TLS
  74. else:
  75. ftp_cls = FTP
  76. if sys.version_info >= (3, 9):
  77. self.ftp = ftp_cls(timeout=self.timeout, encoding=self.encoding)
  78. elif self.encoding:
  79. warnings.warn("`encoding` not supported for python<3.9, ignoring")
  80. self.ftp = ftp_cls(timeout=self.timeout)
  81. else:
  82. self.ftp = ftp_cls(timeout=self.timeout)
  83. self.ftp.connect(self.host, self.port)
  84. self.ftp.login(*self.cred)
  85. @classmethod
  86. def _strip_protocol(cls, path):
  87. return "/" + infer_storage_options(path)["path"].lstrip("/").rstrip("/")
  88. @staticmethod
  89. def _get_kwargs_from_urls(urlpath):
  90. out = infer_storage_options(urlpath)
  91. out.pop("path", None)
  92. out.pop("protocol", None)
  93. return out
  94. def ls(self, path, detail=True, **kwargs):
  95. path = self._strip_protocol(path)
  96. out = []
  97. if path not in self.dircache:
  98. try:
  99. try:
  100. out = [
  101. (fn, details)
  102. for (fn, details) in self.ftp.mlsd(path)
  103. if fn not in [".", ".."]
  104. and details["type"] not in ["pdir", "cdir"]
  105. ]
  106. except error_perm:
  107. out = _mlsd2(self.ftp, path) # Not platform independent
  108. for fn, details in out:
  109. details["name"] = "/".join(
  110. ["" if path == "/" else path, fn.lstrip("/")]
  111. )
  112. if details["type"] == "file":
  113. details["size"] = int(details["size"])
  114. else:
  115. details["size"] = 0
  116. if details["type"] == "dir":
  117. details["type"] = "directory"
  118. self.dircache[path] = out
  119. except Error:
  120. try:
  121. info = self.info(path)
  122. if info["type"] == "file":
  123. out = [(path, info)]
  124. except (Error, IndexError) as exc:
  125. raise FileNotFoundError(path) from exc
  126. files = self.dircache.get(path, out)
  127. if not detail:
  128. return sorted([fn for fn, details in files])
  129. return [details for fn, details in files]
  130. def info(self, path, **kwargs):
  131. # implement with direct method
  132. path = self._strip_protocol(path)
  133. if path == "/":
  134. # special case, since this dir has no real entry
  135. return {"name": "/", "size": 0, "type": "directory"}
  136. files = self.ls(self._parent(path).lstrip("/"), True)
  137. try:
  138. out = next(f for f in files if f["name"] == path)
  139. except StopIteration as exc:
  140. raise FileNotFoundError(path) from exc
  141. return out
  142. def get_file(self, rpath, lpath, **kwargs):
  143. if self.isdir(rpath):
  144. if not os.path.exists(lpath):
  145. os.mkdir(lpath)
  146. return
  147. if isfilelike(lpath):
  148. outfile = lpath
  149. else:
  150. outfile = open(lpath, "wb")
  151. def cb(x):
  152. outfile.write(x)
  153. self.ftp.retrbinary(
  154. f"RETR {rpath}",
  155. blocksize=self.blocksize,
  156. callback=cb,
  157. )
  158. if not isfilelike(lpath):
  159. outfile.close()
  160. def cat_file(self, path, start=None, end=None, **kwargs):
  161. if end is not None:
  162. return super().cat_file(path, start, end, **kwargs)
  163. out = []
  164. def cb(x):
  165. out.append(x)
  166. try:
  167. self.ftp.retrbinary(
  168. f"RETR {path}",
  169. blocksize=self.blocksize,
  170. rest=start,
  171. callback=cb,
  172. )
  173. except (Error, error_perm) as orig_exc:
  174. raise FileNotFoundError(path) from orig_exc
  175. return b"".join(out)
  176. def _open(
  177. self,
  178. path,
  179. mode="rb",
  180. block_size=None,
  181. cache_options=None,
  182. autocommit=True,
  183. **kwargs,
  184. ):
  185. path = self._strip_protocol(path)
  186. block_size = block_size or self.blocksize
  187. return FTPFile(
  188. self,
  189. path,
  190. mode=mode,
  191. block_size=block_size,
  192. tempdir=self.tempdir,
  193. autocommit=autocommit,
  194. cache_options=cache_options,
  195. )
  196. def _rm(self, path):
  197. path = self._strip_protocol(path)
  198. self.ftp.delete(path)
  199. self.invalidate_cache(self._parent(path))
  200. def rm(self, path, recursive=False, maxdepth=None):
  201. paths = self.expand_path(path, recursive=recursive, maxdepth=maxdepth)
  202. for p in reversed(paths):
  203. if self.isfile(p):
  204. self.rm_file(p)
  205. else:
  206. self.rmdir(p)
  207. def mkdir(self, path: str, create_parents: bool = True, **kwargs: Any) -> None:
  208. path = self._strip_protocol(path)
  209. parent = self._parent(path)
  210. if parent != self.root_marker and not self.exists(parent) and create_parents:
  211. self.mkdir(parent, create_parents=create_parents)
  212. self.ftp.mkd(path)
  213. self.invalidate_cache(self._parent(path))
  214. def makedirs(self, path: str, exist_ok: bool = False) -> None:
  215. path = self._strip_protocol(path)
  216. if self.exists(path):
  217. # NB: "/" does not "exist" as it has no directory entry
  218. if not exist_ok:
  219. raise FileExistsError(f"{path} exists without `exist_ok`")
  220. # exists_ok=True -> no-op
  221. else:
  222. self.mkdir(path, create_parents=True)
  223. def rmdir(self, path):
  224. path = self._strip_protocol(path)
  225. self.ftp.rmd(path)
  226. self.invalidate_cache(self._parent(path))
  227. def mv(self, path1, path2, **kwargs):
  228. path1 = self._strip_protocol(path1)
  229. path2 = self._strip_protocol(path2)
  230. self.ftp.rename(path1, path2)
  231. self.invalidate_cache(self._parent(path1))
  232. self.invalidate_cache(self._parent(path2))
  233. def __del__(self):
  234. self.ftp.close()
  235. def invalidate_cache(self, path=None):
  236. if path is None:
  237. self.dircache.clear()
  238. else:
  239. self.dircache.pop(path, None)
  240. super().invalidate_cache(path)
  241. class TransferDone(Exception):
  242. """Internal exception to break out of transfer"""
  243. pass
  244. class FTPFile(AbstractBufferedFile):
  245. """Interact with a remote FTP file with read/write buffering"""
  246. def __init__(
  247. self,
  248. fs,
  249. path,
  250. mode="rb",
  251. block_size="default",
  252. autocommit=True,
  253. cache_type="readahead",
  254. cache_options=None,
  255. **kwargs,
  256. ):
  257. super().__init__(
  258. fs,
  259. path,
  260. mode=mode,
  261. block_size=block_size,
  262. autocommit=autocommit,
  263. cache_type=cache_type,
  264. cache_options=cache_options,
  265. **kwargs,
  266. )
  267. if not autocommit:
  268. self.target = self.path
  269. self.path = "/".join([kwargs["tempdir"], str(uuid.uuid4())])
  270. def commit(self):
  271. self.fs.mv(self.path, self.target)
  272. def discard(self):
  273. self.fs.rm(self.path)
  274. def _fetch_range(self, start, end):
  275. """Get bytes between given byte limits
  276. Implemented by raising an exception in the fetch callback when the
  277. number of bytes received reaches the requested amount.
  278. Will fail if the server does not respect the REST command on
  279. retrieve requests.
  280. """
  281. out = []
  282. total = [0]
  283. def callback(x):
  284. total[0] += len(x)
  285. if total[0] > end - start:
  286. out.append(x[: (end - start) - total[0]])
  287. if end < self.size:
  288. raise TransferDone
  289. else:
  290. out.append(x)
  291. if total[0] == end - start and end < self.size:
  292. raise TransferDone
  293. try:
  294. self.fs.ftp.retrbinary(
  295. f"RETR {self.path}",
  296. blocksize=self.blocksize,
  297. rest=start,
  298. callback=callback,
  299. )
  300. except TransferDone:
  301. try:
  302. # stop transfer, we got enough bytes for this block
  303. self.fs.ftp.abort()
  304. self.fs.ftp.getmultiline()
  305. except Error:
  306. self.fs._connect()
  307. return b"".join(out)
  308. def _upload_chunk(self, final=False):
  309. self.buffer.seek(0)
  310. self.fs.ftp.storbinary(
  311. f"STOR {self.path}", self.buffer, blocksize=self.blocksize, rest=self.offset
  312. )
  313. return True
  314. def _mlsd2(ftp, path="."):
  315. """
  316. Fall back to using `dir` instead of `mlsd` if not supported.
  317. This parses a Linux style `ls -l` response to `dir`, but the response may
  318. be platform dependent.
  319. Parameters
  320. ----------
  321. ftp: ftplib.FTP
  322. path: str
  323. Expects to be given path, but defaults to ".".
  324. """
  325. lines = []
  326. minfo = []
  327. ftp.dir(path, lines.append)
  328. for line in lines:
  329. split_line = line.split()
  330. if len(split_line) < 9:
  331. continue
  332. this = (
  333. split_line[-1],
  334. {
  335. "modify": " ".join(split_line[5:8]),
  336. "unix.owner": split_line[2],
  337. "unix.group": split_line[3],
  338. "unix.mode": split_line[0],
  339. "size": split_line[4],
  340. },
  341. )
  342. if this[1]["unix.mode"][0] == "d":
  343. this[1]["type"] = "dir"
  344. else:
  345. this[1]["type"] = "file"
  346. minfo.append(this)
  347. return minfo