我在想,我的 idea 是否可以实现:一个完全支持异步 rust 蚀的dynamic library plugin manager.

实施分为三个主要部分(github repo):

  1. my-interfaceasync_trait(具有"正常"特征的方法似乎工作得很好,不知道在多大程度上)

    use async_trait::async_trait;
    #[async_trait]
    pub trait SayHelloService {
        async fn say_hello(&self);
    }
    
  2. my-master,加载.dll/.so并调用库中的创建函数,该函数在给定tokio::runtime::Handle的情况下返回Box<dyn SayHelloService>

    use my_interface::SayHelloService;
    use tokio::{self, runtime::Handle};
    
    #[tokio::main]
    async fn main() {
        let lib = libloading::Library::new("target/debug/libmy_plugin.so").expect("load library");
        let new_service: libloading::Symbol<fn(Handle) -> Box<dyn SayHelloService>> =
            unsafe { lib.get(b"new_service") }.expect("load symbol");
        let service1 = new_service(Handle::current());
        let service2 = new_service(Handle::current());
        let _ = tokio::join!(service1.say_hello(), service2.say_hello());
    }
    
  3. my-plugin,实现SayHelloService,这是阻止我的代码

    use async_trait::async_trait;
    use my_interface::SayHelloService;
    use tokio::{self, runtime::Handle};
    
    #[no_mangle]
    pub fn new_service(handle: Handle) -> Box<dyn SayHelloService> {
        Box::new(PluginSayHello::new(handle))
    }
    
    pub struct PluginSayHello {
        id: String,
        handle: Handle,
    }
    
    impl PluginSayHello {
        fn new(handle: Handle) -> PluginSayHello {
            let id = format!("{:08x}", rand::random::<u32>());
            println!("[{}] Created instance!", id);
            PluginSayHello { id, handle }
        }
    }
    
    #[async_trait]
    impl SayHelloService for PluginSayHello {
        // this errors with "future cannot be sent between threads safely"
        async fn say_hello(&self) {
            // this should enable you to call tokio::sleep but EnterGuard is not Send :(
            // https://docs.rs/tokio/latest/tokio/runtime/struct.Handle.html#method.enter
            let _guard = self.handle.enter();
            println!("[{}] Hello from plugin!", self.id);
            let _ = tokio::spawn(async move {
                let _ = tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                println!("sleep 1");
            })
            .await;
        }
    }
    

