我有一个适用于您的示例代码的解决方案.我不确定它是否能处理所有的边缘情况,但它应该是一个很好的起点
我使用Spark SQL Sequence函数与EXPLADE一起在当前和下一个时间戳之间生成新行,我使用Interval=100 ms来匹配您的用例,它可以调整.
我正在删除重叠记录,因为我无法在分解生成器中处理此情况(在Spark中,我们不能在生成器中使用条件表达式)
from pyspark.sql import Window
from pyspark.sql.functions import expr, lead, col, to_timestamp
data = [
("node1", 7777, "2023-10-28 14:22:41.9"),
("node1", 8888, "2023-10-28 14:22:42.5"),
("node1", 1111, "2023-10-28 14:22:42.7"),
("node2", 2222, "2023-10-28 14:22:41.2"),
("node2", 6666, "2023-10-28 14:22:41.5"),
]
columns = ["node", "value", "timestamp"]
df = spark.createDataFrame(data, columns)
window_spec = Window.partitionBy("node").orderBy("node", "timestamp")
df = df.withColumn(
"next_timestamp", to_timestamp(lead(col("timestamp")).over(window_spec))
)
result_df = df.withColumn(
"timestamp",
expr(
"explode(sequence(to_timestamp(timestamp), nvl(next_timestamp, to_timestamp(timestamp)), interval 100 milliseconds))"
),
).withColumn(
"to_drop",
when((col("timestamp") == col("next_timestamp")), True).otherwise(False)
)
result_df.filter(col('to_drop') == False).drop("next_timestamp", "to_drop").show(truncate=False)
我的输出:
+-----+-----+---------------------+
|node |value|timestamp |
+-----+-----+---------------------+
|node1|7777 |2023-10-28 14:22:41.9|
|node1|7777 |2023-10-28 14:22:42 |
|node1|7777 |2023-10-28 14:22:42.1|
|node1|7777 |2023-10-28 14:22:42.2|
|node1|7777 |2023-10-28 14:22:42.3|
|node1|7777 |2023-10-28 14:22:42.4|
|node1|8888 |2023-10-28 14:22:42.5|
|node1|8888 |2023-10-28 14:22:42.6|
|node1|1111 |2023-10-28 14:22:42.7|
|node2|2222 |2023-10-28 14:22:41.2|
|node2|2222 |2023-10-28 14:22:41.3|
|node2|2222 |2023-10-28 14:22:41.4|
|node2|6666 |2023-10-28 14:22:41.5|
+-----+-----+---------------------+