123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541 |
- import io
- import json
- import warnings
- from .core import url_to_fs
- from .utils import merge_offset_ranges
- # Parquet-Specific Utilities for fsspec
- #
- # Most of the functions defined in this module are NOT
- # intended for public consumption. The only exception
- # to this is `open_parquet_file`, which should be used
- # place of `fs.open()` to open parquet-formatted files
- # on remote file systems.
- def open_parquet_file(
- path,
- mode="rb",
- fs=None,
- metadata=None,
- columns=None,
- row_groups=None,
- storage_options=None,
- strict=False,
- engine="auto",
- max_gap=64_000,
- max_block=256_000_000,
- footer_sample_size=1_000_000,
- **kwargs,
- ):
- """
- Return a file-like object for a single Parquet file.
- The specified parquet `engine` will be used to parse the
- footer metadata, and determine the required byte ranges
- from the file. The target path will then be opened with
- the "parts" (`KnownPartsOfAFile`) caching strategy.
- Note that this method is intended for usage with remote
- file systems, and is unlikely to improve parquet-read
- performance on local file systems.
- Parameters
- ----------
- path: str
- Target file path.
- mode: str, optional
- Mode option to be passed through to `fs.open`. Default is "rb".
- metadata: Any, optional
- Parquet metadata object. Object type must be supported
- by the backend parquet engine. For now, only the "fastparquet"
- engine supports an explicit `ParquetFile` metadata object.
- If a metadata object is supplied, the remote footer metadata
- will not need to be transferred into local memory.
- fs: AbstractFileSystem, optional
- Filesystem object to use for opening the file. If nothing is
- specified, an `AbstractFileSystem` object will be inferred.
- engine : str, default "auto"
- Parquet engine to use for metadata parsing. Allowed options
- include "fastparquet", "pyarrow", and "auto". The specified
- engine must be installed in the current environment. If
- "auto" is specified, and both engines are installed,
- "fastparquet" will take precedence over "pyarrow".
- columns: list, optional
- List of all column names that may be read from the file.
- row_groups : list, optional
- List of all row-groups that may be read from the file. This
- may be a list of row-group indices (integers), or it may be
- a list of `RowGroup` metadata objects (if the "fastparquet"
- engine is used).
- storage_options : dict, optional
- Used to generate an `AbstractFileSystem` object if `fs` was
- not specified.
- strict : bool, optional
- Whether the resulting `KnownPartsOfAFile` cache should
- fetch reads that go beyond a known byte-range boundary.
- If `False` (the default), any read that ends outside a
- known part will be zero padded. Note that using
- `strict=True` may be useful for debugging.
- max_gap : int, optional
- Neighboring byte ranges will only be merged when their
- inter-range gap is <= `max_gap`. Default is 64KB.
- max_block : int, optional
- Neighboring byte ranges will only be merged when the size of
- the aggregated range is <= `max_block`. Default is 256MB.
- footer_sample_size : int, optional
- Number of bytes to read from the end of the path to look
- for the footer metadata. If the sampled bytes do not contain
- the footer, a second read request will be required, and
- performance will suffer. Default is 1MB.
- **kwargs :
- Optional key-word arguments to pass to `fs.open`
- """
- # Make sure we have an `AbstractFileSystem` object
- # to work with
- if fs is None:
- fs = url_to_fs(path, **(storage_options or {}))[0]
- # For now, `columns == []` not supported. Just use
- # default `open` command with `path` input
- if columns is not None and len(columns) == 0:
- return fs.open(path, mode=mode)
- # Set the engine
- engine = _set_engine(engine)
- # Fetch the known byte ranges needed to read
- # `columns` and/or `row_groups`
- data = _get_parquet_byte_ranges(
- [path],
- fs,
- metadata=metadata,
- columns=columns,
- row_groups=row_groups,
- engine=engine,
- max_gap=max_gap,
- max_block=max_block,
- footer_sample_size=footer_sample_size,
- )
- # Extract file name from `data`
- fn = next(iter(data)) if data else path
- # Call self.open with "parts" caching
- options = kwargs.pop("cache_options", {}).copy()
- return fs.open(
- fn,
- mode=mode,
- cache_type="parts",
- cache_options={
- **options,
- "data": data.get(fn, {}),
- "strict": strict,
- },
- **kwargs,
- )
- def _get_parquet_byte_ranges(
- paths,
- fs,
- metadata=None,
- columns=None,
- row_groups=None,
- max_gap=64_000,
- max_block=256_000_000,
- footer_sample_size=1_000_000,
- engine="auto",
- ):
- """Get a dictionary of the known byte ranges needed
- to read a specific column/row-group selection from a
- Parquet dataset. Each value in the output dictionary
- is intended for use as the `data` argument for the
- `KnownPartsOfAFile` caching strategy of a single path.
- """
- # Set engine if necessary
- if isinstance(engine, str):
- engine = _set_engine(engine)
- # Pass to specialized function if metadata is defined
- if metadata is not None:
- # Use the provided parquet metadata object
- # to avoid transferring/parsing footer metadata
- return _get_parquet_byte_ranges_from_metadata(
- metadata,
- fs,
- engine,
- columns=columns,
- row_groups=row_groups,
- max_gap=max_gap,
- max_block=max_block,
- )
- # Get file sizes asynchronously
- file_sizes = fs.sizes(paths)
- # Populate global paths, starts, & ends
- result = {}
- data_paths = []
- data_starts = []
- data_ends = []
- add_header_magic = True
- if columns is None and row_groups is None:
- # We are NOT selecting specific columns or row-groups.
- #
- # We can avoid sampling the footers, and just transfer
- # all file data with cat_ranges
- for i, path in enumerate(paths):
- result[path] = {}
- for b in range(0, file_sizes[i], max_block):
- data_paths.append(path)
- data_starts.append(b)
- data_ends.append(min(b + max_block, file_sizes[i]))
- add_header_magic = False # "Magic" should already be included
- else:
- # We ARE selecting specific columns or row-groups.
- #
- # Gather file footers.
- # We just take the last `footer_sample_size` bytes of each
- # file (or the entire file if it is smaller than that)
- footer_starts = []
- footer_ends = []
- for i, path in enumerate(paths):
- footer_ends.append(file_sizes[i])
- sample_size = max(0, file_sizes[i] - footer_sample_size)
- footer_starts.append(sample_size)
- footer_samples = fs.cat_ranges(paths, footer_starts, footer_ends)
- # Check our footer samples and re-sample if necessary.
- missing_footer_starts = footer_starts.copy()
- large_footer = 0
- for i, path in enumerate(paths):
- footer_size = int.from_bytes(footer_samples[i][-8:-4], "little")
- real_footer_start = file_sizes[i] - (footer_size + 8)
- if real_footer_start < footer_starts[i]:
- missing_footer_starts[i] = real_footer_start
- large_footer = max(large_footer, (footer_size + 8))
- if large_footer:
- warnings.warn(
- f"Not enough data was used to sample the parquet footer. "
- f"Try setting footer_sample_size >= {large_footer}."
- )
- for i, block in enumerate(
- fs.cat_ranges(
- paths,
- missing_footer_starts,
- footer_starts,
- )
- ):
- footer_samples[i] = block + footer_samples[i]
- footer_starts[i] = missing_footer_starts[i]
- # Calculate required byte ranges for each path
- for i, path in enumerate(paths):
- # Deal with small-file case.
- # Just include all remaining bytes of the file
- # in a single range.
- if file_sizes[i] < max_block:
- if footer_starts[i] > 0:
- # Only need to transfer the data if the
- # footer sample isn't already the whole file
- data_paths.append(path)
- data_starts.append(0)
- data_ends.append(footer_starts[i])
- continue
- # Use "engine" to collect data byte ranges
- path_data_starts, path_data_ends = engine._parquet_byte_ranges(
- columns,
- row_groups=row_groups,
- footer=footer_samples[i],
- footer_start=footer_starts[i],
- )
- data_paths += [path] * len(path_data_starts)
- data_starts += path_data_starts
- data_ends += path_data_ends
- # Merge adjacent offset ranges
- data_paths, data_starts, data_ends = merge_offset_ranges(
- data_paths,
- data_starts,
- data_ends,
- max_gap=max_gap,
- max_block=max_block,
- sort=False, # Should already be sorted
- )
- # Start by populating `result` with footer samples
- for i, path in enumerate(paths):
- result[path] = {(footer_starts[i], footer_ends[i]): footer_samples[i]}
- # Transfer the data byte-ranges into local memory
- _transfer_ranges(fs, result, data_paths, data_starts, data_ends)
- # Add b"PAR1" to header if necessary
- if add_header_magic:
- _add_header_magic(result)
- return result
- def _get_parquet_byte_ranges_from_metadata(
- metadata,
- fs,
- engine,
- columns=None,
- row_groups=None,
- max_gap=64_000,
- max_block=256_000_000,
- ):
- """Simplified version of `_get_parquet_byte_ranges` for
- the case that an engine-specific `metadata` object is
- provided, and the remote footer metadata does not need to
- be transferred before calculating the required byte ranges.
- """
- # Use "engine" to collect data byte ranges
- data_paths, data_starts, data_ends = engine._parquet_byte_ranges(
- columns,
- row_groups=row_groups,
- metadata=metadata,
- )
- # Merge adjacent offset ranges
- data_paths, data_starts, data_ends = merge_offset_ranges(
- data_paths,
- data_starts,
- data_ends,
- max_gap=max_gap,
- max_block=max_block,
- sort=False, # Should be sorted
- )
- # Transfer the data byte-ranges into local memory
- result = {fn: {} for fn in list(set(data_paths))}
- _transfer_ranges(fs, result, data_paths, data_starts, data_ends)
- # Add b"PAR1" to header
- _add_header_magic(result)
- return result
- def _transfer_ranges(fs, blocks, paths, starts, ends):
- # Use cat_ranges to gather the data byte_ranges
- ranges = (paths, starts, ends)
- for path, start, stop, data in zip(*ranges, fs.cat_ranges(*ranges)):
- blocks[path][(start, stop)] = data
- def _add_header_magic(data):
- # Add b"PAR1" to file headers
- for path in list(data.keys()):
- add_magic = True
- for k in data[path]:
- if k[0] == 0 and k[1] >= 4:
- add_magic = False
- break
- if add_magic:
- data[path][(0, 4)] = b"PAR1"
- def _set_engine(engine_str):
- # Define a list of parquet engines to try
- if engine_str == "auto":
- try_engines = ("fastparquet", "pyarrow")
- elif not isinstance(engine_str, str):
- raise ValueError(
- "Failed to set parquet engine! "
- "Please pass 'fastparquet', 'pyarrow', or 'auto'"
- )
- elif engine_str not in ("fastparquet", "pyarrow"):
- raise ValueError(f"{engine_str} engine not supported by `fsspec.parquet`")
- else:
- try_engines = [engine_str]
- # Try importing the engines in `try_engines`,
- # and choose the first one that succeeds
- for engine in try_engines:
- try:
- if engine == "fastparquet":
- return FastparquetEngine()
- elif engine == "pyarrow":
- return PyarrowEngine()
- except ImportError:
- pass
- # Raise an error if a supported parquet engine
- # was not found
- raise ImportError(
- f"The following parquet engines are not installed "
- f"in your python environment: {try_engines}."
- f"Please install 'fastparquert' or 'pyarrow' to "
- f"utilize the `fsspec.parquet` module."
- )
- class FastparquetEngine:
- # The purpose of the FastparquetEngine class is
- # to check if fastparquet can be imported (on initialization)
- # and to define a `_parquet_byte_ranges` method. In the
- # future, this class may also be used to define other
- # methods/logic that are specific to fastparquet.
- def __init__(self):
- import fastparquet as fp
- self.fp = fp
- def _row_group_filename(self, row_group, pf):
- return pf.row_group_filename(row_group)
- def _parquet_byte_ranges(
- self,
- columns,
- row_groups=None,
- metadata=None,
- footer=None,
- footer_start=None,
- ):
- # Initialize offset ranges and define ParqetFile metadata
- pf = metadata
- data_paths, data_starts, data_ends = [], [], []
- if pf is None:
- pf = self.fp.ParquetFile(io.BytesIO(footer))
- # Convert columns to a set and add any index columns
- # specified in the pandas metadata (just in case)
- column_set = None if columns is None else set(columns)
- if column_set is not None and hasattr(pf, "pandas_metadata"):
- md_index = [
- ind
- for ind in pf.pandas_metadata.get("index_columns", [])
- # Ignore RangeIndex information
- if not isinstance(ind, dict)
- ]
- column_set |= set(md_index)
- # Check if row_groups is a list of integers
- # or a list of row-group metadata
- if row_groups and not isinstance(row_groups[0], int):
- # Input row_groups contains row-group metadata
- row_group_indices = None
- else:
- # Input row_groups contains row-group indices
- row_group_indices = row_groups
- row_groups = pf.row_groups
- # Loop through column chunks to add required byte ranges
- for r, row_group in enumerate(row_groups):
- # Skip this row-group if we are targeting
- # specific row-groups
- if row_group_indices is None or r in row_group_indices:
- # Find the target parquet-file path for `row_group`
- fn = self._row_group_filename(row_group, pf)
- for column in row_group.columns:
- name = column.meta_data.path_in_schema[0]
- # Skip this column if we are targeting a
- # specific columns
- if column_set is None or name in column_set:
- file_offset0 = column.meta_data.dictionary_page_offset
- if file_offset0 is None:
- file_offset0 = column.meta_data.data_page_offset
- num_bytes = column.meta_data.total_compressed_size
- if footer_start is None or file_offset0 < footer_start:
- data_paths.append(fn)
- data_starts.append(file_offset0)
- data_ends.append(
- min(
- file_offset0 + num_bytes,
- footer_start or (file_offset0 + num_bytes),
- )
- )
- if metadata:
- # The metadata in this call may map to multiple
- # file paths. Need to include `data_paths`
- return data_paths, data_starts, data_ends
- return data_starts, data_ends
- class PyarrowEngine:
- # The purpose of the PyarrowEngine class is
- # to check if pyarrow can be imported (on initialization)
- # and to define a `_parquet_byte_ranges` method. In the
- # future, this class may also be used to define other
- # methods/logic that are specific to pyarrow.
- def __init__(self):
- import pyarrow.parquet as pq
- self.pq = pq
- def _row_group_filename(self, row_group, metadata):
- raise NotImplementedError
- def _parquet_byte_ranges(
- self,
- columns,
- row_groups=None,
- metadata=None,
- footer=None,
- footer_start=None,
- ):
- if metadata is not None:
- raise ValueError("metadata input not supported for PyarrowEngine")
- data_starts, data_ends = [], []
- md = self.pq.ParquetFile(io.BytesIO(footer)).metadata
- # Convert columns to a set and add any index columns
- # specified in the pandas metadata (just in case)
- column_set = None if columns is None else set(columns)
- if column_set is not None:
- schema = md.schema.to_arrow_schema()
- has_pandas_metadata = (
- schema.metadata is not None and b"pandas" in schema.metadata
- )
- if has_pandas_metadata:
- md_index = [
- ind
- for ind in json.loads(
- schema.metadata[b"pandas"].decode("utf8")
- ).get("index_columns", [])
- # Ignore RangeIndex information
- if not isinstance(ind, dict)
- ]
- column_set |= set(md_index)
- # Loop through column chunks to add required byte ranges
- for r in range(md.num_row_groups):
- # Skip this row-group if we are targeting
- # specific row-groups
- if row_groups is None or r in row_groups:
- row_group = md.row_group(r)
- for c in range(row_group.num_columns):
- column = row_group.column(c)
- name = column.path_in_schema
- # Skip this column if we are targeting a
- # specific columns
- split_name = name.split(".")[0]
- if (
- column_set is None
- or name in column_set
- or split_name in column_set
- ):
- file_offset0 = column.dictionary_page_offset
- if file_offset0 is None:
- file_offset0 = column.data_page_offset
- num_bytes = column.total_compressed_size
- if file_offset0 < footer_start:
- data_starts.append(file_offset0)
- data_ends.append(
- min(file_offset0 + num_bytes, footer_start)
- )
- return data_starts, data_ends
|