分步解决方案
创建一个窗口规范以按PersonID
和ORDER BY Date
对DataFrame进行分组,然后使用to_date
函数将字符串解析为Date类型.
W = Window.partitionBy('PersonID').orderBy('Date')
df1 = df.withColumn('Date', F.to_date('Date', format='dd-MM-yyyy'))
# df1.show()
# +--------+----------+--------------+
# |PersonID| Date|HasDoneWorkout|
# +--------+----------+--------------+
# | A|2001-01-31| 1|
# | A|2001-02-01| 1|
# | A|2001-02-02| 1|
# | A|2001-02-03| 0|
# | A|2001-02-04| 1|
# | B|2001-02-02| 1|
# +--------+----------+--------------+
计算上一行和当前行中的日期之间的差异,以标记日期连续的行
diff = F.datediff('Date', F.lag('Date').over(W))
df1 = df1.withColumn('is_consecutive_day', F.coalesce(diff, F.lit(0)) == 1)
# df1.show()
# +--------+----------+--------------+------------------+
# |PersonID| Date|HasDoneWorkout|is_consecutive_day|
# +--------+----------+--------------+------------------+
# | A|2001-01-31| 1| false|
# | A|2001-02-01| 1| true|
# | A|2001-02-02| 1| true|
# | A|2001-02-03| 0| true|
# | A|2001-02-04| 1| true|
# | B|2001-02-02| 1| false|
# +--------+----------+--------------+------------------+
创建一个Boolean列,以标识具有连续日期的行和个人已进行锻炼的行.
df1 = df1.withColumn('is_workout_on_consecutive_day', F.col('is_consecutive_day') & (F.col('HasDoneWorkout') == 1))
# df1.show()
# +--------+----------+--------------+------------------+-----------------------------+
# |PersonID| Date|HasDoneWorkout|is_consecutive_day|is_workout_on_consecutive_day|
# +--------+----------+--------------+------------------+-----------------------------+
# | A|2001-01-31| 1| false| false|
# | A|2001-02-01| 1| true| true|
# | A|2001-02-02| 1| true| true|
# | A|2001-02-03| 0| true| false|
# | A|2001-02-04| 1| true| true|
# | B|2001-02-02| 1| false| false|
# +--------+----------+--------------+------------------+-----------------------------+
在倒置条件is_workout_on_consecutive_day
上的累积总和,以区分其中人已经连续进行锻炼的不同行组
df1 = df1.withColumn('groups', F.sum((~F.col('is_workout_on_consecutive_day')).cast('int')).over(W))
# df1.show()
# +--------+----------+--------------+------------------+-----------------------------+------+
# |PersonID| Date|HasDoneWorkout|is_consecutive_day|is_workout_on_consecutive_day|groups|
# +--------+----------+--------------+------------------+-----------------------------+------+
# | A|2001-01-31| 1| false| false| 1|
# | A|2001-02-01| 1| true| true| 1|
# | A|2001-02-02| 1| true| true| 1|
# | A|2001-02-03| 0| true| false| 2|
# | A|2001-02-04| 1| true| true| 2|
# | B|2001-02-02| 1| false| false| 1|
# +--------+----------+--------------+------------------+-----------------------------+------+
将数据帧按PersonID
和groups
分组,将聚合HasDoneWorkout
按sum
分组,以获得所有连续条纹的计数
df1 = df1.groupBy('PersonID', 'groups').agg(F.sum('HasDoneWorkout').alias('streaks'))
# df1.show()
# +--------+------+-------+
# |PersonID|groups|streaks|
# +--------+------+-------+
# | A| 1| 3|
# | A| 2| 1|
# | B| 1| 1|
# +--------+------+-------+
再次将数据帧按PersonID
分组并聚合以找到最大连续条带
df1 = df1.groupBy('PersonID').agg(F.max('streaks').alias('streaks'))
# df1.show()
# +--------+-------+
# |PersonID|streaks|
# +--------+-------+
# | A| 3|
# | B| 1|
# +--------+-------+