我正在try 编写一个多线程的TCP服务器,它可以同时处理tokio
个连接.
我希望以事件驱动的方式构建它,可以将一个或多个闭包附加到特定事件(如新连接、收到的消息、客户端断开连接等).
例如:
server.on_message(|msg: String, stream: &mut TcpStream| {
async move {
println!("Recieved {:?}", msg);
stream.write_all(b"Hello\n").await;
}
}).await;
每个连接都将接收其自己的线程,该线程应该具有对Vec
个回调的读取访问权限.
pub async fn run(&mut self) {
let listener = TcpListener::bind("127.0.0.1:9090").await.unwrap();
loop {
let (mut socket, _) = listener.accept().await.unwrap();
let cb = self.on_message.clone();
tokio::spawn(async move {
Self::process(socket, cb).await;
});
}
}
不幸的是,我对铁 rust 的理解仍然非常粗浅,我一直在兜圈子:
- 查找要存储在VEC中的正确类型
- 为接受闭包回调的函数参数找到正确的类型
每当我觉得我在一个地方取得了进步,我就会意识到另一个地方搞砸了.这是我能做的最好的了,但它仍然不起作用.
type Callback<T> = dyn Fn(T, &mut TcpStream) -> Pin<Box<dyn Future<Output=()> + Send>> + Send + 'static;
unsafe impl<T> Send for TcpStreamCallbackList<T> {}
unsafe impl<T> Sync for TcpStreamCallbackList<T> {}
impl<T> TcpStreamCallbackList<T> {
pub fn new() -> Self {
Self { callbacks: Vec::new() }
}
pub fn push<G: Send + 'static>(&mut self, mut fun: impl Fn(T, &mut TcpStream) -> G + Send + 'static) where G: Future<Output=()> {
self.callbacks.push(Arc::new(Box::new(move |val:T, stream: &mut TcpStream| Box::pin(fun(val, stream)))));
}
pub async fn call(&self, val: T, stream: &mut TcpStream) where T: Clone {
for cb in self.callbacks.iter() {
let _cb = cb.clone();
_cb(val.clone(), stream).await; // B O O M
}
}
}
在删除call
函数中回调返回的.await
on the Future之前,上面的代码不会编译(这与目的不符).
error[E0277]: `dyn for<'a> Fn(String, &'a mut tokio::net::TcpStream) -> Pin<Box<dyn futures::Future<Output = ()> + std::marker::Send>> + std::marker::Send` cannot be shared between threads safely
--> src/main.rs:94:26
据我所知,问题是《返校的future 》没有发送.
note: required by a bound in `tokio::spawn`
--> /Users/lukasz/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.25.0/src/task/spawn.rs:163:21
|
163 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
我不知道我的类型是否有意义并且是线程安全的.我也不知道为什么编译器认为返回类型不是Send.我真的被困在这里了,如果有任何帮助,我将不胜感激.