我已经试着让我的代码运行了很多天,

我有一个文本文件,用9GB的"latin-1"编码->737 022 387行,每行包含一个字符串.

我想读取每一行并在http PUT请求中发送它们,该请求等待响应,如果响应是200或400,则返回TRUE或FALSE

首先,我用3秒钟的睡眠模拟我的PUT请求.

这段代码将我的字符串拆分为字符,我不知道为什么...

from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    peon =  open(r'D:\txtFile',encoding="latin-1")
    for line in peon:
        res = pool.map(process_line,line )
        print(res)

这个give error:TypeError:process_line()接受1个位置参数,但给出了17个

import multiprocessing
from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    with open(r"d:\txtFile",encoding="latin-1") as file:
        res = pool.apply(process_line,file.readline() )
        print(res)

那就是:让电脑崩溃

from multiprocessing import Pool
from time import sleep


def process_line(line):
   sleep(3)
   print(line)
   return True

if __name__ == "__main__":
    pool = Pool(2)
    peon =  open(r'D:\txtFile',encoding="latin-1")
    for line in peon:
        res = pool.map(process_line,peon )
        print(res)

推荐答案

尽管这个问题似乎不现实.射击737022387请求!计算一下从一台计算机上需要多少个月!!

不过,执行此任务的更好方法是在单独的线程中逐行读取文件并插入到队列中.然后对队列进行多处理.

Solution 1:

from multiprocessing import Queue, Process
from threading import Thread
from time import sleep

urls_queue = Queue()
max_process = 4

def read_urls():
    with open('urls_file.txt', 'r') as f:
        for url in f:
            urls_queue.put(url.strip())
            print('put url: {}'.format(url.strip()))

    # put DONE to tell send_request_processor to exit
    for i in range(max_process):
        urls_queue.put("DONE")


def send_request(url):
    print('send request: {}'.format(url))
    sleep(1)
    print('recv response: {}'.format(url))


def send_request_processor():
    print('start send request processor')
    while True:
        url = urls_queue.get()
        if url == "DONE":
            break
        else:
            send_request(url)


def main():
    file_reader_thread = Thread(target=read_urls)
    file_reader_thread.start()

    procs = []
    for i in range(max_process):
        p = Process(target=send_request_processor)
        procs.append(p)
        p.start()

    for p in procs:
        p.join()

    print('all done')
    # wait for all tasks in the queue
    file_reader_thread.join()


if __name__ == '__main__':
    main()

演示:https://onlinegdb.com/Elfo5bGFz

Solution 2:

您可以使用tornado个异步网络库

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

q = Queue(maxsize=2)

async def consumer():
    async for item in q:
        try:
            print('Doing work on %s' % item)
            await gen.sleep(0.01)
        finally:
            q.task_done()

async def producer():
    with open('urls_file.txt', 'r') as f:
        for url in f:
            await q.put(url)
            print('Put %s' % item)

async def main():
    # Start consumer without waiting (since it never finishes).
    IOLoop.current().spawn_callback(consumer)
    await producer()     # Wait for producer to put all tasks.
    await q.join()       # Wait for consumer to finish all tasks.
    print('Done')
    # producer and consumer can run in parallel

IOLoop.current().run_sync(main)

Python相关问答推荐

如何以实现以下所述的预期行为的方式添加两只Pandas pyramme

将C struct 的指针传递给Python中的ioctel

如何获取Django REST框架中序列化器内部的外卡属性?

如何使用PyTest根据self 模拟具有副作用的属性

由于瓶颈,Python代码执行太慢-寻求性能优化

如何使用entry.bind(FocusIn,self.Method_calling)用于使用网格/列表创建的收件箱

LAB中的增强数组

计算相同形状的两个张量的SSE损失

仿制药的类型铸造

Python 约束无法解决n皇后之谜

如果值不存在,列表理解返回列表

处理带有间隙(空)的duckDB上的重复副本并有效填充它们

在线条上绘制表面

Asyncio:如何从子进程中读取stdout?

多指标不同顺序串联大Pandas 模型

如何并行化/加速并行numba代码?

如何在Python中获取`Genericums`超级类型?

将scipy. sparse矩阵直接保存为常规txt文件

Polars将相同的自定义函数应用于组中的多个列,

numpy.unique如何消除重复列?