我有一个PySpark DataFrame,看起来像这样:

df = spark.createDataFrame(
    data=[
    (1, "GERMANY", "20230606", True),
    (2, "GERMANY", "20230620", False),
    (3, "GERMANY", "20230627", True),
    (4, "GERMANY", "20230705", True),
    (5, "GERMANY", "20230714", False),
    (6, "GERMANY", "20230715", True),
    ],
    schema=["ID", "COUNTRY", "DATE", "FLAG"]
)
df.show()
+---+-------+--------+-----+
| ID|COUNTRY|    DATE| FLAG|
+---+-------+--------+-----+
|  1|GERMANY|20230606| true|
|  2|GERMANY|20230620|false|
|  3|GERMANY|20230627| true|
|  4|GERMANY|20230705| true|
|  5|GERMANY|20230714|false|
|  6|GERMANY|20230715| true|
+---+-------+--------+-----+

DataFrame有更多的国家.我想创建一个新的列COUNT_WITH_RESET,遵循以下逻辑:

  • 如果是FLAG=False,那么是COUNT_WITH_RESET=0.
  • 如果为FLAG=True,则COUNT_WITH_RESET应计算从上次日期开始的行数,其中FLAG=False表示该特定国家/地区.

这应该是上面示例的输出.

+---+-------+--------+-----+----------------+
| ID|COUNTRY|    DATE| FLAG|COUNT_WITH_RESET|
+---+-------+--------+-----+----------------+
|  1|GERMANY|20230606| true|               1|
|  2|GERMANY|20230620|false|               0|
|  3|GERMANY|20230627| true|               1|
|  4|GERMANY|20230705| true|               2|
|  5|GERMANY|20230714|false|               0|
|  6|GERMANY|20230715| true|               1|
+---+-------+--------+-----+----------------+

我试着把row_number()个放在windows 上,但我没办法重置计数.我也试过.rowsBetween(Window.unboundedPreceding, Window.currentRow).以下是我的做法:

from pyspark.sql.window import Window
import pyspark.sql.functions as F

window_reset = Window.partitionBy("COUNTRY").orderBy("DATE")

df_with_reset = (
    df
    .withColumn("COUNT_WITH_RESET", F.when(~F.col("FLAG"), 0)
                .otherwise(F.row_number().over(window_reset)))
)

df_with_reset.show()
+---+-------+--------+-----+----------------+
| ID|COUNTRY|    DATE| FLAG|COUNT_WITH_RESET|
+---+-------+--------+-----+----------------+
|  1|GERMANY|20230606| true|               1|
|  2|GERMANY|20230620|false|               0|
|  3|GERMANY|20230627| true|               3|
|  4|GERMANY|20230705| true|               4|
|  5|GERMANY|20230714|false|               0|
|  6|GERMANY|20230715| true|               6|
+---+-------+--------+-----+----------------+

这显然是错误的,因为我的窗口仅按国家/地区进行分区,但我的思路正确吗?在PySpark中有没有特定的内置函数来实现这一点?我需要UDF吗?任何帮助都将不胜感激.

推荐答案

将数据帧划分为COUNTRY,然后计算倒置的FLAG列上的累积和以分配组号,以便区分以false开始的不同blocks

W1 = Window.partitionBy('COUNTRY').orderBy('DATE')
df1 = df.withColumn('blocks', F.sum((~F.col('FLAG')).cast('long')).over(W1))

df1.show()
# +---+-------+--------+-----+------+
# | ID|COUNTRY|    DATE| FLAG|blocks|
# +---+-------+--------+-----+------+
# |  1|GERMANY|20230606| true|     0|
# |  2|GERMANY|20230620|false|     1|
# |  3|GERMANY|20230627| true|     1|
# |  4|GERMANY|20230705| true|     1|
# |  5|GERMANY|20230714|false|     2|
# |  6|GERMANY|20230715| true|     2|
# +---+-------+--------+-----+------+

COUNTRYblocks对该框架进行分区,然后计算有序分区上的行数,以创建顺序计数器

W2 = Window.partitionBy('COUNTRY', 'blocks').orderBy('DATE')
df1 = df1.withColumn('COUNT_WITH_RESET', F.row_number().over(W2) - 1)


df1.show()
# +---+-------+--------+-----+------+----------------+
# | ID|COUNTRY|    DATE| FLAG|blocks|COUNT_WITH_RESET|
# +---+-------+--------+-----+------+----------------+
# |  1|GERMANY|20230606| true|     0|               0|
# |  2|GERMANY|20230620|false|     1|               0|
# |  3|GERMANY|20230627| true|     1|               1|
# |  4|GERMANY|20230705| true|     1|               2|
# |  5|GERMANY|20230714|false|     2|               0|
# |  6|GERMANY|20230715| true|     2|               1|
# +---+-------+--------+-----+------+----------------+

Python相关问答推荐

使用Keras的线性回归参数估计

线性模型PanelOLS和statmodels OLS之间的区别

Polars比较了两个预设-有没有方法在第一次不匹配时立即失败

如何列举Pandigital Prime Set

将9个3x3矩阵按特定顺序排列成9x9矩阵

如何在UserSerializer中添加显式字段?

转换为浮点,pandas字符串列,混合千和十进制分隔符

使用特定值作为引用替换数据框行上的值

Pandas:计算中间时间条目的总时间增量

人口全部乱序 - Python—Matplotlib—映射

删除特定列后的所有列

如何在Airflow执行日期中保留日期并将时间转换为00:00

裁剪数字.nd数组引发-ValueError:无法将空图像写入JPEG

提取最内层嵌套链接

如何使用大量常量优化代码?

Django.core.exceptions.SynchronousOnlyOperation您不能从异步上下文中调用它-请使用线程或SYNC_TO_ASYNC

对于标准的原始类型注释,从键入`和`从www.example.com `?

如何在Quarto中的标题页之前创建序言页

如何获取给定列中包含特定值的行号?

利用广播使减法更有效率