让我们举个例子:

这是一个很好的事件监听模块!但这使用了线程,而且碰巧没有异步替代方案!

其中一位决定与他们长期以来最喜欢的异步API Trio混合使用.

人们可以想出使用Trio's Memory channels的 idea ,记住asyncio是如何做到这一点的.

"""
Watchdog + asyncio MRE
"""

import pathlib

import asyncio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent


class SomeFileCreationDetector(FileSystemEventHandler):

    def __init__(self, queue: asyncio.Queue):
        self._non_threadsafe_queue = queue
        self._loop = asyncio.get_running_loop()

    def put_safe(self, val):
        """Try putting in, if it doesn't, drop value to relieve backpressure"""
        try:
            self._non_threadsafe_queue.put_nowait(val)
        except asyncio.QueueFull:
            pass

    def on_created(self, event: FileSystemEvent) -> None:
        self._loop.call_soon_threadsafe(
            self.put_safe,
            event.src_path
        )


async def main():
    observer = Observer()
    actual_cwd = pathlib.Path(__file__).parent.absolute()

    # create queue
    queue = asyncio.Queue(1024)

    # prepare handler & schedule for file system event
    handler = SomeFileCreationDetector(queue)
    observer.schedule(handler, str(actual_cwd), recursive=False)

    try:
        observer.start()

        while str_path := await queue.get():
            # do some stuff with path...
            print(str_path)
    finally:
        # make sure we stop the observer
        observer.stop()
        observer.join()


if __name__ == '__main__':
    asyncio.run(main())

这确实很管用.

X:\hina.png
X:\black.png
...



然而,以类似的try :

"""
Watchdog + Trio Minimum reproducible example
for listening file creation event
"""

import pathlib

import trio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent


class SomeFileCreationDetector(FileSystemEventHandler):

    def __init__(self, send_ch: trio.MemorySendChannel):
        self.send_ch = send_ch

    def on_created(self, event: FileSystemEvent) -> None:
        self.send_ch.send_nowait(event.src_path)


async def main():
    observer = Observer()
    actual_cwd = pathlib.Path(__file__).parent.absolute()

    # create channel
    send_ch, recv_ch = trio.open_memory_channel(1024)

    async with recv_ch, send_ch:
        # prepare handler & schedule for file system event
        handler = SomeFileCreationDetector(send_ch)
        observer.schedule(handler, str(actual_cwd), recursive=False)

        try:
            observer.start()

            async for str_path in recv_ch:
                # do some stuff with path...
                print(str_path)
        finally:
            # make sure we stop the observer
            observer.stop()
            observer.join()


if __name__ == '__main__':
    trio.run(main)

...似乎是不可能的,因为另一个线程不是三人组的线程.

...
RuntimeError: this thread wasn't created by Trio, pass kwarg trio_token=...

在异步上下文之外产生的线程也不能工作(当没有Trio的线程时忽略非线程安全特性时),这是基于线程的事件侦听器库的情况.

"""
Trio's behavior comparison between threads spawn location

Demonstration of non-async context spawned thread failing to call trio's methods
"""


import time
import threading
import trio


def trio_thread(send_ch: trio.MemorySendChannel):
    trio.from_thread.run_sync(send_ch.send_nowait, "Trio Thread: Yay!")


def thread_threadsafe_ignored(called_from: str):
    # delay execution until channel is ready
    while SomeNameSpace.send_ch is None:
        time.sleep(0.1)

    try:
        SomeNameSpace.send_ch.send_nowait(f"Non-Trio Thread in {called_from} context: Yay!")
    except Exception:
        print(f"Non-Trio Thread in {called_from} context: ded")
        raise


class SomeNameSpace:
    send_ch = None


# start thread outside async context
thread_spawned_in_sync_context = threading.Thread(target=thread_threadsafe_ignored, args=["non-async"])
thread_spawned_in_sync_context.start()


async def async_context():
    send_ch, recv_ch = trio.open_memory_channel(1024)
    SomeNameSpace.send_ch = send_ch

    async with send_ch, send_ch, trio.open_nursery() as nursery:

        # schedule trio thread
        nursery.start_soon(trio.to_thread.run_sync, trio_thread, send_ch)

        # start thread inside async context
        thread_spawned_in_async_context = threading.Thread(target=thread_threadsafe_ignored, args=["async"])
        thread_spawned_in_async_context.start()

        async for val in recv_ch:
            print(val)


trio.run(async_context)
Non-Trio Thread in async context: Yay!
Trio Thread: Yay!
Non-Trio Thread in non-async context: ded
Exception in thread Thread-1 (thread_threadsafe_ignored):
Traceback (most recent call last):
...
  File "C:\Users\jupiterbjy\AppData\Local\Programs\Python\Python312\Lib\site-packages\trio\_core\_generated_run.py", line 126, in reschedule
    raise RuntimeError("must be called from async context") from None
RuntimeError: must be called from async context


当然,这可以通过使用线程安全的queue.Queue和轮询直到它不为空来解决,但是出于好奇心,我对这类问题可能是最像Trio的解决方案感兴趣.



编辑:

三重奏实现的完整代码应用Arthur Tacca‘S详细回答如下,为那些可能需要一个完整的工作玩具代码.

(仅有几行更改)

"""
Watchdog + Trio toy code for listening file creation event
"""

import pathlib

import trio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent


class SomeFileCreationDetector(FileSystemEventHandler):

    def __init__(self, send_ch: trio.MemorySendChannel):
        self.send_ch = send_ch
        self.trio_token = trio.lowlevel.current_trio_token()

    def on_created(self, event: FileSystemEvent) -> None:
        self.trio_token.run_sync_soon(self.send_ch.send_nowait, event.src_path)


