我有以下数据框:

+----+----------+-----+------+
|  id|      date|reset|cumsum|
+----+----------+-----+------+
|1001|2023-04-01|false|     0|
|1001|2023-04-02|false|     0|
|1001|2023-04-03|false|     1|
|1001|2023-04-04|false|     1|
|1001|2023-04-05| true|     4|
|1001|2023-04-06|false|     4|
|1001|2023-04-07|false|     4|
|1001|2023-04-08|false|    10|
|1001|2023-04-09| true|    10|
|1001|2023-04-10|false|    12|
|1001|2023-04-11|false|    13|
+----+----------+-----+------+

我正在try 创建一个新列,该列在特定日期的reset列为True时重置cumsum.

预期输出:

+----+----------+-----+------+----------+
|  id|      date|reset|cumsum|new_cumsum|
+----+----------+-----+------+----------+
|1001|2023-04-01|false|     0|         0|
|1001|2023-04-02|false|     0|         0|
|1001|2023-04-03|false|     1|         1|
|1001|2023-04-04|false|     1|         1|
|1001|2023-04-05| true|     4|         3|
|1001|2023-04-06|false|     4|         3|
|1001|2023-04-07|false|     4|         3|
|1001|2023-04-08|false|    10|         6|
|1001|2023-04-09| true|    10|         0|
|1001|2023-04-10|false|    12|         2|
|1001|2023-04-11|false|    13|         3|
+----+----------+-----+------+----------+

对上面的DF中正在发生的事情的解释:

  1. 从4/01到4/04,没有重置标志,因此每个日期的新值都应该反映原始的值.
  2. 在4/05,我们进行了第一次重置,如果原始值保持为1,则新值将为0.但因为它在那一天从1增加到4,所以新值就是差值(3)
  3. 从4/05到4/07,原始值保持在4不变,因此新值应该保持在3不变.
  4. 在4/08时,原来的值从4增加到10,因此新值应该是差值(6).
  5. 在09年4月4日,我们进行了第二次重置,因为该值保持在10不变,该日期的新值应该是0.
  6. 在4/10时,原始值从10增加到12,因此新值应该是差值(2).
  7. 在4/11时,原始值从12增加到13,因此新值应为4/11时的值与最近重置日期4/09(3)时的值之间的差值.

Df代码:

df = spark.createDataFrame(
    [
        (1001, "2023-04-01", False, 0, 0),
        (1001, "2023-04-02", False, 0, 0),
        (1001, "2023-04-03", False, 1, 1),
        (1001, "2023-04-04", False, 1, 1),
        (1001, "2023-04-05", True, 4, 3),
        (1001, "2023-04-06", False, 4, 3),
        (1001, "2023-04-07", False, 4, 3),
        (1001, "2023-04-08", False, 10, 6),
        (1001, "2023-04-09", True, 10, 0),
        (1001, "2023-04-10", False, 12, 2),
        (1001, "2023-04-11", False, 13, 3),
    ],
    ["id", "date", "reset", "cumsum", "new_cumsum"],
)

我希望这是有道理的.解释起来有点棘手.提前感谢您的任何答复.

推荐答案

我能够用窗口函数的组合来解决这个问题,关键的观察是使用"Reset"列构造的名为"Partition"的分区上的前缀sum,而且我认为您在第4点中犯了一个错误,值应该是9而不是6:

df = spark.createDataFrame(
    [
        (1001, "2023-04-01", False, 0, 0),
        (1001, "2023-04-02", False, 0, 0),
        (1001, "2023-04-03", False, 1, 1),
        (1001, "2023-04-04", False, 1, 1),
        (1001, "2023-04-05", True, 4, 3),
        (1001, "2023-04-06", False, 4, 3),
        (1001, "2023-04-07", False, 4, 3),
        (1001, "2023-04-08", False, 10, 6),
        (1001, "2023-04-09", True, 10, 0),
        (1001, "2023-04-10", False, 12, 2),
        (1001, "2023-04-11", False, 13, 3),
    ],
    ["id", "date", "reset", "cumsum", "new_cumsum"],
)

w = Window.orderBy("date")
w2 = Window.partitionBy("partition").orderBy("date")
df = df.withColumn("diff", col("cumsum") - lag("cumsum", default=0).over(w)) \
    .withColumn("partition", when(~col("reset"), 0).otherwise(1)) \
    .withColumn("partition", sum("partition").over(w)) \
    .withColumn("new_cumsum_2", sum(col("diff")).over(w2)).drop("diff", "partition")

df.show()

结果:

+----+----------+-----+------+----------+------------+
|  id|      date|reset|cumsum|new_cumsum|new_cumsum_2|
+----+----------+-----+------+----------+------------+
|1001|2023-04-01|false|     0|         0|           0|
|1001|2023-04-02|false|     0|         0|           0|
|1001|2023-04-03|false|     1|         1|           1|
|1001|2023-04-04|false|     1|         1|           1|
|1001|2023-04-05| true|     4|         3|           3|
|1001|2023-04-06|false|     4|         3|           3|
|1001|2023-04-07|false|     4|         3|           3|
|1001|2023-04-08|false|    10|         6|           9|
|1001|2023-04-09| true|    10|         0|           0|
|1001|2023-04-10|false|    12|         2|           2|
|1001|2023-04-11|false|    13|         3|           3|
+----+----------+-----+------+----------+------------+

Python相关问答推荐

有什么方法可以修复奇怪的y轴Python matplotlib图吗?

如何将自动创建的代码转换为类而不是字符串?

Python:MultiIndex Dataframe到类似json的字典列表

将numpy数组与空数组相加

自定义新元未更新参数

在编写要Excel的数据透视框架时修复标题行

Python无法在已导入的目录中看到新模块

Pandas 在时间序列中设定频率

Python -Polars库中的滚动索引?

在Python中为变量的缺失值创建虚拟值

如何处理嵌套的SON?

点到面的Y距离

2D空间中的反旋算法

C#使用程序从Python中执行Exec文件

数据抓取失败:寻求帮助

从groupby执行计算后创建新的子框架

如何使用它?

用砂箱开发Web统计分析

Pandas Data Wrangling/Dataframe Assignment

无论输入分辨率如何,稳定扩散管道始终输出512 * 512张图像