请考虑以下代码:

import asyncio

sem: asyncio.Semaphore = asyncio.Semaphore(2)


async def async_run() -> None:
    async def async_task() -> None:
        async with sem:
            await asyncio.sleep(1)
            print('spam')

    await asyncio.gather(*[async_task() for _ in range(3)])


asyncio.run(async_run())

运行在Python3.10.6(Fedora 35)上,它的工作原理就像教科书中的一样.

然而,当我使用Python3.8.10(Ubuntu 20.04)运行它时,我得到以下错误:

Traceback (most recent call last):
  File "main.py", line 21, in <module>
    asyncio.run(async_run())
  File "/usr/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "main.py", line 18, in async_run
    print(future_entry_index, await future_entry)
  File "/usr/lib/python3.8/asyncio/tasks.py", line 619, in _wait_for_one
    return f.result()  # May raise f.exception().
  File "main.py", line 11, in async_task
    async with sem:
  File "/usr/lib/python3.8/asyncio/locks.py", line 97, in __aenter__
    await self.acquire()
  File "/usr/lib/python3.8/asyncio/locks.py", line 496, in acquire
    await fut
RuntimeError: Task <Task pending name='Task-4' coro=<async_run.<locals>.async_task() running at main.py:11> cb=[as_completed.<locals>._on_completion() at /usr/lib/python3.8/asyncio/tasks.py:606]> got Future <Future pending> attached to a different loop

导致错误的是async with sem行和Semaphore对象.如果没有它,一切都可以正常工作,但不是我想要的方式.

我不能在任何地方提供loop参数,因为即使在允许的地方,它也从Python3.8开始就被弃用了,并在Python3.10中被删除了.

如何让代码在Python3.8上运行?

Update.瞥一眼asyncio代码就会发现,这两个版本的Python有很大的不同.然而,信号量不能只在3.8中被打破,对吗?

推荐答案

正如在this answer中所讨论的,Python3.10之前的信号量基于当前运行的循环将其循环设置为__init__,而asyncio.run则开始一个新的循环.因此,当您try 和您的CORO async.run时,您使用的循环与您的Semaphore定义的循环不同,其正确的错误消息实际上是 got Future <Future pending> attached to a different loop.

幸运的是,让代码在两个版本上运行并不太难:

Solution 1

不要创建新的循环,使用现有的循环运行您的函数:

import asyncio

sem: asyncio.Semaphore = asyncio.Semaphore(value=2)


async def async_task() -> None:
    async with sem:
        await asyncio.sleep(1)
        print(f"spam {sem._value}")

async def async_run() -> None:
    await asyncio.gather(*[async_task() for _ in range(3)])

loop = asyncio.get_event_loop()
loop.run_until_complete(async_run())
loop.close()

Solution 2

在由asyncio.run创建的循环中初始化信号量对象:

import asyncio

async def async_task2(sem) -> None:
    async with sem:
        await asyncio.sleep(1)
        print(f"spam {sem._value}")

async def async_run2() -> None:
    sem = asyncio.Semaphore(2)
    await asyncio.gather(*[async_task2(sem) for _ in range(3)])


asyncio.run(async_run2())

这两个代码片段都可以在python3.8和python3.10上运行.大概就是因为这样的怪异,他们才有了removed the loop parameter from most of asyncio in python 3.10.

比较信号量从3.8到3.10的__init__:

Python 3.8


class Semaphore(_ContextManagerMixin):
    """A Semaphore implementation.
    A semaphore manages an internal counter which is decremented by each
    acquire() call and incremented by each release() call. The counter
    can never go below zero; when acquire() finds that it is zero, it blocks,
    waiting until some other thread calls release().
    Semaphores also support the context management protocol.
    The optional argument gives the initial value for the internal
    counter; it defaults to 1. If the value given is less than 0,
    ValueError is raised.
    """

    def __init__(self, value=1, *, loop=None):
        if value < 0:
            raise ValueError("Semaphore initial value must be >= 0")
        self._value = value
        self._waiters = collections.deque()
        if loop is None:
            self._loop = events.get_event_loop()
        else:
            self._loop = loop
            warnings.warn("The loop argument is deprecated since Python 3.8, "
                          "and scheduled for removal in Python 3.10.",
                          DeprecationWarning, stacklevel=2)

Python 3.10:

class Semaphore(_ContextManagerMixin, mixins._LoopBoundMixin):
    """A Semaphore implementation.
    A semaphore manages an internal counter which is decremented by each
    acquire() call and incremented by each release() call. The counter
    can never go below zero; when acquire() finds that it is zero, it blocks,
    waiting until some other thread calls release().
    Semaphores also support the context management protocol.
    The optional argument gives the initial value for the internal
    counter; it defaults to 1. If the value given is less than 0,
    ValueError is raised.
    """

    def __init__(self, value=1, *, loop=mixins._marker):
        super().__init__(loop=loop)
        if value < 0:
            raise ValueError("Semaphore initial value must be >= 0")
        self._value = value
        self._waiters = collections.deque()
        self._wakeup_scheduled = False

Python相关问答推荐

七段显示不完整

如何在where或过滤器方法中使用SQLAlchemy hybrid_Property?

这些变量是否相等,因为它们引用相同的实例,尽管它们看起来应该具有不同的值?

Python(Polars):使用之前的变量确定当前解决方案的Vector化操作

如何在Pandas 中存储二进制数?

如何从格式为note:{neighbor:weight}的字典中构建networkx图?

遵循轮廓中对象方向的计算线

通过交换 node 对链接列表进行 Select 排序

计算相同形状的两个张量的SSE损失

如何使用Google Gemini API为单个提示生成多个响应?

我必须将Sigmoid函数与r2值的两种类型的数据集(每种6个数据集)进行匹配,然后绘制匹配函数的求导.我会犯错

Python上的Instagram API:缺少client_id参数"

带条件计算最小值

处理(潜在)不断增长的任务队列的并行/并行方法

在Pandas DataFrame操作中用链接替换'方法的更有效方法

2D空间中的反旋算法

Telethon加入私有频道

通过pandas向每个非空单元格添加子字符串

寻找Regex模式返回与我当前函数类似的结果

在pandas/python中计数嵌套类别