示例代码.第cumulative_pass
列是我想要以编程方式创建的-
import pandas as pd
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql import Window
import sys
spark_session = SparkSession.builder.getOrCreate()
df_data = {'username': ['bob','bob', 'bob', 'bob', 'bob', 'bob', 'bob', 'bob'],
'session': [1,2,3,4,5,6,7,8],
'year_start': [2020,2020,2020,2020,2020,2021,2022,2023],
'year_end': [2020,2020,2020,2020,2021,2021,2022,2023],
'pass': [1,0,0,0,0,1,1,0],
'cumulative_pass': [0,0,0,0,0,1,2,3],
}
df_pandas = pd.DataFrame.from_dict(df_data)
df = spark_session.createDataFrame(df_pandas)
df.show()
最后的show
个将是这个-
+--------+-------+----------+--------+----+---------------+
|username|session|year_start|year_end|pass|cumulative_pass|
+--------+-------+----------+--------+----+---------------+
| bob| 1| 2020| 2020| 1| 0|
| bob| 2| 2020| 2020| 0| 0|
| bob| 3| 2020| 2020| 0| 0|
| bob| 4| 2020| 2020| 0| 0|
| bob| 5| 2020| 2021| 0| 0|
| bob| 6| 2021| 2021| 1| 1|
| bob| 7| 2022| 2022| 1| 2|
| bob| 8| 2023| 2023| 0| 3|
+--------+-------+----------+--------+----+---------------+
当当前行的year_start
是先前行的year_end
时,cumulative_pass
列将对所有先前行的pass
列求和
我的try (由于语法原因不起作用)-
def conditional_sum(data: pd.DataFrame) -> int:
# df = data.apply(pd.Series) # transform dict into separate columns
return df.loc[df['year_start'].max() > df['year_end']]['pass'].sum()
udf_conditional_sum = F.pandas_udf(conditional_sum, IntegerType())
w = Window.partitionBy("username").orderBy(F.asc("year_start"), F.desc("year_end")).rowsBetween(-sys.maxsize, -1)
df = df.withColumn("calculate_cumulative_pass", udf_conditional_sum(F.struct("year_start", "year_end", "pass")).over(w))