我有这个数据框:

df = (
spark
.createDataFrame([
    [20210101, 'A', 103, "abc"], 
    [20210101, 'A', 102, "def"], 
    [20210101, 'A', 101, "def"], 
    [20210102, 'A', 34, "ghu"], 
    [20210101, 'B', 180, "xyz"], 
    [20210102, 'B', 123, "kqt"]
]
    ).toDF("txn_date", "txn_type", "txn_amount", "other_attributes")
)

每个日期都有不同类型的多笔交易.我的任务是计算每个记录金额的标准差(对于相同类型,追溯到30天).

最明显的方法(我try 过)是基于类型创建一个窗口,并包括过go 30天的记录.

days = lambda i: i * 86400
win = Window.partitionBy("txn_type").orderBy(F.col("txn_date").cast(LongType())).rangeBetween(-days(30), 0)
df = df.withColumn("stddev_last_30days", F.stddev(F.col("txn_amount")).over(win))

由于某些交易类型每天有数百万笔交易,因此会出现OOM.

我试着分部分进行(每次只取几个日期的记录),但由于标准差不是累加性的,这会导致计算容易出错.

我还try 对交易类型和日期的所有记录使用"collect\u set"(因此所有金额在一列中以数组形式出现),但这也会遇到OOM.

我试着一次处理一个月(我需要至少两个月的数据,因为我需要返回1个月),但即使这样,我的遗嘱执行人也无法承受.

解决这个问题的可扩展方法是什么?

笔记:

  • 在原始数据中,列txn_date以"yyyyMMdd"格式存储为长.

  • 对于每个日期和类型,数据框中的其他列可能相同,也可能不同.为了简单起见,我没有将它们包括在示例代码中.

推荐答案

Filtering

It's always good to remove data which is not needed. You said you need just last 60 days, so You could filter out what's not needed.
This line would keep only rows with date not older than 60 last days (until today):

df = df.filter(F.to_date('txn_date', 'yyyyMMdd').between(F.current_date()-61, F.current_date()))

我现在不使用它来说明其他问题.

Window

第一件简单的事,如果它已经是长格式的,你不需要再次转换为长格式,所以我们可以删除.cast(LongType()).

另一个是100,窗口的下限是错误的.看,让我们在输入中再添加一行:

[19990101, 'B', 9999999, "xxxxxxx"],

该行表示1999年的日期.添加行后,运行代码,我们得到以下结果:

# +--------+--------+----------+----------------+------------------+
# |txn_date|txn_type|txn_amount|other_attributes|stddev_last_30days|
# +--------+--------+----------+----------------+------------------+
# |20210101|       A|       103|             abc|               1.0|
# |20210101|       A|       102|             def|               1.0|
# |20210101|       A|       101|             def|               1.0|
# |20210102|       A|        34|             ghu|34.009802508492555|
# |19990101|       B|   9999999|         xxxxxxx|              null|
# |20210101|       B|       180|             xyz|  7070939.82553808|
# |20210102|       B|       123|             kqt|  5773414.64605055|
# +--------+--------+----------+----------------+------------------+

您可以看到,2021的stddev线路也受到了影响,因此30天窗口不起作用,您的窗口实际上需要all个数据.我们可以判断日期202all01的下限是多少:

print(20210101-days(30))  # Returns 17618101 - I doubt you wanted this date as lower bound

这可能是你最大的问题.100

您可以使用此窗口:

days = lambda i: i * 86400
w = Window.partitionBy('txn_type').orderBy(F.unix_timestamp(F.col('txn_date').cast('string'), 'yyyyMMdd')).rangeBetween(-days(30), 0)
df = df.withColumn('stddev_last_30days', F.stddev('txn_amount').over(w))

df.show()
# +--------+--------+----------+----------------+------------------+
# |txn_date|txn_type|txn_amount|other_attributes|stddev_last_30days|
# +--------+--------+----------+----------------+------------------+
# |20210101|       A|       103|             abc|               1.0|
# |20210101|       A|       102|             def|               1.0|
# |20210101|       A|       101|             def|               1.0|
# |20210102|       A|        34|             ghu|34.009802508492555|
# |19990101|       B|   9999999|         xxxxxxx|              null|
# |20210101|       B|       180|             xyz|              null|
# |20210102|       B|       123|             kqt| 40.30508652763321|
# +--------+--------+----------+----------------+------------------+

unix_timestamp可以将"yyyyMMdd"格式转换为适当的长格式数字(UNIX时间以秒为单位).现在你可以从中减go 秒(相当于30天的秒).

Python相关问答推荐

试图找到Python方法来部分填充numpy数组

处理带有间隙(空)的duckDB上的重复副本并有效填充它们

聚合具有重复元素的Python字典列表,并添加具有重复元素数量的新键

如何过滤包含2个指定子字符串的收件箱列名?

Python键入协议默认值

Python,Fitting into a System of Equations

在Python中动态计算范围

调用decorator返回原始函数的输出

python panda ExcelWriter切换动态公式到数组公式

交替字符串位置的正则表达式

如何在Gekko中处理跨矢量优化

如何使用大量常量优化代码?

随机森林n_估计器的计算

高效地计算数字数组中三行上三个点之间的Angular

Match-Case构造中的对象可调用性测试

有什么方法可以在不对多索引DataFrame的列进行排序的情况下避免词法排序警告吗?

如何在不遇到IndexError的情况下将基数10的整数转换为基数80?

将索引表转换为Numy数组

将参数从另一个python脚本中传递给main(argv

GEKKO中若干参数的线性插值动态优化