一些模拟数据:

pd.DataFrame({'date': {0: Timestamp('2021-08-01 '),
  1: Timestamp('2022-08-01 '),
  2: Timestamp('2021-08-02 '),
  3: Timestamp('2021-08-01 '),
  4: Timestamp('2022-08-01 '),
  5: Timestamp('2022-08-01 '),
  6: Timestamp('2022-08-01 ')                   },
 'product_nr': {0: 1, 1: 2, 2: 3, 3: 4, 4: 5, 5: 6, 6: 7},
 'Category': {0:  'Cars', 1: 'Cars', 2: 'Cats', 3: 'Dogs', 4: 'Dogs', 5: 'Cats', 6 :'Cats'},
 'price': {0: '34',
  1: '24',
  2: '244',
  3: '284',
  4: '274',
  5: '354',
  6 : '250'}} )

如何在具有特定条件的同一数据帧上执行内部联接?我想比较同一类别的行之间的价格. 所需输出:

pd.DataFrame({
 'product_nr': {0: 1,  1: 3,  2: 5, 3: 7, 4:7},
 'Category': {0:  'Cars',  1: 'Cats', 2: 'Dogs', 3:'Cats', 4:'Cats'},
 'price': {0: '34',
  1: '244',
  2: '274',
  3: '250',
  4: '250'},
 'product_to_be_compared' : {0: 2,  1: 6,  2: 4, 3:3 , 4:6}
} )

也就是说,我想进行内连接/交叉连接(不确定哪种连接最合适).我有一个很大的数据帧,如果行属于相同的类别和日期,我想将它们配对在一起.理想情况下,我更喜欢删除重复的对,这意味着我想要的输出应该是4行.

推荐答案

从你的问题中,我知道你对PySpark很熟悉.这就是如何使用PySpark数据帧来完成它.即使它使用外部itertools库,它也应该表现良好,因为该部件驻留在pandas_udf内,该pandas_udf是为了性能而矢量化的.

输入df:

import pandas as pd

pdf = pd.DataFrame({
    'date': {
        0: pd.Timestamp('2021-08-01'),
        1: pd.Timestamp('2021-08-01'),
        2: pd.Timestamp('2021-08-02'),
        3: pd.Timestamp('2021-08-03'),
        4: pd.Timestamp('2021-08-03'),
        5: pd.Timestamp('2021-08-02'),
        6: pd.Timestamp('2021-08-02')
    },
    'product_nr': {0: '1', 1: '2', 2: '3', 3: '4', 4: '5', 5: '6', 6: '7'},
    'Category': {0:  'Cars', 1: 'Cars', 2: 'Cats', 3: 'Dogs', 4: 'Dogs', 5: 'Cats', 6 :'Cats'},
    'price': {
        0: '34',
        1: '24',
        2: '244',
        3: '284',
        4: '274',
        5: '354',
        6 : '250'
    }
})
df = spark.createDataFrame(pdf)

这是一个脚本:

from pyspark.sql import functions as F
from itertools import combinations

@F.pandas_udf('array<array<string>>')
def arr_combinations(c: pd.Series) -> pd.Series:
    return c.apply(lambda x: list(combinations(x, 2)))

df2 = df.groupBy('Category', 'date').agg(F.collect_list('product_nr').alias('ps'))
df2 = df2.withColumn('ps', F.explode(arr_combinations('ps')))
df2 = df2.select(
    'Category', 'date',
    F.col('ps')[0].alias('product_nr'),
    F.col('ps')[1].alias('product_to_be_compared')
)
df3 = df.join(df2, ['product_nr', 'Category', 'date'])

df3.show()
# +----------+--------+-------------------+-----+----------------------+
# |product_nr|Category|               date|price|product_to_be_compared|
# +----------+--------+-------------------+-----+----------------------+
# |         3|    Cats|2021-08-02 00:00:00|  244|                     7|
# |         3|    Cats|2021-08-02 00:00:00|  244|                     6|
# |         1|    Cars|2021-08-01 00:00:00|   34|                     2|
# |         6|    Cats|2021-08-02 00:00:00|  354|                     7|
# |         4|    Dogs|2021-08-03 00:00:00|  284|                     5|
# +----------+--------+-------------------+-----+----------------------+

如果要在此表中直接比较价格,请使用以下选项:

from pyspark.sql import functions as F
from itertools import combinations

@F.pandas_udf('array<array<array<string>>>')
def arr_combinations(c: pd.Series) -> pd.Series:
    return c.apply(lambda x: list(combinations(x, 2)))

df2 = df.groupBy('Category', 'date').agg(F.collect_list(F.array('product_nr', 'price')).alias('ps'))
df2 = df2.withColumn('ps', F.explode(arr_combinations('ps')))
df2 = df2.select(
    F.col('ps')[0][0].alias('product_nr'),
    'Category',
    'date',
    F.col('ps')[0][1].alias('product_price'),
    F.col('ps')[1][0].alias('product_to_be_compared'),
    F.col('ps')[1][1].alias('product_to_be_compared_price'),
)
df2.show()
# +----------+--------+-------------------+-------------+----------------------+----------------------------+
# |product_nr|Category|               date|product_price|product_to_be_compared|product_to_be_compared_price|
# +----------+--------+-------------------+-------------+----------------------+----------------------------+
# |         1|    Cars|2021-08-01 00:00:00|           34|                     2|                          24|
# |         3|    Cats|2021-08-02 00:00:00|          244|                     6|                         354|
# |         3|    Cats|2021-08-02 00:00:00|          244|                     7|                         250|
# |         6|    Cats|2021-08-02 00:00:00|          354|                     7|                         250|
# |         4|    Dogs|2021-08-03 00:00:00|          284|                     5|                         274|
# +----------+--------+-------------------+-------------+----------------------+----------------------------+

Python相关问答推荐

Pandas :多索引组

仅从风格中获取 colored颜色 循环

Pydantic 2.7.0模型接受字符串日期时间或无

Pandas实际上如何对基于自定义的索引(integer和非integer)执行索引

Odoo 14 hr. emergency.public内的二进制字段

试图找到Python方法来部分填充numpy数组

如何使用matplotlib在Python中使用规范化数据和原始t测试值创建组合热图?

Python上的Instagram API:缺少client_id参数"

' osmnx.shortest_track '返回有效源 node 和目标 node 的'无'

如何让程序打印新段落上的每一行?

Pandas:将多级列名改为一级

如何使Matplotlib标题以图形为中心,而图例框则以图形为中心

当点击tkinter菜单而不是菜单选项时,如何执行命令?

从嵌套的yaml创建一个嵌套字符串,后面跟着点

在Python中调用变量(特别是Tkinter)

使用字典或列表的值组合

从嵌套极轴列的列表中删除元素

如何在Python中解析特定的文本,这些文本包含了同一行中的所有内容,

TypeError:';Locator';对象无法在PlayWriter中使用.first()调用

将像素信息写入文件并读取该文件