我想每毫秒填充以前的数据.

input dataframe:

node value timestamp
node1 7777 '2023-10-28 14:22:41.9'
node1 8888 '2023-10-28 14:22:42.5'
node1 1111 '2023-10-28 14:22:42.7'
node2 2222 '2023-10-28 14:22:41.2'
node2 6666 '2023-10-28 14:22:41.5'

output dataframe:

node value timestamp
node1 7777 '2023-10-28 14:22:41.9'
node1 7777 '2023-10-28 14:22:42.0'
node1 7777 '2023-10-28 14:22:42.1'
node1 7777 '2023-10-28 14:22:42.2'
node1 7777 '2023-10-28 14:22:42.3'
node1 7777 '2023-10-28 14:22:42.4'
node1 8888 '2023-10-28 14:22:42.5'
node1 8888 '2023-10-28 14:22:42.6'
node1 1111 '2023-10-28 14:22:42.7'
node2 2222 '2023-10-28 14:22:41.2'
node2 2222 '2023-10-28 14:22:41.3'
node2 2222 '2023-10-28 14:22:41.4'
node2 6666 '2023-10-28 14:22:41.5'

对于我来说,这并不适用于毫秒.

推荐答案

我有一个适用于您的示例代码的解决方案.我不确定它是否能处理所有的边缘情况,但它应该是一个很好的起点

我使用Spark SQL Sequence函数与EXPLADE一起在当前和下一个时间戳之间生成新行,我使用Interval=100 ms来匹配您的用例,它可以调整.

我正在删除重叠记录,因为我无法在分解生成器中处理此情况(在Spark中,我们不能在生成器中使用条件表达式)

from pyspark.sql import Window
from pyspark.sql.functions import expr, lead, col, to_timestamp

data = [
    ("node1", 7777, "2023-10-28 14:22:41.9"),
    ("node1", 8888, "2023-10-28 14:22:42.5"),
    ("node1", 1111, "2023-10-28 14:22:42.7"),
    ("node2", 2222, "2023-10-28 14:22:41.2"),
    ("node2", 6666, "2023-10-28 14:22:41.5"),
]
columns = ["node", "value", "timestamp"]
df = spark.createDataFrame(data, columns)

window_spec = Window.partitionBy("node").orderBy("node", "timestamp")

df = df.withColumn(
    "next_timestamp", to_timestamp(lead(col("timestamp")).over(window_spec))
)

result_df = df.withColumn(
    "timestamp",
    expr(
        "explode(sequence(to_timestamp(timestamp), nvl(next_timestamp, to_timestamp(timestamp)), interval 100 milliseconds))"
    ),
).withColumn(
    "to_drop",
    when((col("timestamp") == col("next_timestamp")), True).otherwise(False)
)
result_df.filter(col('to_drop') == False).drop("next_timestamp", "to_drop").show(truncate=False)

我的输出:

+-----+-----+---------------------+
|node |value|timestamp            |
+-----+-----+---------------------+
|node1|7777 |2023-10-28 14:22:41.9|
|node1|7777 |2023-10-28 14:22:42  |
|node1|7777 |2023-10-28 14:22:42.1|
|node1|7777 |2023-10-28 14:22:42.2|
|node1|7777 |2023-10-28 14:22:42.3|
|node1|7777 |2023-10-28 14:22:42.4|
|node1|8888 |2023-10-28 14:22:42.5|
|node1|8888 |2023-10-28 14:22:42.6|
|node1|1111 |2023-10-28 14:22:42.7|
|node2|2222 |2023-10-28 14:22:41.2|
|node2|2222 |2023-10-28 14:22:41.3|
|node2|2222 |2023-10-28 14:22:41.4|
|node2|6666 |2023-10-28 14:22:41.5|
+-----+-----+---------------------+

Python-3.x相关问答推荐

类型注释:pathlib. Path vs importlib. resources. abc. Traversable

是否可以使用参数对Flask重定向?

在BaseHTTPRequestHandler中填充和返回列表

Pandas -我们如何在一行中应用多个要求

如何将从维基百科表中抓取的数据转换为字典列表?

将两列的乘积连续添加到一列的累积和中

以特定方式重新排列 pandas 数据框的列

torch.stack([t1, t1, t1], dim=1)与torch.hstack([t1, t1, t1])之间有什么区别?

三重奏:为什么频道被记录为使用async with,而不是with?

为什么不能用格式字符串 '-' 绘制点?

在不改变 python 中原始数组顺序的情况下,对多维字符串数组进行降序排序?

嵌套协议的使用(协议成员也是协议)

Jupyter Notebook 拒绝打印一些字符串

如何使用 Selenium by class_name 从大学橄榄球数据中抓取图像 url 列表

python total_ordering:为什么使用 __lt__ 和 __eq__ 而不是 __le__?

pip install dryscrape 失败并显示错误:[Errno 2] 没有这样的文件或目录:'src/webkit_server'?

向 Python 函数添加属性的最佳方法

将 Python SIGINT 重置为默认信号处理程序

将 Python 字节转换为无符号 8 位整数

首次使用后 zip 变量为空