我的数据框如下所示:

df1 = spark.createDataFrame([(1, "a"), (1, "b"), (1, "c")], ("col1", "col2"))

        +----+----+
        |col1|col2|
        +----+----+
        |   1|   a|
        |   1|   b|
        |   1|   c|
        +----+----+
        
df2 = spark.createDataFrame([(1, "k1"), (1, "k2"), (1, "k3"),(1,"k4")], ("col1", "col3"))

        +----+----+
        |col1|col3|
        +----+----+
        |   1|  k1|
        |   1|  k2|
        |   1|  k3|
        |   1|  k4|
        +----+----+

我想要生成

df3 = spark.createDataFrame([(1, "a", "k1"), (1, "b", "k2"), (1, "c", "k3"),(1, None, "k4")], ("col1", "col2", "col3"))

即所需输出:

    +----+----+----+
    |col1|col2|col3|
    +----+----+----+
    |   1|   a|  k1|
    |   1|   b|  k2|
    |   1|   c|  k3|
    |   1|null|  k4|
    +----+----+----+

我试了df1.join(df2, on='col1', how="leftouter")次,得到了:

    +----+----+----+
    |col1|col2|col3|
    +----+----+----+
    |   1|   a|  k4|
    |   1|   a|  k3|
    |   1|   a|  k2|
    |   1|   a|  k1|
    |   1|   b|  k4|
    |   1|   b|  k3|
    |   1|   b|  k2|
    |   1|   b|  k1|
    |   1|   c|  k4|
    |   1|   c|  k3|
    |   1|   c|  k2|
    |   1|   c|  k1|
    +----+----+----+

我查了Merge rows from one dataframe that do not match specific columns in another dataframe Python Pandas个.这几乎就是我想要的.但是,它使用的是Pandas DF.我不确定从spark DF切换到Pandas DF来做这个手术是不是一个好主意.有没有一种原生的喷雾方式来做到这一点?

在提供所需输出的转换方面需要帮助.

推荐答案

您可以使用window创建一个表示两个数据帧中行顺序的新rank列,在第一列和这rank列上连接两个数据帧,最后删除rank列,如下所示:

from pyspark.sql import Window
from pyspark.sql import functions as F

window1 = Window.partitionBy("col1").orderBy("col2")
window2 = Window.partitionBy("col1").orderBy("col3")

ranked_df1 = df1.withColumn("rank", F.row_number().over(window1))
ranked_df2 = df2.withColumn("rank", F.row_number().over(window2))

result_df = ranked_df1.join(
    ranked_df2, 
    on=['col1', 'rank'], 
    how='full_outer'
).drop('rank')

使用您问题中定义的df1df2数据帧,您将获得以下result_df个数据帧:

+----+----+----+
|col1|col2|col3|
+----+----+----+
|1   |a   |k1  |
|1   |b   |k2  |
|1   |c   |k3  |
|1   |null|k4  |
+----+----+----+

Python相关问答推荐

Pandas或pyspark跨越列创建

从流程获取定期更新

有没有办法清除气流中的僵尸

我可以使用极点优化这个面向cpu的pandas代码吗?

使用Python OpenCV的文本检测分割

将列表中的元素替换为收件箱中的元素

在Arrow上迭代的快速方法.Julia中包含3000万行和25列的表

运行回文查找器代码时发生错误:[类型错误:builtin_index_or_system对象不可订阅]

如何计算两极打印机中 * 所有列 * 的出现次数?

我从带有langchain的mongoDB中的vector serch获得一个空数组

如何检测背景有噪的图像中的正方形

当使用keras.utils.Image_dataset_from_directory仅加载测试数据集时,结果不同

如何将双框框列中的成对变成两个新列

rame中不兼容的d类型

有没有一种ONE—LINER的方法给一个框架的每一行一个由整数和字符串组成的唯一id?

如何在图中标记平均点?

Python脚本使用蓝牙运行在Windows 11与raspberry pi4

如何使用SentenceTransformers创建矢量嵌入?

我的字符串搜索算法的平均时间复杂度和最坏时间复杂度是多少?

在Python 3中,如何让客户端打开一个套接字到服务器,发送一行JSON编码的数据,读回一行JSON编码的数据,然后继续?