从你的问题中,我知道你对PySpark很熟悉.这就是如何使用PySpark数据帧来完成它.即使它使用外部itertools
库,它也应该表现良好,因为该部件驻留在pandas_udf
内,该pandas_udf
是为了性能而矢量化的.
输入df:
import pandas as pd
pdf = pd.DataFrame({
'date': {
0: pd.Timestamp('2021-08-01'),
1: pd.Timestamp('2021-08-01'),
2: pd.Timestamp('2021-08-02'),
3: pd.Timestamp('2021-08-03'),
4: pd.Timestamp('2021-08-03'),
5: pd.Timestamp('2021-08-02'),
6: pd.Timestamp('2021-08-02')
},
'product_nr': {0: '1', 1: '2', 2: '3', 3: '4', 4: '5', 5: '6', 6: '7'},
'Category': {0: 'Cars', 1: 'Cars', 2: 'Cats', 3: 'Dogs', 4: 'Dogs', 5: 'Cats', 6 :'Cats'},
'price': {
0: '34',
1: '24',
2: '244',
3: '284',
4: '274',
5: '354',
6 : '250'
}
})
df = spark.createDataFrame(pdf)
这是一个脚本:
from pyspark.sql import functions as F
from itertools import combinations
@F.pandas_udf('array<array<string>>')
def arr_combinations(c: pd.Series) -> pd.Series:
return c.apply(lambda x: list(combinations(x, 2)))
df2 = df.groupBy('Category', 'date').agg(F.collect_list('product_nr').alias('ps'))
df2 = df2.withColumn('ps', F.explode(arr_combinations('ps')))
df2 = df2.select(
'Category', 'date',
F.col('ps')[0].alias('product_nr'),
F.col('ps')[1].alias('product_to_be_compared')
)
df3 = df.join(df2, ['product_nr', 'Category', 'date'])
df3.show()
# +----------+--------+-------------------+-----+----------------------+
# |product_nr|Category| date|price|product_to_be_compared|
# +----------+--------+-------------------+-----+----------------------+
# | 3| Cats|2021-08-02 00:00:00| 244| 7|
# | 3| Cats|2021-08-02 00:00:00| 244| 6|
# | 1| Cars|2021-08-01 00:00:00| 34| 2|
# | 6| Cats|2021-08-02 00:00:00| 354| 7|
# | 4| Dogs|2021-08-03 00:00:00| 284| 5|
# +----------+--------+-------------------+-----+----------------------+
如果要在此表中直接比较价格,请使用以下选项:
from pyspark.sql import functions as F
from itertools import combinations
@F.pandas_udf('array<array<array<string>>>')
def arr_combinations(c: pd.Series) -> pd.Series:
return c.apply(lambda x: list(combinations(x, 2)))
df2 = df.groupBy('Category', 'date').agg(F.collect_list(F.array('product_nr', 'price')).alias('ps'))
df2 = df2.withColumn('ps', F.explode(arr_combinations('ps')))
df2 = df2.select(
F.col('ps')[0][0].alias('product_nr'),
'Category',
'date',
F.col('ps')[0][1].alias('product_price'),
F.col('ps')[1][0].alias('product_to_be_compared'),
F.col('ps')[1][1].alias('product_to_be_compared_price'),
)
df2.show()
# +----------+--------+-------------------+-------------+----------------------+----------------------------+
# |product_nr|Category| date|product_price|product_to_be_compared|product_to_be_compared_price|
# +----------+--------+-------------------+-------------+----------------------+----------------------------+
# | 1| Cars|2021-08-01 00:00:00| 34| 2| 24|
# | 3| Cats|2021-08-02 00:00:00| 244| 6| 354|
# | 3| Cats|2021-08-02 00:00:00| 244| 7| 250|
# | 6| Cats|2021-08-02 00:00:00| 354| 7| 250|
# | 4| Dogs|2021-08-03 00:00:00| 284| 5| 274|
# +----------+--------+-------------------+-------------+----------------------+----------------------------+