目前,我已经设法解决了这个问题,但速度比我需要的要慢.耗时约:500K样本需要1小时,整个数据集是~100M样本,100M样本需要~200小时.

硬件/软件规格:内存8 GB,Windows 11 64位,Python3.8.8

The problem:
I have a dataset in .csv (~13GB) where each sample has a value and a respective start-end period of few months.I want to create a dataset where each sample will have the same value but referring to each specific month.

例如:

出发地:

idx | start date | end date | month | year | value
0 | 20/05/2022 | 20/07/2022 | 0 | 0 | X

致:

0 | 20/05/2022 | 20/07/2022 | 5 | 2022 | X
1 | 20/05/2022 | 20/07/2022 | 6 | 2022 | X
2 | 20/05/2022 | 20/07/2022 | 7 | 2022 | X

idea :设法做到并行(就像Dask型的,但我不确定如何完成这项任务).

My implementation:
Chunk read in pandas, augment in dictionaries , append to CSV. Use a function that, given a df, calculates for each sample the months from start date to end date and creates a copy sample for each month appending it to a dictionary. Then it returns the final dictionary.

这些计算都是在词典中进行的,因为它们被发现比在Pandas 身上计算要快得多.然后,我以块的形式迭代原始的CSV,并在每个块上应用该函数,将得到的增强的DF附加到另一个CSV.

功能:

def augment_to_monthly_dict(chunk):
    '''
    Function takes a df or subdf  data and creates and returns an Augmented dataset with monthly data in 
    Dictionary form (for efficiency)
    '''
    dict={}
    l=1
    for i in range(len(chunk)):#iterate through every sample
        # print(str(chunk.iloc[i].APO)[4:6] )  
        #Find the months and years period
        mst =int(float((str(chunk.iloc[i].start)[4:6])))#start month
        mend=int(str(chunk.iloc[i].end)[4:6]) #end month
        yst =int(str(chunk.iloc[i].start)[:4] )#start year
        yend=int(str(chunk.iloc[i].end)[:4] )#end year

        if yend==yst:
            months=[ m for m in range(mst,mend+1)]   
            years=[yend for i in range(len(months))]         
        elif yend==yst+1:# year change at same sample
            months=[m for m in range(mst,13)]
            years=[yst for i in range(mst,13)]
            months= months+[m for m in range(1, mend+1)]
            years= years+[yend for i in range(1, mend+1)]
        else:
            continue
        #months is a list of each month in the period of the sample and years is a same 
        #length list of the respective years eg months=[11,12,1,2] , years= 
        #[2021,2022,2022,2022]

        for j in range(len(months)):#iterate through list of months
            #copy the original sample make it a dictionary
            tmp=pd.DataFrame(chunk.iloc[i]).transpose().to_dict(orient='records')

            #change the month and year values accordingly (they were 0 for initiation)

            tmp[0]['month'] = months[j]
            tmp[0]['year'] = years[j]
            # Here could add more calcs e.g. drop irrelevant columns, change datatypes etc 
            #to reduce size
            #
            #-------------------------------------
            #Append new row to the Augmented data
            dict[l] = tmp[0]
            l+=1
    return dict

读取原始数据集(.csv~13 GB),使用函数进行扩充,并将结果附加到新的.csv:

chunk_count=0
for chunk in pd.read_csv('enc_star_logar_ek.csv', delimiter=';', chunksize=10000):

  chunk.index = chunk.reset_index().index

  aug_dict = augment_to_monthly_dict(chunk)#make chunk dictionary to work faster
  chunk_count+=1  

  if chunk_count ==1: #get the column names and open csv write headers and 1st chunk

       #Find the dicts keys, the column names only from the first dict(not reading all data)
       for kk in aug_dict.values():
            key_names = [i for i in kk.keys()] 
            print(key_names)
            break #break after first input dict

       #Open csv file and write ';' separated data
       with open('dic_to_csv2.csv', 'w', newline='') as csvfile:
            writer = csv.DictWriter(csvfile,delimiter=';', fieldnames=key_names)
            writer.writeheader()
            writer.writerows(aug_dict.values())

  else: # Save the rest of the data chunks
       print('added chunk: ', chunk_count)
       with open('dic_to_csv2.csv', 'a', newline='') as csvfile:
            writer = csv.DictWriter(csvfile,delimiter=';', fieldnames=key_names)
            writer.writerows(aug_dict.values())

推荐答案

当您需要操作columns个数据时,Pandas的效率会发挥作用,为此,Pandas逐行读取输入,为每一列构建一系列数据;这是大量额外的计算,您的问题不会从中受益,实际上只会减慢您的解决方案.

您实际上需要操作rows,为此最快的方法是使用标准CSV模块;您所需要做的就是读入一行,写出派生的行,然后重复:

import csv
import sys

from datetime import datetime


def parse_dt(s):
    return datetime.strptime(s, r"%d/%m/%Y")


def get_dt_range(beg_dt, end_dt):
    """
    Returns a range of (month, year) tuples, from beg_dt up-to-and-including end_dt.
    """
    if end_dt < beg_dt:
        raise ValueError(f"end {end_dt} is before beg {beg_dt}")

    mo, yr = beg_dt.month, beg_dt.year

    dt_range = []
    while True:
        dt_range.append((mo, yr))
        if mo == 12:
            mo = 1
            yr = yr + 1
        else:
            mo += 1
        if (yr, mo) > (end_dt.year, end_dt.month):
            break

    return dt_range


