如何在 python 中使用 pyarrow 从 S3 读取分区的 parquet 文件

2023-12-28

我正在寻找使用 python 从 s3 的多个分区目录读取数据的方法。

data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet data_folder/serial_number=2/cur_date=27-12-2012/asdsdfsd0324324.snappy.parquet

pyarrow ParquetDataset 模块具有从分区读取的能力。所以我尝试了以下代码:

>>> import pandas as pd
>>> import pyarrow.parquet as pq
>>> import s3fs
>>> a = "s3://my_bucker/path/to/data_folder/"
>>> dataset = pq.ParquetDataset(a)

它引发了以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 601, in _make_manifest
    .format(path))
OSError: Passed non-file path: s3://my_bucker/path/to/data_folder/

根据 pyarrow 的文档,我尝试使用 s3fs 作为文件系统,即:

>>> dataset = pq.ParquetDataset(a,filesystem=s3fs)

这会引发以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 502, in __init__
    self.metadata_path) = _make_manifest(path_or_paths, self.fs)
  File "/home/my_username/anaconda3/lib/python3.6/site-packages/pyarrow/parquet.py", line 583, in _make_manifest
    if is_string(path_or_paths) and fs.isdir(path_or_paths):
AttributeError: module 's3fs' has no attribute 'isdir'

我仅限于使用 ECS 集群,因此Spark/pyspark 不是一个选项.

有没有一种方法可以让我们轻松地在 python 中从 s3 中的此类分区目录读取镶木地板文件?我觉得列出所有目录然后阅读并不是一个好的做法,如本中所建议的link https://stackoverflow.com/questions/45043554/how-to-read-a-list-of-parquet-files-from-s3-as-a-pandas-dataframe-using-pyarrow。我需要将读取的数据转换为 pandas 数据帧以进行进一步处理,因此更喜欢与 fastparquet 或 pyarrow 相关的选项。我也对 python 中的其他选项持开放态度。


我设法让它与最新版本的 fastparquet 和 s3fs 一起工作。下面是相同的代码:

import s3fs
import fastparquet as fp
s3 = s3fs.S3FileSystem()
fs = s3fs.core.S3FileSystem()

#mybucket/data_folder/serial_number=1/cur_date=20-12-2012/abcdsd0324324.snappy.parquet 
s3_path = "mybucket/data_folder/*/*/*.parquet"
all_paths_from_s3 = fs.glob(path=s3_path)

myopen = s3.open
#use s3fs as the filesystem
fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
#convert to pandas dataframe
df = fp_obj.to_pandas()

感谢马丁通过我们的网站为我指明了正确的方向对话 https://stackoverflow.com/questions/44301238/error-with-snappy-while-importing-fastparquet-in-python/44310390?noredirect=1#comment77258713_44310390

NB:这会比使用 pyarrow 慢,基于基准 http://wesmckinney.com/blog/python-parquet-update/。一旦 s3fs 支持在 pyarrow 中实现,我将更新我的答案ARROW-1213 https://issues.apache.org/jira/browse/ARROW-1213

我使用 pyarrow 和作为 glob 发送到 fastparquet 的文件列表对各个迭代进行了快速基准测试。 fastparquet 使用 s3fs 比 pyarrow + 我的 hackish 代码更快。但我认为 pyarrow +s3fs 一旦实施将会更快。

代码和基准如下:

>>> def test_pq():
...     for current_file in list_parquet_files:
...         f = fs.open(current_file)
...         df = pq.read_table(f).to_pandas()
...         # following code is to extract the serial_number & cur_date values so that we can add them to the dataframe
...         #probably not the best way to split :)
...         elements_list=current_file.split('/')
...         for item in elements_list:
...             if item.find(date_partition) != -1:
...                 current_date = item.split('=')[1]
...             elif item.find(dma_partition) != -1:
...                 current_dma = item.split('=')[1]
...         df['serial_number'] = current_dma
...         df['cur_date'] = current_date
...         list_.append(df)
...     frame = pd.concat(list_)
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
12.078817503992468

>>> def test_fp():
...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen)
...     df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.961556333000317

2019年更新

毕竟 PR 等问题箭2038 https://issues.apache.org/jira/browse/ARROW-2038 & 快速镶木地板 - PR#182 https://github.com/dask/fastparquet/issues/182已解决。

使用 Pyarrow 读取镶木地板文件

# pip install pyarrow
# pip install s3fs

>>> import s3fs
>>> import pyarrow.parquet as pq
>>> fs = s3fs.S3FileSystem()

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name' #if its a directory omit the traling /
>>> bucket_uri = f's3://{bucket}/{path}'
's3://your-bucket-name/directory_name'

>>> dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
>>> table = dataset.read()
>>> df = table.to_pandas() 

使用 Fast parquet 读取 parquet 文件

# pip install s3fs
# pip install fastparquet

>>> import s3fs
>>> import fastparquet as fp

>>> bucket = 'your-bucket-name'
>>> path = 'directory_name'
>>> root_dir_path = f'{bucket}/{path}'
# the first two wild card represents the 1st,2nd column partitions columns of your data & so forth
>>> s3_path = f"{root_dir_path}/*/*/*.parquet"
>>> all_paths_from_s3 = fs.glob(path=s3_path)

>>> fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
>>> df = fp_obj.to_pandas()

快速基准测试

这可能不是对其进行基准测试的最佳方法。请阅读博客文章 http://wesmckinney.com/blog/python-parquet-update/为通过基准

#pyarrow
>>> import timeit
>>> def test_pq():
...     dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
...     table = dataset.read()
...     df = table.to_pandas()
...
>>> timeit.timeit('test_pq()',number =10,globals=globals())
1.2677053569998407

#fastparquet
>>> def test_fp():
...     fp_obj = fp.ParquetFile(all_paths_from_s3,open_with=myopen, root=root_dir_path)
...     df = fp_obj.to_pandas()

>>> timeit.timeit('test_fp()',number =10,globals=globals())
2.931876824000028

关于 Pyarrow 的进一步阅读speed http://wesmckinney.com/blog/python-parquet-update/

参考 :

  • 快速镶木地板 http://fastparquet.readthedocs.io/en/latest/filesystems.html
  • s3fs http://s3fs.readthedocs.io/en/latest/index.html
  • pyarrow https://arrow.apache.org/docs/python/
  • pyarrow箭头代码基于讨论 https://github.com/apache/arrow/pull/916#issuecomment-337619158还有文档
  • 基于讨论的 fastparquet 代码PR-182 https://github.com/apache/arrow/pull/916#issuecomment-337619158 , PR-182 https://github.com/dask/fastparquet/issues/182还有文档
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 python 中使用 pyarrow 从 S3 读取分区的 parquet 文件 的相关文章

随机推荐