通常numpy函数更优化,所以可以try 使用numpy.repeat()
.我已经调整了以下代码,每天在一个范围内生成日期,并根据timeList
的长度调整IDs
和dStates
:
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
# params
start_time = '2022-04-01'
end_time = '2022-07-01'
IDs = [1, 2, 3, 4, 5, 6, 7, 8]
dStates = ['A', 'B', 'C', 'D']
# Generate data based on params
timeList = np.arange(datetime(2022, 4, 1), datetime(2022, 7, 1), timedelta(days=1)).astype(datetime)
stateList = np.repeat(dStates, len(timeList)//len(dStates))
stateList = np.append(stateList, dStates[:len(timeList)%len(dStates)]) # this ensures the lengths remain the same
nodeList = np.repeat(IDs, len(timeList)//len(IDs))
nodeList = np.append(nodeList, IDs[:len(timeList)%len(IDs)])
# combine
tseries = pd.DataFrame({
'monthlyTrend': timeList.astype(str),
'FromState': stateList,
'ID': nodeList
})
df = spark.createDataFrame(tseries)
Update
这里是另一种方法,使用explode()
和array_repeat
仅使用pyspark函数实现上述功能.我们首先创建一个数据帧,它与最长的params个列表一样长(在本例中是IDs
个).然后使用pyspark函数对其进行扩展.
from pyspark.sql import functions as F
import pyspark.pandas as ps
# params
start_time = '2022-04-01'
end_time = '2022-07-01'
delta_time = (ps.to_datetime(end_time).month - ps.to_datetime(start_time).month)
timeSet = ps.date_range(start=start_time, end=end_time, freq='MS').tolist()
IDs = [1, 2, 3, 4, 5, 6, 7, 8]
dStates = ['A', 'B', 'C', 'D']
# create a minimum length DF aligned to the longest list of params
longest_list = IDs
timeSet = ps.concat([ps.Series(timeSet * (len(longest_list)//len(timeSet))), ps.Series(timeSet[:len(longest_list)%len(timeSet)])], ignore_index=True)
stateSet = ps.concat([ps.Series(dStates * (len(longest_list)//len(dStates))), ps.Series(dStates[:len(longest_list)%len(dStates)])], ignore_index=True)
nodeSet = ps.Series(IDs)
# combine
df_tseries = ps.DataFrame({
'monthlyTrend': timeSet,
'FromState': stateSet,
'ID': nodeSet}).to_spark()
# expand the df with explode and array_repeat
no_of_repeats = 10
df_tseries = df_tseries.withColumn("ID", F.explode(F.array_repeat("ID", no_of_repeats)))