我认为有两种方法可以实现这一点.
- 将整个数据集扫描到LazyFrame中并进行过滤.
- 读取S3中文件夹的名称,并仅扫描文件夹中最新日期的镶木地板文件.
Option 1. Scan entire dataset into a pl.LazyFrame
and filter on the fly.
import boto3
import polars as pl
#
profile = "your-profile"
s3_path = "s3://path/to/your/dataset/*/*.parquet"
# create session and obtain credentials
session = boto3.session.Session(profile_name=profile)
credentials = session.get_credentials().get_frozen_credentials()
df = (
# scan entire dataset
pl.scan_parquet(
s3_path,
storage_options={
"aws_access_key_id": credentials.access_key,
"aws_secret_access_key": credentials.secret_key,
"aws_session_token": credentials.token,
"aws_region": session.region_name,
},
)
# filter for latest partition
.filter(
pl.col("DATE_KEY") == pl.col("DATE_KEY").max()
)
.collect()
)
备选办法2.获取最新分区文件夹的名称,并只读取相应的数据.
import polars as pl
import boto3
import os
#
profile = "your-profile"
s3_bucket = 'bucket-name'
s3_prefix = "path/to/dataset"
# create session and obtain credentials
session = boto3.session.Session(profile_name=profile)
credentials = session.get_credentials().get_frozen_credentials()
# get path of latest partition
response = session.client("s3").list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix, Delimiter='/')
s3_prefix_latest = max(prefix["Prefix"] for prefix in response["CommonPrefixes"])
s3_path_latest = os.path.join("s3://", s3_bucket, s3_prefix_latest, "*.parquet")
# read data only from latest partition
df = pl.read_parquet(
s3_path_latest,
storage_options={
"aws_access_key_id": credentials.access_key,
"aws_secret_access_key": credentials.secret_key,
"aws_session_token": credentials.token,
"aws_region": session.region_name,
},
)