Not being able to call self.handle.enter() creates all sorts of strange behavior for example:
(if you want more info I'll attach the crash log)

#[async_trait]
impl SayHelloService for PluginSayHello {
    async fn say_hello(&self) {
        let id = self.id.clone();
        let _ = self
            .handle
            .spawn_blocking(move || {
                println!("[{}] Hello from plugin!", id);
                // internal code of reqwest just crashes
                let body = reqwest::get("https://www.rust-lang.org")
                    .await?
                    .text()
                    .await?;
                println!("body = {:?}", body);
            })
            .await;
    }
}

我还实现了PluginSayHello的工作实现,但对我来说这并不像是一个完全的胜利.

#[async_trait]
impl SayHelloService for PluginSayHello {
    async fn say_hello(&self) {
        let id = self.id.clone();
        let _ = self
            .handle
            .spawn(async move {
                println!("[{}] Hello from plugin!", id);
                // calling tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                // errors with "there is no reactor running, must be called from the context of a Tokio 1.x runtime"
                let _ = sleep(Duration::new(1, 0));
                println!("slept 1");
                println!("[{}] Hello again from plugin!", id);
            })
            .await;
    }
}

要总结这些内容,请执行以下操作:

  • 这种方法在Rust 的情况下可行吗?
  • 你在我的逻辑中发现了什么错误?(我非常肯定有一大堆)
  • 你能给我指出一个解决方案或一个完全不同的方法吗?

推荐答案

这是一个看起来并不平凡的场景.

首先,让我们退后一步,解释为什么事情会是这样的.I/O和计时器的异步原语在一起管理时工作得最好,因为需要一些线程来处理操作系统事件.Tokio Runtime当然提供了这一点,但为了避免手动传递句柄来向运行时注册计时器和I/O调用,有一个线程本地"current"变量来存储正在执行当前任务的运行时的句柄.因此,当您执行特定于Tokio的操作(如tokio::time::sleep)时,它将使用该线程本地变量来访问运行时的时间管理器.如果没有"CURRENT",则会出现"THEAT NOT RECTOR RUNNING"错误.

这在您的插件系统中成为一个问题,因为在您的每个编译的二进制文件(主程序和插件)中将静态链接tokio个,这意味着它们将有different个线程本地变量:即插件中的"当前"与主程序的"当前"不同,即使使用相同的线程.因此,您似乎试图手动传递句柄并使用.enter(),但此解决方案不太正确.

你遇到的一个问题是,这会导致你的异步函数是!Send,如果你不需要它们是Send,可以通过使用#[async_trait(?Send)]来解决,但我假设你需要.如果任务可以在线程之间移动,则另一个问题handle.enter()仅设置当前运行时of the current thread,并且您的任务可能在某个点被移动到不同的线程.

您需要的是您的async任务具有"当前"运行时only while it is executing.您可以使用Future包装器来实现这一点:

use std::task::{Context, Poll};
use std::pin::Pin;
use std::future::Future;

use tokio::runtime::Handle;
use pin_project_lite::pin_project;

pin_project! {
    struct WithRuntime<F> {
        runtime: Handle,
        #[pin]
        fut: F,
    }
}

impl<F> WithRuntime<F> {
    fn new(runtime: Handle, fut: F) -> Self {
        Self { runtime, fut }
    }
}

impl<F> Future for WithRuntime<F>
where
    F: Future
{
    type Output = F::Output;
    fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();
        let _guard = this.runtime.enter();
        this.fut.poll(ctx)
    }
}

然后您可以使用WithRuntime::new(handle, async { ... }).await(完整代码here).将其与你目前的特点相结合的确切方式可能会令人尴尬,但我认为无论如何都需要改变.

您当前设置的另一个问题是,您假设一个运行时将始终调用您的服务.这可能不是真的;在Actix-Web中有这样的配置,其中有多个单线程运行时.我认为您的代码仍然可以工作,但它可能不是最优的.因此,您可能应该在每次异步方法调用时传递"Current"句柄.您可能会制作一个宏,为插件编写者透明地处理这一点.

最后,你应该被警告,铁 rust 的ABI并不稳定.您所拥有的不能保证在不同的编译器版本之间工作(或者甚至在相同的编译器上工作?).它将会有likely,mostly的效果,但仍然不能保证.通常你会设计一个合适的FFI(外部函数接口)层来进行通信,但是Rust所需的类型(PollContextBox<dyn Future>)不是FFI安全的,所以你需要一个FFI安全的填充程序(比如从stabby开始,你应该完全判断出来)才能真正工作(尽管你仍然不确定Handle会做什么).

而且可能还有其他东西我错过了,因为我不确定为什么handle.spawn(async { ... })不起作用.

Rust相关问答推荐

PyReadonlyArray2到Vec T<>

将JSON密钥转换为Polars DataFrame

关联类型(类型参数)命名约定

Arrow RecordBatch as Polars DataFrame

如何在Bevy/Rapier3D中获得碰撞机的计算质量?

铁 rust ,我的模块介绍突然遇到了一个问题

我应该将哪些文件放入我的GitHub存储库

无符号整数的Rust带符号差

如何在AVX2中对齐/旋转256位向量?

Rust ndarray:如何从索引中 Select 数组的行

我应该如何表达具有生命周期参数的类型的总排序,同时允许与不同生命周期进行比较?

如何从宏调用闭包?

使用 pyo3 将 Rust 转换为 Python 自定义类型

仅发布工作区的二进制 crate

通过写入 std::io::stdout() 输出不可见

如何从 rust 中的同一父目录导入文件

改变不实现克隆的 dioxus UseState struct

在空表达式语句中移动的值

以下打印数组每个元素的 Rust 代码有什么问题?

为什么这里需要类型注解?