1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242 |
- from __future__ import annotations
- import io
- import json
- import logging
- import os
- import threading
- import warnings
- import weakref
- from errno import ESPIPE
- from glob import has_magic
- from hashlib import sha256
- from typing import Any, ClassVar
- from .callbacks import DEFAULT_CALLBACK
- from .config import apply_config, conf
- from .dircache import DirCache
- from .transaction import Transaction
- from .utils import (
- _unstrip_protocol,
- glob_translate,
- isfilelike,
- other_paths,
- read_block,
- stringify_path,
- tokenize,
- )
- logger = logging.getLogger("fsspec")
- def make_instance(cls, args, kwargs):
- return cls(*args, **kwargs)
- class _Cached(type):
- """
- Metaclass for caching file system instances.
- Notes
- -----
- Instances are cached according to
- * The values of the class attributes listed in `_extra_tokenize_attributes`
- * The arguments passed to ``__init__``.
- This creates an additional reference to the filesystem, which prevents the
- filesystem from being garbage collected when all *user* references go away.
- A call to the :meth:`AbstractFileSystem.clear_instance_cache` must *also*
- be made for a filesystem instance to be garbage collected.
- """
- def __init__(cls, *args, **kwargs):
- super().__init__(*args, **kwargs)
- # Note: we intentionally create a reference here, to avoid garbage
- # collecting instances when all other references are gone. To really
- # delete a FileSystem, the cache must be cleared.
- if conf.get("weakref_instance_cache"): # pragma: no cover
- # debug option for analysing fork/spawn conditions
- cls._cache = weakref.WeakValueDictionary()
- else:
- cls._cache = {}
- cls._pid = os.getpid()
- def __call__(cls, *args, **kwargs):
- kwargs = apply_config(cls, kwargs)
- extra_tokens = tuple(
- getattr(cls, attr, None) for attr in cls._extra_tokenize_attributes
- )
- token = tokenize(
- cls, cls._pid, threading.get_ident(), *args, *extra_tokens, **kwargs
- )
- skip = kwargs.pop("skip_instance_cache", False)
- if os.getpid() != cls._pid:
- cls._cache.clear()
- cls._pid = os.getpid()
- if not skip and cls.cachable and token in cls._cache:
- cls._latest = token
- return cls._cache[token]
- else:
- obj = super().__call__(*args, **kwargs)
- # Setting _fs_token here causes some static linters to complain.
- obj._fs_token_ = token
- obj.storage_args = args
- obj.storage_options = kwargs
- if obj.async_impl and obj.mirror_sync_methods:
- from .asyn import mirror_sync_methods
- mirror_sync_methods(obj)
- if cls.cachable and not skip:
- cls._latest = token
- cls._cache[token] = obj
- return obj
- class AbstractFileSystem(metaclass=_Cached):
- """
- An abstract super-class for pythonic file-systems
- Implementations are expected to be compatible with or, better, subclass
- from here.
- """
- cachable = True # this class can be cached, instances reused
- _cached = False
- blocksize = 2**22
- sep = "/"
- protocol: ClassVar[str | tuple[str, ...]] = "abstract"
- _latest = None
- async_impl = False
- mirror_sync_methods = False
- root_marker = "" # For some FSs, may require leading '/' or other character
- transaction_type = Transaction
- #: Extra *class attributes* that should be considered when hashing.
- _extra_tokenize_attributes = ()
- # Set by _Cached metaclass
- storage_args: tuple[Any, ...]
- storage_options: dict[str, Any]
- def __init__(self, *args, **storage_options):
- """Create and configure file-system instance
- Instances may be cachable, so if similar enough arguments are seen
- a new instance is not required. The token attribute exists to allow
- implementations to cache instances if they wish.
- A reasonable default should be provided if there are no arguments.
- Subclasses should call this method.
- Parameters
- ----------
- use_listings_cache, listings_expiry_time, max_paths:
- passed to ``DirCache``, if the implementation supports
- directory listing caching. Pass use_listings_cache=False
- to disable such caching.
- skip_instance_cache: bool
- If this is a cachable implementation, pass True here to force
- creating a new instance even if a matching instance exists, and prevent
- storing this instance.
- asynchronous: bool
- loop: asyncio-compatible IOLoop or None
- """
- if self._cached:
- # reusing instance, don't change
- return
- self._cached = True
- self._intrans = False
- self._transaction = None
- self._invalidated_caches_in_transaction = []
- self.dircache = DirCache(**storage_options)
- if storage_options.pop("add_docs", None):
- warnings.warn("add_docs is no longer supported.", FutureWarning)
- if storage_options.pop("add_aliases", None):
- warnings.warn("add_aliases has been removed.", FutureWarning)
- # This is set in _Cached
- self._fs_token_ = None
- @property
- def fsid(self):
- """Persistent filesystem id that can be used to compare filesystems
- across sessions.
- """
- raise NotImplementedError
- @property
- def _fs_token(self):
- return self._fs_token_
- def __dask_tokenize__(self):
- return self._fs_token
- def __hash__(self):
- return int(self._fs_token, 16)
- def __eq__(self, other):
- return isinstance(other, type(self)) and self._fs_token == other._fs_token
- def __reduce__(self):
- return make_instance, (type(self), self.storage_args, self.storage_options)
- @classmethod
- def _strip_protocol(cls, path):
- """Turn path from fully-qualified to file-system-specific
- May require FS-specific handling, e.g., for relative paths or links.
- """
- if isinstance(path, list):
- return [cls._strip_protocol(p) for p in path]
- path = stringify_path(path)
- protos = (cls.protocol,) if isinstance(cls.protocol, str) else cls.protocol
- for protocol in protos:
- if path.startswith(protocol + "://"):
- path = path[len(protocol) + 3 :]
- elif path.startswith(protocol + "::"):
- path = path[len(protocol) + 2 :]
- path = path.rstrip("/")
- # use of root_marker to make minimum required path, e.g., "/"
- return path or cls.root_marker
- def unstrip_protocol(self, name: str) -> str:
- """Format FS-specific path to generic, including protocol"""
- protos = (self.protocol,) if isinstance(self.protocol, str) else self.protocol
- for protocol in protos:
- if name.startswith(f"{protocol}://"):
- return name
- return f"{protos[0]}://{name}"
- @staticmethod
- def _get_kwargs_from_urls(path):
- """If kwargs can be encoded in the paths, extract them here
- This should happen before instantiation of the class; incoming paths
- then should be amended to strip the options in methods.
- Examples may look like an sftp path "sftp://user@host:/my/path", where
- the user and host should become kwargs and later get stripped.
- """
- # by default, nothing happens
- return {}
- @classmethod
- def current(cls):
- """Return the most recently instantiated FileSystem
- If no instance has been created, then create one with defaults
- """
- if cls._latest in cls._cache:
- return cls._cache[cls._latest]
- return cls()
- @property
- def transaction(self):
- """A context within which files are committed together upon exit
- Requires the file class to implement `.commit()` and `.discard()`
- for the normal and exception cases.
- """
- if self._transaction is None:
- self._transaction = self.transaction_type(self)
- return self._transaction
- def start_transaction(self):
- """Begin write transaction for deferring files, non-context version"""
- self._intrans = True
- self._transaction = self.transaction_type(self)
- return self.transaction
- def end_transaction(self):
- """Finish write transaction, non-context version"""
- self.transaction.complete()
- self._transaction = None
- # The invalid cache must be cleared after the transaction is completed.
- for path in self._invalidated_caches_in_transaction:
- self.invalidate_cache(path)
- self._invalidated_caches_in_transaction.clear()
- def invalidate_cache(self, path=None):
- """
- Discard any cached directory information
- Parameters
- ----------
- path: string or None
- If None, clear all listings cached else listings at or under given
- path.
- """
- # Not necessary to implement invalidation mechanism, may have no cache.
- # But if have, you should call this method of parent class from your
- # subclass to ensure expiring caches after transacations correctly.
- # See the implementation of FTPFileSystem in ftp.py
- if self._intrans:
- self._invalidated_caches_in_transaction.append(path)
- def mkdir(self, path, create_parents=True, **kwargs):
- """
- Create directory entry at path
- For systems that don't have true directories, may create an for
- this instance only and not touch the real filesystem
- Parameters
- ----------
- path: str
- location
- create_parents: bool
- if True, this is equivalent to ``makedirs``
- kwargs:
- may be permissions, etc.
- """
- pass # not necessary to implement, may not have directories
- def makedirs(self, path, exist_ok=False):
- """Recursively make directories
- Creates directory at path and any intervening required directories.
- Raises exception if, for instance, the path already exists but is a
- file.
- Parameters
- ----------
- path: str
- leaf directory name
- exist_ok: bool (False)
- If False, will error if the target already exists
- """
- pass # not necessary to implement, may not have directories
- def rmdir(self, path):
- """Remove a directory, if empty"""
- pass # not necessary to implement, may not have directories
- def ls(self, path, detail=True, **kwargs):
- """List objects at path.
- This should include subdirectories and files at that location. The
- difference between a file and a directory must be clear when details
- are requested.
- The specific keys, or perhaps a FileInfo class, or similar, is TBD,
- but must be consistent across implementations.
- Must include:
- - full path to the entry (without protocol)
- - size of the entry, in bytes. If the value cannot be determined, will
- be ``None``.
- - type of entry, "file", "directory" or other
- Additional information
- may be present, appropriate to the file-system, e.g., generation,
- checksum, etc.
- May use refresh=True|False to allow use of self._ls_from_cache to
- check for a saved listing and avoid calling the backend. This would be
- common where listing may be expensive.
- Parameters
- ----------
- path: str
- detail: bool
- if True, gives a list of dictionaries, where each is the same as
- the result of ``info(path)``. If False, gives a list of paths
- (str).
- kwargs: may have additional backend-specific options, such as version
- information
- Returns
- -------
- List of strings if detail is False, or list of directory information
- dicts if detail is True.
- """
- raise NotImplementedError
- def _ls_from_cache(self, path):
- """Check cache for listing
- Returns listing, if found (may be empty list for a directly that exists
- but contains nothing), None if not in cache.
- """
- parent = self._parent(path)
- try:
- return self.dircache[path.rstrip("/")]
- except KeyError:
- pass
- try:
- files = [
- f
- for f in self.dircache[parent]
- if f["name"] == path
- or (f["name"] == path.rstrip("/") and f["type"] == "directory")
- ]
- if len(files) == 0:
- # parent dir was listed but did not contain this file
- raise FileNotFoundError(path)
- return files
- except KeyError:
- pass
- def walk(self, path, maxdepth=None, topdown=True, on_error="omit", **kwargs):
- """Return all files under the given path.
- List all files, recursing into subdirectories; output is iterator-style,
- like ``os.walk()``. For a simple list of files, ``find()`` is available.
- When topdown is True, the caller can modify the dirnames list in-place (perhaps
- using del or slice assignment), and walk() will
- only recurse into the subdirectories whose names remain in dirnames;
- this can be used to prune the search, impose a specific order of visiting,
- or even to inform walk() about directories the caller creates or renames before
- it resumes walk() again.
- Modifying dirnames when topdown is False has no effect. (see os.walk)
- Note that the "files" outputted will include anything that is not
- a directory, such as links.
- Parameters
- ----------
- path: str
- Root to recurse into
- maxdepth: int
- Maximum recursion depth. None means limitless, but not recommended
- on link-based file-systems.
- topdown: bool (True)
- Whether to walk the directory tree from the top downwards or from
- the bottom upwards.
- on_error: "omit", "raise", a callable
- if omit (default), path with exception will simply be empty;
- If raise, an underlying exception will be raised;
- if callable, it will be called with a single OSError instance as argument
- kwargs: passed to ``ls``
- """
- if maxdepth is not None and maxdepth < 1:
- raise ValueError("maxdepth must be at least 1")
- path = self._strip_protocol(path)
- full_dirs = {}
- dirs = {}
- files = {}
- detail = kwargs.pop("detail", False)
- try:
- listing = self.ls(path, detail=True, **kwargs)
- except (FileNotFoundError, OSError) as e:
- if on_error == "raise":
- raise
- if callable(on_error):
- on_error(e)
- return
- for info in listing:
- # each info name must be at least [path]/part , but here
- # we check also for names like [path]/part/
- pathname = info["name"].rstrip("/")
- name = pathname.rsplit("/", 1)[-1]
- if info["type"] == "directory" and pathname != path:
- # do not include "self" path
- full_dirs[name] = pathname
- dirs[name] = info
- elif pathname == path:
- # file-like with same name as give path
- files[""] = info
- else:
- files[name] = info
- if not detail:
- dirs = list(dirs)
- files = list(files)
- if topdown:
- # Yield before recursion if walking top down
- yield path, dirs, files
- if maxdepth is not None:
- maxdepth -= 1
- if maxdepth < 1:
- if not topdown:
- yield path, dirs, files
- return
- for d in dirs:
- yield from self.walk(
- full_dirs[d],
- maxdepth=maxdepth,
- detail=detail,
- topdown=topdown,
- **kwargs,
- )
- if not topdown:
- # Yield after recursion if walking bottom up
- yield path, dirs, files
- def find(self, path, maxdepth=None, withdirs=False, detail=False, **kwargs):
- """List all files below path.
- Like posix ``find`` command without conditions
- Parameters
- ----------
- path : str
- maxdepth: int or None
- If not None, the maximum number of levels to descend
- withdirs: bool
- Whether to include directory paths in the output. This is True
- when used by glob, but users usually only want files.
- kwargs are passed to ``ls``.
- """
- # TODO: allow equivalent of -name parameter
- path = self._strip_protocol(path)
- out = {}
- # Add the root directory if withdirs is requested
- # This is needed for posix glob compliance
- if withdirs and path != "" and self.isdir(path):
- out[path] = self.info(path)
- for _, dirs, files in self.walk(path, maxdepth, detail=True, **kwargs):
- if withdirs:
- files.update(dirs)
- out.update({info["name"]: info for name, info in files.items()})
- if not out and self.isfile(path):
- # walk works on directories, but find should also return [path]
- # when path happens to be a file
- out[path] = {}
- names = sorted(out)
- if not detail:
- return names
- else:
- return {name: out[name] for name in names}
- def du(self, path, total=True, maxdepth=None, withdirs=False, **kwargs):
- """Space used by files and optionally directories within a path
- Directory size does not include the size of its contents.
- Parameters
- ----------
- path: str
- total: bool
- Whether to sum all the file sizes
- maxdepth: int or None
- Maximum number of directory levels to descend, None for unlimited.
- withdirs: bool
- Whether to include directory paths in the output.
- kwargs: passed to ``find``
- Returns
- -------
- Dict of {path: size} if total=False, or int otherwise, where numbers
- refer to bytes used.
- """
- sizes = {}
- if withdirs and self.isdir(path):
- # Include top-level directory in output
- info = self.info(path)
- sizes[info["name"]] = info["size"]
- for f in self.find(path, maxdepth=maxdepth, withdirs=withdirs, **kwargs):
- info = self.info(f)
- sizes[info["name"]] = info["size"]
- if total:
- return sum(sizes.values())
- else:
- return sizes
- def glob(self, path, maxdepth=None, **kwargs):
- """
- Find files by glob-matching.
- If the path ends with '/', only folders are returned.
- We support ``"**"``,
- ``"?"`` and ``"[..]"``. We do not support ^ for pattern negation.
- The `maxdepth` option is applied on the first `**` found in the path.
- kwargs are passed to ``ls``.
- """
- if maxdepth is not None and maxdepth < 1:
- raise ValueError("maxdepth must be at least 1")
- import re
- seps = (os.path.sep, os.path.altsep) if os.path.altsep else (os.path.sep,)
- ends_with_sep = path.endswith(seps) # _strip_protocol strips trailing slash
- path = self._strip_protocol(path)
- append_slash_to_dirname = ends_with_sep or path.endswith(
- tuple(sep + "**" for sep in seps)
- )
- idx_star = path.find("*") if path.find("*") >= 0 else len(path)
- idx_qmark = path.find("?") if path.find("?") >= 0 else len(path)
- idx_brace = path.find("[") if path.find("[") >= 0 else len(path)
- min_idx = min(idx_star, idx_qmark, idx_brace)
- detail = kwargs.pop("detail", False)
- if not has_magic(path):
- if self.exists(path, **kwargs):
- if not detail:
- return [path]
- else:
- return {path: self.info(path, **kwargs)}
- else:
- if not detail:
- return [] # glob of non-existent returns empty
- else:
- return {}
- elif "/" in path[:min_idx]:
- min_idx = path[:min_idx].rindex("/")
- root = path[: min_idx + 1]
- depth = path[min_idx + 1 :].count("/") + 1
- else:
- root = ""
- depth = path[min_idx + 1 :].count("/") + 1
- if "**" in path:
- if maxdepth is not None:
- idx_double_stars = path.find("**")
- depth_double_stars = path[idx_double_stars:].count("/") + 1
- depth = depth - depth_double_stars + maxdepth
- else:
- depth = None
- allpaths = self.find(root, maxdepth=depth, withdirs=True, detail=True, **kwargs)
- pattern = glob_translate(path + ("/" if ends_with_sep else ""))
- pattern = re.compile(pattern)
- out = {
- p: info
- for p, info in sorted(allpaths.items())
- if pattern.match(
- p + "/"
- if append_slash_to_dirname and info["type"] == "directory"
- else p
- )
- }
- if detail:
- return out
- else:
- return list(out)
- def exists(self, path, **kwargs):
- """Is there a file at the given path"""
- try:
- self.info(path, **kwargs)
- return True
- except: # noqa: E722
- # any exception allowed bar FileNotFoundError?
- return False
- def lexists(self, path, **kwargs):
- """If there is a file at the given path (including
- broken links)"""
- return self.exists(path)
- def info(self, path, **kwargs):
- """Give details of entry at path
- Returns a single dictionary, with exactly the same information as ``ls``
- would with ``detail=True``.
- The default implementation calls ls and could be overridden by a
- shortcut. kwargs are passed on to ```ls()``.
- Some file systems might not be able to measure the file's size, in
- which case, the returned dict will include ``'size': None``.
- Returns
- -------
- dict with keys: name (full path in the FS), size (in bytes), type (file,
- directory, or something else) and other FS-specific keys.
- """
- path = self._strip_protocol(path)
- out = self.ls(self._parent(path), detail=True, **kwargs)
- out = [o for o in out if o["name"].rstrip("/") == path]
- if out:
- return out[0]
- out = self.ls(path, detail=True, **kwargs)
- path = path.rstrip("/")
- out1 = [o for o in out if o["name"].rstrip("/") == path]
- if len(out1) == 1:
- if "size" not in out1[0]:
- out1[0]["size"] = None
- return out1[0]
- elif len(out1) > 1 or out:
- return {"name": path, "size": 0, "type": "directory"}
- else:
- raise FileNotFoundError(path)
- def checksum(self, path):
- """Unique value for current version of file
- If the checksum is the same from one moment to another, the contents
- are guaranteed to be the same. If the checksum changes, the contents
- *might* have changed.
- This should normally be overridden; default will probably capture
- creation/modification timestamp (which would be good) or maybe
- access timestamp (which would be bad)
- """
- return int(tokenize(self.info(path)), 16)
- def size(self, path):
- """Size in bytes of file"""
- return self.info(path).get("size", None)
- def sizes(self, paths):
- """Size in bytes of each file in a list of paths"""
- return [self.size(p) for p in paths]
- def isdir(self, path):
- """Is this entry directory-like?"""
- try:
- return self.info(path)["type"] == "directory"
- except OSError:
- return False
- def isfile(self, path):
- """Is this entry file-like?"""
- try:
- return self.info(path)["type"] == "file"
- except: # noqa: E722
- return False
- def read_text(self, path, encoding=None, errors=None, newline=None, **kwargs):
- """Get the contents of the file as a string.
- Parameters
- ----------
- path: str
- URL of file on this filesystems
- encoding, errors, newline: same as `open`.
- """
- with self.open(
- path,
- mode="r",
- encoding=encoding,
- errors=errors,
- newline=newline,
- **kwargs,
- ) as f:
- return f.read()
- def write_text(
- self, path, value, encoding=None, errors=None, newline=None, **kwargs
- ):
- """Write the text to the given file.
- An existing file will be overwritten.
- Parameters
- ----------
- path: str
- URL of file on this filesystems
- value: str
- Text to write.
- encoding, errors, newline: same as `open`.
- """
- with self.open(
- path,
- mode="w",
- encoding=encoding,
- errors=errors,
- newline=newline,
- **kwargs,
- ) as f:
- return f.write(value)
- def cat_file(self, path, start=None, end=None, **kwargs):
- """Get the content of a file
- Parameters
- ----------
- path: URL of file on this filesystems
- start, end: int
- Bytes limits of the read. If negative, backwards from end,
- like usual python slices. Either can be None for start or
- end of file, respectively
- kwargs: passed to ``open()``.
- """
- # explicitly set buffering off?
- with self.open(path, "rb", **kwargs) as f:
- if start is not None:
- if start >= 0:
- f.seek(start)
- else:
- f.seek(max(0, f.size + start))
- if end is not None:
- if end < 0:
- end = f.size + end
- return f.read(end - f.tell())
- return f.read()
- def pipe_file(self, path, value, mode="overwrite", **kwargs):
- """Set the bytes of given file"""
- if mode == "create" and self.exists(path):
- # non-atomic but simple way; or could use "xb" in open(), which is likely
- # not as well supported
- raise FileExistsError
- with self.open(path, "wb", **kwargs) as f:
- f.write(value)
- def pipe(self, path, value=None, **kwargs):
- """Put value into path
- (counterpart to ``cat``)
- Parameters
- ----------
- path: string or dict(str, bytes)
- If a string, a single remote location to put ``value`` bytes; if a dict,
- a mapping of {path: bytesvalue}.
- value: bytes, optional
- If using a single path, these are the bytes to put there. Ignored if
- ``path`` is a dict
- """
- if isinstance(path, str):
- self.pipe_file(self._strip_protocol(path), value, **kwargs)
- elif isinstance(path, dict):
- for k, v in path.items():
- self.pipe_file(self._strip_protocol(k), v, **kwargs)
- else:
- raise ValueError("path must be str or dict")
- def cat_ranges(
- self, paths, starts, ends, max_gap=None, on_error="return", **kwargs
- ):
- """Get the contents of byte ranges from one or more files
- Parameters
- ----------
- paths: list
- A list of of filepaths on this filesystems
- starts, ends: int or list
- Bytes limits of the read. If using a single int, the same value will be
- used to read all the specified files.
- """
- if max_gap is not None:
- raise NotImplementedError
- if not isinstance(paths, list):
- raise TypeError
- if not isinstance(starts, list):
- starts = [starts] * len(paths)
- if not isinstance(ends, list):
- ends = [ends] * len(paths)
- if len(starts) != len(paths) or len(ends) != len(paths):
- raise ValueError
- out = []
- for p, s, e in zip(paths, starts, ends):
- try:
- out.append(self.cat_file(p, s, e))
- except Exception as e:
- if on_error == "return":
- out.append(e)
- else:
- raise
- return out
- def cat(self, path, recursive=False, on_error="raise", **kwargs):
- """Fetch (potentially multiple) paths' contents
- Parameters
- ----------
- recursive: bool
- If True, assume the path(s) are directories, and get all the
- contained files
- on_error : "raise", "omit", "return"
- If raise, an underlying exception will be raised (converted to KeyError
- if the type is in self.missing_exceptions); if omit, keys with exception
- will simply not be included in the output; if "return", all keys are
- included in the output, but the value will be bytes or an exception
- instance.
- kwargs: passed to cat_file
- Returns
- -------
- dict of {path: contents} if there are multiple paths
- or the path has been otherwise expanded
- """
- paths = self.expand_path(path, recursive=recursive)
- if (
- len(paths) > 1
- or isinstance(path, list)
- or paths[0] != self._strip_protocol(path)
- ):
- out = {}
- for path in paths:
- try:
- out[path] = self.cat_file(path, **kwargs)
- except Exception as e:
- if on_error == "raise":
- raise
- if on_error == "return":
- out[path] = e
- return out
- else:
- return self.cat_file(paths[0], **kwargs)
- def get_file(self, rpath, lpath, callback=DEFAULT_CALLBACK, outfile=None, **kwargs):
- """Copy single remote file to local"""
- from .implementations.local import LocalFileSystem
- if isfilelike(lpath):
- outfile = lpath
- elif self.isdir(rpath):
- os.makedirs(lpath, exist_ok=True)
- return None
- fs = LocalFileSystem(auto_mkdir=True)
- fs.makedirs(fs._parent(lpath), exist_ok=True)
- with self.open(rpath, "rb", **kwargs) as f1:
- if outfile is None:
- outfile = open(lpath, "wb")
- try:
- callback.set_size(getattr(f1, "size", None))
- data = True
- while data:
- data = f1.read(self.blocksize)
- segment_len = outfile.write(data)
- if segment_len is None:
- segment_len = len(data)
- callback.relative_update(segment_len)
- finally:
- if not isfilelike(lpath):
- outfile.close()
- def get(
- self,
- rpath,
- lpath,
- recursive=False,
- callback=DEFAULT_CALLBACK,
- maxdepth=None,
- **kwargs,
- ):
- """Copy file(s) to local.
- Copies a specific file or tree of files (if recursive=True). If lpath
- ends with a "/", it will be assumed to be a directory, and target files
- will go within. Can submit a list of paths, which may be glob-patterns
- and will be expanded.
- Calls get_file for each source.
- """
- if isinstance(lpath, list) and isinstance(rpath, list):
- # No need to expand paths when both source and destination
- # are provided as lists
- rpaths = rpath
- lpaths = lpath
- else:
- from .implementations.local import (
- LocalFileSystem,
- make_path_posix,
- trailing_sep,
- )
- source_is_str = isinstance(rpath, str)
- rpaths = self.expand_path(rpath, recursive=recursive, maxdepth=maxdepth)
- if source_is_str and (not recursive or maxdepth is not None):
- # Non-recursive glob does not copy directories
- rpaths = [p for p in rpaths if not (trailing_sep(p) or self.isdir(p))]
- if not rpaths:
- return
- if isinstance(lpath, str):
- lpath = make_path_posix(lpath)
- source_is_file = len(rpaths) == 1
- dest_is_dir = isinstance(lpath, str) and (
- trailing_sep(lpath) or LocalFileSystem().isdir(lpath)
- )
- exists = source_is_str and (
- (has_magic(rpath) and source_is_file)
- or (not has_magic(rpath) and dest_is_dir and not trailing_sep(rpath))
- )
- lpaths = other_paths(
- rpaths,
- lpath,
- exists=exists,
- flatten=not source_is_str,
- )
- callback.set_size(len(lpaths))
- for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
- with callback.branched(rpath, lpath) as child:
- self.get_file(rpath, lpath, callback=child, **kwargs)
- def put_file(
- self, lpath, rpath, callback=DEFAULT_CALLBACK, mode="overwrite", **kwargs
- ):
- """Copy single file to remote"""
- if mode == "create" and self.exists(rpath):
- raise FileExistsError
- if os.path.isdir(lpath):
- self.makedirs(rpath, exist_ok=True)
- return None
- with open(lpath, "rb") as f1:
- size = f1.seek(0, 2)
- callback.set_size(size)
- f1.seek(0)
- self.mkdirs(self._parent(os.fspath(rpath)), exist_ok=True)
- with self.open(rpath, "wb", **kwargs) as f2:
- while f1.tell() < size:
- data = f1.read(self.blocksize)
- segment_len = f2.write(data)
- if segment_len is None:
- segment_len = len(data)
- callback.relative_update(segment_len)
- def put(
- self,
- lpath,
- rpath,
- recursive=False,
- callback=DEFAULT_CALLBACK,
- maxdepth=None,
- **kwargs,
- ):
- """Copy file(s) from local.
- Copies a specific file or tree of files (if recursive=True). If rpath
- ends with a "/", it will be assumed to be a directory, and target files
- will go within.
- Calls put_file for each source.
- """
- if isinstance(lpath, list) and isinstance(rpath, list):
- # No need to expand paths when both source and destination
- # are provided as lists
- rpaths = rpath
- lpaths = lpath
- else:
- from .implementations.local import (
- LocalFileSystem,
- make_path_posix,
- trailing_sep,
- )
- source_is_str = isinstance(lpath, str)
- if source_is_str:
- lpath = make_path_posix(lpath)
- fs = LocalFileSystem()
- lpaths = fs.expand_path(lpath, recursive=recursive, maxdepth=maxdepth)
- if source_is_str and (not recursive or maxdepth is not None):
- # Non-recursive glob does not copy directories
- lpaths = [p for p in lpaths if not (trailing_sep(p) or fs.isdir(p))]
- if not lpaths:
- return
- source_is_file = len(lpaths) == 1
- dest_is_dir = isinstance(rpath, str) and (
- trailing_sep(rpath) or self.isdir(rpath)
- )
- rpath = (
- self._strip_protocol(rpath)
- if isinstance(rpath, str)
- else [self._strip_protocol(p) for p in rpath]
- )
- exists = source_is_str and (
- (has_magic(lpath) and source_is_file)
- or (not has_magic(lpath) and dest_is_dir and not trailing_sep(lpath))
- )
- rpaths = other_paths(
- lpaths,
- rpath,
- exists=exists,
- flatten=not source_is_str,
- )
- callback.set_size(len(rpaths))
- for lpath, rpath in callback.wrap(zip(lpaths, rpaths)):
- with callback.branched(lpath, rpath) as child:
- self.put_file(lpath, rpath, callback=child, **kwargs)
- def head(self, path, size=1024):
- """Get the first ``size`` bytes from file"""
- with self.open(path, "rb") as f:
- return f.read(size)
- def tail(self, path, size=1024):
- """Get the last ``size`` bytes from file"""
- with self.open(path, "rb") as f:
- f.seek(max(-size, -f.size), 2)
- return f.read()
- def cp_file(self, path1, path2, **kwargs):
- raise NotImplementedError
- def copy(
- self, path1, path2, recursive=False, maxdepth=None, on_error=None, **kwargs
- ):
- """Copy within two locations in the filesystem
- on_error : "raise", "ignore"
- If raise, any not-found exceptions will be raised; if ignore any
- not-found exceptions will cause the path to be skipped; defaults to
- raise unless recursive is true, where the default is ignore
- """
- if on_error is None and recursive:
- on_error = "ignore"
- elif on_error is None:
- on_error = "raise"
- if isinstance(path1, list) and isinstance(path2, list):
- # No need to expand paths when both source and destination
- # are provided as lists
- paths1 = path1
- paths2 = path2
- else:
- from .implementations.local import trailing_sep
- source_is_str = isinstance(path1, str)
- paths1 = self.expand_path(path1, recursive=recursive, maxdepth=maxdepth)
- if source_is_str and (not recursive or maxdepth is not None):
- # Non-recursive glob does not copy directories
- paths1 = [p for p in paths1 if not (trailing_sep(p) or self.isdir(p))]
- if not paths1:
- return
- source_is_file = len(paths1) == 1
- dest_is_dir = isinstance(path2, str) and (
- trailing_sep(path2) or self.isdir(path2)
- )
- exists = source_is_str and (
- (has_magic(path1) and source_is_file)
- or (not has_magic(path1) and dest_is_dir and not trailing_sep(path1))
- )
- paths2 = other_paths(
- paths1,
- path2,
- exists=exists,
- flatten=not source_is_str,
- )
- for p1, p2 in zip(paths1, paths2):
- try:
- self.cp_file(p1, p2, **kwargs)
- except FileNotFoundError:
- if on_error == "raise":
- raise
- def expand_path(self, path, recursive=False, maxdepth=None, **kwargs):
- """Turn one or more globs or directories into a list of all matching paths
- to files or directories.
- kwargs are passed to ``glob`` or ``find``, which may in turn call ``ls``
- """
- if maxdepth is not None and maxdepth < 1:
- raise ValueError("maxdepth must be at least 1")
- if isinstance(path, (str, os.PathLike)):
- out = self.expand_path([path], recursive, maxdepth)
- else:
- out = set()
- path = [self._strip_protocol(p) for p in path]
- for p in path:
- if has_magic(p):
- bit = set(self.glob(p, maxdepth=maxdepth, **kwargs))
- out |= bit
- if recursive:
- # glob call above expanded one depth so if maxdepth is defined
- # then decrement it in expand_path call below. If it is zero
- # after decrementing then avoid expand_path call.
- if maxdepth is not None and maxdepth <= 1:
- continue
- out |= set(
- self.expand_path(
- list(bit),
- recursive=recursive,
- maxdepth=maxdepth - 1 if maxdepth is not None else None,
- **kwargs,
- )
- )
- continue
- elif recursive:
- rec = set(
- self.find(
- p, maxdepth=maxdepth, withdirs=True, detail=False, **kwargs
- )
- )
- out |= rec
- if p not in out and (recursive is False or self.exists(p)):
- # should only check once, for the root
- out.add(p)
- if not out:
- raise FileNotFoundError(path)
- return sorted(out)
- def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
- """Move file(s) from one location to another"""
- if path1 == path2:
- logger.debug("%s mv: The paths are the same, so no files were moved.", self)
- else:
- # explicitly raise exception to prevent data corruption
- self.copy(
- path1, path2, recursive=recursive, maxdepth=maxdepth, onerror="raise"
- )
- self.rm(path1, recursive=recursive)
- def rm_file(self, path):
- """Delete a file"""
- self._rm(path)
- def _rm(self, path):
- """Delete one file"""
- # this is the old name for the method, prefer rm_file
- raise NotImplementedError
- def rm(self, path, recursive=False, maxdepth=None):
- """Delete files.
- Parameters
- ----------
- path: str or list of str
- File(s) to delete.
- recursive: bool
- If file(s) are directories, recursively delete contents and then
- also remove the directory
- maxdepth: int or None
- Depth to pass to walk for finding files to delete, if recursive.
- If None, there will be no limit and infinite recursion may be
- possible.
- """
- path = self.expand_path(path, recursive=recursive, maxdepth=maxdepth)
- for p in reversed(path):
- self.rm_file(p)
- @classmethod
- def _parent(cls, path):
- path = cls._strip_protocol(path)
- if "/" in path:
- parent = path.rsplit("/", 1)[0].lstrip(cls.root_marker)
- return cls.root_marker + parent
- else:
- return cls.root_marker
- def _open(
- self,
- path,
- mode="rb",
- block_size=None,
- autocommit=True,
- cache_options=None,
- **kwargs,
- ):
- """Return raw bytes-mode file-like from the file-system"""
- return AbstractBufferedFile(
- self,
- path,
- mode,
- block_size,
- autocommit,
- cache_options=cache_options,
- **kwargs,
- )
- def open(
- self,
- path,
- mode="rb",
- block_size=None,
- cache_options=None,
- compression=None,
- **kwargs,
- ):
- """
- Return a file-like object from the filesystem
- The resultant instance must function correctly in a context ``with``
- block.
- Parameters
- ----------
- path: str
- Target file
- mode: str like 'rb', 'w'
- See builtin ``open()``
- Mode "x" (exclusive write) may be implemented by the backend. Even if
- it is, whether it is checked up front or on commit, and whether it is
- atomic is implementation-dependent.
- block_size: int
- Some indication of buffering - this is a value in bytes
- cache_options : dict, optional
- Extra arguments to pass through to the cache.
- compression: string or None
- If given, open file using compression codec. Can either be a compression
- name (a key in ``fsspec.compression.compr``) or "infer" to guess the
- compression from the filename suffix.
- encoding, errors, newline: passed on to TextIOWrapper for text mode
- """
- import io
- path = self._strip_protocol(path)
- if "b" not in mode:
- mode = mode.replace("t", "") + "b"
- text_kwargs = {
- k: kwargs.pop(k)
- for k in ["encoding", "errors", "newline"]
- if k in kwargs
- }
- return io.TextIOWrapper(
- self.open(
- path,
- mode,
- block_size=block_size,
- cache_options=cache_options,
- compression=compression,
- **kwargs,
- ),
- **text_kwargs,
- )
- else:
- ac = kwargs.pop("autocommit", not self._intrans)
- f = self._open(
- path,
- mode=mode,
- block_size=block_size,
- autocommit=ac,
- cache_options=cache_options,
- **kwargs,
- )
- if compression is not None:
- from fsspec.compression import compr
- from fsspec.core import get_compression
- compression = get_compression(path, compression)
- compress = compr[compression]
- f = compress(f, mode=mode[0])
- if not ac and "r" not in mode:
- self.transaction.files.append(f)
- return f
- def touch(self, path, truncate=True, **kwargs):
- """Create empty file, or update timestamp
- Parameters
- ----------
- path: str
- file location
- truncate: bool
- If True, always set file size to 0; if False, update timestamp and
- leave file unchanged, if backend allows this
- """
- if truncate or not self.exists(path):
- with self.open(path, "wb", **kwargs):
- pass
- else:
- raise NotImplementedError # update timestamp, if possible
- def ukey(self, path):
- """Hash of file properties, to tell if it has changed"""
- return sha256(str(self.info(path)).encode()).hexdigest()
- def read_block(self, fn, offset, length, delimiter=None):
- """Read a block of bytes from
- Starting at ``offset`` of the file, read ``length`` bytes. If
- ``delimiter`` is set then we ensure that the read starts and stops at
- delimiter boundaries that follow the locations ``offset`` and ``offset
- + length``. If ``offset`` is zero then we start at zero. The
- bytestring returned WILL include the end delimiter string.
- If offset+length is beyond the eof, reads to eof.
- Parameters
- ----------
- fn: string
- Path to filename
- offset: int
- Byte offset to start read
- length: int
- Number of bytes to read. If None, read to end.
- delimiter: bytes (optional)
- Ensure reading starts and stops at delimiter bytestring
- Examples
- --------
- >>> fs.read_block('data/file.csv', 0, 13) # doctest: +SKIP
- b'Alice, 100\\nBo'
- >>> fs.read_block('data/file.csv', 0, 13, delimiter=b'\\n') # doctest: +SKIP
- b'Alice, 100\\nBob, 200\\n'
- Use ``length=None`` to read to the end of the file.
- >>> fs.read_block('data/file.csv', 0, None, delimiter=b'\\n') # doctest: +SKIP
- b'Alice, 100\\nBob, 200\\nCharlie, 300'
- See Also
- --------
- :func:`fsspec.utils.read_block`
- """
- with self.open(fn, "rb") as f:
- size = f.size
- if length is None:
- length = size
- if size is not None and offset + length > size:
- length = size - offset
- return read_block(f, offset, length, delimiter)
- def to_json(self, *, include_password: bool = True) -> str:
- """
- JSON representation of this filesystem instance.
- Parameters
- ----------
- include_password: bool, default True
- Whether to include the password (if any) in the output.
- Returns
- -------
- JSON string with keys ``cls`` (the python location of this class),
- protocol (text name of this class's protocol, first one in case of
- multiple), ``args`` (positional args, usually empty), and all other
- keyword arguments as their own keys.
- Warnings
- --------
- Serialized filesystems may contain sensitive information which have been
- passed to the constructor, such as passwords and tokens. Make sure you
- store and send them in a secure environment!
- """
- from .json import FilesystemJSONEncoder
- return json.dumps(
- self,
- cls=type(
- "_FilesystemJSONEncoder",
- (FilesystemJSONEncoder,),
- {"include_password": include_password},
- ),
- )
- @staticmethod
- def from_json(blob: str) -> AbstractFileSystem:
- """
- Recreate a filesystem instance from JSON representation.
- See ``.to_json()`` for the expected structure of the input.
- Parameters
- ----------
- blob: str
- Returns
- -------
- file system instance, not necessarily of this particular class.
- Warnings
- --------
- This can import arbitrary modules (as determined by the ``cls`` key).
- Make sure you haven't installed any modules that may execute malicious code
- at import time.
- """
- from .json import FilesystemJSONDecoder
- return json.loads(blob, cls=FilesystemJSONDecoder)
- def to_dict(self, *, include_password: bool = True) -> dict[str, Any]:
- """
- JSON-serializable dictionary representation of this filesystem instance.
- Parameters
- ----------
- include_password: bool, default True
- Whether to include the password (if any) in the output.
- Returns
- -------
- Dictionary with keys ``cls`` (the python location of this class),
- protocol (text name of this class's protocol, first one in case of
- multiple), ``args`` (positional args, usually empty), and all other
- keyword arguments as their own keys.
- Warnings
- --------
- Serialized filesystems may contain sensitive information which have been
- passed to the constructor, such as passwords and tokens. Make sure you
- store and send them in a secure environment!
- """
- from .json import FilesystemJSONEncoder
- json_encoder = FilesystemJSONEncoder()
- cls = type(self)
- proto = self.protocol
- storage_options = dict(self.storage_options)
- if not include_password:
- storage_options.pop("password", None)
- return dict(
- cls=f"{cls.__module__}:{cls.__name__}",
- protocol=proto[0] if isinstance(proto, (tuple, list)) else proto,
- args=json_encoder.make_serializable(self.storage_args),
- **json_encoder.make_serializable(storage_options),
- )
- @staticmethod
- def from_dict(dct: dict[str, Any]) -> AbstractFileSystem:
- """
- Recreate a filesystem instance from dictionary representation.
- See ``.to_dict()`` for the expected structure of the input.
- Parameters
- ----------
- dct: Dict[str, Any]
- Returns
- -------
- file system instance, not necessarily of this particular class.
- Warnings
- --------
- This can import arbitrary modules (as determined by the ``cls`` key).
- Make sure you haven't installed any modules that may execute malicious code
- at import time.
- """
- from .json import FilesystemJSONDecoder
- json_decoder = FilesystemJSONDecoder()
- dct = dict(dct) # Defensive copy
- cls = FilesystemJSONDecoder.try_resolve_fs_cls(dct)
- if cls is None:
- raise ValueError("Not a serialized AbstractFileSystem")
- dct.pop("cls", None)
- dct.pop("protocol", None)
- return cls(
- *json_decoder.unmake_serializable(dct.pop("args", ())),
- **json_decoder.unmake_serializable(dct),
- )
- def _get_pyarrow_filesystem(self):
- """
- Make a version of the FS instance which will be acceptable to pyarrow
- """
- # all instances already also derive from pyarrow
- return self
- def get_mapper(self, root="", check=False, create=False, missing_exceptions=None):
- """Create key/value store based on this file-system
- Makes a MutableMapping interface to the FS at the given root path.
- See ``fsspec.mapping.FSMap`` for further details.
- """
- from .mapping import FSMap
- return FSMap(
- root,
- self,
- check=check,
- create=create,
- missing_exceptions=missing_exceptions,
- )
- @classmethod
- def clear_instance_cache(cls):
- """
- Clear the cache of filesystem instances.
- Notes
- -----
- Unless overridden by setting the ``cachable`` class attribute to False,
- the filesystem class stores a reference to newly created instances. This
- prevents Python's normal rules around garbage collection from working,
- since the instances refcount will not drop to zero until
- ``clear_instance_cache`` is called.
- """
- cls._cache.clear()
- def created(self, path):
- """Return the created timestamp of a file as a datetime.datetime"""
- raise NotImplementedError
- def modified(self, path):
- """Return the modified timestamp of a file as a datetime.datetime"""
- raise NotImplementedError
- def tree(
- self,
- path: str = "/",
- recursion_limit: int = 2,
- max_display: int = 25,
- display_size: bool = False,
- prefix: str = "",
- is_last: bool = True,
- first: bool = True,
- indent_size: int = 4,
- ) -> str:
- """
- Return a tree-like structure of the filesystem starting from the given path as a string.
- Parameters
- ----------
- path: Root path to start traversal from
- recursion_limit: Maximum depth of directory traversal
- max_display: Maximum number of items to display per directory
- display_size: Whether to display file sizes
- prefix: Current line prefix for visual tree structure
- is_last: Whether current item is last in its level
- first: Whether this is the first call (displays root path)
- indent_size: Number of spaces by indent
- Returns
- -------
- str: A string representing the tree structure.
- Example
- -------
- >>> from fsspec import filesystem
- >>> fs = filesystem('ftp', host='test.rebex.net', user='demo', password='password')
- >>> tree = fs.tree(display_size=True, recursion_limit=3, indent_size=8, max_display=10)
- >>> print(tree)
- """
- def format_bytes(n: int) -> str:
- """Format bytes as text."""
- for prefix, k in (
- ("P", 2**50),
- ("T", 2**40),
- ("G", 2**30),
- ("M", 2**20),
- ("k", 2**10),
- ):
- if n >= 0.9 * k:
- return f"{n / k:.2f} {prefix}b"
- return f"{n}B"
- result = []
- if first:
- result.append(path)
- if recursion_limit:
- indent = " " * indent_size
- contents = self.ls(path, detail=True)
- contents.sort(
- key=lambda x: (x.get("type") != "directory", x.get("name", ""))
- )
- if max_display is not None and len(contents) > max_display:
- displayed_contents = contents[:max_display]
- remaining_count = len(contents) - max_display
- else:
- displayed_contents = contents
- remaining_count = 0
- for i, item in enumerate(displayed_contents):
- is_last_item = (i == len(displayed_contents) - 1) and (
- remaining_count == 0
- )
- branch = (
- "└" + ("─" * (indent_size - 2))
- if is_last_item
- else "├" + ("─" * (indent_size - 2))
- )
- branch += " "
- new_prefix = prefix + (
- indent if is_last_item else "│" + " " * (indent_size - 1)
- )
- name = os.path.basename(item.get("name", ""))
- if display_size and item.get("type") == "directory":
- sub_contents = self.ls(item.get("name", ""), detail=True)
- num_files = sum(
- 1 for sub_item in sub_contents if sub_item.get("type") == "file"
- )
- num_folders = sum(
- 1
- for sub_item in sub_contents
- if sub_item.get("type") == "directory"
- )
- if num_files == 0 and num_folders == 0:
- size = " (empty folder)"
- elif num_files == 0:
- size = f" ({num_folders} subfolder{'s' if num_folders > 1 else ''})"
- elif num_folders == 0:
- size = f" ({num_files} file{'s' if num_files > 1 else ''})"
- else:
- size = f" ({num_files} file{'s' if num_files > 1 else ''}, {num_folders} subfolder{'s' if num_folders > 1 else ''})"
- elif display_size and item.get("type") == "file":
- size = f" ({format_bytes(item.get('size', 0))})"
- else:
- size = ""
- result.append(f"{prefix}{branch}{name}{size}")
- if item.get("type") == "directory" and recursion_limit > 0:
- result.append(
- self.tree(
- path=item.get("name", ""),
- recursion_limit=recursion_limit - 1,
- max_display=max_display,
- display_size=display_size,
- prefix=new_prefix,
- is_last=is_last_item,
- first=False,
- indent_size=indent_size,
- )
- )
- if remaining_count > 0:
- more_message = f"{remaining_count} more item(s) not displayed."
- result.append(
- f"{prefix}{'└' + ('─' * (indent_size - 2))} {more_message}"
- )
- return "\n".join(_ for _ in result if _)
- # ------------------------------------------------------------------------
- # Aliases
- def read_bytes(self, path, start=None, end=None, **kwargs):
- """Alias of `AbstractFileSystem.cat_file`."""
- return self.cat_file(path, start=start, end=end, **kwargs)
- def write_bytes(self, path, value, **kwargs):
- """Alias of `AbstractFileSystem.pipe_file`."""
- self.pipe_file(path, value, **kwargs)
- def makedir(self, path, create_parents=True, **kwargs):
- """Alias of `AbstractFileSystem.mkdir`."""
- return self.mkdir(path, create_parents=create_parents, **kwargs)
- def mkdirs(self, path, exist_ok=False):
- """Alias of `AbstractFileSystem.makedirs`."""
- return self.makedirs(path, exist_ok=exist_ok)
- def listdir(self, path, detail=True, **kwargs):
- """Alias of `AbstractFileSystem.ls`."""
- return self.ls(path, detail=detail, **kwargs)
- def cp(self, path1, path2, **kwargs):
- """Alias of `AbstractFileSystem.copy`."""
- return self.copy(path1, path2, **kwargs)
- def move(self, path1, path2, **kwargs):
- """Alias of `AbstractFileSystem.mv`."""
- return self.mv(path1, path2, **kwargs)
- def stat(self, path, **kwargs):
- """Alias of `AbstractFileSystem.info`."""
- return self.info(path, **kwargs)
- def disk_usage(self, path, total=True, maxdepth=None, **kwargs):
- """Alias of `AbstractFileSystem.du`."""
- return self.du(path, total=total, maxdepth=maxdepth, **kwargs)
- def rename(self, path1, path2, **kwargs):
- """Alias of `AbstractFileSystem.mv`."""
- return self.mv(path1, path2, **kwargs)
- def delete(self, path, recursive=False, maxdepth=None):
- """Alias of `AbstractFileSystem.rm`."""
- return self.rm(path, recursive=recursive, maxdepth=maxdepth)
- def upload(self, lpath, rpath, recursive=False, **kwargs):
- """Alias of `AbstractFileSystem.put`."""
- return self.put(lpath, rpath, recursive=recursive, **kwargs)
- def download(self, rpath, lpath, recursive=False, **kwargs):
- """Alias of `AbstractFileSystem.get`."""
- return self.get(rpath, lpath, recursive=recursive, **kwargs)
- def sign(self, path, expiration=100, **kwargs):
- """Create a signed URL representing the given path
- Some implementations allow temporary URLs to be generated, as a
- way of delegating credentials.
- Parameters
- ----------
- path : str
- The path on the filesystem
- expiration : int
- Number of seconds to enable the URL for (if supported)
- Returns
- -------
- URL : str
- The signed URL
- Raises
- ------
- NotImplementedError : if method is not implemented for a filesystem
- """
- raise NotImplementedError("Sign is not implemented for this filesystem")
- def _isfilestore(self):
- # Originally inherited from pyarrow DaskFileSystem. Keeping this
- # here for backwards compatibility as long as pyarrow uses its
- # legacy fsspec-compatible filesystems and thus accepts fsspec
- # filesystems as well
- return False
- class AbstractBufferedFile(io.IOBase):
- """Convenient class to derive from to provide buffering
- In the case that the backend does not provide a pythonic file-like object
- already, this class contains much of the logic to build one. The only
- methods that need to be overridden are ``_upload_chunk``,
- ``_initiate_upload`` and ``_fetch_range``.
- """
- DEFAULT_BLOCK_SIZE = 5 * 2**20
- _details = None
- def __init__(
- self,
- fs,
- path,
- mode="rb",
- block_size="default",
- autocommit=True,
- cache_type="readahead",
- cache_options=None,
- size=None,
- **kwargs,
- ):
- """
- Template for files with buffered reading and writing
- Parameters
- ----------
- fs: instance of FileSystem
- path: str
- location in file-system
- mode: str
- Normal file modes. Currently only 'wb', 'ab' or 'rb'. Some file
- systems may be read-only, and some may not support append.
- block_size: int
- Buffer size for reading or writing, 'default' for class default
- autocommit: bool
- Whether to write to final destination; may only impact what
- happens when file is being closed.
- cache_type: {"readahead", "none", "mmap", "bytes"}, default "readahead"
- Caching policy in read mode. See the definitions in ``core``.
- cache_options : dict
- Additional options passed to the constructor for the cache specified
- by `cache_type`.
- size: int
- If given and in read mode, suppressed having to look up the file size
- kwargs:
- Gets stored as self.kwargs
- """
- from .core import caches
- self.path = path
- self.fs = fs
- self.mode = mode
- self.blocksize = (
- self.DEFAULT_BLOCK_SIZE if block_size in ["default", None] else block_size
- )
- self.loc = 0
- self.autocommit = autocommit
- self.end = None
- self.start = None
- self.closed = False
- if cache_options is None:
- cache_options = {}
- if "trim" in kwargs:
- warnings.warn(
- "Passing 'trim' to control the cache behavior has been deprecated. "
- "Specify it within the 'cache_options' argument instead.",
- FutureWarning,
- )
- cache_options["trim"] = kwargs.pop("trim")
- self.kwargs = kwargs
- if mode not in {"ab", "rb", "wb", "xb"}:
- raise NotImplementedError("File mode not supported")
- if mode == "rb":
- if size is not None:
- self.size = size
- else:
- self.size = self.details["size"]
- self.cache = caches[cache_type](
- self.blocksize, self._fetch_range, self.size, **cache_options
- )
- else:
- self.buffer = io.BytesIO()
- self.offset = None
- self.forced = False
- self.location = None
- @property
- def details(self):
- if self._details is None:
- self._details = self.fs.info(self.path)
- return self._details
- @details.setter
- def details(self, value):
- self._details = value
- self.size = value["size"]
- @property
- def full_name(self):
- return _unstrip_protocol(self.path, self.fs)
- @property
- def closed(self):
- # get around this attr being read-only in IOBase
- # use getattr here, since this can be called during del
- return getattr(self, "_closed", True)
- @closed.setter
- def closed(self, c):
- self._closed = c
- def __hash__(self):
- if "w" in self.mode:
- return id(self)
- else:
- return int(tokenize(self.details), 16)
- def __eq__(self, other):
- """Files are equal if they have the same checksum, only in read mode"""
- if self is other:
- return True
- return (
- isinstance(other, type(self))
- and self.mode == "rb"
- and other.mode == "rb"
- and hash(self) == hash(other)
- )
- def commit(self):
- """Move from temp to final destination"""
- def discard(self):
- """Throw away temporary file"""
- def info(self):
- """File information about this path"""
- if self.readable():
- return self.details
- else:
- raise ValueError("Info not available while writing")
- def tell(self):
- """Current file location"""
- return self.loc
- def seek(self, loc, whence=0):
- """Set current file location
- Parameters
- ----------
- loc: int
- byte location
- whence: {0, 1, 2}
- from start of file, current location or end of file, resp.
- """
- loc = int(loc)
- if not self.mode == "rb":
- raise OSError(ESPIPE, "Seek only available in read mode")
- if whence == 0:
- nloc = loc
- elif whence == 1:
- nloc = self.loc + loc
- elif whence == 2:
- nloc = self.size + loc
- else:
- raise ValueError(f"invalid whence ({whence}, should be 0, 1 or 2)")
- if nloc < 0:
- raise ValueError("Seek before start of file")
- self.loc = nloc
- return self.loc
- def write(self, data):
- """
- Write data to buffer.
- Buffer only sent on flush() or if buffer is greater than
- or equal to blocksize.
- Parameters
- ----------
- data: bytes
- Set of bytes to be written.
- """
- if not self.writable():
- raise ValueError("File not in write mode")
- if self.closed:
- raise ValueError("I/O operation on closed file.")
- if self.forced:
- raise ValueError("This file has been force-flushed, can only close")
- out = self.buffer.write(data)
- self.loc += out
- if self.buffer.tell() >= self.blocksize:
- self.flush()
- return out
- def flush(self, force=False):
- """
- Write buffered data to backend store.
- Writes the current buffer, if it is larger than the block-size, or if
- the file is being closed.
- Parameters
- ----------
- force: bool
- When closing, write the last block even if it is smaller than
- blocks are allowed to be. Disallows further writing to this file.
- """
- if self.closed:
- raise ValueError("Flush on closed file")
- if force and self.forced:
- raise ValueError("Force flush cannot be called more than once")
- if force:
- self.forced = True
- if self.readable():
- # no-op to flush on read-mode
- return
- if not force and self.buffer.tell() < self.blocksize:
- # Defer write on small block
- return
- if self.offset is None:
- # Initialize a multipart upload
- self.offset = 0
- try:
- self._initiate_upload()
- except:
- self.closed = True
- raise
- if self._upload_chunk(final=force) is not False:
- self.offset += self.buffer.seek(0, 2)
- self.buffer = io.BytesIO()
- def _upload_chunk(self, final=False):
- """Write one part of a multi-block file upload
- Parameters
- ==========
- final: bool
- This is the last block, so should complete file, if
- self.autocommit is True.
- """
- # may not yet have been initialized, may need to call _initialize_upload
- def _initiate_upload(self):
- """Create remote file/upload"""
- pass
- def _fetch_range(self, start, end):
- """Get the specified set of bytes from remote"""
- return self.fs.cat_file(self.path, start=start, end=end)
- def read(self, length=-1):
- """
- Return data from cache, or fetch pieces as necessary
- Parameters
- ----------
- length: int (-1)
- Number of bytes to read; if <0, all remaining bytes.
- """
- length = -1 if length is None else int(length)
- if self.mode != "rb":
- raise ValueError("File not in read mode")
- if length < 0:
- length = self.size - self.loc
- if self.closed:
- raise ValueError("I/O operation on closed file.")
- if length == 0:
- # don't even bother calling fetch
- return b""
- out = self.cache._fetch(self.loc, self.loc + length)
- logger.debug(
- "%s read: %i - %i %s",
- self,
- self.loc,
- self.loc + length,
- self.cache._log_stats(),
- )
- self.loc += len(out)
- return out
- def readinto(self, b):
- """mirrors builtin file's readinto method
- https://docs.python.org/3/library/io.html#io.RawIOBase.readinto
- """
- out = memoryview(b).cast("B")
- data = self.read(out.nbytes)
- out[: len(data)] = data
- return len(data)
- def readuntil(self, char=b"\n", blocks=None):
- """Return data between current position and first occurrence of char
- char is included in the output, except if the end of the tile is
- encountered first.
- Parameters
- ----------
- char: bytes
- Thing to find
- blocks: None or int
- How much to read in each go. Defaults to file blocksize - which may
- mean a new read on every call.
- """
- out = []
- while True:
- start = self.tell()
- part = self.read(blocks or self.blocksize)
- if len(part) == 0:
- break
- found = part.find(char)
- if found > -1:
- out.append(part[: found + len(char)])
- self.seek(start + found + len(char))
- break
- out.append(part)
- return b"".join(out)
- def readline(self):
- """Read until and including the first occurrence of newline character
- Note that, because of character encoding, this is not necessarily a
- true line ending.
- """
- return self.readuntil(b"\n")
- def __next__(self):
- out = self.readline()
- if out:
- return out
- raise StopIteration
- def __iter__(self):
- return self
- def readlines(self):
- """Return all data, split by the newline character, including the newline character"""
- data = self.read()
- lines = data.split(b"\n")
- out = [l + b"\n" for l in lines[:-1]]
- if data.endswith(b"\n"):
- return out
- else:
- return out + [lines[-1]]
- # return list(self) ???
- def readinto1(self, b):
- return self.readinto(b)
- def close(self):
- """Close file
- Finalizes writes, discards cache
- """
- if getattr(self, "_unclosable", False):
- return
- if self.closed:
- return
- try:
- if self.mode == "rb":
- self.cache = None
- else:
- if not self.forced:
- self.flush(force=True)
- if self.fs is not None:
- self.fs.invalidate_cache(self.path)
- self.fs.invalidate_cache(self.fs._parent(self.path))
- finally:
- self.closed = True
- def readable(self):
- """Whether opened for reading"""
- return "r" in self.mode and not self.closed
- def seekable(self):
- """Whether is seekable (only in read mode)"""
- return self.readable()
- def writable(self):
- """Whether opened for writing"""
- return self.mode in {"wb", "ab", "xb"} and not self.closed
- def __reduce__(self):
- if self.mode != "rb":
- raise RuntimeError("Pickling a writeable file is not supported")
- return reopen, (
- self.fs,
- self.path,
- self.mode,
- self.blocksize,
- self.loc,
- self.size,
- self.autocommit,
- self.cache.name if self.cache else "none",
- self.kwargs,
- )
- def __del__(self):
- if not self.closed:
- self.close()
- def __str__(self):
- return f"<File-like object {type(self.fs).__name__}, {self.path}>"
- __repr__ = __str__
- def __enter__(self):
- return self
- def __exit__(self, *args):
- self.close()
- def reopen(fs, path, mode, blocksize, loc, size, autocommit, cache_type, kwargs):
- file = fs.open(
- path,
- mode=mode,
- block_size=blocksize,
- autocommit=autocommit,
- cache_type=cache_type,
- size=size,
- **kwargs,
- )
- if loc > 0:
- file.seek(loc)
- return file
|