我有一个数据帧,如下所示:

+-------+----------+-----+
|user_id|      date|valor|
+-------+----------+-----+
|      1|2022-01-01|    0|
|      1|2022-01-02|    0|
|      1|2022-01-03|    1|
|      1|2022-01-04|    1|
|      1|2022-01-05|    1|
|      1|2022-01-06|    0|
|      1|2022-01-07|    0|
|      1|2022-01-08|    0|
|      1|2022-01-09|    1|
|      1|2022-01-10|    1|
|      1|2022-01-11|    1|
|      1|2022-01-12|    0|
|      1|2022-01-13|    0|
|      1|2022-01-14|   -1|
|      1|2022-01-15|   -1|
|      1|2022-01-16|   -1|
|      1|2022-01-17|   -1|
|      1|2022-01-18|   -1|
|      1|2022-01-19|   -1|
|      1|2022-01-20|    0|
+-------+----------+-----+

目标是使用valor作为基数计算用户id的分数,它将从3开始,在valor列the main problem here is that my score cant be under 1 and cant be over 5, so the sum bust always stay on the range and don't lose the last value so I can compute it right中增加或减少1.所以我期望的是:

+-------+----------+-----+-----+
|user_id|      date|valor|score|
+-------+----------+-----+-----+
|      1|2022-01-01|    0|    3|
|      1|2022-01-02|    0|    3|
|      1|2022-01-03|    1|    4|
|      1|2022-01-04|    1|    5|
|      1|2022-01-05|    1|    5|
|      1|2022-01-06|    0|    5|
|      1|2022-01-07|    0|    5|
|      1|2022-01-08|    0|    5|
|      1|2022-01-09|    1|    5|
|      1|2022-01-10|   -1|    4|
|      1|2022-01-11|   -1|    3|
|      1|2022-01-12|    0|    3|
|      1|2022-01-13|    0|    3|
|      1|2022-01-14|   -1|    2|
|      1|2022-01-15|   -1|    1|
|      1|2022-01-16|    1|    2|
|      1|2022-01-17|   -1|    1|
|      1|2022-01-18|   -1|    1|
|      1|2022-01-19|    1|    2|
|      1|2022-01-20|    0|    2|
+-------+----------+-----+-----+

到目前为止,我已经做了一个窗口来对列valor进行排序,这样我可以跟踪序列中增加或减少的数量,并从valor中删除大于4的序列,但我不知道如何将valor_的和保持在范围(1:5)内:

+-------+----------+----+-----+------+
|user_id|      date|rank|valor|valor_|
+-------+----------+----+-----+------+
|      1|2022-01-01|   0|    0|     0|
|      1|2022-01-02|   0|    0|     0|
|      1|2022-01-03|   1|    1|     1|
|      1|2022-01-04|   2|    1|     1|
|      1|2022-01-05|   3|    1|     1|
|      1|2022-01-06|   0|    0|     0|
|      1|2022-01-07|   0|    0|     0|
|      1|2022-01-08|   0|    0|     0|
|      1|2022-01-09|   1|    1|     1|
|      1|2022-01-10|   2|    1|     1|
|      1|2022-01-11|   3|    1|     1|
|      1|2022-01-12|   0|    0|     0|
|      1|2022-01-13|   0|    0|     0|
|      1|2022-01-14|   1|   -1|    -1|
|      1|2022-01-15|   2|   -1|    -1|
|      1|2022-01-16|   3|   -1|    -1|
|      1|2022-01-17|   4|   -1|    -1|
|      1|2022-01-18|   5|   -1|     0|
|      1|2022-01-19|   6|   -1|     0|

如你所见,这里的结果是not what I expected:

+-------+----------+----+-----+------+-----+
|user_id|      date|rank|valor|valor_|score|
+-------+----------+----+-----+------+-----+
|      1|2022-01-01|   0|    0|     0|    3|
|      1|2022-01-02|   0|    0|     0|    3|
|      1|2022-01-03|   1|    1|     1|    4|
|      1|2022-01-04|   2|    1|     1|    5|
|      1|2022-01-05|   3|    1|     1|    6|
|      1|2022-01-06|   0|    0|     0|    6|
|      1|2022-01-07|   0|    0|     0|    6|
|      1|2022-01-08|   0|    0|     0|    6|
|      1|2022-01-09|   1|    1|     1|    7|
|      1|2022-01-10|   2|    1|     1|    8|
|      1|2022-01-11|   3|    1|     1|    9|
|      1|2022-01-12|   0|    0|     0|    9|
|      1|2022-01-13|   0|    0|     0|    9|
|      1|2022-01-14|   1|   -1|    -1|    8|
|      1|2022-01-15|   2|   -1|    -1|    7|
|      1|2022-01-16|   3|   -1|    -1|    6|
|      1|2022-01-17|   4|   -1|    -1|    5|
|      1|2022-01-18|   5|   -1|     0|    5|
|      1|2022-01-19|   6|   -1|     0|    5|
|      1|2022-01-20|   0|    0|     0|    5|

