您可以直接使用spark SQL日期—时间函数获取当前时间,如下所示:
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, current_timestamp
spark = SparkSession.builder.getOrCreate()
spark.sql("""select date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx") as updateDate""").show(truncate=False)
Output:
+-----------------------------+
|updateDate |
+-----------------------------+
|2024-04-04T09:04:35.865+00:00|
+-----------------------------+
Schema:
root
|-- updateDate: string (nullable = false)
如果你注意到了这个模式,updateDate
是一个字符串,你可以用to_timestamp()
将它转换为时间戳,如下所示:
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, current_timestamp, to_timestamp
spark = SparkSession.builder.getOrCreate()
spark.sql("""select to_timestamp(date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx")) as updateDate""").show(truncate=False)
Output:
+-----------------------+
|updateDate |
+-----------------------+
|2024-04-04 09:04:12.703|
+-----------------------+
Schema:
root
|-- updateDate: timestamp (nullable = true)
现在,updateDate
是一个时间戳,调整为spark会话的时区(这就是为什么+00:00
偏移现在消失了)—顺便说一句,你可以使用spark.conf.set("spark.sql.session.timeZone", "<enter-timezone-here>")
来更新.
如果你想把它作为一个新的列添加到一个现有的框架中,你可以这样做:
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, current_timestamp
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([(1,), (2,)], ["rownum"]) # replace this with your dataframe
df = df.withColumn("updateDate", date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx").cast("timestamp"))
df.show(truncate=False)
df.printSchema()
Output:
+------+-----------------------+
|rownum|updateDate |
+------+-----------------------+
|1 |2024-04-04 09:04:48.473|
|2 |2024-04-04 09:04:48.473|
+------+-----------------------+
Schema:
root
|-- rownum: long (nullable = true)
|-- updateDate: timestamp (nullable = true)
您正在查找的数据类型是一个时区在Spark中的时间戳.现在,您可以try 使用这个模式将数据集加载到MongoDB中.