我将列出复制(df1['x'].diff() < 0).cumsum()
的所有步骤(并保留输出中的所有列),这很容易使用lag
计算.
但是,重要的是,您的数据具有一个具有DataFrame顺序的ID列,因为与PANG不同,Spark不保留DataFrame的排序(由于其分布式性质).对于本例,我假设您的数据有一个名为idx
的ID列,这是您在示例输入中打印的索引.
# input data
data_sdf.show(5)
# +---+---+
# |idx|val|
# +---+---+
# | 0| 1|
# | 1| 2|
# | 2| 2|
# | 3| 2|
# | 4| 3|
# +---+---+
# only showing top 5 rows
# calculating the group column
data_sdf. \
withColumn('diff_with_prevval',
func.col('val') - func.lag('val').over(wd.orderBy('idx'))
). \
withColumn('diff_lt_0',
func.coalesce((func.col('diff_with_prevval') < 0).cast('int'), func.lit(0))
). \
withColumn('diff_lt_0_cumsum',
func.sum('diff_lt_0').over(wd.orderBy('idx').rowsBetween(-sys.maxsize, 0))
). \
show()
# +---+---+-----------------+---------+----------------+
# |idx|val|diff_with_prevval|diff_lt_0|diff_lt_0_cumsum|
# +---+---+-----------------+---------+----------------+
# | 0| 1| null| 0| 0|
# | 1| 2| 1| 0| 0|
# | 2| 2| 0| 0| 0|
# | 3| 2| 0| 0| 0|
# | 4| 3| 1| 0| 0|
# | 5| 3| 0| 0| 0|
# | 6| 4| 1| 0| 0|
# | 7| 5| 1| 0| 0|
# | 8| 4| -1| 1| 1|
# | 9| 3| -1| 1| 2|
# | 10| 2| -1| 1| 3|
# | 11| 1| -1| 1| 4|
# | 12| 2| 1| 0| 4|
# | 13| 3| 1| 0| 4|
# | 14| 4| 1| 0| 4|
# | 15| 5| 1| 0| 4|
# | 16| 5| 0| 0| 4|
# | 17| 6| 1| 0| 4|
# +---+---+-----------------+---------+----------------+
现在,您可以使用groupBy()
中的diff_lt_0_cumsum
列进一步使用.