假设我们有很多链接要下载,每个链接可能需要不同的下载时间.我只允许使用最多3个连接进行下载.现在,我想确保使用asyncio有效地完成这项工作.

以下是我试图实现的目标:在任何时间点,try 确保至少有3次下载正在运行.

Connection 1: 1---------7---9---
Connection 2: 2---4----6-----
Connection 3: 3-----5---8-----

数字代表下载链接,连字符代表等待下载.

这是我现在使用的代码

from random import randint
import asyncio

count = 0


async def download(code, permit_download, no_concurrent, downloading_event):
    global count
    downloading_event.set()
    wait_time = randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))
    count -= 1
    if count < no_concurrent and not permit_download.is_set():
        permit_download.set()


async def main(loop):
    global count
    permit_download = asyncio.Event()
    permit_download.set()
    downloading_event = asyncio.Event()
    no_concurrent = 3
    i = 0
    while i < 9:
        if permit_download.is_set():
            count += 1
            if count >= no_concurrent:
                permit_download.clear()
            loop.create_task(download(i, permit_download, no_concurrent, downloading_event))
            await downloading_event.wait()  # To force context to switch to download function
            downloading_event.clear()
            i += 1
        else:
            await permit_download.wait()
    await asyncio.sleep(9)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

yields 如预期:

downloading 0 will take 2 second(s)
downloading 1 will take 3 second(s)
downloading 2 will take 1 second(s)
downloaded 2
downloading 3 will take 2 second(s)
downloaded 0
downloading 4 will take 3 second(s)
downloaded 1
downloaded 3
downloading 5 will take 2 second(s)
downloading 6 will take 2 second(s)
downloaded 5
downloaded 6
downloaded 4
downloading 7 will take 1 second(s)
downloading 8 will take 1 second(s)
downloaded 7
downloaded 8

但以下是我的问题:

  1. 目前,我只需等待9秒钟,以保持主功能运行,直到下载完成.在退出main功能之前,是否有一种有效的方法来等待最后一次下载完成?(我知道有asyncio.wait个,但我需要存储所有的任务引用,以便它工作)

  2. 什么样的图书馆能完成这种任务?我知道javascript有很多异步库,但是Python呢?

编辑:

推荐答案

在阅读本答案的其余部分之前,请注意,使用asyncio限制并行任务数量的惯用方法是使用asyncio.Semaphore,如Mikhail's answer所示,并在Andrei's answer中优雅地抽象.这个答案包含了一些可行的方法,但实现同样目标的方法要复杂一些.我留下答案是因为在某些情况下,这种方法可能比信号量有优势,特别是当要完成的工作非常大或无限大,并且您无法提前创建所有的协同路由时.在这种情况下,第二个(基于队列的)解决方案是,这个答案就是您想要的.但在大多数常规情况下,比如通过aiohttp并行下载,应该使用信号量.


你基本上需要固定大小的pool个下载任务.asyncio不附带预先制作的任务池,但创建一个任务池很容易:只需保留一组任务,不要让其增长超过限制.虽然问题表明你不愿意走这条路,但代码最终要优雅得多:

import asyncio, random

async def download(code):
    wait_time = random.randint(1, 3)
    print('downloading {} will take {} second(s)'.format(code, wait_time))
    await asyncio.sleep(wait_time)  # I/O, context will switch to main function
    print('downloaded {}'.format(code))

async def main(loop):
    no_concurrent = 3
    dltasks = set()
    i = 0
    while i < 9:
        if len(dltasks) >= no_concurrent:
            # Wait for some download to finish before adding a new one
            _done, dltasks = await asyncio.wait(
                dltasks, return_when=asyncio.FIRST_COMPLETED)
        dltasks.add(loop.create_task(download(i)))
        i += 1
    # Wait for the remaining downloads to finish
    await asyncio.wait(dltasks)

另一种方法是创建一个固定数量的协同程序进行下载,就像一个固定大小的线程池,并使用asyncio.Queue个线程为它们提供工作.这样就不需要手动限制下载次数,下载次数将自动受到调用download():

# download() defined as above

async def download_worker(q):
    while True:
        code = await q.get()
        await download(code)
        q.task_done()

async def main(loop):
    q = asyncio.Queue()
    workers = [loop.create_task(download_worker(q)) for _ in range(3)]
    i = 0
    while i < 9:
        await q.put(i)
        i += 1
    await q.join()  # wait for all tasks to be processed
    for worker in workers:
        worker.cancel()
    await asyncio.gather(*workers, return_exceptions=True)

至于你的另一个问题,显而易见的 Select 是aiohttp.

Python-3.x相关问答推荐

根据收件箱内部的值以行降序的特定顺序重新排序列

如何创建多个日志(log)文件

按小时和日期对Pandas 数据帧进行分组

在Python代码中包含NAN值时,以两个矩阵计算RMSE

我正在try 从 10*3 矩阵中删除随机值并将其变为 10*2 矩阵

平移数组

如何从形状汇总图中提取实际值

matplotlib.pyplot 多边形,具有相同的纵横比和紧凑的布局

通过附加/包含多个列表来创建 nDimensional 列表

在python中循环处理时并行写入文件

ImportError:没有名为资源的模块

Python:pprint的模块错误,打印没有错误

如何从另一个目录导入 python 包?

Python 3 变量名中接受哪些 Unicode 符号?

Python过滤器函数 - 单个结果

如何等待 create_task() 创建的任务完成?

混合全局/参数和名为top的函数的奇怪python行为

如何使用已打开并使用登录凭据登录的浏览器

计数大于Pandas groupby 中的值的项目

pdfminer python 3.5