我有一个PySpark数据框,看起来像这样:

Id               timestamp           col1               col2
abc                789                0                  1
def                456                1                  0
abc                123                1                  0
def                321                0                  1

我想按ID列分组或分区,然后应根据时间戳的顺序创建col1和col2的列表.

Id               timestamp            col1             col2
abc              [123,789]           [1,0]             [0,1]
def              [321,456]           [0,1]             [1,0]

我的方法:

from pyspark.sql import functions as F
from pyspark.sql import Window as W

window_spec = W.partitionBy("id").orderBy('timestamp')
ranged_spec = window_spec.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

df1 = df.withColumn("col1", F.collect_list("reco").over(window_spec))\
  .withColumn("col2", F.collect_list("score").over(window_spec))\
df1.show()

但这并没有返回col1和col2的列表.

推荐答案

我认为使用groupBy个聚合无法可靠地保存订单.因此,窗口功能似乎是一条出路.

设置:

from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
    [('abc', 789, 0, 1),
     ('def', 456, 1, 0),
     ('abc', 123, 1, 0),
     ('def', 321, 0, 1)],
    ['Id', 'timestamp', 'col1', 'col2'])

脚本:

w1 = W.partitionBy('Id').orderBy('timestamp')
w2 = W.partitionBy('Id').orderBy(F.desc('timestamp'))
df = df.select(
    'Id',
     *[F.collect_list(c).over(w1).alias(c) for c in df.columns if c != 'Id']
)
df = (df
    .withColumn('_rn', F.row_number().over(w2))
    .filter('_rn=1')
    .drop('_rn')
)

结果:

df.show()
# +---+----------+------+------+
# | Id| timestamp|  col1|  col2|
# +---+----------+------+------+
# |abc|[123, 789]|[1, 0]|[0, 1]|
# |def|[321, 456]|[0, 1]|[1, 0]|
# +---+----------+------+------+

你也非常接近你所需要的.我玩过,这似乎也很管用:

window_spec = W.partitionBy("Id").orderBy('timestamp')
ranged_spec = window_spec.rowsBetween(W.unboundedPreceding, W.unboundedFollowing)

df1 = (df
    .withColumn("timestamp", F.collect_list("timestamp").over(ranged_spec))
    .withColumn("col1", F.collect_list("col1").over(ranged_spec))
    .withColumn("col2", F.collect_list("col2").over(ranged_spec))
).drop_duplicates()
df1.show()

Python相关问答推荐

matplotlib图中的复杂箭头形状

Python如何导入类的实例

类型对象';敌人';没有属性';损害';

有没有一种方法可以根据不同索引集的数组从2D数组的对称子矩阵高效地构造3D数组?

Python:在cmd中添加参数时的语法

如何有效地计算所有输出相对于参数的梯度?

在FastAPI/Starlette中使用WebSockets时如何运行后台任务?

如何在保持sibling 姐妹美汤的同时插入和删除标签?

将COLUMN BY GROUP中的值连接为列表,并将其赋值给PANAS数据框中的变量

使用Python下载pdf url

有条件的滚动平均数(面试问题)

如何在pandas DataFrame列中保持一个只增加的数字序列?

每像素级图像处理的毕达式优化

带参数约束的类型提示函数*args->;tuple[*args]

返回最后一行以上的所有行,直到满足Python中的条件

如何在Python中返回多行SQL查询

递归函数根据词法作用域的不同而失败

如何从polars中的列表中 Select 所有列

卡方检验中的不同结果

形状摘要_绘图的子图