作为this question的后续问题,我想找到一种方法来限制内存使用时,扫描,过滤和加入一个大型的存储在S3云,与一个小型的本地存储器.
假设我的代码看起来像下面这样:
import pyarrow.dataset as ds
import polars as pl
import s3fs
# S3 credentials
secret = ...
key = ...
endpoint_url = ...
# some tiny dataframe
sellers_df = pl.DataFrame({'seller_id': ['0332649223', '0192491683', '0336435426']})
# scan, filter and join with huge dataframe on S3
fs = s3fs.S3FileSystem(endpoint_url=endpoint_url, key=key, secret=secret)
dataset = ds.dataset(f'{s3_bucket}/benchmark_dt/dt_partitions', filesystem=fs, partitioning='hive')
scan_df = pl.scan_pyarrow_dataset(dataset) \
.filter(pl.col('dt') >= '2023-05-17') \
.filter(pl.col('dt') <= '2023-10-18') \
.join(sellers_df.lazy(), on='seller_id', how='inner').collect()
我的拼花文件布局,看起来像下面这样:
-- dt_partitions
-- dt=2023-06-09
-- data.parquet
-- dt=2023-06-10
-- data.parquet
-- dt=2023-06-11
-- data.parquet
-- dt=2023-06-12
-- data.parquet
...
When running the code I notice that Polars first loads the entire dataset to the memory, according to the given dates, and after performs the join.
This causes me severe memory problems.
是否有任何方法可以在预定义的批处理/流中执行连接以节省内存?
先谢谢你.
Edit:
这是解释计划(您可以看到没有应用流):
INNER JOIN:
LEFT PLAN ON: [col("seller_id")]
PYTHON SCAN
PROJECT */3 COLUMNS
SELECTION: ((pa.compute.field('dt') >= '2023-10-17') & (pa.compute.field('dt') <= '2023-10-18'))
RIGHT PLAN ON: [col("seller_id")]
DF ["seller_id"]; PROJECT */1 COLUMNS; SELECTION: "None"
END INNER JOIN
INNER JOIN:
LEFT PLAN ON: [col("seller_id")]
但是,在使用is_in
时:
PYTHON SCAN
PROJECT */3 COLUMNS
SELECTION: ((pa.compute.field('seller_id')).isin(["0332649223","0192491683","0336435426","3628932648","5241104373","1414317462","4028203396","6445502649","1131069079","9027417785","6509736571","9214134975","7722199293","1617136891","8786329949","8260764409","5103636478","3444202168","9066806312","3961998994","7345385102","2756955097","7038039666","0148664533","5120870693","8843132164","6424549457","8242686761","3148647530","8329075741","0803877447","2228154163","8661602117","2544985488","3241983296","4756084729","5317176976","0658022895","3802149808","2368104663","0835399702","0806598632","9753553141","3473629988","1145080603","5731199445","7622500016","4980968502","6713967792","8469333969"]) & ((pa.compute.field('dt') >= '2023-10-17') & (pa.compute.field('dt') <= '2023-10-18')))
关注@Dean MacGregor的答案,添加os.environ['AWS_ALLOW_HTTP'] = 'true'
,它奏效了:
--- STREAMING
INNER JOIN:
LEFT PLAN ON: [col("seller_id")]
Parquet SCAN s3://test-bucket/benchmark_dt/dt_partitions/dt=2023-10-17/part-0.parquet
PROJECT */3 COLUMNS
RIGHT PLAN ON: [col("seller_id")]
DF ["seller_id"]; PROJECT */1 COLUMNS; SELECTION: "None"
END INNER JOIN --- END STREAMING