123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- import datetime
- import logging
- import os
- import types
- import uuid
- from stat import S_ISDIR, S_ISLNK
- import paramiko
- from .. import AbstractFileSystem
- from ..utils import infer_storage_options
- logger = logging.getLogger("fsspec.sftp")
- class SFTPFileSystem(AbstractFileSystem):
- """Files over SFTP/SSH
- Peer-to-peer filesystem over SSH using paramiko.
- Note: if using this with the ``open`` or ``open_files``, with full URLs,
- there is no way to tell if a path is relative, so all paths are assumed
- to be absolute.
- """
- protocol = "sftp", "ssh"
- def __init__(self, host, **ssh_kwargs):
- """
- Parameters
- ----------
- host: str
- Hostname or IP as a string
- temppath: str
- Location on the server to put files, when within a transaction
- ssh_kwargs: dict
- Parameters passed on to connection. See details in
- https://docs.paramiko.org/en/3.3/api/client.html#paramiko.client.SSHClient.connect
- May include port, username, password...
- """
- if self._cached:
- return
- super().__init__(**ssh_kwargs)
- self.temppath = ssh_kwargs.pop("temppath", "/tmp") # remote temp directory
- self.host = host
- self.ssh_kwargs = ssh_kwargs
- self._connect()
- def _connect(self):
- logger.debug("Connecting to SFTP server %s", self.host)
- self.client = paramiko.SSHClient()
- self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
- self.client.connect(self.host, **self.ssh_kwargs)
- self.ftp = self.client.open_sftp()
- @classmethod
- def _strip_protocol(cls, path):
- return infer_storage_options(path)["path"]
- @staticmethod
- def _get_kwargs_from_urls(urlpath):
- out = infer_storage_options(urlpath)
- out.pop("path", None)
- out.pop("protocol", None)
- return out
- def mkdir(self, path, create_parents=True, mode=511):
- logger.debug("Creating folder %s", path)
- if self.exists(path):
- raise FileExistsError(f"File exists: {path}")
- if create_parents:
- self.makedirs(path)
- else:
- self.ftp.mkdir(path, mode)
- def makedirs(self, path, exist_ok=False, mode=511):
- if self.exists(path) and not exist_ok:
- raise FileExistsError(f"File exists: {path}")
- parts = path.split("/")
- new_path = "/" if path[:1] == "/" else ""
- for part in parts:
- if part:
- new_path = f"{new_path}/{part}" if new_path else part
- if not self.exists(new_path):
- self.ftp.mkdir(new_path, mode)
- def rmdir(self, path):
- logger.debug("Removing folder %s", path)
- self.ftp.rmdir(path)
- def info(self, path):
- stat = self._decode_stat(self.ftp.stat(path))
- stat["name"] = path
- return stat
- @staticmethod
- def _decode_stat(stat, parent_path=None):
- if S_ISDIR(stat.st_mode):
- t = "directory"
- elif S_ISLNK(stat.st_mode):
- t = "link"
- else:
- t = "file"
- out = {
- "name": "",
- "size": stat.st_size,
- "type": t,
- "uid": stat.st_uid,
- "gid": stat.st_gid,
- "time": datetime.datetime.fromtimestamp(
- stat.st_atime, tz=datetime.timezone.utc
- ),
- "mtime": datetime.datetime.fromtimestamp(
- stat.st_mtime, tz=datetime.timezone.utc
- ),
- }
- if parent_path:
- out["name"] = "/".join([parent_path.rstrip("/"), stat.filename])
- return out
- def ls(self, path, detail=False):
- logger.debug("Listing folder %s", path)
- stats = [self._decode_stat(stat, path) for stat in self.ftp.listdir_iter(path)]
- if detail:
- return stats
- else:
- paths = [stat["name"] for stat in stats]
- return sorted(paths)
- def put(self, lpath, rpath, callback=None, **kwargs):
- logger.debug("Put file %s into %s", lpath, rpath)
- self.ftp.put(lpath, rpath)
- def get_file(self, rpath, lpath, **kwargs):
- if self.isdir(rpath):
- os.makedirs(lpath, exist_ok=True)
- else:
- self.ftp.get(self._strip_protocol(rpath), lpath)
- def _open(self, path, mode="rb", block_size=None, **kwargs):
- """
- block_size: int or None
- If 0, no buffering, if 1, line buffering, if >1, buffer that many
- bytes, if None use default from paramiko.
- """
- logger.debug("Opening file %s", path)
- if kwargs.get("autocommit", True) is False:
- # writes to temporary file, move on commit
- path2 = "/".join([self.temppath, str(uuid.uuid4())])
- f = self.ftp.open(path2, mode, bufsize=block_size if block_size else -1)
- f.temppath = path2
- f.targetpath = path
- f.fs = self
- f.commit = types.MethodType(commit_a_file, f)
- f.discard = types.MethodType(discard_a_file, f)
- else:
- f = self.ftp.open(path, mode, bufsize=block_size if block_size else -1)
- return f
- def _rm(self, path):
- if self.isdir(path):
- self.ftp.rmdir(path)
- else:
- self.ftp.remove(path)
- def mv(self, old, new):
- logger.debug("Renaming %s into %s", old, new)
- self.ftp.posix_rename(old, new)
- def commit_a_file(self):
- self.fs.mv(self.temppath, self.targetpath)
- def discard_a_file(self):
- self.fs._rm(self.temppath)
|