我有一个工具,每当它运行时,它都会生成大量的文件(从几十万到几百万).所有这些文件都可以相互独立地读取.我需要分析它们并总结信息.

生成的文件的虚拟示例:

File1:
NAME=John AGE=25 ADDRESS=123 Fake St
NAME=Jane AGE=25 ADDRESS=234 Fake St

File2:
NAME=Dan AGE=30 ADDRESS=123 Fake St
NAME=Lisa AGE=30 ADDRESS=234 Fake St

摘要-统计地址在所有文件中出现的次数:

123 Fake St - 2
234 Fake St - 2

我想使用并行化来读取它们,所以我会想到multiprocessingasyncio(I/O密集型操作).我计划在将 for each 文件并行调用的单个单元/函数中执行以下操作:

  1. 打开文件,逐行打开
  2. 填充包含此文件专门提供的信息的唯一词典
  3. 关闭该文件

一旦我完成了所有文件的并行读取,并且每个文件有一个词典,我现在就可以循环每个词典并根据需要进行总结.

我认为我需要这个两步过程的原因是,我不能对该函数进行多个并行调用,直接汇总并写入一个通用汇总字典.这会把事情搞砸的.

但这意味着我将消耗大量内存(因为内存中保存了数十万到数百万个词典).

为了达到这一目标,怎样才能充分利用运行时和内存消耗这两个方面呢?

推荐答案

根据 comments ,这里的示例使用multiprocessing.Pool.

每个进程逐行读取一个文件,并将结果发送回主进程进行收集.

import re
import multiprocessing
from collections import Counter

pat = re.compile(r"([A-Z]{2,})=(.+?)(?=[A-Z]{2,}=|$)")


def process_file(filename):
    c = Counter()
    with open(filename, "r") as f_in:
        for line in f_in:
            d = dict(pat.findall(line))
            if "ADDRESS" in d:
                c[d["ADDRESS"]] += 1

    # send partial result back to main process:
    return c


if __name__ == "__main__":

    # you can get filelist for example from `glob` module:
    files = ["file1.txt", "file2.txt"]

    final_counter = Counter()

    with multiprocessing.Pool(processes=4) as pool:
        # iterate over files and update the final Counter:
        for result in pool.imap_unordered(process_file, files):
            final_counter.update(result)

    # print final Counter:
    for k, v in final_counter.items():
        print("{:<20} {}".format(k, v))

打印:

123 Fake St          2
234 Fake St          2

注:您可以使用tqdm模块来获得漂亮的进度条,例如:

...
from tqdm import tqdm
...

for result in pool.imap_unordered(process_file, tqdm(files)):

...

Python相关问答推荐

在Pandas DataFrame操作中用链接替换'方法的更有效方法

如何过滤包含2个指定子字符串的收件箱列名?

如何在solve()之后获得症状上的等式的值

如果条件不满足,我如何获得掩码的第一个索引并获得None?

NumPy中条件嵌套for循环的向量化

形状弃用警告与组合多边形和多边形如何解决

Polars将相同的自定义函数应用于组中的多个列,

如何防止Pandas将索引标为周期?

Python Pandas—时间序列—时间戳缺失时间精确在00:00

为什么常规操作不以其就地对应操作为基础?

如何将数据帧中的timedelta转换为datetime

Django.core.exceptions.SynchronousOnlyOperation您不能从异步上下文中调用它-请使用线程或SYNC_TO_ASYNC

为什么我只用exec()函数运行了一次文件,而Python却运行了两次?

使用pythonminidom过滤XML文件

如何在不不断遇到ChromeDriver版本错误的情况下使用Selify?

如何使用count()获取特定日期之间的项目

从`end_date`回溯,如何计算以极为单位的滚动统计量?

PANDA:如何将多选列转换为索引/列

如何让doctest在mkdocs的标记代码块中运行示例?

try 理解PyTorch运行错误:try 再次向后遍历图表