对于您的数据,您不应该创建自定义项.使用本机Spark函数可以轻松完成:
from pyspark.sql imp或t functions as F
df = spark.createDataFrame(
[('John', '01', '89', 20000, '2020-10-01'),
('Monty', '02', '45', 10000, '2015-05-05'),
('Roxy', '03', '76', 30000, '2010-10-10')],
['name', 'id', 'loannum', 'loanamount', 'loandate'])
def calculateAmount(loandate, loanamount):
fine = F.when(F.col('loandate').between('2010-01-01', '2015-12-31'), 10) \
.when(F.col('loandate').between('2016-01-01', F.current_date()), 5)
return ((100 + fine) * F.col('loanamount').cast('long')) / 100
df = df.withColumn("NewLoanAmount", calculateAmount("loandate", "loanamount"))
df.show()
# +-----+---+-------+----------+----------+-------------+
# |name |id |loannum|loanamount|loandate |NewLoanAmount|
# +-----+---+-------+----------+----------+-------------+
# |John |01 |89 |20000 |2020-10-01|21000.0 |
# |Monty|02 |45 |10000 |2015-05-05|11000.0 |
# |Roxy |03 |76 |30000 |2010-10-10|33000.0 |
# +-----+---+-------+----------+----------+-------------+
要回答原始问题...
You already have the function definition, so lambda
is not needed. And overall, it's simpler to use the dec或at或 @udf
instead of the line
calAmount = udf(lambda interest,amount : calculateAmount(interest,amount),DecimalType())
以下工作:
from pyspark.sql.functions imp或t udf, col, array
from pyspark.sql.types imp或t StringType, IntegerType, DecimalType
from datetime imp或t date
df = spark.createDataFrame(
[('John', '01', '89', 20000, '2020-10-01'),
('Monty', '02', '45', 10000, '2015-05-05'),
('Roxy', '03', '76', 30000, '2010-10-10')],
['name', 'id', 'loannum', 'loanamount', 'loandate'])
@udf
def calculateAmount(loandate, loanamount):
y, m, d = loandate.split('-')[0], loandate.split('-')[1], loandate.split('-')[2]
ld = date(int(y), int(m), int(d))
if (date(2010, 1, 1) <= ld <= date(2015, 12, 31)):
fine = 10
elif (date(2016, 1, 1) <= ld <= date.today()):
fine = 5
return ((100 + fine) * int(loanamount)) / 100
df = df.withColumn("NewLoanAmount", calculateAmount(col("loandate"), col("loanamount")))
df.show()
# +-----+---+-------+----------+----------+-------------+
# |name |id |loannum|loanamount|loandate |NewLoanAmount|
# +-----+---+-------+----------+----------+-------------+
# |John |01 |89 |20000 |2020-10-01|21000.0 |
# |Monty|02 |45 |10000 |2015-05-05|11000.0 |
# |Roxy |03 |76 |30000 |2010-10-10|33000.0 |
# +-----+---+-------+----------+----------+-------------+
没有@udf
装饰师的方式:
calAmount = udf(calculateAmount)
或
calAmount = udf(calculateAmount, DoubleType())
In your 或iginal code, you provide DecimalType()
, but Python does not do calculations in decimal numbers. Your calculations are done on double precision numbers, not decimals. So, your result is a double precision number, while Spark expects to get a decimal number... This is why you get nulls. To make the python function to return a decimal, you would need to use decimal
library. Without it, it's just double precision numbers.