假设我有以下代码:

import asyncio
import threading

queue = asyncio.Queue()

def threaded():
    import time
    while True:
        time.sleep(2)
        queue.put_nowait(time.time())
        print(queue.qsize())

@asyncio.coroutine
def async():
    while True:
        time = yield from queue.get()
        print(time)

loop = asyncio.get_event_loop()
asyncio.Task(async())
threading.Thread(target=threaded).start()
loop.run_forever()

这段代码的问题是,async协同程序中的循环永远不会完成第一次迭代,而queue的大小却在增加.

Why is this happening this way and what can I do to fix it?

我无法摆脱单独的线程,因为在我的实际代码中,我使用单独的线程与串行设备通信,而我还没有找到使用asyncio来实现这一点的方法.

推荐答案

asyncio.Queue is not thread-safe,所以不能从多个线程直接使用它.相反,您可以使用janus,这是一个提供线程感知asyncio队列的第三方库:

import asyncio
import threading
import janus

def threaded(squeue):
    import time
    while True:
        time.sleep(2)
        squeue.put_nowait(time.time())
        print(squeue.qsize())

@asyncio.coroutine
def async(aqueue):
    while True:
        time = yield from aqueue.get()
        print(time)

loop = asyncio.get_event_loop()
queue = janus.Queue(loop=loop)
asyncio.Task(asyncio.ensure_future(queue.async_q))
threading.Thread(target=threaded, args=(queue.sync_q,)).start()
loop.run_forever()

还有aioprocessing个(完全公开:我写的),它也提供了进程安全(作为一个副作用,线程安全)队列,但是如果你不try 使用multiprocessing,那就太过分了.

Edit

正如在其他答案中指出的,对于简单的用例,您也可以使用loop.call_soon_threadsafe来添加到队列中.

Python-3.x相关问答推荐

如何创建多个日志(log)文件

如何获得给定列表中所有可能的元素组合?

如何从拼图分区数据集中读取数据到Polar

通过 Pandas 通过用户定义函数重命名数据框列

如何通过 python 使用 auth no priv 获取 SNMPv3?

拆分列表的元素并将拆分后的元素包含到列表中

在字符串中查找正则表达式的所有模式

使用 pandas 进行多类分类的总体准确度

如何在 django 中没有循环的情况下获得前键的前键?

魔术8球txt文件列表

Python3 AttributeError:列表对象没有属性清除

理解 Keras 的 ImageDataGenerator 类中的 `width_shift_range` 和 `height_shift_range` 参数

python - 错误 R10(启动超时)-> Web 进程未能在启动后 60 秒内绑定到 $PORT

如何在元素列表中找到最大的数字,可能是非唯一的?

如何将 cv2.imread 匹配到 keras image.img_load 输出

python 3的蓝牙库

如何使用 Python 订阅 Websocket API 通道?

Selenium Python - 处理没有这样的元素异常

如何将文档字符串放在 Enums 上?

使用 Python 3 读取 CSV 文件