我想使用Polars LazyFrames将两个数量众多的时间戳数组彼此同步.

让我们假设我有两个使用LazyFrames存储的时间戳的Numty数组:

import polars as pl


timestamps = pl.LazyFrame(
    np.array(
        [
            np.datetime64("1970-01-01T00:00:00.500000000"),
            np.datetime64("1970-01-01T00:00:01.500000000"),
            np.datetime64("1970-01-01T00:00:02.600000000"),
            np.datetime64("1970-01-01T00:00:03.400000000"),
            np.datetime64("1970-01-01T00:00:04.500000000"),
            np.datetime64("1970-01-01T00:00:05.300000000"),
            np.datetime64("1970-01-01T00:00:06.200000000"),
            np.datetime64("1970-01-01T00:00:07.400000000"),
            np.datetime64("1970-01-01T00:00:08.500000000"),
        ]
    ),
    schema={"values": pl.Datetime}
)

other_timestamps = pl.LazyFrame(
    np.array(
        [
            np.datetime64("1970-01-01T00:00:01.500000000"),
            np.datetime64("1970-01-01T00:00:02.000000000"),
            np.datetime64("1970-01-01T00:00:02.500000000"),
            np.datetime64("1970-01-01T00:00:04.500000000"),
            np.datetime64("1970-01-01T00:00:06.000000000"),
            np.datetime64("1970-01-01T00:00:06.500000000"),
        ]
    ),
    schema={"values": pl.Datetime}
)

我还在numpy中实现了预期的功能:

import numpy as np
import numpy.typing as npt


def _np_sync_to(
    timestamps: npt.ArrayLike[np.datetime64],
    other: npt.ArrayLike[np.datetime64],
    tolerance: str,
):
    outer_diffs = np.abs(np.subtract.outer(other, timestamps))
    closest_timestamps_indices = outer_diffs.argmin(0)
    closest_timestamps = other[closest_timestamps_indices]
    diffs = np.abs(closest_timestamps - timestamps)
    tolerance = parse_timedelta(tolerance)
    within_tolerance = diffs <= tolerance

    ts1_synced = timestamps[within_tolerance]
    ts2_synced = closest_timestamps[within_tolerance]

    return ts1_synced, ts2_synced


np_ts1_synced, np_ts2_synced = _np_sync_to(
    timestamps=np.squeeze(timestamps.collect().to_numpy()),
    other=np.squeeze(other_timestamps.collect().to_numpy()),
    tolerance="500ms",
)

预期结果为:

np_ts1_synced = np.array([
    np.datetime64('1970-01-01T00:00:01.500000000'), 
    np.datetime64('1970-01-01T00:00:02.600000000'), 
    np.datetime64('1970-01-01T00:00:04.500000000'), 
    np.datetime64('1970-01-01T00:00:06.200000000')
])

np_ts2_synced = np.array([
    np.datetime64('1970-01-01T00:00:01.500000000'), 
    np.datetime64('1970-01-01T00:00:02.500000000'), 
    np.datetime64('1970-01-01T00:00:04.500000000'), 
    np.datetime64('1970-01-01T00:00:06.000000000')
])

因此,同步的时间戳基本上是指定容差内最近的时间戳. 现在我想使用Polars LazyFrames实现相同的功能来处理 Big Data .

我试着用Polars等效地实现它,但是外部减法的维数不正确,我想有一种更好的方法来进行一般的计算:

# inspired by https://stackoverflow.com/questions/77748729/polars-equivalent-to-np-outer
def sync_to(timestamps, other, tolerance: str):
    def _outer(
        a: pl.DataFrame | pl.LazyFrame, b: pl.DataFrame | pl.LazyFrame
    ):
        # I guess the following line is incorrect
        nrows = pl.len().sqrt().cast(pl.Int32)
        return (
            a.select("values")
            .join(b.select("values"), how="cross")
            .select(
                computed=(pl.col("values") - pl.col("values_right")).abs()
            )
            .group_by(pl.arange(0, pl.len()) // nrows, maintain_order=True)
            .agg("computed")
            .select(pl.col("computed").list.to_struct())
            .unnest("computed")
        )

    outer_diffs = timestamps.pipe(_outer, other)

我的另一个 idea 是:

ts1 = timestamps.sort("values").join_asof(
    other.sort("values"),
    on="values",
    strategy="nearest",
    tolerance=tolerance,
)

但yields 并不是我想要的.

推荐答案

看来你的join_as_of个例子很有效.唯一的问题是,据我所知,join_as_of是一个left连接,所以你必须额外过滤掉非连接值is_not_null()(或者更好,如@ Hericks在 comments 中建议的drop_nulls()):

timestamps.sort("values").join_asof(
    other_timestamps.with_columns(
        pl.col('values').alias('other_values')
    ).sort("values"),
    on="values",
    strategy="nearest",
    tolerance="500ms"
).drop_nulls()
# ).filter(pl.col('right_values').is_not_null())

┌─────────────────────────┬─────────────────────────┐
│ values                  ┆ other_values            │
│ ---                     ┆ ---                     │
│ datetime[ns]            ┆ datetime[ns]            │
╞═════════════════════════╪═════════════════════════╡
│ 1970-01-01 00:00:01.500 ┆ 1970-01-01 00:00:01.500 │
│ 1970-01-01 00:00:02.600 ┆ 1970-01-01 00:00:02.500 │
│ 1970-01-01 00:00:04.500 ┆ 1970-01-01 00:00:04.500 │
│ 1970-01-01 00:00:06.200 ┆ 1970-01-01 00:00:06     │
└─────────────────────────┴─────────────────────────┘

Python相关问答推荐

@Property方法上的inspect.getmembers出现意外行为,引发异常

比较两个数据帧并并排附加结果(获取性能警告)

PMMLPipeline._ fit()需要2到3个位置参数,但给出了4个位置参数

django禁止直接分配到多对多集合的前端.使用user.set()

Python+线程\TrocessPoolExecutor

导入...从...混乱

Django admin Csrf令牌未设置

如何使用OpenGL使球体遵循Python中的八样路径?

ConversationalRetrivalChain引发键错误

从列表中获取n个元素,其中list [i][0]== value''

查看pandas字符列是否在字符串列中

使用polars. pivot()旋转一个框架(类似于R中的pivot_longer)

需要帮助使用Python中的Google的People API更新联系人的多个字段'

如何在PythonPandas 中对同一个浮动列进行逐行划分?

用来自另一个数据框的列特定标量划分Polars数据框中的每一列,

无法在盐流道中获得柱子

是否需要依赖反转来确保呼叫方和被呼叫方之间的分离?

如何通过特定导入在类中执行Python代码

来自任务调度程序的作为系统的Python文件

Pandas 数据框自定义排序功能