推荐答案

输入:

from pyspark.sql import functions as F
df = spark.createDataFrame(
    [(1, '2022-01-01',  0),
     (1, '2022-01-02',  0),
     (1, '2022-01-03',  1),
     (1, '2022-01-04',  1),
     (1, '2022-01-05',  1),
     (1, '2022-01-06',  0),
     (1, '2022-01-07',  0),
     (1, '2022-01-08',  0),
     (1, '2022-01-09',  1),
     (1, '2022-01-10',  1),
     (1, '2022-01-11',  1),
     (1, '2022-01-12',  0),
     (1, '2022-01-13',  0),
     (1, '2022-01-14', -1),
     (1, '2022-01-15', -1),
     (1, '2022-01-16', -1),
     (1, '2022-01-17', -1),
     (1, '2022-01-18', -1),
     (1, '2022-01-19', -1),
     (1, '2022-01-20',  0)],
    ['user_id', 'date', 'valor'])

脚本:

df = df.groupBy('user_id').agg(F.array_sort(F.collect_list(F.array('date', 'valor'))).alias('a'))
df = df.withColumn(
    'a',
    F.filter(
        F.aggregate(
            'a',
            F.expr("array(struct('' as date, 0 as valor, 3 as cum))"),
            lambda acc, x: F.array_union(
                acc,
                F.array(F.struct(
                    x[0].alias('date'),
                    x[1].cast('int').alias('valor'),
                    F.greatest(F.lit(1), F.least(F.lit(5), x[1].cast('int') + F.element_at(acc, -1)['cum'])).alias('cum')
                ))
            )
        ),
        lambda x: x['date'] != ''
    )
)
df = df.selectExpr("user_id", "inline(a)")

df.show()
# +-------+----------+-----+---+
# |user_id|      date|valor|cum|
# +-------+----------+-----+---+
# |      1|2022-01-01|    0|  3|
# |      1|2022-01-02|    0|  3|
# |      1|2022-01-03|    1|  4|
# |      1|2022-01-04|    1|  5|
# |      1|2022-01-05|    1|  5|
# |      1|2022-01-06|    0|  5|
# |      1|2022-01-07|    0|  5|
# |      1|2022-01-08|    0|  5|
# |      1|2022-01-09|    1|  5|
# |      1|2022-01-10|    1|  5|
# |      1|2022-01-11|    1|  5|
# |      1|2022-01-12|    0|  5|
# |      1|2022-01-13|    0|  5|
# |      1|2022-01-14|   -1|  4|
# |      1|2022-01-15|   -1|  3|
# |      1|2022-01-16|   -1|  2|
# |      1|2022-01-17|   -1|  1|
# |      1|2022-01-18|   -1|  1|
# |      1|2022-01-19|   -1|  1|
# |      1|2022-01-20|    0|  1|
# +-------+----------+-----+---+

Python相关问答推荐

如何将Docker内部运行的mariadb与主机上Docker外部运行的Python脚本连接起来

当从Docker的--env-file参数读取Python中的环境变量时,每个\n都会添加一个\'.如何没有额外的?

使用groupby Pandas的一些操作

删除字符串中第一次出现单词后的所有内容

"使用odbc_connect(raw)连接字符串登录失败;可用于pyodbc"

当点击tkinter菜单而不是菜单选项时,如何执行命令?

Python导入某些库时非法指令(核心转储)(beautifulsoup4."" yfinance)

跳过嵌套JSON中的级别并转换为Pandas Rame

不允许 Select 北极滚动?

jsonschema日期格式

使用polars. pivot()旋转一个框架(类似于R中的pivot_longer)

多个矩阵的张量积

如何在Django模板中显示串行化器错误

当我定义一个继承的类时,我可以避免使用`metaclass=`吗?

Pandas:根据相邻行之间的差异过滤数据帧

Django-修改后的管理表单返回对象而不是文本

如何将ManyToManyfield用于Self类

为什么fizzbuzz在两个数字的条件出现在一个数字的条件之后时不起作用?

搜索结果未显示.我的URL选项卡显示:http://127.0.0.1:8000/search?";,而不是这个:";http://127.0.0.1:8000/search?q=name";

将COLUMN BY GROUP中的值连接为列表,并将其赋值给PANAS数据框中的变量