fname = sys.argv[1]
with open(fname, newline="") as f_in, open("output_csv.csv", "w", newline="") as f_out:
    reader = csv.reader(f_in)
    writer = csv.writer(f_out)
    writer.writerow(next(reader))  # transfer header

    for row in reader:
        beg_dt = parse_dt(row[1])
        end_dt = parse_dt(row[2])
        for mo, yr in get_dt_range(beg_dt, end_dt):
            row[3] = mo
            row[4] = yr
            writer.writerow(row)

而且,为了与一般的Pandas 进行比较,让我们来看看@abkey的特定Pandas 解决方案--我不确定是否有更好的Pandas 实现,但这个解决方案做得有点正确:

import sys
import pandas as pd

fname = sys.argv[1]
df = pd.read_csv(fname)

df["start date"] = pd.to_datetime(df["start date"], format="%d/%m/%Y")
df["end date"] = pd.to_datetime(df["end date"], format="%d/%m/%Y")

df["month"] = df.apply(
    lambda x: pd.date_range(
        start=x["start date"], end=x["end date"] + pd.DateOffset(months=1), freq="M"
    ).month.tolist(),
    axis=1,
)
df["year"] = df["start date"].dt.year

out = df.explode("month").reset_index(drop=True)

out.to_csv("output_pd.csv")

不过,让我们从基础开始,让程序真正做正确的事情.给定以下输入:

idx,start date,end date,month,year,value
0,20/05/2022,20/05/2022,0,0,X
0,20/05/2022,20/07/2022,0,0,X
0,20/12/2022,20/01/2023,0,0,X

我的程序,./main.py input.csv,产生:

idx,start date,end date,month,year,value
0,20/05/2022,20/05/2022,5,2022,X
0,20/05/2022,20/07/2022,5,2022,X
0,20/05/2022,20/07/2022,6,2022,X
0,20/05/2022,20/07/2022,7,2022,X
0,20/12/2022,20/01/2023,12,2022,X
0,20/12/2022,20/01/2023,1,2023,X

我相信这就是你要找的.

Pandas 解决方案,./main_pd.py input.csv,产生:

,idx,start date,end date,month,year,value
0,0,2022-05-20,2022-05-20,5,2022,X
1,0,2022-05-20,2022-07-20,5,2022,X
2,0,2022-05-20,2022-07-20,6,2022,X
3,0,2022-05-20,2022-07-20,7,2022,X
4,0,2022-12-20,2023-01-20,12,2022,X
5,0,2022-12-20,2023-01-20,1,2022,X

忽略为框架索引添加的列,以及日期格式已更改的事实(我非常确定可以使用某个我不知道的Pandas指令来解决这一问题),它仍然正确地使用适当的日期范围创建新行.

因此,双方都做了正确的事情.现在,让我们来表演吧.我复制了1_000_000和10_000_000行的初始样本,只有1行:

import sys

nrows = int(sys.argv[1])
with open(f"input_{nrows}.csv", "w") as f:
    f.write("idx,start date,end date,month,year,value\n")
    for _ in range(nrows):
        f.write("0,20/05/2022,20/07/2022,0,0,X\n")

我运行的是一台2020年的M1 MacBook Air,配备2TB固态硬盘(读写速度为very good倍):

1M rows (sec, RAM) 10M rows (sec, RAM)
csv module 7.8s, 6MB 78s, 6MB
Pandas 75s, 569MB 750s, 5.8GB

您可以看到,这两个程序的运行时间都随着行大小的增加而线性增加.CSV模块的内存仍然完全不存在,因为它是流数据输入和输出(几乎什么都没有);Pandas的内存随着它必须容纳的行的大小而增加,这样它就可以进行实际的日期范围计算,同样是在whole columns.此外,没有显示,但对于10M行的Pandas测试,Pandas仅编写CSV就花了近2分钟-比CSV模块方法完成整个任务所用的时间还要长.

现在,对于我对Pandas 的所有贬低,解决方案是少得多的代码,而且很可能从一开始就没有错误.我在编写get_dt_range()时确实遇到了问题,我不得不花大约5分钟的时间来思考它到底需要做什么并进行调试.

您可以使用小的测试工具来查看我的设置,结果是here.

Python相关问答推荐

是否有使用纯霍夫曼编码的现代图像格式?

如何分割我的收件箱,以便连续的数字各自位于自己的收件箱中?

如何输入提示抽象方法属性并让mypy高兴?

为什么我的主页不会重定向到详细视图(Django)

Python在通过Inbox调用时给出不同的响应

Pandas 群内滚动总和

强制venv在bin而不是收件箱文件夹中创建虚拟环境

Python无法在已导入的目录中看到新模块

如何使用entry.bind(FocusIn,self.Method_calling)用于使用网格/列表创建的收件箱

无法使用equals_html从网址获取全文

Locust请求中的Python和参数

替换字符串中的多个重叠子字符串

如何根据参数推断对象的返回类型?

PMMLPipeline._ fit()需要2到3个位置参数,但给出了4个位置参数

切片包括面具的第一个实例在内的眼镜的最佳方法是什么?

Polars:用氨纶的其他部分替换氨纶的部分

Stacked bar chart from billrame

为什么NumPy的向量化计算在将向量存储为类属性时较慢?'

在两极中过滤

在Python 3中,如何让客户端打开一个套接字到服务器,发送一行JSON编码的数据,读回一行JSON编码的数据,然后继续?