我有一些例子数据:

my_data = [{'id': '001', 'name': 'Sam', 'class': "classA", 'age': 15, 'exam_score': 90},
           {'id': '002', 'name': 'Tom', 'class': "classA", 'age': 15, 'exam_score': 78},
           {'id': '003', 'name': 'Ben', 'class': "classB", 'age': 16, 'exam_score': 91},
           {'id': '004', 'name': 'Max', 'class': "classB", 'age': 16, 'exam_score': 76},
           {'id': '005', 'name': 'Ana', 'class': "classA", 'age': 15, 'exam_score': 88},
           {'id': '006', 'name': 'Ivy', 'class': "classA", 'age': 16, 'exam_score': 77},
           {'id': '007', 'name': 'Eva', 'class': "classB", 'age': 15, 'exam_score': 86},
           {'id': '008', 'name': 'Zoe', 'class': "classB", 'age': 16, 'exam_score': 89}]

my_rdd = sc.parallelize(my_data)
my_rdd

假设我有一些简单的函数:

def divide_by_100(value):
  return value/100

目标是用函数将所有考试分数除以100,然后求出最小分数.我的 idea 是:

  • 按键筛选my_rdd,以便仅保留exam_score中的值
  • divide_by_100()函数应用于此
  • 使用.min().collect()功能打印数据中的最低考试分数

I'm aware that 100 could also be used.

问题是,我不知道如何在使用pyspark的情况下将其付诸实践.会很感激你能帮上忙的.

推荐答案

没有任何关键字的最小分数:

score_rdd = my_rdd.map(lambda my_dict: my_dict['exam_score']/100)
print(score_rdd.min())
# 0.76

这是一个使用PYSPARK RDD的解决方案:

score_rdd = my_rdd.map(lambda my_dict: (my_dict['class'], my_dict['exam_score']/100))
print(score_rdd.collect())
# [('classA', 0.9), ('classA', 0.78), ('classB', 0.91), ('classB', 0.76), ('classA', 0.88), ('classA', 0.77), ('classB', 0.86), ('classB', 0.89)]
from builtins import min # import python function: min
class_min_rdd = score_rdd.reduceByKey(min)
print(class_min_rdd.collect())
# [('classA', 0.77), ('classB', 0.76)]

这是一个使用PYSPARK数据帧的解决方案:

from pyspark.sql.types import *
from pyspark.sql.functions import *

df = my_rdd.toDF()
df.printSchema()
# root
#  |-- age: long (nullable = true)
#  |-- class: string (nullable = true)
#  |-- exam_score: long (nullable = true)
#  |-- id: string (nullable = true)
#  |-- name: string (nullable = true)
df = df.withColumn('score', col('exam_score')/100).groupBy('class').agg(min('score').alias('score'))
df.show(10, False)
# +------+-----+
# |class |score|
# +------+-----+
# |classB|0.76 |
# |classA|0.77 |
# +------+-----+
print(df.collect())
# [Row(class='classB', score=0.76), Row(class='classA', score=0.77)]

Python相关问答推荐

将列表中的元素替换为收件箱中的元素

code _tkinter. Tcl错误:窗口路径名称错误.!按钮4"

在上下文管理器中更改异常类型

使用Keras的线性回归参数估计

将DF中的名称与另一DF拆分并匹配并返回匹配的公司

如何标记Spacy中不包含特定符号的单词?

海运图:调整行和列标签

Telethon加入私有频道

Python虚拟环境的轻量级使用

如何使用它?

如何设置视频语言时上传到YouTube与Python API客户端

如何在Python中找到线性依赖mod 2

使用groupby方法移除公共子字符串

Python导入某些库时非法指令(核心转储)(beautifulsoup4."" yfinance)

ruamel.yaml dump:如何阻止map标量值被移动到一个新的缩进行?

如何获取Python synsets列表的第一个内容?

并行编程:同步进程

Discord.py -

无法在Spyder上的Pandas中将本地CSV转换为数据帧

在Python中控制列表中的数据步长