123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739 |
- from __future__ import annotations
- import contextlib
- import logging
- import math
- import os
- import re
- import sys
- import tempfile
- from functools import partial
- from hashlib import md5
- from importlib.metadata import version
- from typing import (
- IO,
- TYPE_CHECKING,
- Any,
- Callable,
- Iterable,
- Iterator,
- Sequence,
- TypeVar,
- )
- from urllib.parse import urlsplit
- if TYPE_CHECKING:
- import pathlib
- from typing_extensions import TypeGuard
- from fsspec.spec import AbstractFileSystem
- DEFAULT_BLOCK_SIZE = 5 * 2**20
- T = TypeVar("T")
- def infer_storage_options(
- urlpath: str, inherit_storage_options: dict[str, Any] | None = None
- ) -> dict[str, Any]:
- """Infer storage options from URL path and merge it with existing storage
- options.
- Parameters
- ----------
- urlpath: str or unicode
- Either local absolute file path or URL (hdfs://namenode:8020/file.csv)
- inherit_storage_options: dict (optional)
- Its contents will get merged with the inferred information from the
- given path
- Returns
- -------
- Storage options dict.
- Examples
- --------
- >>> infer_storage_options('/mnt/datasets/test.csv') # doctest: +SKIP
- {"protocol": "file", "path", "/mnt/datasets/test.csv"}
- >>> infer_storage_options(
- ... 'hdfs://username:pwd@node:123/mnt/datasets/test.csv?q=1',
- ... inherit_storage_options={'extra': 'value'},
- ... ) # doctest: +SKIP
- {"protocol": "hdfs", "username": "username", "password": "pwd",
- "host": "node", "port": 123, "path": "/mnt/datasets/test.csv",
- "url_query": "q=1", "extra": "value"}
- """
- # Handle Windows paths including disk name in this special case
- if (
- re.match(r"^[a-zA-Z]:[\\/]", urlpath)
- or re.match(r"^[a-zA-Z0-9]+://", urlpath) is None
- ):
- return {"protocol": "file", "path": urlpath}
- parsed_path = urlsplit(urlpath)
- protocol = parsed_path.scheme or "file"
- if parsed_path.fragment:
- path = "#".join([parsed_path.path, parsed_path.fragment])
- else:
- path = parsed_path.path
- if protocol == "file":
- # Special case parsing file protocol URL on Windows according to:
- # https://msdn.microsoft.com/en-us/library/jj710207.aspx
- windows_path = re.match(r"^/([a-zA-Z])[:|]([\\/].*)$", path)
- if windows_path:
- drive, path = windows_path.groups()
- path = f"{drive}:{path}"
- if protocol in ["http", "https"]:
- # for HTTP, we don't want to parse, as requests will anyway
- return {"protocol": protocol, "path": urlpath}
- options: dict[str, Any] = {"protocol": protocol, "path": path}
- if parsed_path.netloc:
- # Parse `hostname` from netloc manually because `parsed_path.hostname`
- # lowercases the hostname which is not always desirable (e.g. in S3):
- # https://github.com/dask/dask/issues/1417
- options["host"] = parsed_path.netloc.rsplit("@", 1)[-1].rsplit(":", 1)[0]
- if protocol in ("s3", "s3a", "gcs", "gs"):
- options["path"] = options["host"] + options["path"]
- else:
- options["host"] = options["host"]
- if parsed_path.port:
- options["port"] = parsed_path.port
- if parsed_path.username:
- options["username"] = parsed_path.username
- if parsed_path.password:
- options["password"] = parsed_path.password
- if parsed_path.query:
- options["url_query"] = parsed_path.query
- if parsed_path.fragment:
- options["url_fragment"] = parsed_path.fragment
- if inherit_storage_options:
- update_storage_options(options, inherit_storage_options)
- return options
- def update_storage_options(
- options: dict[str, Any], inherited: dict[str, Any] | None = None
- ) -> None:
- if not inherited:
- inherited = {}
- collisions = set(options) & set(inherited)
- if collisions:
- for collision in collisions:
- if options.get(collision) != inherited.get(collision):
- raise KeyError(
- f"Collision between inferred and specified storage "
- f"option:\n{collision}"
- )
- options.update(inherited)
- # Compression extensions registered via fsspec.compression.register_compression
- compressions: dict[str, str] = {}
- def infer_compression(filename: str) -> str | None:
- """Infer compression, if available, from filename.
- Infer a named compression type, if registered and available, from filename
- extension. This includes builtin (gz, bz2, zip) compressions, as well as
- optional compressions. See fsspec.compression.register_compression.
- """
- extension = os.path.splitext(filename)[-1].strip(".").lower()
- if extension in compressions:
- return compressions[extension]
- return None
- def build_name_function(max_int: float) -> Callable[[int], str]:
- """Returns a function that receives a single integer
- and returns it as a string padded by enough zero characters
- to align with maximum possible integer
- >>> name_f = build_name_function(57)
- >>> name_f(7)
- '07'
- >>> name_f(31)
- '31'
- >>> build_name_function(1000)(42)
- '0042'
- >>> build_name_function(999)(42)
- '042'
- >>> build_name_function(0)(0)
- '0'
- """
- # handle corner cases max_int is 0 or exact power of 10
- max_int += 1e-8
- pad_length = int(math.ceil(math.log10(max_int)))
- def name_function(i: int) -> str:
- return str(i).zfill(pad_length)
- return name_function
- def seek_delimiter(file: IO[bytes], delimiter: bytes, blocksize: int) -> bool:
- r"""Seek current file to file start, file end, or byte after delimiter seq.
- Seeks file to next chunk delimiter, where chunks are defined on file start,
- a delimiting sequence, and file end. Use file.tell() to see location afterwards.
- Note that file start is a valid split, so must be at offset > 0 to seek for
- delimiter.
- Parameters
- ----------
- file: a file
- delimiter: bytes
- a delimiter like ``b'\n'`` or message sentinel, matching file .read() type
- blocksize: int
- Number of bytes to read from the file at once.
- Returns
- -------
- Returns True if a delimiter was found, False if at file start or end.
- """
- if file.tell() == 0:
- # beginning-of-file, return without seek
- return False
- # Interface is for binary IO, with delimiter as bytes, but initialize last
- # with result of file.read to preserve compatibility with text IO.
- last: bytes | None = None
- while True:
- current = file.read(blocksize)
- if not current:
- # end-of-file without delimiter
- return False
- full = last + current if last else current
- try:
- if delimiter in full:
- i = full.index(delimiter)
- file.seek(file.tell() - (len(full) - i) + len(delimiter))
- return True
- elif len(current) < blocksize:
- # end-of-file without delimiter
- return False
- except (OSError, ValueError):
- pass
- last = full[-len(delimiter) :]
- def read_block(
- f: IO[bytes],
- offset: int,
- length: int | None,
- delimiter: bytes | None = None,
- split_before: bool = False,
- ) -> bytes:
- """Read a block of bytes from a file
- Parameters
- ----------
- f: File
- Open file
- offset: int
- Byte offset to start read
- length: int
- Number of bytes to read, read through end of file if None
- delimiter: bytes (optional)
- Ensure reading starts and stops at delimiter bytestring
- split_before: bool (optional)
- Start/stop read *before* delimiter bytestring.
- If using the ``delimiter=`` keyword argument we ensure that the read
- starts and stops at delimiter boundaries that follow the locations
- ``offset`` and ``offset + length``. If ``offset`` is zero then we
- start at zero, regardless of delimiter. The bytestring returned WILL
- include the terminating delimiter string.
- Examples
- --------
- >>> from io import BytesIO # doctest: +SKIP
- >>> f = BytesIO(b'Alice, 100\\nBob, 200\\nCharlie, 300') # doctest: +SKIP
- >>> read_block(f, 0, 13) # doctest: +SKIP
- b'Alice, 100\\nBo'
- >>> read_block(f, 0, 13, delimiter=b'\\n') # doctest: +SKIP
- b'Alice, 100\\nBob, 200\\n'
- >>> read_block(f, 10, 10, delimiter=b'\\n') # doctest: +SKIP
- b'Bob, 200\\nCharlie, 300'
- """
- if delimiter:
- f.seek(offset)
- found_start_delim = seek_delimiter(f, delimiter, 2**16)
- if length is None:
- return f.read()
- start = f.tell()
- length -= start - offset
- f.seek(start + length)
- found_end_delim = seek_delimiter(f, delimiter, 2**16)
- end = f.tell()
- # Adjust split location to before delimiter if seek found the
- # delimiter sequence, not start or end of file.
- if found_start_delim and split_before:
- start -= len(delimiter)
- if found_end_delim and split_before:
- end -= len(delimiter)
- offset = start
- length = end - start
- f.seek(offset)
- # TODO: allow length to be None and read to the end of the file?
- assert length is not None
- b = f.read(length)
- return b
- def tokenize(*args: Any, **kwargs: Any) -> str:
- """Deterministic token
- (modified from dask.base)
- >>> tokenize([1, 2, '3'])
- '9d71491b50023b06fc76928e6eddb952'
- >>> tokenize('Hello') == tokenize('Hello')
- True
- """
- if kwargs:
- args += (kwargs,)
- try:
- h = md5(str(args).encode())
- except ValueError:
- # FIPS systems: https://github.com/fsspec/filesystem_spec/issues/380
- h = md5(str(args).encode(), usedforsecurity=False)
- return h.hexdigest()
- def stringify_path(filepath: str | os.PathLike[str] | pathlib.Path) -> str:
- """Attempt to convert a path-like object to a string.
- Parameters
- ----------
- filepath: object to be converted
- Returns
- -------
- filepath_str: maybe a string version of the object
- Notes
- -----
- Objects supporting the fspath protocol are coerced according to its
- __fspath__ method.
- For backwards compatibility with older Python version, pathlib.Path
- objects are specially coerced.
- Any other object is passed through unchanged, which includes bytes,
- strings, buffers, or anything else that's not even path-like.
- """
- if isinstance(filepath, str):
- return filepath
- elif hasattr(filepath, "__fspath__"):
- return filepath.__fspath__()
- elif hasattr(filepath, "path"):
- return filepath.path
- else:
- return filepath # type: ignore[return-value]
- def make_instance(
- cls: Callable[..., T], args: Sequence[Any], kwargs: dict[str, Any]
- ) -> T:
- inst = cls(*args, **kwargs)
- inst._determine_worker() # type: ignore[attr-defined]
- return inst
- def common_prefix(paths: Iterable[str]) -> str:
- """For a list of paths, find the shortest prefix common to all"""
- parts = [p.split("/") for p in paths]
- lmax = min(len(p) for p in parts)
- end = 0
- for i in range(lmax):
- end = all(p[i] == parts[0][i] for p in parts)
- if not end:
- break
- i += end
- return "/".join(parts[0][:i])
- def other_paths(
- paths: list[str],
- path2: str | list[str],
- exists: bool = False,
- flatten: bool = False,
- ) -> list[str]:
- """In bulk file operations, construct a new file tree from a list of files
- Parameters
- ----------
- paths: list of str
- The input file tree
- path2: str or list of str
- Root to construct the new list in. If this is already a list of str, we just
- assert it has the right number of elements.
- exists: bool (optional)
- For a str destination, it is already exists (and is a dir), files should
- end up inside.
- flatten: bool (optional)
- Whether to flatten the input directory tree structure so that the output files
- are in the same directory.
- Returns
- -------
- list of str
- """
- if isinstance(path2, str):
- path2 = path2.rstrip("/")
- if flatten:
- path2 = ["/".join((path2, p.split("/")[-1])) for p in paths]
- else:
- cp = common_prefix(paths)
- if exists:
- cp = cp.rsplit("/", 1)[0]
- if not cp and all(not s.startswith("/") for s in paths):
- path2 = ["/".join([path2, p]) for p in paths]
- else:
- path2 = [p.replace(cp, path2, 1) for p in paths]
- else:
- assert len(paths) == len(path2)
- return path2
- def is_exception(obj: Any) -> bool:
- return isinstance(obj, BaseException)
- def isfilelike(f: Any) -> TypeGuard[IO[bytes]]:
- return all(hasattr(f, attr) for attr in ["read", "close", "tell"])
- def get_protocol(url: str) -> str:
- url = stringify_path(url)
- parts = re.split(r"(\:\:|\://)", url, maxsplit=1)
- if len(parts) > 1:
- return parts[0]
- return "file"
- def can_be_local(path: str) -> bool:
- """Can the given URL be used with open_local?"""
- from fsspec import get_filesystem_class
- try:
- return getattr(get_filesystem_class(get_protocol(path)), "local_file", False)
- except (ValueError, ImportError):
- # not in registry or import failed
- return False
- def get_package_version_without_import(name: str) -> str | None:
- """For given package name, try to find the version without importing it
- Import and package.__version__ is still the backup here, so an import
- *might* happen.
- Returns either the version string, or None if the package
- or the version was not readily found.
- """
- if name in sys.modules:
- mod = sys.modules[name]
- if hasattr(mod, "__version__"):
- return mod.__version__
- try:
- return version(name)
- except: # noqa: E722
- pass
- try:
- import importlib
- mod = importlib.import_module(name)
- return mod.__version__
- except (ImportError, AttributeError):
- return None
- def setup_logging(
- logger: logging.Logger | None = None,
- logger_name: str | None = None,
- level: str = "DEBUG",
- clear: bool = True,
- ) -> logging.Logger:
- if logger is None and logger_name is None:
- raise ValueError("Provide either logger object or logger name")
- logger = logger or logging.getLogger(logger_name)
- handle = logging.StreamHandler()
- formatter = logging.Formatter(
- "%(asctime)s - %(name)s - %(levelname)s - %(funcName)s -- %(message)s"
- )
- handle.setFormatter(formatter)
- if clear:
- logger.handlers.clear()
- logger.addHandler(handle)
- logger.setLevel(level)
- return logger
- def _unstrip_protocol(name: str, fs: AbstractFileSystem) -> str:
- return fs.unstrip_protocol(name)
- def mirror_from(
- origin_name: str, methods: Iterable[str]
- ) -> Callable[[type[T]], type[T]]:
- """Mirror attributes and methods from the given
- origin_name attribute of the instance to the
- decorated class"""
- def origin_getter(method: str, self: Any) -> Any:
- origin = getattr(self, origin_name)
- return getattr(origin, method)
- def wrapper(cls: type[T]) -> type[T]:
- for method in methods:
- wrapped_method = partial(origin_getter, method)
- setattr(cls, method, property(wrapped_method))
- return cls
- return wrapper
- @contextlib.contextmanager
- def nullcontext(obj: T) -> Iterator[T]:
- yield obj
- def merge_offset_ranges(
- paths: list[str],
- starts: list[int] | int,
- ends: list[int] | int,
- max_gap: int = 0,
- max_block: int | None = None,
- sort: bool = True,
- ) -> tuple[list[str], list[int], list[int]]:
- """Merge adjacent byte-offset ranges when the inter-range
- gap is <= `max_gap`, and when the merged byte range does not
- exceed `max_block` (if specified). By default, this function
- will re-order the input paths and byte ranges to ensure sorted
- order. If the user can guarantee that the inputs are already
- sorted, passing `sort=False` will skip the re-ordering.
- """
- # Check input
- if not isinstance(paths, list):
- raise TypeError
- if not isinstance(starts, list):
- starts = [starts] * len(paths)
- if not isinstance(ends, list):
- ends = [ends] * len(paths)
- if len(starts) != len(paths) or len(ends) != len(paths):
- raise ValueError
- # Early Return
- if len(starts) <= 1:
- return paths, starts, ends
- starts = [s or 0 for s in starts]
- # Sort by paths and then ranges if `sort=True`
- if sort:
- paths, starts, ends = (
- list(v)
- for v in zip(
- *sorted(
- zip(paths, starts, ends),
- )
- )
- )
- if paths:
- # Loop through the coupled `paths`, `starts`, and
- # `ends`, and merge adjacent blocks when appropriate
- new_paths = paths[:1]
- new_starts = starts[:1]
- new_ends = ends[:1]
- for i in range(1, len(paths)):
- if paths[i] == paths[i - 1] and new_ends[-1] is None:
- continue
- elif (
- paths[i] != paths[i - 1]
- or ((starts[i] - new_ends[-1]) > max_gap)
- or (max_block is not None and (ends[i] - new_starts[-1]) > max_block)
- ):
- # Cannot merge with previous block.
- # Add new `paths`, `starts`, and `ends` elements
- new_paths.append(paths[i])
- new_starts.append(starts[i])
- new_ends.append(ends[i])
- else:
- # Merge with previous block by updating the
- # last element of `ends`
- new_ends[-1] = ends[i]
- return new_paths, new_starts, new_ends
- # `paths` is empty. Just return input lists
- return paths, starts, ends
- def file_size(filelike: IO[bytes]) -> int:
- """Find length of any open read-mode file-like"""
- pos = filelike.tell()
- try:
- return filelike.seek(0, 2)
- finally:
- filelike.seek(pos)
- @contextlib.contextmanager
- def atomic_write(path: str, mode: str = "wb"):
- """
- A context manager that opens a temporary file next to `path` and, on exit,
- replaces `path` with the temporary file, thereby updating `path`
- atomically.
- """
- fd, fn = tempfile.mkstemp(
- dir=os.path.dirname(path), prefix=os.path.basename(path) + "-"
- )
- try:
- with open(fd, mode) as fp:
- yield fp
- except BaseException:
- with contextlib.suppress(FileNotFoundError):
- os.unlink(fn)
- raise
- else:
- os.replace(fn, path)
- def _translate(pat, STAR, QUESTION_MARK):
- # Copied from: https://github.com/python/cpython/pull/106703.
- res: list[str] = []
- add = res.append
- i, n = 0, len(pat)
- while i < n:
- c = pat[i]
- i = i + 1
- if c == "*":
- # compress consecutive `*` into one
- if (not res) or res[-1] is not STAR:
- add(STAR)
- elif c == "?":
- add(QUESTION_MARK)
- elif c == "[":
- j = i
- if j < n and pat[j] == "!":
- j = j + 1
- if j < n and pat[j] == "]":
- j = j + 1
- while j < n and pat[j] != "]":
- j = j + 1
- if j >= n:
- add("\\[")
- else:
- stuff = pat[i:j]
- if "-" not in stuff:
- stuff = stuff.replace("\\", r"\\")
- else:
- chunks = []
- k = i + 2 if pat[i] == "!" else i + 1
- while True:
- k = pat.find("-", k, j)
- if k < 0:
- break
- chunks.append(pat[i:k])
- i = k + 1
- k = k + 3
- chunk = pat[i:j]
- if chunk:
- chunks.append(chunk)
- else:
- chunks[-1] += "-"
- # Remove empty ranges -- invalid in RE.
- for k in range(len(chunks) - 1, 0, -1):
- if chunks[k - 1][-1] > chunks[k][0]:
- chunks[k - 1] = chunks[k - 1][:-1] + chunks[k][1:]
- del chunks[k]
- # Escape backslashes and hyphens for set difference (--).
- # Hyphens that create ranges shouldn't be escaped.
- stuff = "-".join(
- s.replace("\\", r"\\").replace("-", r"\-") for s in chunks
- )
- # Escape set operations (&&, ~~ and ||).
- stuff = re.sub(r"([&~|])", r"\\\1", stuff)
- i = j + 1
- if not stuff:
- # Empty range: never match.
- add("(?!)")
- elif stuff == "!":
- # Negated empty range: match any character.
- add(".")
- else:
- if stuff[0] == "!":
- stuff = "^" + stuff[1:]
- elif stuff[0] in ("^", "["):
- stuff = "\\" + stuff
- add(f"[{stuff}]")
- else:
- add(re.escape(c))
- assert i == n
- return res
- def glob_translate(pat):
- # Copied from: https://github.com/python/cpython/pull/106703.
- # The keyword parameters' values are fixed to:
- # recursive=True, include_hidden=True, seps=None
- """Translate a pathname with shell wildcards to a regular expression."""
- if os.path.altsep:
- seps = os.path.sep + os.path.altsep
- else:
- seps = os.path.sep
- escaped_seps = "".join(map(re.escape, seps))
- any_sep = f"[{escaped_seps}]" if len(seps) > 1 else escaped_seps
- not_sep = f"[^{escaped_seps}]"
- one_last_segment = f"{not_sep}+"
- one_segment = f"{one_last_segment}{any_sep}"
- any_segments = f"(?:.+{any_sep})?"
- any_last_segments = ".*"
- results = []
- parts = re.split(any_sep, pat)
- last_part_idx = len(parts) - 1
- for idx, part in enumerate(parts):
- if part == "*":
- results.append(one_segment if idx < last_part_idx else one_last_segment)
- continue
- if part == "**":
- results.append(any_segments if idx < last_part_idx else any_last_segments)
- continue
- elif "**" in part:
- raise ValueError(
- "Invalid pattern: '**' can only be an entire path component"
- )
- if part:
- results.extend(_translate(part, f"{not_sep}*", not_sep))
- if idx < last_part_idx:
- results.append(any_sep)
- res = "".join(results)
- return rf"(?s:{res})\Z"
|