我正在try 编写一个多线程的TCP服务器,它可以同时处理tokio个连接.

我希望以事件驱动的方式构建它,可以将一个或多个闭包附加到特定事件(如新连接、收到的消息、客户端断开连接等).

例如:

    server.on_message(|msg: String, stream: &mut TcpStream| {
       async move {
            println!("Recieved {:?}", msg);
            stream.write_all(b"Hello\n").await;
       }
    }).await;

每个连接都将接收其自己的线程,该线程应该具有对Vec个回调的读取访问权限.

    pub async fn run(&mut self) {
        let listener = TcpListener::bind("127.0.0.1:9090").await.unwrap();

        loop {
            let (mut socket, _) = listener.accept().await.unwrap();

            let cb = self.on_message.clone();
            tokio::spawn(async move {
                Self::process(socket, cb).await;
            });
        }
    }

不幸的是,我对铁 rust 的理解仍然非常粗浅,我一直在兜圈子:

  • 查找要存储在VEC中的正确类型
  • 为接受闭包回调的函数参数找到正确的类型

每当我觉得我在一个地方取得了进步,我就会意识到另一个地方搞砸了.这是我能做的最好的了,但它仍然不起作用.

type Callback<T> = dyn Fn(T, &mut TcpStream) -> Pin<Box<dyn Future<Output=()> + Send>> + Send + 'static;

unsafe impl<T> Send for TcpStreamCallbackList<T> {}
unsafe impl<T> Sync for TcpStreamCallbackList<T> {}

impl<T> TcpStreamCallbackList<T> {

    pub fn new() -> Self {
        Self { callbacks: Vec::new() }
    }

    pub fn push<G: Send + 'static>(&mut self, mut fun: impl Fn(T, &mut TcpStream) -> G + Send + 'static) where G: Future<Output=()> {
        self.callbacks.push(Arc::new(Box::new(move |val:T, stream: &mut TcpStream| Box::pin(fun(val, stream)))));
    }

    pub async fn call(&self, val: T, stream: &mut TcpStream) where T: Clone {
        for cb in self.callbacks.iter() {
            let _cb = cb.clone();
            _cb(val.clone(), stream).await; // B O O M

        }
    }
}

在删除call函数中回调返回的.await on the Future之前,上面的代码不会编译(这与目的不符).

error[E0277]: `dyn for<'a> Fn(String, &'a mut tokio::net::TcpStream) -> Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>> + std::marker::Send` cannot be shared between threads safely
   --> src/main.rs:94:26

据我所知,问题是《返校的future 》没有发送.

note: required by a bound in `tokio::spawn`
   --> /Users/lukasz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/task/spawn.rs:163:21
    |
163 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

我不知道我的类型是否有意义并且是线程安全的.我也不知道为什么编译器认为返回类型不是Send.我真的被困在这里了,如果有任何帮助,我将不胜感激.

推荐答案

我已经组装了一个稍微简单一点的,但可以产生(playground)个.关键是,Callback必须是Sync,才能使&self成为Send.我试着使用this comment中提到的技巧,但它在这里似乎不起作用,做call也不需要&mut self.我在that answer上写了更多关于SendSync的信息.

use std::future::Future;
use std::pin::Pin;

type CallbackFuture<O> = Pin<Box<dyn Future<Output = O> + Send>>;
type Callback<T> = dyn (Fn(T) -> CallbackFuture<()>) + Send + Sync;

pub struct CallbackList<T> {
    list: Vec<Box<Callback<T>>>,
}

impl<T> CallbackList<T> {
    pub fn new() -> Self {
        Self { list: Vec::new() }
    }

    pub fn push<F>(&mut self, f: F)
    where
        F: Fn(T) -> CallbackFuture<()>,
        F: Send + Sync + 'static,
    {
        self.list.push(Box::new(f))
    }

    pub async fn call(&self, t: T)
    where
        T: Clone,
    {
        for f in &self.list {
            f(t.clone()).await;
        }
    }
}

#[tokio::main]
async fn main() {
    let mut calls = CallbackList::new();
    calls.push(|i| {
        Box::pin(async move {
            println!("{i}");
        })
    });

    calls.push(|i| {
        Box::pin(async move {
            println!("{}", i + 1);
        })
    });

    let handle = tokio::spawn(async move {
        calls.call(34).await;
    });

    handle.await.unwrap();
}

我已经删除了尽可能多的特征界限、'static和包装器,但您可能需要添加一些回来,这取决于您对它做了什么.现在它需要T,但应该可以将其分为T&mut TcpStream.如果您使用使用所有元素的main函数更新您的问题,我可以更改我的函数以匹配.如果所有其他方法都失败了,您可以使用(_, Arc<Mutex<TcpStream>>)作为T.

Rust相关问答推荐

为什么`Vec i64`的和不知道是`Option i64`?

无法从流中读取Redis请求

在rust sqlx中使用ilike和push bind

使用Py03从Rust调用Python函数时的最佳返回类型

我可以在不收集或克隆的情况下,将一个带有Item=(key,val)的迭代器拆分成单独的key iter和val iter吗?

用于判断整数块是否连续的SIMD算法.

什么时候使用FuturesOrdered?

在运行特定测试时,如何 suppress cargo test 的空输出?

如何迭代属性以判断相等性?

处理带有panic 的 Err 时,匹配臂具有不兼容的类型

如何将带有嵌套borrow /NLL 的 Rust 代码提取到函数中

确保参数是编译时定义的字符串文字

信号量释放后 Rust 输出挂起线程

Rust 重写函数参数

Rust ECDH 不会产生与 NodeJS/Javascript 和 C 实现相同的共享密钥

为什么这段 Rust 代码会在没有递归或循环的情况下导致堆栈溢出?

Rust 异步和 AsRef 未被发送

在使用大型表达式时(8k 行需要一小时编译),是否可以避免 Rust 中的二次编译时间?

如何在不设置精度的情况下打印浮点数时保持尾随零?

为什么 u64::trailing_zeros() 在无分支工作时生成分支程序集?