我试图使用databricks pyspark将多个文档加载到MongoDb集合中,在加载时我也使用了updateDate文件,但是加载后我可以看到updateDate字段的数据类型是字符串而不是日期数据类型.

这里我使用的是时间戳的代码.

import datetime

current_timestamp_utc = datetime.datetime.now(datetime.timezone.utc)
formatted_timestamp = current_timestamp_utc.strftime("%Y-%m-%dT%H:%M:%S")
timezone_offset = current_timestamp_utc.strftime("%z")
formatted_timestamp = formatted_timestamp + ".000" + timezone_offset[:-2] + ":" + 
timezone_offset[-2:]

print(formatted_timestamp)

result : 2024-04-03T07:33:52.000+00:00

结果看起来很好,但加载到MongoDb后,它显示为String而不是Date.

所以,请你帮助我如何加载与日期数据类型的文件. 我已经使用UpdateMany()方法将字符串更改为日期数据类型,这是要继续的写方法吗,还是使用updateMany()方法时会有任何I/O或性能影响.请建议

推荐答案

您可以直接使用spark SQL日期—时间函数获取当前时间,如下所示:

from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, current_timestamp

spark = SparkSession.builder.getOrCreate()

spark.sql("""select date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx") as updateDate""").show(truncate=False)

Output:
+-----------------------------+
|updateDate                   |
+-----------------------------+
|2024-04-04T09:04:35.865+00:00|
+-----------------------------+

Schema:
root
 |-- updateDate: string (nullable = false)

如果你注意到了这个模式,updateDate是一个字符串,你可以用to_timestamp()将它转换为时间戳,如下所示:

from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, current_timestamp, to_timestamp

spark = SparkSession.builder.getOrCreate()

spark.sql("""select to_timestamp(date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx")) as updateDate""").show(truncate=False)

Output:
+-----------------------+
|updateDate             |
+-----------------------+
|2024-04-04 09:04:12.703|
+-----------------------+

Schema:
root
 |-- updateDate: timestamp (nullable = true)

现在,updateDate是一个时间戳,调整为spark会话的时区(这就是为什么+00:00偏移现在消失了)—顺便说一句,你可以使用spark.conf.set("spark.sql.session.timeZone", "<enter-timezone-here>")来更新.

如果你想把它作为一个新的列添加到一个现有的框架中,你可以这样做:

from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, current_timestamp

spark = SparkSession.builder.getOrCreate()

df = spark.createDataFrame([(1,), (2,)], ["rownum"]) # replace this with your dataframe

df = df.withColumn("updateDate", date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx").cast("timestamp"))

df.show(truncate=False)
df.printSchema()

Output:
+------+-----------------------+
|rownum|updateDate             |
+------+-----------------------+
|1     |2024-04-04 09:04:48.473|
|2     |2024-04-04 09:04:48.473|
+------+-----------------------+

Schema:
root
 |-- rownum: long (nullable = true)
 |-- updateDate: timestamp (nullable = true)

您正在查找的数据类型是一个时区在Spark中的时间戳.现在,您可以try 使用这个模式将数据集加载到MongoDB中.

Mongodb相关问答推荐

如何在MongoDB中对两个数组进行分组?

为什么 mongoose 在 mongodb 中找不到我的数据

MongoDB - 将数组元素转换为新字段

从具有多个数组匹配 MongoDB 的两个集合中采样数据

查询有关 Go 项目中对象数组的 MongoDb 集合

使用golang的MongoDB错误无法访问服务器

在mongodb中,如何使用聚合来获取字段之间的对应关系

使用 AngularJs 和 MongoDB/Mongoose

哪个库最适合用于带有 Scala 的 MongoDB?

你如何在 Kubernetes 上设置 Mongo 副本集?

Node.js 数据库的抽象层

从命令行创建 MongoDB 用户

如何在 mongodb 本机驱动程序中对 find() 进行字段 Select ?

如何使用 MongoDB 和 Mongoid 在 Rails 3 上进行适当的数据库测试 (TDD)

Cannot connect to MongoDB errno:61

Mongoimport json 文件更新或覆盖..?

MongoError: The dollar ($) prefixed field '$push' in '$push' is not valid for storage

MongoDB 中的多个 $inc 更新

120 个 mongodb 集合与单个集合 - 哪个更有效?

如何在第一个文档中恢复 MongoDB ChangeStream 而不仅仅是在我开始收听后更改