我遇到了一个问题,我用略有不同的模式保存不同的拼图文件,但这些文件具有共享的分区列.作为一个最小的可重现示例,我创建了以下内容:
from dask import dataframe as dd
import pandas as pd
import shutil
def save_parquet():
df1 = pd.DataFrame({"A": [1], "B": [1]})
df2 = pd.DataFrame({"A": [2], "C": [2]})
df1.to_parquet("test.parquet", partition_cols=["A"])
df2.to_parquet("test.parquet", partition_cols=["A"])
def load_parquet():
filters = [[
("A", "==", 2)
]]
ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters)
def main():
save_parquet()
load_parquet()
if __name__=="__main__":
main()
运行以上命令会导致以下异常:
Traceback (most recent call last):
File "/home/.../.local/lib/python3.8/site-packages/dask/backends.py", line 133, in wrapper
return func(*args, **kwargs)
File "/home/.../.local/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 578, in read_parquet
meta, index, columns = set_index_columns(meta, index, columns, auto_index_allowed)
File "/home/.../.local/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 1487, in set_index_columns
raise ValueError(
ValueError: The following columns were not found in the dataset {'C'}
The following columns were found Index(['B', 'A'], dtype='object')
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "mre.py", line 26, in <module>
main()
File "mre.py", line 23, in main
load_parquet()
File "mre.py", line 15, in load_parquet
ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters)
File "/home/.../.local/lib/python3.8/site-packages/dask/backends.py", line 135, in wrapper
raise type(e)(
ValueError: An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: The following columns were not found in the dataset {'C'}
The following columns were found Index(['B', 'A'], dtype='object')
我的期望是,("A", "==", 2)
过滤器应该阻止我们从df1
加载模式,无论它是否加载df1
,它应该能够从df2
找到"C"
列.我是不是漏掉了什么?
将columns
字段更改为columns=["A", "B"]
成功地读取了数据,因此我觉得我想要做的事情应该是可能的.
This post表示READ_PARQUET从它遇到的第一个拼图面板文件中读取模式,但是您可以指定一个模式来避免这种情况.
指定schema
个喜欢
import pyarrow as pa
...
ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters, schema=pa.schema({"A": pa.int64(), "C": pa.int64()}))
仍会触发异常.
指定schema
而不指定columns
不会触发异常,但会返回不带"C"列的DaskDataFrame(似乎与模式中的内容无关):
>>> print(ddf.columns)
Index(['B', 'A'], dtype='object')
有没有办法阻止read_parquet
使用应该过滤掉的.parquet文件?