我有一个需要处理的项目列表,我希望能够并行处理它们以提高效率.但在处理一个项目的过程中,我可能会发现更多项目需要添加到列表中才能处理.

我查看过multiprocessingconcurrent库,但找不到此类队列的功能,可以在运行时或传递到池后进行修改.有没有满足我愿望的解决方案?

这里有一些代码演示了我想要的东西.

i = 0
jobs_to_be_processed = [f'job{(i:=i+1)}' for _ in range(5)]

def process_job(job):
    if int(job[-1]) % 3 == 0:
        jobs_to_be_processed.append(f'new job{(i:=i+1)}')
    # do process job ...
    pass

# Add jobs to a pool that allows `jobs_to_be_processed`
# to have jobs added while processing
pool = AsyncJobPool(jobs_to_be_processed)
pool.start()
pool.join()

推荐答案

IIUC您可以在放置要处理的物品时使用asyncio.Queue,例如:

import asyncio


async def worker(queue: asyncio.Queue):
    while True:
        item = await queue.get()

        if item == 'spawn more jobs':
            print('Spawning more jobs!')
            queue.put_nowait('other1')
            queue.put_nowait('other2')
        else:
            await asyncio.sleep(1)
            print(f'Processed job item: {item}')

        queue.task_done()

async def main():
    q = asyncio.Queue()

    # we have pool of 2 workers that work concurrently:
    workers = [asyncio.create_task(worker(q)) for i in range(2)]

    # initially we have 4 job items (one item spawns 2 more jobs):
    for job in ['job1', 'job2', 'spawn more jobs', 'job3']:
        q.put_nowait(job)

    await q.join()

    for w in workers:
        w.cancel()

asyncio.run(main())

这打印:

Processed job item: job1
Spawning more jobs!
Processed job item: job2
Processed job item: job3
Processed job item: other1
Processed job item: other2

Python相关问答推荐

Python -根据另一个数据框中的列编辑和替换数据框中的列值

如何在Deliveryter笔记本中从同步上下文正确地安排和等待Delivercio代码中的结果?

返回nxon矩阵的diag元素,而不使用for循环

如何根据参数推断对象的返回类型?

try 将一行连接到Tensorflow中的矩阵

不允许访问非IPM文件夹

Plotly Dash Creating Interactive Graph下拉列表

如何合并两个列表,并获得每个索引值最高的列表名称?

如何使用Numpy. stracards重新编写滚动和?

在极中解析带有数字和SI前缀的字符串

Django Table—如果项目是唯一的,则单行

使用嵌套对象字段的Qdrant过滤

Pandas在rame中在组内洗牌行,保持相对组的顺序不变,

使用SQLAlchemy从多线程Python应用程序在postgr中插入多行的最佳方法是什么?'

提取最内层嵌套链接

Regex用于匹配Python中逗号分隔的AWS区域

为什么我只用exec()函数运行了一次文件,而Python却运行了两次?

Django抛出重复的键值违反唯一约束错误

将数据从一个单元格保存到Jupyter笔记本中的下一个单元格

如何在不不断遇到ChromeDriver版本错误的情况下使用Selify?