作为this question的后续问题,我想找到一种方法来限制内存使用时,扫描,过滤和加入一个大型的存储在S3云,与一个小型的本地存储器.

假设我的代码看起来像下面这样:

import pyarrow.dataset as ds
import polars as pl
import s3fs

# S3 credentials
secret = ...
key = ...
endpoint_url = ...

# some tiny dataframe
sellers_df = pl.DataFrame({'seller_id': ['0332649223', '0192491683', '0336435426']})

# scan, filter and join with huge dataframe on S3
fs = s3fs.S3FileSystem(endpoint_url=endpoint_url, key=key, secret=secret)
dataset = ds.dataset(f'{s3_bucket}/benchmark_dt/dt_partitions', filesystem=fs, partitioning='hive')
    scan_df = pl.scan_pyarrow_dataset(dataset) \
        .filter(pl.col('dt') >= '2023-05-17') \
        .filter(pl.col('dt') <= '2023-10-18') \
        .join(sellers_df.lazy(), on='seller_id', how='inner').collect()

我的拼花文件布局,看起来像下面这样:

-- dt_partitions
    -- dt=2023-06-09
        -- data.parquet
    -- dt=2023-06-10
            -- data.parquet
    -- dt=2023-06-11
        -- data.parquet
    -- dt=2023-06-12
        -- data.parquet
    ...

When running the code I notice that Polars first loads the entire dataset to the memory, according to the given dates, and after performs the join.
This causes me severe memory problems.

是否有任何方法可以在预定义的批处理/流中执行连接以节省内存?

先谢谢你.

Edit:

这是解释计划(您可以看到没有应用流):

INNER JOIN:
LEFT PLAN ON: [col("seller_id")]

    PYTHON SCAN 
    PROJECT */3 COLUMNS
    SELECTION: ((pa.compute.field('dt') >= '2023-10-17') & (pa.compute.field('dt') <= '2023-10-18'))
RIGHT PLAN ON: [col("seller_id")]
  DF ["seller_id"]; PROJECT */1 COLUMNS; SELECTION: "None"
END INNER JOIN
INNER JOIN:
LEFT PLAN ON: [col("seller_id")]

但是,在使用is_in时:

PYTHON SCAN 
  PROJECT */3 COLUMNS
  SELECTION: ((pa.compute.field('seller_id')).isin(["0332649223","0192491683","0336435426","3628932648","5241104373","1414317462","4028203396","6445502649","1131069079","9027417785","6509736571","9214134975","7722199293","1617136891","8786329949","8260764409","5103636478","3444202168","9066806312","3961998994","7345385102","2756955097","7038039666","0148664533","5120870693","8843132164","6424549457","8242686761","3148647530","8329075741","0803877447","2228154163","8661602117","2544985488","3241983296","4756084729","5317176976","0658022895","3802149808","2368104663","0835399702","0806598632","9753553141","3473629988","1145080603","5731199445","7622500016","4980968502","6713967792","8469333969"]) & ((pa.compute.field('dt') >= '2023-10-17') & (pa.compute.field('dt') <= '2023-10-18')))

关注@Dean MacGregor的答案,添加os.environ['AWS_ALLOW_HTTP'] = 'true',它奏效了:

--- STREAMING
INNER JOIN:
LEFT PLAN ON: [col("seller_id")]

    Parquet SCAN s3://test-bucket/benchmark_dt/dt_partitions/dt=2023-10-17/part-0.parquet
    PROJECT */3 COLUMNS
RIGHT PLAN ON: [col("seller_id")]
  DF ["seller_id"]; PROJECT */1 COLUMNS; SELECTION: "None"
END INNER JOIN  --- END STREAMING

推荐答案

Polars不(知道如何)对pa数据集执行谓词下推.开发的努力是为了支持自己的云蜂窝阅读,所以或许可以试一试?

使用它的语法与yarrow略有不同.

它应该大致看起来像这样,但在如何输入身份验证信息方面可能会有细微差别.

import polars as pl

scan_df = pl.scan_parquet(f's3://{s3_bucket}/benchmark_dt/dt_partitions/**/*.parquet', 
                  storage_options={'access_key_id':key,
                                   'secret_access_key':secret}
)

注意语法上的不同:路径需要"s3://"前缀,并且您必须为配置单元 struct 使用全局填充模式.对于auth,请将storage_options参数与具有object store支持的键值的字典一起使用.它不像pyrow那样依赖或利用fsspec/s3fs.

从那里你可以做

scan_df.filter(pl.col('dt') >= '2023-05-17') \
        .filter(pl.col('dt') <= '2023-10-18') \
        .join(sellers_df.lazy(), on='seller_id', how='inner').collect()

但是,要使predicate_pushdown工作,由于配置单元没有按seller_id分区,因此底层的parquet文件需要具有以seller_id分隔的row_groups和表示该分隔的统计信息.

即使没有谓词下推,仍然可以流式传输数据,但您需要将collect()更改为collect(streaming=True).

如果您需要访问本地/自定义S3终结点,则设置os.environ['AWS_ALLOW_HTTP'] = 'true'以告诉对象存储连接到非AWS URL.Here是它将查找/查看的更多环境变量.

Python相关问答推荐

我们可以在apps.py?中使用Post_Save信号吗

Asyncio与队列的多处理通信-仅运行一个协程

带有pandas的分区列上的过滤器的多个条件read_parquet

如何在telegram 机器人中发送音频?

Ibis中是否有一个ANY或ANY_UTE表达,可以让我比较子查询返回的一组值中的值?

"如果发生特定错误,返回值

使用Python和PRNG(不是梅森龙卷风)有效地生成伪随机浮点数在[0,1)中均匀?

Pandas 在时间序列中设定频率

如何根据情况丢弃大Pandas 的前n行,使大Pandas 的其余部分完好无损

多处理代码在while循环中不工作

将DF中的名称与另一DF拆分并匹配并返回匹配的公司

使用mySQL的SQlalchemy过滤重叠时间段

scikit-learn导入无法导入名称METRIC_MAPPING64'

组/群集按字符串中的子字符串或子字符串中的字符串轮询数据框

pandas在第1列的id,第2列的标题,第3列的值,第3列的值?

在Python中,从给定范围内的数组中提取索引组列表的更有效方法

Plotly Dash Creating Interactive Graph下拉列表

如何从需要点击/切换的网页中提取表格?

Flash只从html表单中获取一个值

LocaleError:模块keras._' tf_keras. keras没有属性__internal_'''