我有一个DataFrame(DF_Testing),其中包含以下示例数据:
我需要从Amount列中获取最大值.因此,输出DataFrame(Dfnew)如下所示:
我还是一名初学者,所以我使用以下代码遍历了数据帧:
import numpy as np
import pandas as pd
rec_count = df_testing.count()
MaxValuesArray = [] #empty array
TransactionArray = [] #empty array
for i in range(0, rec_count):
vMaxValue = max(df_testing.cache().collect()[i]["Amount"].split(","))
vTransactionId = df_testing.cache().collect()[i]["Id"]
TransactionArray.append(vTransactionId)
MaxValuesArray.append(vMaxValue)
X = np.array([TransactionArray,MaxValuesArray])
Y = {'Id':X[0], 'MaxValue':X[1]}
df = pd.DataFrame(Y) #convert array to panda dataframe
SparkDF = spark.createDataFrame(df) #convert to spark dataframe
a=df_testing.alias("a")
b=SparkDF.alias("b")
dfnew = a.join(b,a.Id == b.Id,"inner").select('a.*','b.MaxValue') #join dataframes
dfnew.show(truncate=False)
虽然上面的代码可以工作,但它的效率非常低.该样本有3条记录,但每天我需要处理大约25000条记录.循环(附在小spark 盘上)25000条记录需要2个多小时.
我的理解是,Pyspark DataFrame非常强大,但我只是不具备作为DataSet的一部分获得最大值的专业知识,而不是遍历DataFrame.
任何帮助都将不胜感激.