想象一下一张桌子:

PersonID Date HasDoneWorkout
A 31-01-2001 1
A 01-02-2001 1
A 02-02-2001 1
A 03-02-2001 0
A 04-02-2001 1
B 02-02-2001 1

我想创建一个spark 源聚合函数,它将计算一个人已经连续锻炼了多少天.如果个人有多个连续记录- Select 最长的记录.

预期输出:

PersonID HasDoneWorkout
A 3
B 1

因为我没有找到任何使用spark 的解决方案-我试图采用Pandas 的方法.但未能将其转化为星星之火.

推荐答案

分步解决方案

创建一个窗口规范以按PersonID和ORDER BY Date对DataFrame进行分组,然后使用to_date函数将字符串解析为Date类型.

W = Window.partitionBy('PersonID').orderBy('Date')
df1 = df.withColumn('Date', F.to_date('Date', format='dd-MM-yyyy'))

# df1.show()
# +--------+----------+--------------+
# |PersonID|      Date|HasDoneWorkout|
# +--------+----------+--------------+
# |       A|2001-01-31|             1|
# |       A|2001-02-01|             1|
# |       A|2001-02-02|             1|
# |       A|2001-02-03|             0|
# |       A|2001-02-04|             1|
# |       B|2001-02-02|             1|
# +--------+----------+--------------+

计算上一行和当前行中的日期之间的差异,以标记日期连续的行

diff = F.datediff('Date', F.lag('Date').over(W))
df1 = df1.withColumn('is_consecutive_day', F.coalesce(diff, F.lit(0)) == 1)

# df1.show()
# +--------+----------+--------------+------------------+
# |PersonID|      Date|HasDoneWorkout|is_consecutive_day|
# +--------+----------+--------------+------------------+
# |       A|2001-01-31|             1|             false|
# |       A|2001-02-01|             1|              true|
# |       A|2001-02-02|             1|              true|
# |       A|2001-02-03|             0|              true|
# |       A|2001-02-04|             1|              true|
# |       B|2001-02-02|             1|             false|
# +--------+----------+--------------+------------------+

创建一个Boolean列,以标识具有连续日期的行和个人已进行锻炼的行.

df1 = df1.withColumn('is_workout_on_consecutive_day', F.col('is_consecutive_day') & (F.col('HasDoneWorkout') == 1))

# df1.show()
# +--------+----------+--------------+------------------+-----------------------------+
# |PersonID|      Date|HasDoneWorkout|is_consecutive_day|is_workout_on_consecutive_day|
# +--------+----------+--------------+------------------+-----------------------------+
# |       A|2001-01-31|             1|             false|                        false|
# |       A|2001-02-01|             1|              true|                         true|
# |       A|2001-02-02|             1|              true|                         true|
# |       A|2001-02-03|             0|              true|                        false|
# |       A|2001-02-04|             1|              true|                         true|
# |       B|2001-02-02|             1|             false|                        false|
# +--------+----------+--------------+------------------+-----------------------------+

在倒置条件is_workout_on_consecutive_day上的累积总和,以区分其中人已经连续进行锻炼的不同行组

df1 = df1.withColumn('groups', F.sum((~F.col('is_workout_on_consecutive_day')).cast('int')).over(W))


# df1.show()
# +--------+----------+--------------+------------------+-----------------------------+------+
# |PersonID|      Date|HasDoneWorkout|is_consecutive_day|is_workout_on_consecutive_day|groups|
# +--------+----------+--------------+------------------+-----------------------------+------+
# |       A|2001-01-31|             1|             false|                        false|     1|
# |       A|2001-02-01|             1|              true|                         true|     1|
# |       A|2001-02-02|             1|              true|                         true|     1|
# |       A|2001-02-03|             0|              true|                        false|     2|
# |       A|2001-02-04|             1|              true|                         true|     2|
# |       B|2001-02-02|             1|             false|                        false|     1|
# +--------+----------+--------------+------------------+-----------------------------+------+

将数据帧按PersonIDgroups分组,将聚合HasDoneWorkoutsum分组,以获得所有连续条纹的计数

df1 = df1.groupBy('PersonID', 'groups').agg(F.sum('HasDoneWorkout').alias('streaks'))

# df1.show()
# +--------+------+-------+
# |PersonID|groups|streaks|
# +--------+------+-------+
# |       A|     1|      3|
# |       A|     2|      1|
# |       B|     1|      1|
# +--------+------+-------+

再次将数据帧按PersonID分组并聚合以找到最大连续条带

df1 = df1.groupBy('PersonID').agg(F.max('streaks').alias('streaks'))

# df1.show()
# +--------+-------+
# |PersonID|streaks|
# +--------+-------+
# |       A|      3|
# |       B|      1|
# +--------+-------+

Python相关问答推荐

Python中两个矩阵的自定义Hadamard风格产物

合并同名列,但一列为空,另一列包含值

按 struct 值对Polars列表[struct[]]排序

指示组内的rejected_time是否在creation_timestamp后5分钟内

如何防止Plotly在输出到PDF时减少行中的点数?

从包含数字和单词的文件中读取和获取数据集

计算相同形状的两个张量的SSE损失

在Pandas 日历中插入一行

当密钥是复合且唯一时,Pandas合并抱怨标签不唯一

如何获取TFIDF Transformer中的值?

我们可以为Flask模型中的id字段主键设置默认uuid吗

如何使用表达式将字符串解压缩到Polars DataFrame中的多个列中?

在极性中创建条件累积和

cv2.matchTemplate函数匹配失败

如何在表中添加重复的列?

在www.example.com中使用`package_data`包含不包含__init__. py的非Python文件

如何在达到end_time时自动将状态字段从1更改为0

在代码执行后关闭ChromeDriver窗口

在Admin中显示从ManyToMany通过模型的筛选结果

如何获取Python synsets列表的第一个内容?