我有以下简化的PySpark输入数据帧:

Category    Time    Stock-level    Stock-change
apple       1       4              null
apple       2       null           -2
apple       3       null           5
banana      1       12             null
banana      2       null           4
orange      1       1              null
orange      2       null           -7

我希望对于每个Category,按Time升序,当前行的Stock-level值用前一行的Stock-level+该行本身的Stock-change填充.更清楚的是:

Stock-level[row n] = Stock-level[row n-1] + Stock-change[row n]

输出数据帧应如下所示:

Category    Time    Stock-level    Stock-change
apple       1       4              null
apple       2       2              -2
apple       3       7              5
banana      1       12             null
banana      2       16             4
orange      1       1              null
orange      2       -6             -7

我知道Pyspark窗口函数,它们似乎对此很有用,但我找不到一个示例来解决这种特定类型的问题,即将当前行和前一行的值相加.

提前谢谢!

推荐答案

你可以取coalesce(level, change)英镑的总和.

import pyspark.sql.functions as func
from pyspark.sql.window import Window as wd

data_sdf. \
    withColumn('new_stock_level', 
               func.sum(func.coalesce('stock_level', 'stock_change')).
               over(wd.partitionBy('cat').orderBy('time').rowsBetween(-sys.maxsize, 0))
               ). \
    show()

# +------+----+-----------+------------+---------------+
# |   cat|time|stock_level|stock_change|new_stock_level|
# +------+----+-----------+------------+---------------+
# | apple|   1|          4|        null|              4|
# | apple|   2|       null|          -2|              2|
# | apple|   3|       null|           5|              7|
# |banana|   1|         12|        null|             12|
# |banana|   2|       null|           4|             16|
# |orange|   1|          1|        null|              1|
# |orange|   2|       null|          -7|             -6|
# +------+----+-----------+------------+---------------+

Python相关问答推荐

如果AST请求默认受csref保护,那么在Django中使用@ system_decorator(csref_protect)的目的是什么?

如何在Python中使用ijson解析SON期间检索文件位置?

使文本输入中的文本与标签中的文本相同

指示组内的rejected_time是否在creation_timestamp后5分钟内

覆盖Django rest响应,仅返回PK

使用from_pandas将GeDataFrame转换为polars失败,ArrowType错误:未传递numpy. dype对象

如何根据条件在多指标框架上进行groupby

Polars比较了两个预设-有没有方法在第一次不匹配时立即失败

如何使用symy打印方程?

查找两极rame中组之间的所有差异

我对我应该做什么以及我如何做感到困惑'

我想一列Panadas的Rashrame,这是一个URL,我保存为CSV,可以直接点击

如何启动下载并在不击中磁盘的情况下呈现响应?

考虑到同一天和前2天的前2个数值,如何估算电力时间序列数据中的缺失值?

Python全局变量递归得到不同的结果

pandas:对多级列框架的列进行排序/重新排序

未调用自定义JSON编码器

下三角形掩码与seaborn clustermap bug

在输入行运行时停止代码

如何在Python 3.9.6和MacOS Sonoma 14.3.1下安装Pyregion