我想找出这对曾经联系过的人.以下是数据:

Input is
K-\> M, H 
M-\> K, E
H-\> F
B-\> T, H
E-\> K, H
F-\> K, H, E
A-\> Z

输出结果为:

Output:
K, M //(this means K has supplied goods to M and M has also supplied some good to K)
H, F

这是我写的代码.

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, SQLContext
from pyspark.ml.regression import LinearRegression
import re
from itertools import combinations

spark = SparkContext("local", "DoubleRDD")


def findpairs(ls):
    lst = []
    for i in range(0,len(ls)-1):
        for j in range(i+1, len(ls)):
            if ls[i] == tuple(reversed(ls[j])):
                lst.append(ls[i])   
    return(lst)

text  = spark.textFile("path to the .txt")
text  = text.map(lambda s: s.replace("->",","))
text  = text.map(lambda s: s.replace(",",""))
text  = text.map(lambda s: s.replace(" ",""))
pairs = text.flatMap(lambda x:  [(x[0],y) for y in x[1:]])
commonpairs = pairs.filter(lambda x: findpairs(x))
pairs.collect()
The output is: []

推荐答案

不要使用RDD,这个问题可以使用原生的spark框架函数来解决.将文本文件作为spark框架读取

df = spark.read.csv('data.txt', header=False, sep=r'-\\> ').toDF('x', 'y')

# +---+-------+
# |  x|      y|
# +---+-------+
# |  K|   M, H|
# |  M|   K, E|
# |  H|      F|
# |  B|   T, H|
# |  E|   K, H|
# |  F|K, H, E|
# |  A|     Zs|
# +---+-------+

拆分和分解收件人(Y)列

df1 = df.withColumn('y', F.explode(F.split('y', r',\s+')))

# +---+---+
# |  x|  y|
# +---+---+
# |  K|  M|
# |  K|  H|
# |  M|  K|
# |  M|  E|
# |  H|  F|
# |  B|  T|
# |  B|  H|
# |  E|  K|
# |  E|  H|
# |  F|  K|
# |  F|  H|
# |  F|  E|
# |  A| Zs|
# +---+---+

自己加入数据帧,其中左边的接收者是右边的数据帧的发送者.然后过滤数据帧,使左侧的发送方与右侧的接收方相同

df1 = df1.alias('left').join(df1.alias('right'), on=F.expr("left.y == right.x"))
df1 = df1.filter("left.x == right.y")

# +---+---+---+---+
# |  x|  y|  x|  y|
# +---+---+---+---+
# |  K|  M|  M|  K|
# |  M|  K|  K|  M|
# |  H|  F|  F|  H|
# |  F|  H|  H|  F|
# +---+---+---+---+

删除发件人和收件人的重复组合

df1 = df1.select('left.*').withColumn('pairs', F.array_sort(F.array('x', 'y')))
df1 = df1.dropDuplicates(['pairs']).drop('pairs')

# +---+---+
# |  x|  y|
# +---+---+
# |  H|  F|
# |  K|  M|
# +---+---+

Python相关问答推荐

从Python调用GMP C函数时的分段错误和内存泄漏

customtkinter中使用的这个小部件的名称是什么

如何将Matplotlib的fig.add_axes本地坐标与我的坐标关联起来?

模型序列化器中未调用现场验证器

更改Seaborn条形图中的x轴日期时间限制

阅读Polars Python中管道的函数定义

基本链合同的地址是如何计算的?

如何使用Python中的clinicalTrials.gov API获取完整结果?

比较两个数据帧并并排附加结果(获取性能警告)

Polars LazyFrame在收集后未返回指定的模式顺序

Gekko:Spring-Mass系统的参数识别

使可滚动框架在tkinter环境中看起来自然

从groupby执行计算后创建新的子框架

SQLAlchemy Like ALL ORM analog

部分视图的DataFrame

在含噪声的3D点网格中识别4连通点模式

driver. find_element无法通过class_name找到元素'""

在Python中计算连续天数

如何在BeautifulSoup/CSS Select 器中处理regex?

我对这个简单的异步者的例子有什么错误的理解吗?