有没有一种标准的方法来计算pyspark中的加权平均值,忽略分母和中的缺失值?

以下面的例子为例:

# create data
data2 = [(1,1,1,1),
         (1,None,1,2),
         (2,1,1,1),
         (2,3,1,2),
  ]

schema = (StructType([ 
    StructField("group",IntegerType(),True), 
    StructField("var1",IntegerType(),True), 
    StructField("var2",IntegerType(),True), 
    StructField("wght", IntegerType(), True), 
  ]))


df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)

+-----+----+----+----+
|group|var1|var2|wght|
+-----+----+----+----+
|1    |1   |1   |1   |
|1    |null|1   |2   |
|2    |1   |1   |1   |
|2    |3   |1   |2   |
+-----+----+----+----+

我可以计算其他地方记录的加权平均值:

(df.groupBy("group").agg(
     (F.sum(col("var1")*col("wght"))/F.sum("wght")).alias("wgtd_var1"),
     (F.sum(col("var2")*col("wght"))/F.sum("wght")).alias("wgtd_var2")).show(truncate=False))

+-----+------------------+---------+
|group|wgtd_var1         |wgtd_var2|
+-----+------------------+---------+
|1    |0.3333333333333333|1.0      |
|2    |2.3333333333333335|1.0      |
+-----+------------------+---------+

但问题是,对于第一组,Weighted average应该是1,因为第二次观察不能使用.我可以

# get new weights
df = (df.withColumn("wghtvar1", F.when(col("var1").isNull(), None)
                                 .otherwise(col("wght")))
        .withColumn("wghtvar2", F.when(col("var2").isNull(), None)
                                 .otherwise(col("wght"))))

# compute correct weighted average
(df.groupBy("group").agg(
     (F.sum(col("var1")*col("wghtvar1"))/F.sum("wghtvar1")).alias("wgtd_var1"),
     (F.sum(col("var2")*col("wghtvar2"))/F.sum("wghtvar2")).alias("wgtd_var2")).show(truncate=False))

+-----+------------------+---------+
|group|wgtd_var1         |wgtd_var2|
+-----+------------------+---------+
|1    |1.0               |1.0      |
|2    |2.3333333333333335|1.0      |
+-----+------------------+---------+

有没有一种规范的方法可以做到这一点?

推荐答案

虽然差别不大,但至少这样可以避免为每个变量创建新的wght列.

条件聚合.

df = (df.groupby('group')
      .agg(
          (F.sum(F.when(F.col('var1').isNotNull(), F.col('var1') * F.col('wght'))) 
           /
          (F.sum(F.when(F.col('var1').isNotNull(), F.col('wght'))))
          ).alias('wgtd_var1')
      ))

为了将其应用于var个数字,我可以使用列表理解.

df = (df.groupby('group')
      .agg(*[
          (F.sum(F.when(F.col(x).isNotNull(), F.col(x) * F.col('wght'))) 
           /
          (F.sum(F.when(F.col(x).isNotNull(), F.col('wght'))))
          ).alias(f'wgtd_{x}')
          for x in ['var1', 'var2']
      ]))

Python相关问答推荐

如何创建多个继承类的实例?

如何计算Pandas 数据框中单元格内的行数,这些行数不是空行

Ctypes 数据类型转换

我正在try 堆叠、融化、grouby 或reshape Pandas DataFrame

反转的正则表达式模式比原始模式慢得多

使用 Selenium 提取/保存元素的文本和图像

计算区间中素数的算法不起作用

如何在我的 HTML 代码中使用 pyscript 并返回输出

如何在python中的多处理器之间进行竞赛

将 pandasql 输出分配给 DataFrame 中的新列

如何为 ManyToMany 字段添加排序?

遍历python中的列表并删除元素中字符的第二个实例之后的字符

cupy.asnumpy() 和 get() 之间的区别

使用groupby后如何获取数据框中的最小索引

在 Python 中的变量后使用括号有什么作用?

NNC 的整数规划

Python:如何将列表中以某个字母开头的元素复制到新列表中或从列表中删除不以字母开头的元素

带有 APIRouter 插件系统的 FastAPI 无法正常工作

Python:去除成对的列名

使用 Python OpenCV cv2.VideoCapture() 直接以灰度读取视频帧