我有一个很大的问题,试图以它应该是的方式来运作. 我有一个store函数,它应该模拟对数据库的写入.现在我写的是一个文本文件.为此,我有一个用于引入延迟的布尔值.

import asyncio
import os
import time

async def store(text, delay: bool = False):
    mode = "a" if os.path.exists(DB_FILE) else "w"
    if delay:
        await asyncio.sleep(5)
    with open(DB_FILE, mode=mode) as f:
        timestamp = str(time.time())
        row = f"{text}\t{timestamp}\n"
        f.write(row)
        f.flush()
        print("done saving")

然后,我有一个函数来模拟向该数据库写入几个更改,然后获取最新的更改.如果上一个存储函数中有延迟,则不会被算作"最新".

texts = [("how", False), ("are", False), ("you", True)]

async def run_tests():
    initial_text = "hi"
    await store(initial_text)
    curr_text = initial_text
    last_finished = initial_text
    for text, delay in texts:
        await store(text, delay=delay)
        if not delay:  # if finished
            last_finished = curr_text
    assert last_finished == "are"


if __name__ == "__main__":
    asyncio.run(run_tests())

我试着用asyncio.run()loop.run_until_complete()运行这个程序,但都不起作用.asyncio.sleep的行为仍然像time.sleep.

理想情况下,我想要这条线.

await store(text, delay=delay)

以在功能run_tests的其余部分继续运行的同时异步执行.

我做错了什么?

推荐答案

在这里阻挡你的不是asyncio.sleep.那就是你们在你们的for循环中各await个协程store.这意味着literally在继续迭代之前,您要等待它完全执行.

如果您想安排每个协程immediately,而不是等待到finish,您可以在它们上使用asyncio.create_task.只要确保在some点等待所有任务即可.

以下是一个简化的示例:

from asyncio import create_task, gather, run, sleep


async def store(text: str, delay: bool = False) -> None:
    print(text)
    if delay:
        await sleep(5)


texts = [
    ("hi", False),
    ("how", False),
    ("are", False),
    ("you", True),
]


async def run_tests() -> None:
    last_finished: str | None = None
    tasks = []
    for text, delay in texts:
        tasks.append(create_task(store(text, delay=delay)))
        if not delay:
            last_finished = text
    await gather(*tasks)
    assert last_finished == "are"


if __name__ == "__main__":
    run(run_tests())

为了方便起见,我在这里用了asyncio.gather来等待最后的所有任务.

如果您使用的是Python>=3.11,则可以改用asyncio.TaskGroup上下文管理器:

from asyncio import TaskGroup, run, sleep

...

async def run_tests() -> None:
    last_finished: str | None = None
    async with TaskGroup() as task_group:
        for text, delay in texts:
            task_group.create_task(store(text, delay=delay))
            if not delay:
                last_finished = text
    assert last_finished == "are"

但我不得不说,您使用的变量命名有点令人困惑,因为last_finished表明最后是scheduled的协程(与"are"文本一起)也是最后done的协程.大体是not true个!

一旦调度了协同 routine ,就不能保证它将在其他调度的协同 routine 之前或之后完成,无论先调度的是哪一个.为了确保这一点,你必须使用某种形式的synchronization primitive.

根据实际环境,这可能特别重要,因为您谈论的是将内容写入数据库.但在这个超级简单的例子中,顺序很可能是正确的.

Python相关问答推荐

如何在Power Query中按名称和时间总和进行分组

从Python调用GMP C函数时的分段错误和内存泄漏

Python中MongoDB的BSON时间戳

将HLS纳入媒体包

如何根据另一列值用字典中的值替换列值

如何在BeautifulSoup中链接Find()方法并处理无?

更改matplotlib彩色条的字体并勾选标签?

Python多处理:当我在一个巨大的pandas数据框架上启动许多进程时,程序就会陷入困境

点到面的Y距离

如何在箱形图中添加绘制线的传奇?

重新匹配{ }中包含的文本,其中文本可能包含{{var}

try 在树叶 map 上应用覆盖磁贴

django禁止直接分配到多对多集合的前端.使用user.set()

如何请求使用Python将文件下载到带有登录名的门户网站?

优化器的运行顺序影响PyTorch中的预测

NumPy中条件嵌套for循环的向量化

Django RawSQL注释字段

Python Pandas获取层次路径直到顶层管理

基于行条件计算(pandas)

Python Pandas—时间序列—时间戳缺失时间精确在00:00