123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743 |
- from __future__ import annotations
- import io
- import logging
- import os
- import re
- from glob import has_magic
- from pathlib import Path
- # for backwards compat, we export cache things from here too
- from fsspec.caching import ( # noqa: F401
- BaseCache,
- BlockCache,
- BytesCache,
- MMapCache,
- ReadAheadCache,
- caches,
- )
- from fsspec.compression import compr
- from fsspec.config import conf
- from fsspec.registry import filesystem, get_filesystem_class
- from fsspec.utils import (
- _unstrip_protocol,
- build_name_function,
- infer_compression,
- stringify_path,
- )
- logger = logging.getLogger("fsspec")
- class OpenFile:
- """
- File-like object to be used in a context
- Can layer (buffered) text-mode and compression over any file-system, which
- are typically binary-only.
- These instances are safe to serialize, as the low-level file object
- is not created until invoked using ``with``.
- Parameters
- ----------
- fs: FileSystem
- The file system to use for opening the file. Should be a subclass or duck-type
- with ``fsspec.spec.AbstractFileSystem``
- path: str
- Location to open
- mode: str like 'rb', optional
- Mode of the opened file
- compression: str or None, optional
- Compression to apply
- encoding: str or None, optional
- The encoding to use if opened in text mode.
- errors: str or None, optional
- How to handle encoding errors if opened in text mode.
- newline: None or str
- Passed to TextIOWrapper in text mode, how to handle line endings.
- autoopen: bool
- If True, calls open() immediately. Mostly used by pickle
- pos: int
- If given and autoopen is True, seek to this location immediately
- """
- def __init__(
- self,
- fs,
- path,
- mode="rb",
- compression=None,
- encoding=None,
- errors=None,
- newline=None,
- ):
- self.fs = fs
- self.path = path
- self.mode = mode
- self.compression = get_compression(path, compression)
- self.encoding = encoding
- self.errors = errors
- self.newline = newline
- self.fobjects = []
- def __reduce__(self):
- return (
- OpenFile,
- (
- self.fs,
- self.path,
- self.mode,
- self.compression,
- self.encoding,
- self.errors,
- self.newline,
- ),
- )
- def __repr__(self):
- return f"<OpenFile '{self.path}'>"
- def __enter__(self):
- mode = self.mode.replace("t", "").replace("b", "") + "b"
- try:
- f = self.fs.open(self.path, mode=mode)
- except FileNotFoundError as e:
- if has_magic(self.path):
- raise FileNotFoundError(
- "%s not found. The URL contains glob characters: you maybe needed\n"
- "to pass expand=True in fsspec.open() or the storage_options of \n"
- "your library. You can also set the config value 'open_expand'\n"
- "before import, or fsspec.core.DEFAULT_EXPAND at runtime, to True.",
- self.path,
- ) from e
- raise
- self.fobjects = [f]
- if self.compression is not None:
- compress = compr[self.compression]
- f = compress(f, mode=mode[0])
- self.fobjects.append(f)
- if "b" not in self.mode:
- # assume, for example, that 'r' is equivalent to 'rt' as in builtin
- f = PickleableTextIOWrapper(
- f, encoding=self.encoding, errors=self.errors, newline=self.newline
- )
- self.fobjects.append(f)
- return self.fobjects[-1]
- def __exit__(self, *args):
- self.close()
- @property
- def full_name(self):
- return _unstrip_protocol(self.path, self.fs)
- def open(self):
- """Materialise this as a real open file without context
- The OpenFile object should be explicitly closed to avoid enclosed file
- instances persisting. You must, therefore, keep a reference to the OpenFile
- during the life of the file-like it generates.
- """
- return self.__enter__()
- def close(self):
- """Close all encapsulated file objects"""
- for f in reversed(self.fobjects):
- if "r" not in self.mode and not f.closed:
- f.flush()
- f.close()
- self.fobjects.clear()
- class OpenFiles(list):
- """List of OpenFile instances
- Can be used in a single context, which opens and closes all of the
- contained files. Normal list access to get the elements works as
- normal.
- A special case is made for caching filesystems - the files will
- be down/uploaded together at the start or end of the context, and
- this may happen concurrently, if the target filesystem supports it.
- """
- def __init__(self, *args, mode="rb", fs=None):
- self.mode = mode
- self.fs = fs
- self.files = []
- super().__init__(*args)
- def __enter__(self):
- if self.fs is None:
- raise ValueError("Context has already been used")
- fs = self.fs
- while True:
- if hasattr(fs, "open_many"):
- # check for concurrent cache download; or set up for upload
- self.files = fs.open_many(self)
- return self.files
- if hasattr(fs, "fs") and fs.fs is not None:
- fs = fs.fs
- else:
- break
- return [s.__enter__() for s in self]
- def __exit__(self, *args):
- fs = self.fs
- [s.__exit__(*args) for s in self]
- if "r" not in self.mode:
- while True:
- if hasattr(fs, "open_many"):
- # check for concurrent cache upload
- fs.commit_many(self.files)
- return
- if hasattr(fs, "fs") and fs.fs is not None:
- fs = fs.fs
- else:
- break
- def __getitem__(self, item):
- out = super().__getitem__(item)
- if isinstance(item, slice):
- return OpenFiles(out, mode=self.mode, fs=self.fs)
- return out
- def __repr__(self):
- return f"<List of {len(self)} OpenFile instances>"
- def open_files(
- urlpath,
- mode="rb",
- compression=None,
- encoding="utf8",
- errors=None,
- name_function=None,
- num=1,
- protocol=None,
- newline=None,
- auto_mkdir=True,
- expand=True,
- **kwargs,
- ):
- """Given a path or paths, return a list of ``OpenFile`` objects.
- For writing, a str path must contain the "*" character, which will be filled
- in by increasing numbers, e.g., "part*" -> "part1", "part2" if num=2.
- For either reading or writing, can instead provide explicit list of paths.
- Parameters
- ----------
- urlpath: string or list
- Absolute or relative filepath(s). Prefix with a protocol like ``s3://``
- to read from alternative filesystems. To read from multiple files you
- can pass a globstring or a list of paths, with the caveat that they
- must all have the same protocol.
- mode: 'rb', 'wt', etc.
- compression: string or None
- If given, open file using compression codec. Can either be a compression
- name (a key in ``fsspec.compression.compr``) or "infer" to guess the
- compression from the filename suffix.
- encoding: str
- For text mode only
- errors: None or str
- Passed to TextIOWrapper in text mode
- name_function: function or None
- if opening a set of files for writing, those files do not yet exist,
- so we need to generate their names by formatting the urlpath for
- each sequence number
- num: int [1]
- if writing mode, number of files we expect to create (passed to
- name+function)
- protocol: str or None
- If given, overrides the protocol found in the URL.
- newline: bytes or None
- Used for line terminator in text mode. If None, uses system default;
- if blank, uses no translation.
- auto_mkdir: bool (True)
- If in write mode, this will ensure the target directory exists before
- writing, by calling ``fs.mkdirs(exist_ok=True)``.
- expand: bool
- **kwargs: dict
- Extra options that make sense to a particular storage connection, e.g.
- host, port, username, password, etc.
- Examples
- --------
- >>> files = open_files('2015-*-*.csv') # doctest: +SKIP
- >>> files = open_files(
- ... 's3://bucket/2015-*-*.csv.gz', compression='gzip'
- ... ) # doctest: +SKIP
- Returns
- -------
- An ``OpenFiles`` instance, which is a list of ``OpenFile`` objects that can
- be used as a single context
- Notes
- -----
- For a full list of the available protocols and the implementations that
- they map across to see the latest online documentation:
- - For implementations built into ``fsspec`` see
- https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
- - For implementations in separate packages see
- https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
- """
- fs, fs_token, paths = get_fs_token_paths(
- urlpath,
- mode,
- num=num,
- name_function=name_function,
- storage_options=kwargs,
- protocol=protocol,
- expand=expand,
- )
- if fs.protocol == "file":
- fs.auto_mkdir = auto_mkdir
- elif "r" not in mode and auto_mkdir:
- parents = {fs._parent(path) for path in paths}
- for parent in parents:
- try:
- fs.makedirs(parent, exist_ok=True)
- except PermissionError:
- pass
- return OpenFiles(
- [
- OpenFile(
- fs,
- path,
- mode=mode,
- compression=compression,
- encoding=encoding,
- errors=errors,
- newline=newline,
- )
- for path in paths
- ],
- mode=mode,
- fs=fs,
- )
- def _un_chain(path, kwargs):
- # Avoid a circular import
- from fsspec.implementations.cached import CachingFileSystem
- if "::" in path:
- x = re.compile(".*[^a-z]+.*") # test for non protocol-like single word
- bits = []
- for p in path.split("::"):
- if "://" in p or x.match(p):
- bits.append(p)
- else:
- bits.append(p + "://")
- else:
- bits = [path]
- # [[url, protocol, kwargs], ...]
- out = []
- previous_bit = None
- kwargs = kwargs.copy()
- for bit in reversed(bits):
- protocol = kwargs.pop("protocol", None) or split_protocol(bit)[0] or "file"
- cls = get_filesystem_class(protocol)
- extra_kwargs = cls._get_kwargs_from_urls(bit)
- kws = kwargs.pop(protocol, {})
- if bit is bits[0]:
- kws.update(kwargs)
- kw = dict(
- **{k: v for k, v in extra_kwargs.items() if k not in kws or v != kws[k]},
- **kws,
- )
- bit = cls._strip_protocol(bit)
- if "target_protocol" not in kw and issubclass(cls, CachingFileSystem):
- bit = previous_bit
- out.append((bit, protocol, kw))
- previous_bit = bit
- out.reverse()
- return out
- def url_to_fs(url, **kwargs):
- """
- Turn fully-qualified and potentially chained URL into filesystem instance
- Parameters
- ----------
- url : str
- The fsspec-compatible URL
- **kwargs: dict
- Extra options that make sense to a particular storage connection, e.g.
- host, port, username, password, etc.
- Returns
- -------
- filesystem : FileSystem
- The new filesystem discovered from ``url`` and created with
- ``**kwargs``.
- urlpath : str
- The file-systems-specific URL for ``url``.
- """
- url = stringify_path(url)
- # non-FS arguments that appear in fsspec.open()
- # inspect could keep this in sync with open()'s signature
- known_kwargs = {
- "compression",
- "encoding",
- "errors",
- "expand",
- "mode",
- "name_function",
- "newline",
- "num",
- }
- kwargs = {k: v for k, v in kwargs.items() if k not in known_kwargs}
- chain = _un_chain(url, kwargs)
- inkwargs = {}
- # Reverse iterate the chain, creating a nested target_* structure
- for i, ch in enumerate(reversed(chain)):
- urls, protocol, kw = ch
- if i == len(chain) - 1:
- inkwargs = dict(**kw, **inkwargs)
- continue
- inkwargs["target_options"] = dict(**kw, **inkwargs)
- inkwargs["target_protocol"] = protocol
- inkwargs["fo"] = urls
- urlpath, protocol, _ = chain[0]
- fs = filesystem(protocol, **inkwargs)
- return fs, urlpath
- DEFAULT_EXPAND = conf.get("open_expand", False)
- def open(
- urlpath,
- mode="rb",
- compression=None,
- encoding="utf8",
- errors=None,
- protocol=None,
- newline=None,
- expand=None,
- **kwargs,
- ):
- """Given a path or paths, return one ``OpenFile`` object.
- Parameters
- ----------
- urlpath: string or list
- Absolute or relative filepath. Prefix with a protocol like ``s3://``
- to read from alternative filesystems. Should not include glob
- character(s).
- mode: 'rb', 'wt', etc.
- compression: string or None
- If given, open file using compression codec. Can either be a compression
- name (a key in ``fsspec.compression.compr``) or "infer" to guess the
- compression from the filename suffix.
- encoding: str
- For text mode only
- errors: None or str
- Passed to TextIOWrapper in text mode
- protocol: str or None
- If given, overrides the protocol found in the URL.
- newline: bytes or None
- Used for line terminator in text mode. If None, uses system default;
- if blank, uses no translation.
- expand: bool or None
- Whether to regard file paths containing special glob characters as needing
- expansion (finding the first match) or absolute. Setting False allows using
- paths which do embed such characters. If None (default), this argument
- takes its value from the DEFAULT_EXPAND module variable, which takes
- its initial value from the "open_expand" config value at startup, which will
- be False if not set.
- **kwargs: dict
- Extra options that make sense to a particular storage connection, e.g.
- host, port, username, password, etc.
- Examples
- --------
- >>> openfile = open('2015-01-01.csv') # doctest: +SKIP
- >>> openfile = open(
- ... 's3://bucket/2015-01-01.csv.gz', compression='gzip'
- ... ) # doctest: +SKIP
- >>> with openfile as f:
- ... df = pd.read_csv(f) # doctest: +SKIP
- ...
- Returns
- -------
- ``OpenFile`` object.
- Notes
- -----
- For a full list of the available protocols and the implementations that
- they map across to see the latest online documentation:
- - For implementations built into ``fsspec`` see
- https://filesystem-spec.readthedocs.io/en/latest/api.html#built-in-implementations
- - For implementations in separate packages see
- https://filesystem-spec.readthedocs.io/en/latest/api.html#other-known-implementations
- """
- expand = DEFAULT_EXPAND if expand is None else expand
- out = open_files(
- urlpath=[urlpath],
- mode=mode,
- compression=compression,
- encoding=encoding,
- errors=errors,
- protocol=protocol,
- newline=newline,
- expand=expand,
- **kwargs,
- )
- if not out:
- raise FileNotFoundError(urlpath)
- return out[0]
- def open_local(
- url: str | list[str] | Path | list[Path],
- mode: str = "rb",
- **storage_options: dict,
- ) -> str | list[str]:
- """Open file(s) which can be resolved to local
- For files which either are local, or get downloaded upon open
- (e.g., by file caching)
- Parameters
- ----------
- url: str or list(str)
- mode: str
- Must be read mode
- storage_options:
- passed on to FS for or used by open_files (e.g., compression)
- """
- if "r" not in mode:
- raise ValueError("Can only ensure local files when reading")
- of = open_files(url, mode=mode, **storage_options)
- if not getattr(of[0].fs, "local_file", False):
- raise ValueError(
- "open_local can only be used on a filesystem which"
- " has attribute local_file=True"
- )
- with of as files:
- paths = [f.name for f in files]
- if (isinstance(url, str) and not has_magic(url)) or isinstance(url, Path):
- return paths[0]
- return paths
- def get_compression(urlpath, compression):
- if compression == "infer":
- compression = infer_compression(urlpath)
- if compression is not None and compression not in compr:
- raise ValueError(f"Compression type {compression} not supported")
- return compression
- def split_protocol(urlpath):
- """Return protocol, path pair"""
- urlpath = stringify_path(urlpath)
- if "://" in urlpath:
- protocol, path = urlpath.split("://", 1)
- if len(protocol) > 1:
- # excludes Windows paths
- return protocol, path
- if urlpath.startswith("data:"):
- return urlpath.split(":", 1)
- return None, urlpath
- def strip_protocol(urlpath):
- """Return only path part of full URL, according to appropriate backend"""
- protocol, _ = split_protocol(urlpath)
- cls = get_filesystem_class(protocol)
- return cls._strip_protocol(urlpath)
- def expand_paths_if_needed(paths, mode, num, fs, name_function):
- """Expand paths if they have a ``*`` in them (write mode) or any of ``*?[]``
- in them (read mode).
- :param paths: list of paths
- mode: str
- Mode in which to open files.
- num: int
- If opening in writing mode, number of files we expect to create.
- fs: filesystem object
- name_function: callable
- If opening in writing mode, this callable is used to generate path
- names. Names are generated for each partition by
- ``urlpath.replace('*', name_function(partition_index))``.
- :return: list of paths
- """
- expanded_paths = []
- paths = list(paths)
- if "w" in mode: # read mode
- if sum(1 for p in paths if "*" in p) > 1:
- raise ValueError(
- "When writing data, only one filename mask can be specified."
- )
- num = max(num, len(paths))
- for curr_path in paths:
- if "*" in curr_path:
- # expand using name_function
- expanded_paths.extend(_expand_paths(curr_path, name_function, num))
- else:
- expanded_paths.append(curr_path)
- # if we generated more paths that asked for, trim the list
- if len(expanded_paths) > num:
- expanded_paths = expanded_paths[:num]
- else: # read mode
- for curr_path in paths:
- if has_magic(curr_path):
- # expand using glob
- expanded_paths.extend(fs.glob(curr_path))
- else:
- expanded_paths.append(curr_path)
- return expanded_paths
- def get_fs_token_paths(
- urlpath,
- mode="rb",
- num=1,
- name_function=None,
- storage_options=None,
- protocol=None,
- expand=True,
- ):
- """Filesystem, deterministic token, and paths from a urlpath and options.
- Parameters
- ----------
- urlpath: string or iterable
- Absolute or relative filepath, URL (may include protocols like
- ``s3://``), or globstring pointing to data.
- mode: str, optional
- Mode in which to open files.
- num: int, optional
- If opening in writing mode, number of files we expect to create.
- name_function: callable, optional
- If opening in writing mode, this callable is used to generate path
- names. Names are generated for each partition by
- ``urlpath.replace('*', name_function(partition_index))``.
- storage_options: dict, optional
- Additional keywords to pass to the filesystem class.
- protocol: str or None
- To override the protocol specifier in the URL
- expand: bool
- Expand string paths for writing, assuming the path is a directory
- """
- if isinstance(urlpath, (list, tuple, set)):
- if not urlpath:
- raise ValueError("empty urlpath sequence")
- urlpath0 = stringify_path(next(iter(urlpath)))
- else:
- urlpath0 = stringify_path(urlpath)
- storage_options = storage_options or {}
- if protocol:
- storage_options["protocol"] = protocol
- chain = _un_chain(urlpath0, storage_options or {})
- inkwargs = {}
- # Reverse iterate the chain, creating a nested target_* structure
- for i, ch in enumerate(reversed(chain)):
- urls, nested_protocol, kw = ch
- if i == len(chain) - 1:
- inkwargs = dict(**kw, **inkwargs)
- continue
- inkwargs["target_options"] = dict(**kw, **inkwargs)
- inkwargs["target_protocol"] = nested_protocol
- inkwargs["fo"] = urls
- paths, protocol, _ = chain[0]
- fs = filesystem(protocol, **inkwargs)
- if isinstance(urlpath, (list, tuple, set)):
- pchains = [
- _un_chain(stringify_path(u), storage_options or {})[0] for u in urlpath
- ]
- if len({pc[1] for pc in pchains}) > 1:
- raise ValueError("Protocol mismatch getting fs from %s", urlpath)
- paths = [pc[0] for pc in pchains]
- else:
- paths = fs._strip_protocol(paths)
- if isinstance(paths, (list, tuple, set)):
- if expand:
- paths = expand_paths_if_needed(paths, mode, num, fs, name_function)
- elif not isinstance(paths, list):
- paths = list(paths)
- else:
- if ("w" in mode or "x" in mode) and expand:
- paths = _expand_paths(paths, name_function, num)
- elif "*" in paths:
- paths = [f for f in sorted(fs.glob(paths)) if not fs.isdir(f)]
- else:
- paths = [paths]
- return fs, fs._fs_token, paths
- def _expand_paths(path, name_function, num):
- if isinstance(path, str):
- if path.count("*") > 1:
- raise ValueError("Output path spec must contain exactly one '*'.")
- elif "*" not in path:
- path = os.path.join(path, "*.part")
- if name_function is None:
- name_function = build_name_function(num - 1)
- paths = [path.replace("*", name_function(i)) for i in range(num)]
- if paths != sorted(paths):
- logger.warning(
- "In order to preserve order between partitions"
- " paths created with ``name_function`` should "
- "sort to partition order"
- )
- elif isinstance(path, (tuple, list)):
- assert len(path) == num
- paths = list(path)
- else:
- raise ValueError(
- "Path should be either\n"
- "1. A list of paths: ['foo.json', 'bar.json', ...]\n"
- "2. A directory: 'foo/\n"
- "3. A path with a '*' in it: 'foo.*.json'"
- )
- return paths
- class PickleableTextIOWrapper(io.TextIOWrapper):
- """TextIOWrapper cannot be pickled. This solves it.
- Requires that ``buffer`` be pickleable, which all instances of
- AbstractBufferedFile are.
- """
- def __init__(
- self,
- buffer,
- encoding=None,
- errors=None,
- newline=None,
- line_buffering=False,
- write_through=False,
- ):
- self.args = buffer, encoding, errors, newline, line_buffering, write_through
- super().__init__(*self.args)
- def __reduce__(self):
- return PickleableTextIOWrapper, self.args
|