我使用一个已经写入磁盘的大型数据集作为拼图分区数据集.

我如何将这些数据直接读入Polar中,以获得一些聚合计算结果? 我希望避免将镶木地板数据转换为Pandas (pq_df.to_pandas()),因为我的数据大于我的计算机内存.

以下是一个可重复使用的示例代码. 我很感谢你的意见.

import polars as pl    # Version 0.20.3
import pyarrow as pa   # Version 11.0.0
import pyarrow.parquet as pq
 
 
pl_df = pl.DataFrame({
                          "Name": ["ABC","DEF","GHI",'JKL'],
                          "date": ["2024-01-01","2024-01-10","2023-01-29","2023-01-29"],
                          "price":[1000,1500,1800,2100] ,
                          })
 
pl_df = pl_df.with_columns(date= pl.col("date").cast(pl.Date))
 
# write Polars data frame to disk as parquet dataset    
pq.write_to_dataset( pl_df.to_arrow(), root_path=r"C:\Users\desktop PC\Downloads\test_pl", partition_cols=["date"],
                        compression ='gzip',existing_data_behavior='overwrite_or_ignore')
                        
# Have a schema object of data written to parquet dataset
pd_df_schema = pa.Schema.from_pandas(pl_df.to_pandas())
 
# Read data written to parquet dataset
pq_df = pq.read_table(r"C:\Users\desktop PC\Downloads\test_pl",
                      schema=pd_df_schema,
                      )
 
# I want to use this parquest object to create a aggregate result via Polars with out using #"pq_df.to_pandas()" method.
 
df = (pl.from_pandas(pq_df.to_pandas()).lazy()
      .group_by(["date"])
      .agg(
          [
              pl.col("price").sum().alias("grouped_sum"),
              pl.col("price").count().alias("grouped_count"),])
      ).collect(streaming=True)

推荐答案

您可以使用from_arrow()方法:

(
    pl.from_arrow(pq_df).lazy()
    .group_by("date")
    .agg(
        pl.col("price").sum().alias("grouped_sum"),
        pl.col("price").count().alias("grouped_count")
    ).collect(streaming=True)
)

┌──────┬─────────────────────┬───────┐
│ Name ┆ date                ┆ price │
│ ---  ┆ ---                 ┆ ---   │
│ str  ┆ datetime[ms]        ┆ i64   │
╞══════╪═════════════════════╪═══════╡
│ GHI  ┆ 2023-01-29 00:00:00 ┆ 1800  │
│ JKL  ┆ 2023-01-29 00:00:00 ┆ 2100  │
│ ABC  ┆ 2024-01-01 00:00:00 ┆ 1000  │
│ DEF  ┆ 2024-01-10 00:00:00 ┆ 1500  │
└──────┴─────────────────────┴───────┘

但正确的方法可能是使用scan_parquet()种功能,它允许您扫描路径:

(
    pl.scan_parquet(r"test_pl/*/*.parquet")
    .group_by("date")
    .agg(
        pl.col("price").sum().alias("grouped_sum"),
        pl.col("price").count().alias("grouped_count")
    ).collect(streaming=True)
)

┌────────────┬─────────────┬───────────────┐
│ date       ┆ grouped_sum ┆ grouped_count │
│ ---        ┆ ---         ┆ ---           │
│ str        ┆ i64         ┆ u32           │
╞════════════╪═════════════╪═══════════════╡
│ 2024-01-01 ┆ 1000        ┆ 1             │
│ 2024-01-10 ┆ 1500        ┆ 1             │
│ 2023-01-29 ┆ 3900        ┆ 2             │
└────────────┴─────────────┴───────────────┘

此外,您还可以始终使用duckdb:

duckdb.sql("""
    select
        a.date,
        sum(a.price) as grouped_sum,
        sum(a.price) as grouped_count
    from read_parquet('test_pl/*/*.parquet') as a
    group by
        a.date
""").pl()

┌────────────┬─────────────┬───────────────┐
│ date       ┆ grouped_sum ┆ grouped_count │
│ ---        ┆ ---         ┆ ---           │
│ date       ┆ f64         ┆ i64           │
╞════════════╪═════════════╪═══════════════╡
│ 2024-01-01 ┆ 1000.0      ┆ 1             │
│ 2024-01-10 ┆ 1500.0      ┆ 1             │
│ 2023-01-29 ┆ 3900.0      ┆ 2             │
└────────────┴─────────────┴───────────────┘

Python-3.x相关问答推荐

使用 Fetch 提交表单到 Django 视图

如何将 OLS 趋势线添加到使用 updatemenus 显示数据子集的 plotly 散点图图形对象?

如何在Pandas 中按条件计算分组?

以编程方式映射 uniprot ID 时如何解决 400 客户端错误?

在字符串中查找正则表达式的所有模式

正则表达式从文本文件中捕获包含制表符/空格和子字符串的部分字符串

机器学习实验笔记本的工作区 url

排队多个子进程

在气流中运行 DAG 时出现处理信号:ttou消息

为什么 setattr 在绑定方法上失败

如何禁用 pylint 禁止自用警告?

Pandas 的 EMA 与股票的 EMA 不匹配?

所有 Python dunder 方法的列表 - 您需要实现哪些方法才能正确代理对象?

如何使用 asyncio 添加连接超时?

在 ubuntu 20.04 中安装 libpq-dev 时出现问题

谁能给我一个 Python 3 中标准输入和标准输出的快速教程?

将 numpy.float64 列表快速转换为 Python 中的浮点数

是否可以在每个路由的基础上限制 Flask POST 数据大小?

在 linux mint 上安装 python3-venv 模块

__iter__ 和 __getitem__ 有什么区别?