我遇到了一个问题,我用略有不同的模式保存不同的拼图文件,但这些文件具有共享的分区列.作为一个最小的可重现示例,我创建了以下内容:

from dask import dataframe as dd
import pandas as pd
import shutil

def save_parquet():
    df1 = pd.DataFrame({"A": [1], "B": [1]})
    df2 = pd.DataFrame({"A": [2], "C": [2]})
    df1.to_parquet("test.parquet", partition_cols=["A"])
    df2.to_parquet("test.parquet", partition_cols=["A"])

def load_parquet():
    filters = [[
        ("A", "==", 2)
    ]]
    ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters)

def main():
    save_parquet()
    load_parquet()

if __name__=="__main__":
    main()

运行以上命令会导致以下异常:

Traceback (most recent call last):
  File "/home/.../.local/lib/python3.8/site-packages/dask/backends.py", line 133, in wrapper
    return func(*args, **kwargs)
  File "/home/.../.local/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 578, in read_parquet
    meta, index, columns = set_index_columns(meta, index, columns, auto_index_allowed)
  File "/home/.../.local/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py", line 1487, in set_index_columns
    raise ValueError(
ValueError: The following columns were not found in the dataset {'C'}
The following columns were found Index(['B', 'A'], dtype='object')

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "mre.py", line 26, in <module>
    main()
  File "mre.py", line 23, in main
    load_parquet()
  File "mre.py", line 15, in load_parquet
    ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters)
  File "/home/.../.local/lib/python3.8/site-packages/dask/backends.py", line 135, in wrapper
    raise type(e)(
ValueError: An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: The following columns were not found in the dataset {'C'}
The following columns were found Index(['B', 'A'], dtype='object')

我的期望是,("A", "==", 2)过滤器应该阻止我们从df1加载模式,无论它是否加载df1,它应该能够从df2找到"C"列.我是不是漏掉了什么?

columns字段更改为columns=["A", "B"]成功地读取了数据,因此我觉得我想要做的事情应该是可能的.

This post表示READ_PARQUET从它遇到的第一个拼图面板文件中读取模式,但是您可以指定一个模式来避免这种情况.

指定schema个喜欢

import pyarrow as pa

...

ddf = dd.read_parquet("test.parquet", columns=["A", "C"], filters=filters, schema=pa.schema({"A": pa.int64(), "C": pa.int64()}))

仍会触发异常.

指定schema而不指定columns不会触发异常,但会返回不带"C"列的DaskDataFrame(似乎与模式中的内容无关):

>>> print(ddf.columns)
Index(['B', 'A'], dtype='object')

有没有办法阻止read_parquet使用应该过滤掉的.parquet文件?

推荐答案

我发现了一个有点愚蠢的解决办法.

Dask.dataframe.read_parket可以获取分区中的拼图文件列表(而不是顶级拼图文件夹).我只是使用路径全局过滤来手动筛选出我想要使用的相关拼图文件(基于分区文件夹),并直接传递该列表.

Python相关问答推荐

双情节在单个图上切换-pPython

从单个列创建多个列并按pandas分组

仅对matplotlib的条标签中的一个条标签应用不同的格式

不允许AMBIMA API请求方法

Pandas 在时间序列中设定频率

从管道将Python应用程序部署到Azure Web应用程序,不包括需求包

Python会扔掉未使用的表情吗?

在Python中对分层父/子列表进行排序

2维数组9x9,不使用numpy.数组(MutableSequence的子类)

如何在Windows上用Python提取名称中带有逗号的文件?

如何在类和classy-fastapi -fastapi- followup中使用FastAPI创建路由

如何使用数组的最小条目拆分数组

如何使用它?

把一个pandas文件夹从juyter笔记本放到堆栈溢出问题中的最快方法?

使用Python从URL下载Excel文件

让函数调用方程

如何从需要点击/切换的网页中提取表格?

可以bcrypts AES—256 GCM加密损坏ZIP文件吗?

如何检测鼠标/键盘的空闲时间,而不是其他输入设备?

替换现有列名中的字符,而不创建新列