async def main():
    observer = Observer()
    actual_cwd = pathlib.Path(__file__).parent.absolute()

    # create channel
    send_ch, recv_ch = trio.open_memory_channel(1024)

    async with recv_ch, send_ch:
        # prepare handler & schedule for file system event
        handler = SomeFileCreationDetector(send_ch)
        observer.schedule(handler, str(actual_cwd), recursive=False)

        try:
            observer.start()
            print("Observer started!")

            async for str_path in recv_ch:
                # do some stuff with path...
                print(str_path)

        except KeyboardInterrupt:
            print("Terminating!")

        finally:
            # make sure we stop the observer
            observer.stop()
            observer.join()
            print("Observer thread stopped!")


if __name__ == '__main__':
    trio.run(main)
❯ py X:\scratch.py
Observer started!
X:\hina.png
X:\Horizon Claire cover.png
Terminating!
Observer thread stopped!

推荐答案

使trio.from_thread.run()正常工作的线索在您发布的错误消息中:"运行错误:此线程不是由Trio创建的,传递kwarg trio_Token=...".如果您通过在Trio文档中搜索trio_token(实际上这个类是TrioToken)来跟踪面包屑,您就可以使该函数工作.docs for trio.from_thread.run() 中包含一个"定位TrioToken"部分,但令人恼火的是,它并没有直接指向您需要的trio.lowlevel.current_trio_token()函数.

直接使用trio.TrioToken.run_sync_soon()实际上更简单(这是trio.from_thread.run()在幕后使用的),因为函数MemorySendChannel.send_nowait()无论如何都是同步的.下面是一个简单的示例,它将各个部分组合在一起:

class MySimpleWorker:
    def __init__(self, *worker_args):
        self._worker_args = worker_args
        self._trio_token = trio.lowlevel.current_trio_token()
        self._send_channel, self.receive_channel = trio.open_memory_channel(max_buffer_size=math.inf)
        self._thread = threading.Thread(target=self._thread_main)
        self._thread.start()

    def _thread_main(self):
        worker_object = ThreadedWorker(*self._worker_args)
        while True:
            # TODO (if needed): exception handling
            next_val = worker_object.get_next()
            self._trio_token.run_sync_soon(self._send_channel.send_nowait, next_val)
        self._trio_token.run_sync_soon(self._send_channel.close)


async def use_simple_worker():
    async with trio.open_nursery() as n:
        # Can use n.start_soon to start unrelated tasks in the background
        w = MySimpleWorker()
        async for data in w.receive_channel:
            process(data)

在实际的应用程序中,您可能还希望在关闭异步应用程序时关闭工作线程中的对象,这会稍微麻烦一些.您需要在托儿所中产生另一个任务,该任务在线程被取消时关闭该线程,并等待其完成.因为不等待通道关闭就不可能不等待其中的值,所以我使用了单独的trio.Event来表示关闭完成.虽然不是绝对必要的,但在这个新 routine 中产生线程也是有意义的.下面是它会是什么样子:

class MyWorker:
    def __init__(self, *worker_args):
        self._trio_token = trio.lowlevel.current_trio_token()
        self._stopped_event = trio.Event()
        self._worker_object = ThreadedWorker(*worker_args)
        self._send_channel, self.receive_channel = trio.open_memory_channel(max_buffer_size=math.inf)
        self._thread = None

    async def run_background_thread(self):
        """Ensures that the background thread is closed when nursery is shutdown."""
        assert not self._thread is None
        self._thread = threading.Thread(target=self._thread_main)
        self._thread.start()
        try:
            await trio.sleep_forever()
        finally:
            self._worker_object.stop()  # Needs to be thread safe stop method
            with trio.CancelScope(shield=True):
                await self._stopped_event.wait()
                self._thread.join()

    def _thread_main(self):
        while True:
            # TODO (if needed): exception handling (could capture and inject into run_background_thread)
            next_val = self._worker_object.get_next()
            if next_val is None: # Or however else the object signifies shutdown
                self._trio_token.run_sync_soon(self._send_channel.close)
                self._trio_token.run_sync_soon(self._stopped_event.set)
            else:
                self._trio_token.run_sync_soon(self._send_channel.send_nowait, next_val)


async def use_worker():
    async with trio.open_nursery() as n:
        # Can use n.start_soon to start unrelated tasks in the background
        w = MyWorker()
        n.start_soon(w.run_background_thread)
        async for data in w.receive_channel:
            process(data)

Python-3.x相关问答推荐

错误2没有这样的文件或目录website_content.txt""

只有在Chrome尚未打开的情况下,打开Chrome后,PySimpleGUI窗口才会崩溃

安装grpcio时出现错误DeproationWarning:pkg_resource

Django 3.2/Django-cms 3.11:查找错误:型号帐户.客户用户未注册

为什么我的Selenium脚本在密码元素上失败?

无法使用xpath关闭selenium中的弹出窗口

Pandas groupby 然后 for each 组添加新行

在 Python 中实现 COM 接口

numpy.ndarray 与 pandas.DataFrame

'~'(波浪号)运算符在 Python 中的应用

全局捕获快速 api 中的异常

Python中的依赖倒置

如何调试垂死的 Jupyter Python3 内核?

Anaconda 中的依赖项和包冲突?

Python的max函数有多高效

如何模拟 open(...).write() 而不会出现没有这样的文件或目录错误?

如何将二进制(字符串)转换为浮点值?

如何在 Python 3.2 中退出?

有没有一种标准方法来确保 python 脚本将由 python2 而不是 python3 解释?

print(... sep='', '\t' ) 是什么意思?