我想按单调递增趋势将我的spark 源数据帧拆分成组,并保留大小大于10的组.

在这里,我try 了部分代码,

from pyspark.sql import functions as F, Window

df = df1.withColumn(
    "FLAG_INCREASE",
    F.when(
        F.col("x")
        > F.lag("x").over(Window.partitionBy("x1").orderBy("time")),
        1,
    ).otherwise(0),
)

我不知道怎么在spark 源里玩团体舞...如果有人有更好的解决方案

在Pandas 身上,我们也可以这样做:

df=df1.groupby((df1['x'].diff() < 0).cumsum())

如何将此代码转换为PYSPARK?

数据帧示例:

    x
0   1
1   2
2   2
3   2
4   3
5   3
6   4
7   5
8   4
9   3
10  2
11  1
12  2
13  3
14  4
15  5
16  5
17  6

预期yields

第一组:

   x
0  1
1  2
2  2
3  2
4  3
5  3
6  4
7  5

第二组:

   x
0  1
1  2
2  3
3  4
4  5
5  5
6  6

推荐答案

我将列出复制(df1['x'].diff() < 0).cumsum()的所有步骤(并保留输出中的所有列),这很容易使用lag计算.

但是,重要的是,您的数据具有一个具有DataFrame顺序的ID列,因为与PANG不同,Spark不保留DataFrame的排序(由于其分布式性质).对于本例,我假设您的数据有一个名为idx的ID列,这是您在示例输入中打印的索引.

# input data
data_sdf.show(5)

# +---+---+
# |idx|val|
# +---+---+
# |  0|  1|
# |  1|  2|
# |  2|  2|
# |  3|  2|
# |  4|  3|
# +---+---+
# only showing top 5 rows

# calculating the group column
data_sdf. \
    withColumn('diff_with_prevval', 
               func.col('val') - func.lag('val').over(wd.orderBy('idx'))
               ). \
    withColumn('diff_lt_0', 
               func.coalesce((func.col('diff_with_prevval') < 0).cast('int'), func.lit(0))
               ). \
    withColumn('diff_lt_0_cumsum', 
               func.sum('diff_lt_0').over(wd.orderBy('idx').rowsBetween(-sys.maxsize, 0))
               ). \
    show()

# +---+---+-----------------+---------+----------------+
# |idx|val|diff_with_prevval|diff_lt_0|diff_lt_0_cumsum|
# +---+---+-----------------+---------+----------------+
# |  0|  1|             null|        0|               0|
# |  1|  2|                1|        0|               0|
# |  2|  2|                0|        0|               0|
# |  3|  2|                0|        0|               0|
# |  4|  3|                1|        0|               0|
# |  5|  3|                0|        0|               0|
# |  6|  4|                1|        0|               0|
# |  7|  5|                1|        0|               0|
# |  8|  4|               -1|        1|               1|
# |  9|  3|               -1|        1|               2|
# | 10|  2|               -1|        1|               3|
# | 11|  1|               -1|        1|               4|
# | 12|  2|                1|        0|               4|
# | 13|  3|                1|        0|               4|
# | 14|  4|                1|        0|               4|
# | 15|  5|                1|        0|               4|
# | 16|  5|                0|        0|               4|
# | 17|  6|                1|        0|               4|
# +---+---+-----------------+---------+----------------+

现在,您可以使用groupBy()中的diff_lt_0_cumsum列进一步使用.

Python-3.x相关问答推荐

网站抓取:当我使用Chrome DevTools中的网络选项卡时,找不到正确的URL来提供我想要的数据

将strid()映射到Pandas DataFrame中的字符串不会更改NaN条目,但仍然声称它们不同?

与 pandas 0.22 相比,pandas 2.0.3 中的 df.replace() 会抛出 ValueError 错误

如何创建与导航抽屉一起使用的导航栏

合并所有文件并获取特定列数据

双轴上的刻度和标签

这种类型提示有什么作用?

在字符串中查找正则表达式的所有模式

用于 BIG 数组计算的多处理池映射比预期的要慢

RGB 图像中最主要的 colored颜色 - OpenCV / NumPy / Python

Python socket.error: [Errno 13] 权限被拒绝

活动屏幕上的 PyQt4 中心窗口

在两个数据框之间查找相等的列

如何从同一文件夹中的模块导入功能?

TensorFlow:dataset.train.next_batch 是如何定义的?

django - 值更改后自动更新日期

Python configparser 不会接受没有值的键

如何制作函数Collection

有没有一种标准方法来确保 python 脚本将由 python2 而不是 python3 解释?

字典理解中的操作顺序