以下是一些转换为RDD的示例数据:

my_data = [{'id': '001', 'name': 'Sam', 'class': "classA", 'age': 15, 'exam_score': '90'},
           {'id': '002', 'name': 'Tom', 'class': "classA", 'age': 15, 'exam_score': '78'},
           {'id': '003', 'name': 'Ben', 'class': "classB", 'age': 16, 'exam_score': '91'},
           {'id': '004', 'name': 'Max', 'class': "classB", 'age': 16, 'exam_score': '76'},
           {'id': '005', 'name': 'Ana', 'class': "classA", 'age': 15, 'exam_score': '88'},
           {'id': '006', 'name': 'Ivy', 'class': "classA", 'age': 16, 'exam_score': '77'},
           {'id': '007', 'name': 'Eva', 'class': "classB", 'age': 15, 'exam_score': '86'},
           {'id': '008', 'name': 'Zoe', 'class': "classB", 'age': 16, 'exam_score': '89'}]

my_rdd = sc.parallelize(my_data)

仅运行my_rdd就会返回:

#>>> ParallelCollectionRDD[117] at readRDDFromFile at PythonRDD.scala:274

我知道你可以用my_rdd.collect()来显示RDD,它返回:

#[{'age': 15,
#  'class': 'classA',
#  'exam_score': '90',
#  'id': '001',
#  'name': 'Sam'},
# {'age': 15,
#  'class': 'classA',
#  'exam_score': '78',
#  'id': '002',
#  'name': 'Tom'}, ...]

我发现我可以通过运行my_rdd.keys()来访问密钥,但这将返回:

#>>> PythonRDD[121] at RDD at PythonRDD.scala:53

我想返回RDD中所有不同键的列表(我知道每行的键都是相同的,但我想知道它们不同的场景)-因此如下所示:

#>>> ['id', 'name', 'class', 'age', 'exam_score']

因此,我假设我可以通过运行my_rdd.keys().distinct.collect()得到这个结果,但是我得到了一个错误.

我还在学习PYSSPARK,如果有人能提供一些意见,我将不胜感激:)

推荐答案

my_data = [{'id': '001', 'name': 'Sam', 'class': "classA", 'age': 15, 'exam_score': '90'},
           {'id': '008', 'name': 'Zoe', 'xxxxx': "classB", 'age': 16, 'exam_score': '89'},
           {'id': '007', 'name': 'Eva', 'class': "classB", 'age': 15, 'exam_score': '86'},
        ]

my_rdd = sc.parallelize(my_data)

key_rdd = my_rdd.flatMap(lambda x: x) # flatMap

print( key_rdd.collect() )
# ['id', 'name', 'class', 'age', 'exam_score', 'id', 'name', 'xxxxx', 'age', 'exam_score', 'id', 'name', 'class', 'age', 'exam_score']
print( key_rdd.distinct().collect() )
# ['class', 'id', 'exam_score', 'age', 'name', 'xxxxx']

Python相关问答推荐

使用setuptools pyproject.toml和自定义目录树构建PyPi包

Python,Fitting into a System of Equations

为什么以这种方式调用pd.ExcelWriter会创建无效的文件格式或扩展名?

无法在Docker内部运行Python的Matlab SDK模块,但本地没有问题

如何在UserSerializer中添加显式字段?

根据列值添加时区

我的字符串搜索算法的平均时间复杂度和最坏时间复杂度是多少?

在两极中过滤

如何更改groupby作用域以找到满足掩码条件的第一个值?

使用BeautifulSoup抓取所有链接

在Python中计算连续天数

寻找Regex模式返回与我当前函数类似的结果

找到相对于列表索引的当前最大值列表""

Python—压缩叶 map html作为邮箱附件并通过sendgrid发送

判断Python操作:如何从字面上得到所有decorator ?

获取git修订版中每个文件的最后修改时间的最有效方法是什么?

PySpark:如何最有效地读取不同列位置的多个CSV文件

Django.core.exceptions.SynchronousOnlyOperation您不能从异步上下文中调用它-请使用线程或SYNC_TO_ASYNC

如何在Polars中创建条件增量列?

ValueError:必须在Pandas 中生成聚合值