我需要在PySpark中生成一些数据,我目前正在使用PySparkPandas.我发现,当我想用.repeat()来扩展我的数据生成过程时,它非常非常慢(几十分钟).

有没有其他替代方法可以用来生成如下排序的数据帧?

import pyspark.pandas as ps

# params
start_time = '2022-04-01'
end_time = '2022-07-01'
IDs = [1, 2, 3, 4, 5, 6, 7, 8, ...]
dStates = ['A', 'B', 'C', 'D', ....]

# delta time
delta_time = (ps.to_datetime(end_time).month - ps.to_datetime(start_time).month)

# create DF
timeSet = ps.date_range(start=start_time, end=end_time, freq='MS').repeat(  len(dStates) * len(IDs)  )
stateSet = ps.Series( dStates * ( delta_time + 1 ) * len(IDs) )
nodeSet = ps.Series(IDs).repeat( len(dStates) * ( delta_time + 1 ) ).reset_index(drop=True)

# combine
tseries = ps.DataFrame({'monthlyTrend': timeSet.astype(str),
                   'FromState': stateSet,
                  'ID': nodeSet})

推荐答案

通常numpy函数更优化,所以可以try 使用numpy.repeat().我已经调整了以下代码,每天在一个范围内生成日期,并根据timeList的长度调整IDsdStates:

import pandas as pd
import numpy as np
from datetime import datetime, timedelta

# params
start_time = '2022-04-01'
end_time = '2022-07-01'
IDs = [1, 2, 3, 4, 5, 6, 7, 8]
dStates = ['A', 'B', 'C', 'D']

# Generate data based on params
timeList = np.arange(datetime(2022, 4, 1), datetime(2022, 7, 1), timedelta(days=1)).astype(datetime)

stateList = np.repeat(dStates, len(timeList)//len(dStates))
stateList = np.append(stateList, dStates[:len(timeList)%len(dStates)]) # this ensures the lengths remain the same

nodeList = np.repeat(IDs, len(timeList)//len(IDs))
nodeList = np.append(nodeList, IDs[:len(timeList)%len(IDs)])

# combine
tseries = pd.DataFrame({
    'monthlyTrend': timeList.astype(str),
    'FromState': stateList,
     'ID': nodeList
})

df = spark.createDataFrame(tseries)

Update

这里是另一种方法,使用explode()array_repeat仅使用pyspark函数实现上述功能.我们首先创建一个数据帧,它与最长的params个列表一样长(在本例中是IDs个).然后使用pyspark函数对其进行扩展.

from pyspark.sql import functions as F
import pyspark.pandas as ps

# params
start_time = '2022-04-01'
end_time = '2022-07-01'
delta_time = (ps.to_datetime(end_time).month - ps.to_datetime(start_time).month)
timeSet = ps.date_range(start=start_time, end=end_time, freq='MS').tolist()

IDs = [1, 2, 3, 4, 5, 6, 7, 8]
dStates = ['A', 'B', 'C', 'D']

# create a minimum length DF aligned to the longest list of params
longest_list = IDs
timeSet = ps.concat([ps.Series(timeSet * (len(longest_list)//len(timeSet))), ps.Series(timeSet[:len(longest_list)%len(timeSet)])], ignore_index=True)
stateSet = ps.concat([ps.Series(dStates * (len(longest_list)//len(dStates))), ps.Series(dStates[:len(longest_list)%len(dStates)])], ignore_index=True)
nodeSet = ps.Series(IDs)

# combine
df_tseries = ps.DataFrame({
    'monthlyTrend': timeSet,
    'FromState': stateSet,
    'ID': nodeSet}).to_spark()

# expand the df with explode and array_repeat
no_of_repeats = 10
df_tseries = df_tseries.withColumn("ID", F.explode(F.array_repeat("ID", no_of_repeats)))

Python相关问答推荐

如何将uint 16表示为float 16

Docker-compose:为不同项目创建相同的容器

仅对matplotlib的条标签中的一个条标签应用不同的格式

了解shuffle在NP.random.Generator.choice()中的作用

自定义新元未更新参数

使用多个性能指标执行循环特征消除

按照行主要蛇扫描顺序对点列表进行排序

将轨迹优化问题描述为NLP.如何用Gekko解决这个问题?当前面临异常:@错误:最大方程长度错误

使用pandas、matplotlib和Yearbox绘制时显示错误的年份

比较两个二元组列表,NP.isin

比较两个数据帧并并排附加结果(获取性能警告)

替换字符串中的多个重叠子字符串

切片包括面具的第一个实例在内的眼镜的最佳方法是什么?

为什么NumPy的向量化计算在将向量存储为类属性时较慢?'

在ubuntu上安装dlib时出错

不能使用Gekko方程'

* 动态地 * 修饰Python中的递归函数

numpy.unique如何消除重复列?

基于另一列的GROUP-BY聚合将列添加到Polars LazyFrame

GPT python SDK引入了大量开销/错误超时