我还定义了一个名为info的列:

|     Timestamp     |   info   |
+-------------------+----------+
|2016-01-01 17:54:30|     0    |
|2016-02-01 12:16:18|     0    |
|2016-03-01 12:17:57|     0    |
|2016-04-01 10:05:21|     0    |
|2016-05-11 18:58:25|     1    |
|2016-06-11 11:18:29|     1    |
|2016-07-01 12:05:21|     0    |
|2016-08-11 11:58:25|     0    |
|2016-09-11 15:18:29|     1    |

我想计算连续出现的1,否则插入0.最后一列是:

--------------------+----------+----------+
|     Timestamp     |   info   |    res   |
+-------------------+----------+----------+
|2016-01-01 17:54:30|     0    |     0    |
|2016-02-01 12:16:18|     0    |     0    |
|2016-03-01 12:17:57|     0    |     0    |
|2016-04-01 10:05:21|     0    |     0    |
|2016-05-11 18:58:25|     1    |     1    |
|2016-06-11 11:18:29|     1    |     2    |
|2016-07-01 12:05:21|     0    |     0    |
|2016-08-11 11:58:25|     0    |     0    |
|2016-09-11 15:18:29|     1    |     1    |

我try 使用以下函数,但没有成功.

df_input = df_input.withColumn(
    "res",
    F.when(
        df_input.info == F.lag(df_input.info).over(w1),
        F.sum(F.lit(1)).over(w1)
    ).otherwise(0)
)

推荐答案

Adding a column counting cumulative pervious repeating values,学分到@Blackishop

from pyspark.sql import functions as F, Window

df = spark.createDataFrame([0, 0, 0, 0, 1, 1, 0, 0, 1], 'int').toDF('info')

df.withColumn("ID", F.monotonically_increasing_id()) \
    .withColumn("group",
            F.row_number().over(Window.orderBy("ID"))
            - F.row_number().over(Window.partitionBy("info").orderBy("ID"))
    ) \
    .withColumn("Result", F.when(F.col('info') != 0, F.row_number().over(Window.partitionBy("group").orderBy("ID"))).otherwise(F.lit(0)))\
    .orderBy("ID")\
    .drop("ID", "group")\
    .show()

+----+------+
|info|Result|
+----+------+
|   0|     0|
|   0|     0|
|   0|     0|
|   0|     0|
|   1|     1|
|   1|     2|
|   0|     0|
|   0|     0|
|   1|     1|
+----+------+

Python相关问答推荐

在后台运行的Python函数

如何计算部分聚合数据的统计数据

如何在不使用字符串的情况下将namedtuple属性传递给方法?

无法导入已安装的模块

在for循环中仅执行一次此操作

如何计算列表列行之间的公共元素

根据给定日期的状态过滤查询集

2维数组9x9,不使用numpy.数组(MutableSequence的子类)

如何在Python中将returns.context. DeliverresContext与Deliverc函数一起使用?

使用miniconda创建环境的问题

PMMLPipeline._ fit()需要2到3个位置参数,但给出了4个位置参数

如何从.cgi网站刮一张表到rame?

如何获取numpy数组的特定索引值?

Python+线程\TrocessPoolExecutor

如何根据一列的值有条件地 Select 前N个组,然后按两列分组?

使用groupby方法移除公共子字符串

Django—cte给出:QuerySet对象没有属性with_cte''''

以逻辑方式获取自己的pyproject.toml依赖项

ModuleNotFoundError:没有模块名为x时try 运行我的代码''

Pandas—堆栈多索引头,但不包括第一列