I'm having a big problem trying to run this the way it's supposed to. I have a store function, which is supposed to simulate writing to a database. Currently I'm writing to a text file instead. To do that, I have a boolean which is used to introduce a delay.

import asyncio
import os
import time

async def store(text, delay: bool = False):
    mode = "a" if os.path.exists(DB_FILE) else "w"
    if delay:
        await asyncio.sleep(5)
    with open(DB_FILE, mode=mode) as f:
        timestamp = str(time.time())
        row = f"{text}\t{timestamp}\n"
        f.write(row)
        f.flush()
        print("done saving")

I then have a function that simulates writing several changes to that database, and then gets the latest. If there is a delay in the last store function, then it wont count as "latest".

texts = [("how", False), ("are", False), ("you", True)]

async def run_tests():
    initial_text = "hi"
    await store(initial_text)
    curr_text = initial_text
    last_finished = initial_text
    for text, delay in texts:
        await store(text, delay=delay)
        if not delay:  # if finished
            last_finished = curr_text
    assert last_finished == "are"


if __name__ == "__main__":
    asyncio.run(run_tests())

I tried running this with both asyncio.run() and loop.run_until_complete(), but neither is working. asyncio.sleep still behaves like time.sleep.

Ideally I want the line

await store(text, delay=delay)

to execute asynchronously while the rest of the function run_tests keeps going.

What am i doing wrong?

推荐答案

It is not asyncio.sleep that is blocking you here. It is that you await each coroutine store in your for-loop. That means literally you wait for it to execute completely before continuing with your iteration.

If you want to schedule each coroutine immediately, but not wait for it to finish, you can use asyncio.create_task on them. Just make sure to await all the tasks at some point.

Here is a simplified example:

from asyncio import create_task, gather, run, sleep


async def store(text: str, delay: bool = False) -> None:
    print(text)
    if delay:
        await sleep(5)


texts = [
    ("hi", False),
    ("how", False),
    ("are", False),
    ("you", True),
]


async def run_tests() -> None:
    last_finished: str | None = None
    tasks = []
    for text, delay in texts:
        tasks.append(create_task(store(text, delay=delay)))
        if not delay:
            last_finished = text
    await gather(*tasks)
    assert last_finished == "are"


if __name__ == "__main__":
    run(run_tests())

I used asyncio.gather here for convenience to await all the tasks at the end.

If you are on Python >=3.11, you can use a asyncio.TaskGroup context manager instead:

from asyncio import TaskGroup, run, sleep

...

async def run_tests() -> None:
    last_finished: str | None = None
    async with TaskGroup() as task_group:
        for text, delay in texts:
            task_group.create_task(store(text, delay=delay))
            if not delay:
                last_finished = text
    assert last_finished == "are"

But I have to say the variable naming you use is a bit confusing because last_finished suggests that the coroutine that was scheduled last (with the "are" text) is also the one that is done last. That is not true in general!

Once a coroutine is scheduled, there are no inherent guarantees that it will finish before or after some other scheduled coroutine, no matter which was scheduled first. To ensure that, you would have to make use of some sort of synchronization primitive.

Depending on the actual context, this may be especially important because you were talking about writing things to a database. But in this super simple example the order is likely to hold true.

Python相关问答推荐

customtkinter中使用的这个小部件的名称是什么

如何在超时的情况下同步运行Matplolib服务器端?该过程随机挂起

Twilio:CallInstance对象没有来自_的属性'

如何在Deliveryter笔记本中从同步上下文正确地安排和等待Delivercio代码中的结果?

如何在BeautifulSoup中链接Find()方法并处理无?

从webhook中的短代码(而不是电话号码)接收Twilio消息

将整组数组拆分为最小值与最大值之和的子数组

如何标记Spacy中不包含特定符号的单词?

从numpy数组和参数创建收件箱

当独立的网络调用不应该互相阻塞时,'

为什么NumPy的向量化计算在将向量存储为类属性时较慢?'

什么是最好的方法来切割一个相框到一个面具的第一个实例?

使用groupby方法移除公共子字符串

Plotly Dash Creating Interactive Graph下拉列表

CommandeError:模块numba没有属性generated_jit''''

使用Python从rotowire中抓取MLB每日阵容

python panda ExcelWriter切换动态公式到数组公式

为用户输入的整数查找根/幂整数对的Python练习

如何使用matplotlib查看并列直方图

使用SQLAlchemy从多线程Python应用程序在postgr中插入多行的最佳方法是什么?'