我正在使用一个如下所示的PySpark DataFrame:

client family_code start_date end_date
0001 1111 2019-10-10 2019-11-06
0002 1112 2020-11-26 2021-01-06
0003 1113 2020-09-24 2020-10-21

我想将每一行展开为多行,其中每一新行表示从start_dateend_date的日期范围内的一周.第一个week_start_date应与start_date对齐,最后week_end_date应与end_date对齐.对于所有其他新行,week_start_dateweek_end_date应分别表示一周的开始和结束.

以下是所需输出的示例:

client family_code week_start_date week_end_date start_date end_date
0001 1111 2019-10-10 2019-10-13 2019-10-10 2019-11-06
0001 1111 2019-10-14 2019-10-20 2019-10-10 2019-11-06
0001 1111 2019-10-21 2019-10-27 2019-10-10 2019-11-06
0001 1111 2019-10-28 2019-11-03 2019-10-10 2019-11-06
0001 1111 2019-11-04 2019-11-06 2019-10-10 2019-11-06
0002 1112 2020-11-26 2020-11-29 2020-11-26 2021-01-06
0002 1112 2020-11-30 2020-12-06 2020-11-26 2021-01-06
0002 1112 2020-12-07 2020-12-13 2020-11-26 2021-01-06
0002 1112 2020-12-14 2020-12-20 2020-11-26 2021-01-06
0002 1112 2020-12-21 2020-12-27 2020-11-26 2021-01-06
0002 1112 2020-12-28 2021-01-03 2020-11-26 2021-01-06
0002 1112 2021-01-04 2021-01-06 2020-11-26 2021-01-06
... ... ... ... ... ...

我如何在PySpark中实现这一转变?如果有任何指导,我将不胜感激!

推荐答案

请判断我的代码.它不是完美的(对不起,我没时间了:d)但它可以是你一个很好的起点

我能够用几个函数生成与您的输出类似的结果:

SEQUENCE(START_DATE,END_DATE,间隔1天)-我使用它来生成START_DATE和END_DATE之间的行

F.date_trunc(timestamp="date", format="week")-我在这里找到一周的第一天,以获得我使用类似东西的最后一天:

F.date_sub(
            F.date_trunc(timestamp=F.date_add(F.col("date"), 7), format="week"), 1
        )

然后使用Case/When to Cut Not Full Week将一周的开始/结束日期与开始/结束日期对齐

当一切准备就绪后,我只是将其分组以获得所需的输出

示例代码:

import datetime
import pyspark.sql.functions as F

data = [
    ("0001", "1111", datetime.date(2019, 10, 10), datetime.date(2019, 11, 6)),
    ("0002", "1112", datetime.date(2020, 11, 26), datetime.date(2021, 1, 6)),
    ("0003", "1113", datetime.date(2020, 9, 24), datetime.date(2020, 10, 21)),
]

df_1 = spark.createDataFrame(
    data, schema=["client", "family_code", "start_date", "end_date"]
)

windowSpec = Window.partitionBy("client", "family_code").orderBy("date")

dfExplodedAndAlligned = (
    df_1.withColumn(
        "date", F.explode(F.expr("sequence(start_date, end_date, interval 1 day)"))
    )
    .withColumn("start_of_week", F.date_trunc(timestamp="date", format="week"))
    .withColumn(
        "end_of_week",
        F.date_sub(
            F.date_trunc(timestamp=F.date_add(F.col("date"), 7), format="week"), 1
        ),
    )
    .withColumn(
        "start_of_week",
        F.when(
            F.col("start_date") > F.col("start_of_week"), F.col("start_date")
        ).otherwise(F.col("start_of_week")),
    )
    .withColumn(
        "end_of_week",
        F.when(F.col("end_date") < F.col("end_of_week"), F.col("end_date")).otherwise(
            F.col("end_of_week")
        ),
    )
)

#probably agg is not needed here, i wasnt able to run it without any agg, need to figure out somethin better
dfExplodedAndAlligned.groupBy(
    F.col("client"),
    F.col("family_code"),
    F.col("start_date"),
    F.col("end_date"),
    F.col("start_of_week"),
    F.col("end_of_week"),
).agg(F.max("start_date")).orderBy("start_of_week").drop("max(start_date)").show()

输出:

+------+-----------+----------+----------+-------------------+-----------+
|client|family_code|start_date|  end_date|      start_of_week|end_of_week|
+------+-----------+----------+----------+-------------------+-----------+
|  0001|       1111|2019-10-10|2019-11-06|2019-10-10 00:00:00| 2019-10-13|
|  0001|       1111|2019-10-10|2019-11-06|2019-10-14 00:00:00| 2019-10-20|
|  0001|       1111|2019-10-10|2019-11-06|2019-10-21 00:00:00| 2019-10-27|
|  0001|       1111|2019-10-10|2019-11-06|2019-10-28 00:00:00| 2019-11-03|
|  0001|       1111|2019-10-10|2019-11-06|2019-11-04 00:00:00| 2019-11-06|
|  0003|       1113|2020-09-24|2020-10-21|2020-09-24 00:00:00| 2020-09-27|
|  0003|       1113|2020-09-24|2020-10-21|2020-09-28 00:00:00| 2020-10-04|
|  0003|       1113|2020-09-24|2020-10-21|2020-10-05 00:00:00| 2020-10-11|
|  0003|       1113|2020-09-24|2020-10-21|2020-10-12 00:00:00| 2020-10-18|
|  0003|       1113|2020-09-24|2020-10-21|2020-10-19 00:00:00| 2020-10-21|
|  0002|       1112|2020-11-26|2021-01-06|2020-11-26 00:00:00| 2020-11-29|
|  0002|       1112|2020-11-26|2021-01-06|2020-11-30 00:00:00| 2020-12-06|
|  0002|       1112|2020-11-26|2021-01-06|2020-12-07 00:00:00| 2020-12-13|
|  0002|       1112|2020-11-26|2021-01-06|2020-12-14 00:00:00| 2020-12-20|
|  0002|       1112|2020-11-26|2021-01-06|2020-12-21 00:00:00| 2020-12-27|
|  0002|       1112|2020-11-26|2021-01-06|2020-12-28 00:00:00| 2021-01-03|
|  0002|       1112|2020-11-26|2021-01-06|2021-01-04 00:00:00| 2021-01-06|
+------+-----------+----------+----------+-------------------+-----------+

Python相关问答推荐

在Python中对分层父/子列表进行排序

大Pandas 胚胎中产生组合

Odoo 14 hr. emergency.public内的二进制字段

将特定列信息移动到当前行下的新行

如何记录脚本输出

在Polars(Python库)中将二进制转换为具有非UTF-8字符的字符串变量

图像 pyramid .难以创建所需的合成图像

如何过滤包含2个指定子字符串的收件箱列名?

OR—Tools CP SAT条件约束

如何将一个动态分配的C数组转换为Numpy数组,并在C扩展模块中返回给Python

如何在WSL2中更新Python到最新版本(3.12.2)?

如何从pandas的rame类继承并使用filepath实例化

在www.example.com中使用`package_data`包含不包含__init__. py的非Python文件

用渐近模计算含符号的矩阵乘法

Maya Python脚本将纹理应用于所有对象,而不是选定对象

将标签移动到matplotlib饼图中楔形块的开始处

如何获取包含`try`外部堆栈的`__traceback__`属性的异常

用由数据帧的相应元素形成的列表的函数来替换列的行中的值

在聚合中使用python-polars时如何计算模式

奇怪的Base64 Python解码