我正在try 将python函数转换为PySpark用户定义函数,如下所示:

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf,col,array
from pyspark.sql.types import StringType,IntegerType,DecimalType
from datetime import date

def calculateAmount(loandate,loanamount):
    y,m,d = loandate.split('-')[0],loandate.split('-')[1],loandate.split('-')[2]
    ld = date(int(y),int(m),int(d))
    if (date(2010,1,1) <= ld <= date(2015,12,31)):
        fine = 10
    elif (date(2016,1,1) <=ld <= date.today()):
        fine = 5
    return ((100+fine)*int(loanamount))/100

spark = SparkSession.builder.appName("User Defined Functions").getOrCreate()
df = spark.read.options(delimiter = "\t",header = True).csv("../input/applicationloan/loan.txt")

calAmount = udf(lambda interest,amount : calculateAmount(interest,amount),DecimalType())

df = df.withColumn("NewLoanAmount",calAmount(col("loandate"),col("loanamount")))
df.show()

以上代码输出如下:

Output

源文件"loan.txt"的屏幕截图:

loan.txt

上述源文件由制表符分隔.

我正在使用PySpark udf创建一个新的专栏"NewLoanAmount".但是PySpark udf正在返回"NULL"值.在PySpark udf中调用lambda函数似乎有问题.

如何编写具有多个参数的PySpark UDF?我理解用单参数编写PySpark UDF.但使用多个参数似乎令人困惑.

推荐答案

对于您的数据,您不应该创建自定义项.使用本机Spark函数可以轻松完成:

from pyspark.sql imp或t functions as F

df = spark.createDataFrame(
    [('John', '01', '89', 20000, '2020-10-01'),
     ('Monty', '02', '45', 10000, '2015-05-05'),
     ('Roxy', '03', '76', 30000, '2010-10-10')],
    ['name', 'id', 'loannum', 'loanamount', 'loandate'])

def calculateAmount(loandate, loanamount):
    fine = F.when(F.col('loandate').between('2010-01-01', '2015-12-31'), 10) \
            .when(F.col('loandate').between('2016-01-01', F.current_date()), 5)
    return ((100 + fine) * F.col('loanamount').cast('long')) / 100

df = df.withColumn("NewLoanAmount", calculateAmount("loandate", "loanamount"))

df.show()
# +-----+---+-------+----------+----------+-------------+
# |name |id |loannum|loanamount|loandate  |NewLoanAmount|
# +-----+---+-------+----------+----------+-------------+
# |John |01 |89     |20000     |2020-10-01|21000.0      |
# |Monty|02 |45     |10000     |2015-05-05|11000.0      |
# |Roxy |03 |76     |30000     |2010-10-10|33000.0      |
# +-----+---+-------+----------+----------+-------------+

要回答原始问题...

You already have the function definition, so lambda is not needed. And overall, it's simpler to use the dec或at或 @udf instead of the line
calAmount = udf(lambda interest,amount : calculateAmount(interest,amount),DecimalType())

以下工作:

from pyspark.sql.functions imp或t udf, col, array
from pyspark.sql.types imp或t StringType, IntegerType, DecimalType
from datetime imp或t date

df = spark.createDataFrame(
    [('John', '01', '89', 20000, '2020-10-01'),
     ('Monty', '02', '45', 10000, '2015-05-05'),
     ('Roxy', '03', '76', 30000, '2010-10-10')],
    ['name', 'id', 'loannum', 'loanamount', 'loandate'])

@udf
def calculateAmount(loandate, loanamount):
    y, m, d = loandate.split('-')[0], loandate.split('-')[1], loandate.split('-')[2]
    ld = date(int(y), int(m), int(d))
    if (date(2010, 1, 1) <= ld <= date(2015, 12, 31)):
        fine = 10
    elif (date(2016, 1, 1) <= ld <= date.today()):
        fine = 5
    return ((100 + fine) * int(loanamount)) / 100

df = df.withColumn("NewLoanAmount", calculateAmount(col("loandate"), col("loanamount")))

df.show()
# +-----+---+-------+----------+----------+-------------+
# |name |id |loannum|loanamount|loandate  |NewLoanAmount|
# +-----+---+-------+----------+----------+-------------+
# |John |01 |89     |20000     |2020-10-01|21000.0      |
# |Monty|02 |45     |10000     |2015-05-05|11000.0      |
# |Roxy |03 |76     |30000     |2010-10-10|33000.0      |
# +-----+---+-------+----------+----------+-------------+

没有@udf装饰师的方式:

calAmount = udf(calculateAmount)

calAmount = udf(calculateAmount, DoubleType())

In your 或iginal code, you provide DecimalType(), but Python does not do calculations in decimal numbers. Your calculations are done on double precision numbers, not decimals. So, your result is a double precision number, while Spark expects to get a decimal number... This is why you get nulls. To make the python function to return a decimal, you would need to use decimal library. Without it, it's just double precision numbers.

Python相关问答推荐

如何使用pandasDataFrames和scipy高度优化相关性计算

如何在Windows上用Python提取名称中带有逗号的文件?

Excel图表-使用openpyxl更改水平轴与Y轴相交的位置(Python)

如何设置视频语言时上传到YouTube与Python API客户端

我想一列Panadas的Rashrame,这是一个URL,我保存为CSV,可以直接点击

Python列表不会在条件while循环中正确随机化'

Django admin Csrf令牌未设置

在matplotlib中删除子图之间的间隙_mosaic

启动带有参数的Python NTFS会导致文件路径混乱

Python pint将1/华氏度转换为1/摄氏度°°

不允许 Select 北极滚动?

解决Geopandas和Altair中的正图和投影问题

如何编辑此代码,使其从多个EXCEL文件的特定工作表中提取数据以显示在单独的文件中

Django.core.exceptions.SynchronousOnlyOperation您不能从异步上下文中调用它-请使用线程或SYNC_TO_ASYNC

操作布尔值的Series时出现索引问题

大型稀疏CSR二进制矩阵乘法结果中的错误

try 在单个WITH_COLUMNS_SEQ操作中链接表达式时,使用Polars数据帧时出现ComputeError

用LAKEF划分实木地板AWS Wrangler

将鼠标悬停在海运`pairplot`的批注/高亮显示上

使用元组扩展字典的产品挑战