我有镶木地板文件存储在S3位置是按日期键分区.使用Polars,我需要从最新日期键文件夹读取镶木地板文件.下面是我的s3 struct 的一个例子:

亚马逊人S3>Bucket名称>dev/>目标/>refed/>StUDENTS.parquet/

低于STUDENTS.parquet时,有几个文件夹按DATA_KEY分区,即,

enter image description here

每个文件夹都包含镶木地板文件.

使用Polars,我需要从最新的日期键文件夹(在本例中,DATA_KEY = 2024—03—06/)中读取镶木地板文件到Polars打印机中.

你认为在名称文件夹上进行降序排序是实现这一点的一种方法吗?

有人能帮我在他的,因为我在追求北极星,而不是Pandas .

推荐答案

我认为有两种方法可以实现这一点.

  1. 将整个数据集扫描到LazyFrame中并进行过滤.
  2. 读取S3中文件夹的名称,并仅扫描文件夹中最新日期的镶木地板文件.

Option 1. Scan entire dataset into a pl.LazyFrame and filter on the fly.

import boto3
import polars as pl

# 
profile = "your-profile"
s3_path = "s3://path/to/your/dataset/*/*.parquet"

# create session and obtain credentials
session = boto3.session.Session(profile_name=profile)
credentials = session.get_credentials().get_frozen_credentials()

df = (
    # scan entire dataset
    pl.scan_parquet(
        s3_path,
        storage_options={  
            "aws_access_key_id": credentials.access_key,
            "aws_secret_access_key": credentials.secret_key,
            "aws_session_token": credentials.token,
            "aws_region": session.region_name,
        },
    )
    # filter for latest partition
    .filter(
        pl.col("DATE_KEY") == pl.col("DATE_KEY").max()
    )
    .collect()
)

备选办法2.获取最新分区文件夹的名称,并只读取相应的数据.

import polars as pl
import boto3
import os

# 
profile = "your-profile"
s3_bucket = 'bucket-name'
s3_prefix = "path/to/dataset"

# create session and obtain credentials
session = boto3.session.Session(profile_name=profile)
credentials = session.get_credentials().get_frozen_credentials()

# get path of latest partition
response = session.client("s3").list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix, Delimiter='/')
s3_prefix_latest = max(prefix["Prefix"] for prefix in response["CommonPrefixes"])
s3_path_latest = os.path.join("s3://", s3_bucket, s3_prefix_latest, "*.parquet")

# read data only from latest partition
df = pl.read_parquet(
    s3_path_latest,
    storage_options={  
        "aws_access_key_id": credentials.access_key,
        "aws_secret_access_key": credentials.secret_key,
        "aws_session_token": credentials.token,
        "aws_region": session.region_name,
    },
)

Python-3.x相关问答推荐

TypeError:&Quot;Value&Quot;参数必须是标量、Dict或Series,但您传递了&Quot;Index&Quot;

Django 3.2/Django-cms 3.11:查找错误:型号帐户.客户用户未注册

为什么我在BLE中的广告代码在发送包裹之间需要大约1秒

我正在try 从 10*3 矩阵中删除随机值并将其变为 10*2 矩阵

估计列表中连续对的数量

替换 .txt 文件中的项目列表

如何向 scikit-learn 函数添加类型提示?

Pandas 按值和索引对 DF 进行排序

pip 找不到最新的软件包版本

如何使用 regex sub 根据列表中的变量替换字符

如何在 django 中没有循环的情况下获得前键的前键?

ImportError:没有名为资源的模块

为什么 List 不能包含多种类型?

如何禁用 pylint 禁止自用警告?

Python:如何判断一个项目是否被添加到一个集合中,没有 2x(hash,lookup)

Python 异步调试示例

使用逗号时,除了处理程序中的语法无效

ValueError:预期的 2D 数组,得到 1D 数组:

同步调用协程

如何从集合中删除多个元素?