我正在用Python 3.4.2学习asyncio,我用它在IPC总线上连续监听,而gbulb在DBU上监听.

我创建了一个函数listen_to_ipc_channel_layer,它持续监听IPC通道上的传入消息,并将消息传递给message_handler.

我也在听SIGTERM和SIGINT.当我向运行您在底部找到的代码的python进程发送SIGTERM时,脚本应该会优雅地终止.

我遇到的问题是以下警告:

got signal 15: exit
Task was destroyed but it is pending!
task: <Task pending coro=<listen_to_ipc_channel_layer() running at /opt/mainloop-test.py:23> wait_for=<Future cancelled>>

Process finished with exit code 0

…使用以下代码:

import asyncio
import gbulb
import signal
import asgi_ipc as asgi

def main():
    asyncio.async(listen_to_ipc_channel_layer())
    loop = asyncio.get_event_loop()

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, ask_exit)

    # Start listening on the Linux IPC bus for incoming messages
    loop.run_forever()
    loop.close()

@asyncio.coroutine
def listen_to_ipc_channel_layer():
    """Listens to the Linux IPC bus for messages"""
    while True:
        message_handler(message=channel_layer.receive(["my_channel"]))
        try:
            yield from asyncio.sleep(0.1)
        except asyncio.CancelledError:
            break

def ask_exit():
    loop = asyncio.get_event_loop()
    for task in asyncio.Task.all_tasks():
        task.cancel()
    loop.stop()


if __name__ == "__main__":
    gbulb.install()
    # Connect to the IPC bus
    channel_layer = asgi.IPCChannelLayer(prefix="my_channel")
    main()

我仍然对asyncio知之甚少,但我想我知道发生了什么.在等待yield from asyncio.sleep(0.1)的过程中,信号处理器捕捉到SIGTERM,并在该过程中调用task.cancel().

这不应该触发while True:循环中的CancelledError吗?(因为不是,但这就是我对"Calling cancel() will throw a CancelledError to the wrapped coroutine"的理解).

最终调用loop.stop(),它停止循环,而不等待yield from asyncio.sleep(0.1)返回结果,甚至不等待整个协程listen_to_ipc_channel_layer.

如果我错了,请纠正我.

我认为我需要做的唯一一件事就是让我的程序等待yield from asyncio.sleep(0.1)返回结果and/or协同程序,以打破while循环并完成.

我相信我混淆了很多事情.请帮我弄清楚这些事情,这样我就可以想出如何在没有警告的情况下优雅地关闭事件循环.

推荐答案

问题在于取消任务后立即关闭循环.作为cancel() docs state

"这将安排通过事件循环将取消的错误抛出到the next cycle上包装的协同程序中."

以这段代码为例:

import asyncio
import signal


async def pending_doom():
    await asyncio.sleep(2)
    print(">> Cancelling tasks now")
    for task in asyncio.Task.all_tasks():
        task.cancel()

    print(">> Done cancelling tasks")
    asyncio.get_event_loop().stop()


def ask_exit():
    for task in asyncio.Task.all_tasks():
        task.cancel()


async def looping_coro():
    print("Executing coroutine")
    while True:
        try:
            await asyncio.sleep(0.25)
        except asyncio.CancelledError:
            print("Got CancelledError")
            break

        print("Done waiting")

    print("Done executing coroutine")
    asyncio.get_event_loop().stop()


def main():
    asyncio.async(pending_doom())
    asyncio.async(looping_coro())

    loop = asyncio.get_event_loop()
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, ask_exit)

    loop.run_forever()

    # I had to manually remove the handlers to
    # avoid an exception on BaseEventLoop.__del__
    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.remove_signal_handler(sig)


if __name__ == '__main__':
    main()

注意,ask_exit取消任务,但不执行循环,在下一个循环中,looping_coro()停止循环.如果取消,则输出为:

Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
^CGot CancelledError
Done executing coroutine

请注意pending_doom如何取消和停止循环immediately after.如果你让它一直运行到pending_doom个协同程序从睡眠中醒来,你会看到同样的警告:

Executing coroutine
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
Done waiting
>> Cancelling tasks now
>> Done cancelling tasks
Task was destroyed but it is pending!
task: <Task pending coro=<looping_coro() running at canceling_coroutines.py:24> wait_for=<Future cancelled>>

Python-3.x相关问答推荐

PythonPandas 创建一个列并添加到DataFrame

按长度和字母数字对Pandas 数据帧列进行排序

如何将项目添加到Python中具有固定大小的列表列表中

SQL Server 2022和Python3.10脚本错误

使用Python按照其组/ID的紧密值的递增顺序映射数据框的两列

数据框中从每个组/ID的底部删除行

如何查找以开头并替换的字符串

Sunburst 折线图可视化

在 Python 中实现 COM 接口

Python3:是否可以将变量用作函数调用的一部分

如何通过 GitLab V4 api 列出 gitlab 项目中的所有项目变量

如何使用 regex sub 根据列表中的变量替换字符

Python 3 `str.__getitem__` 的计算复杂度是多少?

使用 Python 解析 JSON 嵌套字典

如何注释一个以另一个函数作为参数的函数?

virtualenv virtualenvwrapper virtualenv:错误:无法识别的参数:--no-site-packages

TypeError:列表索引必须是整数或切片,而不是列表

为什么在 Python 3 中实例的 __dict__ 的大小要小得多?

带有 Emacs 的 Python 3

如何使用异步 for 循环遍历列表?