让我们举个例子:
这是一个很好的事件监听模块!但这使用了线程,而且碰巧没有异步替代方案!
其中一位决定与他们长期以来最喜欢的异步API Trio混合使用.
人们可以想出使用Trio's Memory channels的 idea ,记住asyncio
是如何做到这一点的.
"""
Watchdog + asyncio MRE
"""
import pathlib
import asyncio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent
class SomeFileCreationDetector(FileSystemEventHandler):
def __init__(self, queue: asyncio.Queue):
self._non_threadsafe_queue = queue
self._loop = asyncio.get_running_loop()
def put_safe(self, val):
"""Try putting in, if it doesn't, drop value to relieve backpressure"""
try:
self._non_threadsafe_queue.put_nowait(val)
except asyncio.QueueFull:
pass
def on_created(self, event: FileSystemEvent) -> None:
self._loop.call_soon_threadsafe(
self.put_safe,
event.src_path
)
async def main():
observer = Observer()
actual_cwd = pathlib.Path(__file__).parent.absolute()
# create queue
queue = asyncio.Queue(1024)
# prepare handler & schedule for file system event
handler = SomeFileCreationDetector(queue)
observer.schedule(handler, str(actual_cwd), recursive=False)
try:
observer.start()
while str_path := await queue.get():
# do some stuff with path...
print(str_path)
finally:
# make sure we stop the observer
observer.stop()
observer.join()
if __name__ == '__main__':
asyncio.run(main())
这确实很管用.
X:\hina.png
X:\black.png
...
然而,以类似的try :
"""
Watchdog + Trio Minimum reproducible example
for listening file creation event
"""
import pathlib
import trio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent
class SomeFileCreationDetector(FileSystemEventHandler):
def __init__(self, send_ch: trio.MemorySendChannel):
self.send_ch = send_ch
def on_created(self, event: FileSystemEvent) -> None:
self.send_ch.send_nowait(event.src_path)
async def main():
observer = Observer()
actual_cwd = pathlib.Path(__file__).parent.absolute()
# create channel
send_ch, recv_ch = trio.open_memory_channel(1024)
async with recv_ch, send_ch:
# prepare handler & schedule for file system event
handler = SomeFileCreationDetector(send_ch)
observer.schedule(handler, str(actual_cwd), recursive=False)
try:
observer.start()
async for str_path in recv_ch:
# do some stuff with path...
print(str_path)
finally:
# make sure we stop the observer
observer.stop()
observer.join()
if __name__ == '__main__':
trio.run(main)
...似乎是不可能的,因为另一个线程不是三人组的线程.
...
RuntimeError: this thread wasn't created by Trio, pass kwarg trio_token=...
在异步上下文之外产生的线程也不能工作(当没有Trio的线程时忽略非线程安全特性时),这是基于线程的事件侦听器库的情况.
"""
Trio's behavior comparison between threads spawn location
Demonstration of non-async context spawned thread failing to call trio's methods
"""
import time
import threading
import trio
def trio_thread(send_ch: trio.MemorySendChannel):
trio.from_thread.run_sync(send_ch.send_nowait, "Trio Thread: Yay!")
def thread_threadsafe_ignored(called_from: str):
# delay execution until channel is ready
while SomeNameSpace.send_ch is None:
time.sleep(0.1)
try:
SomeNameSpace.send_ch.send_nowait(f"Non-Trio Thread in {called_from} context: Yay!")
except Exception:
print(f"Non-Trio Thread in {called_from} context: ded")
raise
class SomeNameSpace:
send_ch = None
# start thread outside async context
thread_spawned_in_sync_context = threading.Thread(target=thread_threadsafe_ignored, args=["non-async"])
thread_spawned_in_sync_context.start()
async def async_context():
send_ch, recv_ch = trio.open_memory_channel(1024)
SomeNameSpace.send_ch = send_ch
async with send_ch, send_ch, trio.open_nursery() as nursery:
# schedule trio thread
nursery.start_soon(trio.to_thread.run_sync, trio_thread, send_ch)
# start thread inside async context
thread_spawned_in_async_context = threading.Thread(target=thread_threadsafe_ignored, args=["async"])
thread_spawned_in_async_context.start()
async for val in recv_ch:
print(val)
trio.run(async_context)
Non-Trio Thread in async context: Yay!
Trio Thread: Yay!
Non-Trio Thread in non-async context: ded
Exception in thread Thread-1 (thread_threadsafe_ignored):
Traceback (most recent call last):
...
File "C:\Users\jupiterbjy\AppData\Local\Programs\Python\Python312\Lib\site-packages\trio\_core\_generated_run.py", line 126, in reschedule
raise RuntimeError("must be called from async context") from None
RuntimeError: must be called from async context
当然,这可以通过使用线程安全的queue.Queue
和轮询直到它不为空来解决,但是出于好奇心,我对这类问题可能是最像Trio的解决方案感兴趣.
编辑:
三重奏实现的完整代码应用Arthur Tacca‘S详细回答如下,为那些可能需要一个完整的工作玩具代码.
(仅有几行更改)
"""
Watchdog + Trio toy code for listening file creation event
"""
import pathlib
import trio
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, FileSystemEvent
class SomeFileCreationDetector(FileSystemEventHandler):
def __init__(self, send_ch: trio.MemorySendChannel):
self.send_ch = send_ch
self.trio_token = trio.lowlevel.current_trio_token()
def on_created(self, event: FileSystemEvent) -> None:
self.trio_token.run_sync_soon(self.send_ch.send_nowait, event.src_path)
async def main():
observer = Observer()
actual_cwd = pathlib.Path(__file__).parent.absolute()
# create channel
send_ch, recv_ch = trio.open_memory_channel(1024)
async with recv_ch, send_ch:
# prepare handler & schedule for file system event
handler = SomeFileCreationDetector(send_ch)
observer.schedule(handler, str(actual_cwd), recursive=False)
try:
observer.start()
print("Observer started!")
async for str_path in recv_ch:
# do some stuff with path...
print(str_path)
except KeyboardInterrupt:
print("Terminating!")
finally:
# make sure we stop the observer
observer.stop()
observer.join()
print("Observer thread stopped!")
if __name__ == '__main__':
trio.run(main)
❯ py X:\scratch.py
Observer started!
X:\hina.png
X:\Horizon Claire cover.png
Terminating!
Observer thread stopped!