Filtering
It's always good to remove data which is not needed. You said you need just last 60 days, so You could filter
out what's not needed.
This line would keep only rows with date not older than 60 last days (until today):
df = df.filter(F.to_date('txn_date', 'yyyyMMdd').between(F.current_date()-61, F.current_date()))
我现在不使用它来说明其他问题.
Window
第一件简单的事,如果它已经是长格式的,你不需要再次转换为长格式,所以我们可以删除.cast(LongType())
.
另一个是100,窗口的下限是错误的.看,让我们在输入中再添加一行:
[19990101, 'B', 9999999, "xxxxxxx"],
该行表示1999年的日期.添加行后,运行代码,我们得到以下结果:
# +--------+--------+----------+----------------+------------------+
# |txn_date|txn_type|txn_amount|other_attributes|stddev_last_30days|
# +--------+--------+----------+----------------+------------------+
# |20210101| A| 103| abc| 1.0|
# |20210101| A| 102| def| 1.0|
# |20210101| A| 101| def| 1.0|
# |20210102| A| 34| ghu|34.009802508492555|
# |19990101| B| 9999999| xxxxxxx| null|
# |20210101| B| 180| xyz| 7070939.82553808|
# |20210102| B| 123| kqt| 5773414.64605055|
# +--------+--------+----------+----------------+------------------+
您可以看到,2021的stddev线路也受到了影响,因此30天窗口不起作用,您的窗口实际上需要all个数据.我们可以判断日期202all01
的下限是多少:
print(20210101-days(30)) # Returns 17618101 - I doubt you wanted this date as lower bound
这可能是你最大的问题.100
您可以使用此窗口:
days = lambda i: i * 86400
w = Window.partitionBy('txn_type').orderBy(F.unix_timestamp(F.col('txn_date').cast('string'), 'yyyyMMdd')).rangeBetween(-days(30), 0)
df = df.withColumn('stddev_last_30days', F.stddev('txn_amount').over(w))
df.show()
# +--------+--------+----------+----------------+------------------+
# |txn_date|txn_type|txn_amount|other_attributes|stddev_last_30days|
# +--------+--------+----------+----------------+------------------+
# |20210101| A| 103| abc| 1.0|
# |20210101| A| 102| def| 1.0|
# |20210101| A| 101| def| 1.0|
# |20210102| A| 34| ghu|34.009802508492555|
# |19990101| B| 9999999| xxxxxxx| null|
# |20210101| B| 180| xyz| null|
# |20210102| B| 123| kqt| 40.30508652763321|
# +--------+--------+----------+----------------+------------------+
unix_timestamp
可以将"yyyyMMdd"格式转换为适当的长格式数字(UNIX时间以秒为单位).现在你可以从中减go 秒(相当于30天的秒).