我正在try 编写一个迭代器,它在等待IO绑定任务的同时,继续进行迭代的下一步.大致演示我在代码中要做的事情

for i in iterable:
    await io_bound_task()   # move on to next step in iteration
    # do more stuff when task is complete

我最初试着用一个简单的for循环运行,用sleep模拟一个IO绑定的任务

import asyncio
import random


async def main() -> None:
    for i in range(3):
        print(f"starting task {i}")
        result = await io_bound_task(i)
        print(f"finished task {result}")


async def io_bound_task(i: int) -> int:
    await asyncio.sleep(random.random())
    return i


asyncio.run(main())

在这里,代码同步运行并输出

starting task 0
finished task 0
starting task 1
finished task 1
starting task 2
finished task 2

我想这是因为for循环阻塞了.所以我认为异步for循环是继续的方式?所以我try 使用异步迭代器

from __future__ import annotations

import asyncio
import random


class AsyncIterator:
    def __init__(self, max_value: int) -> None:
        self.max_value = max_value
        self.count = 0

    def __aiter__(self) -> AsyncIterator:
        return self

    async def __anext__(self) -> int:
        if self.count == self.max_value:
            raise StopAsyncIteration
        self.count += 1
        return self.count


async def main() -> None:
    async for i in AsyncIterator(3):
        print(f"starting task {i}")
        result = await io_bound_task(i)
        print(f"finished task {result}")


async def io_bound_task(i: int) -> int:
    await asyncio.sleep(random.random())
    return i


asyncio.run(main())

但这似乎也是同步运行的,并导致输出

starting task 1
finished task 1
starting task 2
finished task 2
starting task 3
finished task 3

每一次.所以我认为异步迭代器没有做我认为它会做的事情?在这一点上,我被卡住了.我对异步迭代器的理解有问题吗?有人能给我一些建议,告诉我如何实现我的目标吗?

我刚接触async,所以如果我做了一些愚蠢的事情,我深表歉意.感谢您的帮助.谢谢

如果这是一个相关的细节,我是python 3.8.10.

推荐答案

你正在寻找的东西叫做任务,可以使用asyncio.create_task函数创建.您try 的所有方法都涉及到等待Corroutine io_bound_task(i),而wait的意思是"在继续之前等待这个过程完成".如果您将协同程序包装在一个任务中,那么它将在后台运行,而不必等待它完成后再继续.

以下是使用任务的代码版本:

import asyncio
import random


async def main() -> None:
    tasks = []
    for i in range(3):
        print(f"starting task {i}")
        tasks.append(asyncio.create_task(io_bound_task(i)))

    for task in tasks:
        result = await task
        print(f"finished task {result}")


async def io_bound_task(i: int) -> int:
    await asyncio.sleep(random.random())
    return i


asyncio.run(main())

输出:

starting task 0
starting task 1
starting task 2
finished task 0
finished task 1
finished task 2

您还可以使用asyncio.gather(如果继续之前需要所有结果)或asyncio.wait来等待多个任务,而不是循环.例如,如果任务2在任务0之前完成,并且您不想等待任务0,则可以执行以下操作:

async def main() -> None:
    pending = []
    for i in range(3):
        print(f"starting task {i}")
        pending.append(asyncio.create_task(io_bound_task(i)))

    while pending:
        done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
        for task in done:
            result = await task
            print(f"finished task {result}")

Python相关问答推荐

使用GEKKO在简单DTE系统中进行一致初始化

如何在Python中使用时区夏令时获取任何给定本地时间的纪元值?

如何通过多2多字段过滤查询集

在函数内部使用eval(),将函数的输入作为字符串的一部分

在Python中对分层父/子列表进行排序

删除所有列值,但判断是否存在任何二元组

将图像拖到另一个图像

如何让Flask 中的请求标签发挥作用

如何使用根据其他值相似的列从列表中获取的中间值填充空NaN数据

Python逻辑操作作为Pandas中的条件

如何启动下载并在不击中磁盘的情况下呈现响应?

如何在Python中获取`Genericums`超级类型?

如何使用使用来自其他列的值的公式更新一个rabrame列?

如何创建引用列表并分配值的Systemrame列

并行编程:同步进程

在Python中控制列表中的数据步长

裁剪数字.nd数组引发-ValueError:无法将空图像写入JPEG

如何用FFT确定频变幅值

Python OPCUA,modbus通信代码运行3小时后出现RuntimeError

为什么我只用exec()函数运行了一次文件,而Python却运行了两次?