parquet.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. import io
  2. import json
  3. import warnings
  4. from .core import url_to_fs
  5. from .utils import merge_offset_ranges
  6. # Parquet-Specific Utilities for fsspec
  7. #
  8. # Most of the functions defined in this module are NOT
  9. # intended for public consumption. The only exception
  10. # to this is `open_parquet_file`, which should be used
  11. # place of `fs.open()` to open parquet-formatted files
  12. # on remote file systems.
  13. def open_parquet_file(
  14. path,
  15. mode="rb",
  16. fs=None,
  17. metadata=None,
  18. columns=None,
  19. row_groups=None,
  20. storage_options=None,
  21. strict=False,
  22. engine="auto",
  23. max_gap=64_000,
  24. max_block=256_000_000,
  25. footer_sample_size=1_000_000,
  26. **kwargs,
  27. ):
  28. """
  29. Return a file-like object for a single Parquet file.
  30. The specified parquet `engine` will be used to parse the
  31. footer metadata, and determine the required byte ranges
  32. from the file. The target path will then be opened with
  33. the "parts" (`KnownPartsOfAFile`) caching strategy.
  34. Note that this method is intended for usage with remote
  35. file systems, and is unlikely to improve parquet-read
  36. performance on local file systems.
  37. Parameters
  38. ----------
  39. path: str
  40. Target file path.
  41. mode: str, optional
  42. Mode option to be passed through to `fs.open`. Default is "rb".
  43. metadata: Any, optional
  44. Parquet metadata object. Object type must be supported
  45. by the backend parquet engine. For now, only the "fastparquet"
  46. engine supports an explicit `ParquetFile` metadata object.
  47. If a metadata object is supplied, the remote footer metadata
  48. will not need to be transferred into local memory.
  49. fs: AbstractFileSystem, optional
  50. Filesystem object to use for opening the file. If nothing is
  51. specified, an `AbstractFileSystem` object will be inferred.
  52. engine : str, default "auto"
  53. Parquet engine to use for metadata parsing. Allowed options
  54. include "fastparquet", "pyarrow", and "auto". The specified
  55. engine must be installed in the current environment. If
  56. "auto" is specified, and both engines are installed,
  57. "fastparquet" will take precedence over "pyarrow".
  58. columns: list, optional
  59. List of all column names that may be read from the file.
  60. row_groups : list, optional
  61. List of all row-groups that may be read from the file. This
  62. may be a list of row-group indices (integers), or it may be
  63. a list of `RowGroup` metadata objects (if the "fastparquet"
  64. engine is used).
  65. storage_options : dict, optional
  66. Used to generate an `AbstractFileSystem` object if `fs` was
  67. not specified.
  68. strict : bool, optional
  69. Whether the resulting `KnownPartsOfAFile` cache should
  70. fetch reads that go beyond a known byte-range boundary.
  71. If `False` (the default), any read that ends outside a
  72. known part will be zero padded. Note that using
  73. `strict=True` may be useful for debugging.
  74. max_gap : int, optional
  75. Neighboring byte ranges will only be merged when their
  76. inter-range gap is <= `max_gap`. Default is 64KB.
  77. max_block : int, optional
  78. Neighboring byte ranges will only be merged when the size of
  79. the aggregated range is <= `max_block`. Default is 256MB.
  80. footer_sample_size : int, optional
  81. Number of bytes to read from the end of the path to look
  82. for the footer metadata. If the sampled bytes do not contain
  83. the footer, a second read request will be required, and
  84. performance will suffer. Default is 1MB.
  85. **kwargs :
  86. Optional key-word arguments to pass to `fs.open`
  87. """
  88. # Make sure we have an `AbstractFileSystem` object
  89. # to work with
  90. if fs is None:
  91. fs = url_to_fs(path, **(storage_options or {}))[0]
  92. # For now, `columns == []` not supported. Just use
  93. # default `open` command with `path` input
  94. if columns is not None and len(columns) == 0:
  95. return fs.open(path, mode=mode)
  96. # Set the engine
  97. engine = _set_engine(engine)
  98. # Fetch the known byte ranges needed to read
  99. # `columns` and/or `row_groups`
  100. data = _get_parquet_byte_ranges(
  101. [path],
  102. fs,
  103. metadata=metadata,
  104. columns=columns,
  105. row_groups=row_groups,
  106. engine=engine,
  107. max_gap=max_gap,
  108. max_block=max_block,
  109. footer_sample_size=footer_sample_size,
  110. )
  111. # Extract file name from `data`
  112. fn = next(iter(data)) if data else path
  113. # Call self.open with "parts" caching
  114. options = kwargs.pop("cache_options", {}).copy()
  115. return fs.open(
  116. fn,
  117. mode=mode,
  118. cache_type="parts",
  119. cache_options={
  120. **options,
  121. "data": data.get(fn, {}),
  122. "strict": strict,
  123. },
  124. **kwargs,
  125. )
  126. def _get_parquet_byte_ranges(
  127. paths,
  128. fs,
  129. metadata=None,
  130. columns=None,
  131. row_groups=None,
  132. max_gap=64_000,
  133. max_block=256_000_000,
  134. footer_sample_size=1_000_000,
  135. engine="auto",
  136. ):
  137. """Get a dictionary of the known byte ranges needed
  138. to read a specific column/row-group selection from a
  139. Parquet dataset. Each value in the output dictionary
  140. is intended for use as the `data` argument for the
  141. `KnownPartsOfAFile` caching strategy of a single path.
  142. """
  143. # Set engine if necessary
  144. if isinstance(engine, str):
  145. engine = _set_engine(engine)
  146. # Pass to specialized function if metadata is defined
  147. if metadata is not None:
  148. # Use the provided parquet metadata object
  149. # to avoid transferring/parsing footer metadata
  150. return _get_parquet_byte_ranges_from_metadata(
  151. metadata,
  152. fs,
  153. engine,
  154. columns=columns,
  155. row_groups=row_groups,
  156. max_gap=max_gap,
  157. max_block=max_block,
  158. )
  159. # Get file sizes asynchronously
  160. file_sizes = fs.sizes(paths)
  161. # Populate global paths, starts, & ends
  162. result = {}
  163. data_paths = []
  164. data_starts = []
  165. data_ends = []
  166. add_header_magic = True
  167. if columns is None and row_groups is None:
  168. # We are NOT selecting specific columns or row-groups.
  169. #
  170. # We can avoid sampling the footers, and just transfer
  171. # all file data with cat_ranges
  172. for i, path in enumerate(paths):
  173. result[path] = {}
  174. for b in range(0, file_sizes[i], max_block):
  175. data_paths.append(path)
  176. data_starts.append(b)
  177. data_ends.append(min(b + max_block, file_sizes[i]))
  178. add_header_magic = False # "Magic" should already be included
  179. else:
  180. # We ARE selecting specific columns or row-groups.
  181. #
  182. # Gather file footers.
  183. # We just take the last `footer_sample_size` bytes of each
  184. # file (or the entire file if it is smaller than that)
  185. footer_starts = []
  186. footer_ends = []
  187. for i, path in enumerate(paths):
  188. footer_ends.append(file_sizes[i])
  189. sample_size = max(0, file_sizes[i] - footer_sample_size)
  190. footer_starts.append(sample_size)
  191. footer_samples = fs.cat_ranges(paths, footer_starts, footer_ends)
  192. # Check our footer samples and re-sample if necessary.
  193. missing_footer_starts = footer_starts.copy()
  194. large_footer = 0
  195. for i, path in enumerate(paths):
  196. footer_size = int.from_bytes(footer_samples[i][-8:-4], "little")
  197. real_footer_start = file_sizes[i] - (footer_size + 8)
  198. if real_footer_start < footer_starts[i]:
  199. missing_footer_starts[i] = real_footer_start
  200. large_footer = max(large_footer, (footer_size + 8))
  201. if large_footer:
  202. warnings.warn(
  203. f"Not enough data was used to sample the parquet footer. "
  204. f"Try setting footer_sample_size >= {large_footer}."
  205. )
  206. for i, block in enumerate(
  207. fs.cat_ranges(
  208. paths,
  209. missing_footer_starts,
  210. footer_starts,
  211. )
  212. ):
  213. footer_samples[i] = block + footer_samples[i]
  214. footer_starts[i] = missing_footer_starts[i]
  215. # Calculate required byte ranges for each path
  216. for i, path in enumerate(paths):
  217. # Deal with small-file case.
  218. # Just include all remaining bytes of the file
  219. # in a single range.
  220. if file_sizes[i] < max_block:
  221. if footer_starts[i] > 0:
  222. # Only need to transfer the data if the
  223. # footer sample isn't already the whole file
  224. data_paths.append(path)
  225. data_starts.append(0)
  226. data_ends.append(footer_starts[i])
  227. continue
  228. # Use "engine" to collect data byte ranges
  229. path_data_starts, path_data_ends = engine._parquet_byte_ranges(
  230. columns,
  231. row_groups=row_groups,
  232. footer=footer_samples[i],
  233. footer_start=footer_starts[i],
  234. )
  235. data_paths += [path] * len(path_data_starts)
  236. data_starts += path_data_starts
  237. data_ends += path_data_ends
  238. # Merge adjacent offset ranges
  239. data_paths, data_starts, data_ends = merge_offset_ranges(
  240. data_paths,
  241. data_starts,
  242. data_ends,
  243. max_gap=max_gap,
  244. max_block=max_block,
  245. sort=False, # Should already be sorted
  246. )
  247. # Start by populating `result` with footer samples
  248. for i, path in enumerate(paths):
  249. result[path] = {(footer_starts[i], footer_ends[i]): footer_samples[i]}
  250. # Transfer the data byte-ranges into local memory
  251. _transfer_ranges(fs, result, data_paths, data_starts, data_ends)
  252. # Add b"PAR1" to header if necessary
  253. if add_header_magic:
  254. _add_header_magic(result)
  255. return result
  256. def _get_parquet_byte_ranges_from_metadata(
  257. metadata,
  258. fs,
  259. engine,
  260. columns=None,
  261. row_groups=None,
  262. max_gap=64_000,
  263. max_block=256_000_000,
  264. ):
  265. """Simplified version of `_get_parquet_byte_ranges` for
  266. the case that an engine-specific `metadata` object is
  267. provided, and the remote footer metadata does not need to
  268. be transferred before calculating the required byte ranges.
  269. """
  270. # Use "engine" to collect data byte ranges
  271. data_paths, data_starts, data_ends = engine._parquet_byte_ranges(
  272. columns,
  273. row_groups=row_groups,
  274. metadata=metadata,
  275. )
  276. # Merge adjacent offset ranges
  277. data_paths, data_starts, data_ends = merge_offset_ranges(
  278. data_paths,
  279. data_starts,
  280. data_ends,
  281. max_gap=max_gap,
  282. max_block=max_block,
  283. sort=False, # Should be sorted
  284. )
  285. # Transfer the data byte-ranges into local memory
  286. result = {fn: {} for fn in list(set(data_paths))}
  287. _transfer_ranges(fs, result, data_paths, data_starts, data_ends)
  288. # Add b"PAR1" to header
  289. _add_header_magic(result)
  290. return result
  291. def _transfer_ranges(fs, blocks, paths, starts, ends):
  292. # Use cat_ranges to gather the data byte_ranges
  293. ranges = (paths, starts, ends)
  294. for path, start, stop, data in zip(*ranges, fs.cat_ranges(*ranges)):
  295. blocks[path][(start, stop)] = data
  296. def _add_header_magic(data):
  297. # Add b"PAR1" to file headers
  298. for path in list(data.keys()):
  299. add_magic = True
  300. for k in data[path]:
  301. if k[0] == 0 and k[1] >= 4:
  302. add_magic = False
  303. break
  304. if add_magic:
  305. data[path][(0, 4)] = b"PAR1"
  306. def _set_engine(engine_str):
  307. # Define a list of parquet engines to try
  308. if engine_str == "auto":
  309. try_engines = ("fastparquet", "pyarrow")
  310. elif not isinstance(engine_str, str):
  311. raise ValueError(
  312. "Failed to set parquet engine! "
  313. "Please pass 'fastparquet', 'pyarrow', or 'auto'"
  314. )
  315. elif engine_str not in ("fastparquet", "pyarrow"):
  316. raise ValueError(f"{engine_str} engine not supported by `fsspec.parquet`")
  317. else:
  318. try_engines = [engine_str]
  319. # Try importing the engines in `try_engines`,
  320. # and choose the first one that succeeds
  321. for engine in try_engines:
  322. try:
  323. if engine == "fastparquet":
  324. return FastparquetEngine()
  325. elif engine == "pyarrow":
  326. return PyarrowEngine()
  327. except ImportError:
  328. pass
  329. # Raise an error if a supported parquet engine
  330. # was not found
  331. raise ImportError(
  332. f"The following parquet engines are not installed "
  333. f"in your python environment: {try_engines}."
  334. f"Please install 'fastparquert' or 'pyarrow' to "
  335. f"utilize the `fsspec.parquet` module."
  336. )
  337. class FastparquetEngine:
  338. # The purpose of the FastparquetEngine class is
  339. # to check if fastparquet can be imported (on initialization)
  340. # and to define a `_parquet_byte_ranges` method. In the
  341. # future, this class may also be used to define other
  342. # methods/logic that are specific to fastparquet.
  343. def __init__(self):
  344. import fastparquet as fp
  345. self.fp = fp
  346. def _row_group_filename(self, row_group, pf):
  347. return pf.row_group_filename(row_group)
  348. def _parquet_byte_ranges(
  349. self,
  350. columns,
  351. row_groups=None,
  352. metadata=None,
  353. footer=None,
  354. footer_start=None,
  355. ):
  356. # Initialize offset ranges and define ParqetFile metadata
  357. pf = metadata
  358. data_paths, data_starts, data_ends = [], [], []
  359. if pf is None:
  360. pf = self.fp.ParquetFile(io.BytesIO(footer))
  361. # Convert columns to a set and add any index columns
  362. # specified in the pandas metadata (just in case)
  363. column_set = None if columns is None else set(columns)
  364. if column_set is not None and hasattr(pf, "pandas_metadata"):
  365. md_index = [
  366. ind
  367. for ind in pf.pandas_metadata.get("index_columns", [])
  368. # Ignore RangeIndex information
  369. if not isinstance(ind, dict)
  370. ]
  371. column_set |= set(md_index)
  372. # Check if row_groups is a list of integers
  373. # or a list of row-group metadata
  374. if row_groups and not isinstance(row_groups[0], int):
  375. # Input row_groups contains row-group metadata
  376. row_group_indices = None
  377. else:
  378. # Input row_groups contains row-group indices
  379. row_group_indices = row_groups
  380. row_groups = pf.row_groups
  381. # Loop through column chunks to add required byte ranges
  382. for r, row_group in enumerate(row_groups):
  383. # Skip this row-group if we are targeting
  384. # specific row-groups
  385. if row_group_indices is None or r in row_group_indices:
  386. # Find the target parquet-file path for `row_group`
  387. fn = self._row_group_filename(row_group, pf)
  388. for column in row_group.columns:
  389. name = column.meta_data.path_in_schema[0]
  390. # Skip this column if we are targeting a
  391. # specific columns
  392. if column_set is None or name in column_set:
  393. file_offset0 = column.meta_data.dictionary_page_offset
  394. if file_offset0 is None:
  395. file_offset0 = column.meta_data.data_page_offset
  396. num_bytes = column.meta_data.total_compressed_size
  397. if footer_start is None or file_offset0 < footer_start:
  398. data_paths.append(fn)
  399. data_starts.append(file_offset0)
  400. data_ends.append(
  401. min(
  402. file_offset0 + num_bytes,
  403. footer_start or (file_offset0 + num_bytes),
  404. )
  405. )
  406. if metadata:
  407. # The metadata in this call may map to multiple
  408. # file paths. Need to include `data_paths`
  409. return data_paths, data_starts, data_ends
  410. return data_starts, data_ends
  411. class PyarrowEngine:
  412. # The purpose of the PyarrowEngine class is
  413. # to check if pyarrow can be imported (on initialization)
  414. # and to define a `_parquet_byte_ranges` method. In the
  415. # future, this class may also be used to define other
  416. # methods/logic that are specific to pyarrow.
  417. def __init__(self):
  418. import pyarrow.parquet as pq
  419. self.pq = pq
  420. def _row_group_filename(self, row_group, metadata):
  421. raise NotImplementedError
  422. def _parquet_byte_ranges(
  423. self,
  424. columns,
  425. row_groups=None,
  426. metadata=None,
  427. footer=None,
  428. footer_start=None,
  429. ):
  430. if metadata is not None:
  431. raise ValueError("metadata input not supported for PyarrowEngine")
  432. data_starts, data_ends = [], []
  433. md = self.pq.ParquetFile(io.BytesIO(footer)).metadata
  434. # Convert columns to a set and add any index columns
  435. # specified in the pandas metadata (just in case)
  436. column_set = None if columns is None else set(columns)
  437. if column_set is not None:
  438. schema = md.schema.to_arrow_schema()
  439. has_pandas_metadata = (
  440. schema.metadata is not None and b"pandas" in schema.metadata
  441. )
  442. if has_pandas_metadata:
  443. md_index = [
  444. ind
  445. for ind in json.loads(
  446. schema.metadata[b"pandas"].decode("utf8")
  447. ).get("index_columns", [])
  448. # Ignore RangeIndex information
  449. if not isinstance(ind, dict)
  450. ]
  451. column_set |= set(md_index)
  452. # Loop through column chunks to add required byte ranges
  453. for r in range(md.num_row_groups):
  454. # Skip this row-group if we are targeting
  455. # specific row-groups
  456. if row_groups is None or r in row_groups:
  457. row_group = md.row_group(r)
  458. for c in range(row_group.num_columns):
  459. column = row_group.column(c)
  460. name = column.path_in_schema
  461. # Skip this column if we are targeting a
  462. # specific columns
  463. split_name = name.split(".")[0]
  464. if (
  465. column_set is None
  466. or name in column_set
  467. or split_name in column_set
  468. ):
  469. file_offset0 = column.dictionary_page_offset
  470. if file_offset0 is None:
  471. file_offset0 = column.data_page_offset
  472. num_bytes = column.total_compressed_size
  473. if file_offset0 < footer_start:
  474. data_starts.append(file_offset0)
  475. data_ends.append(
  476. min(file_offset0 + num_bytes, footer_start)
  477. )
  478. return data_starts, data_ends