我有一个线程可以启动工作线程,所有线程都将永远存在.每个辅助线程维护自己的Socket个线程列表.

有些操作要求遍历当前所有活动的套接字,但我在try 创建包含指向另一个列表所拥有的套接字的指针的套接字主列表时遇到了问题.

use std::{str, thread};
use std::thread::JoinHandle;
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use std::ops::DerefMut;
use std::sync::mpsc::{channel, Sender, Receiver, TryRecvError};
use self::socketlist::SocketList;
use self::mastersocketlist::MasterSocketList;

pub struct Socket {
    user: String,
    stream: TcpStream,
}

mod socketlist {
    use self::SocketList::{Node, End};
    use super::Socket;

    pub enum SocketList {
        Node(Socket, Box<SocketList>),
        End,
    }

    impl SocketList {
        pub fn new() -> SocketList {
            End
        }

        pub fn add(self, socket: Socket) -> SocketList {
            Node(socket, Box::new(self))
        }

        pub fn newest<'a>(&'a mut self) -> Result<&'a Socket, String> {
            match *self {
                Node(ref mut socket, ref mut next) => Ok(socket),
                End => Err("No socket available".to_string()),
            }
        }
    }
}

mod mastersocketlist {
    use self::MasterSocketList::{Node, End};
    use super::Socket;

    pub enum MasterSocketList<'a> {
        Node(Box<&'a Socket>, Box<MasterSocketList<'a>>),
        End,
    }

    impl<'a> MasterSocketList<'a> {
        pub fn new() -> MasterSocketList<'a> {
            End
        }

        pub fn add(self, socket: &'a Socket) -> MasterSocketList<'a> {
            MasterSocketList::Node(Box::new(&socket), Box::new(self))
        }
    }
}

pub struct SlotManager {
    prox: JoinHandle<()>,
    prox_tx: Sender<TcpStream>,
}

impl SlotManager {
    pub fn new() -> SlotManager {
        let (tx, rx): (Sender<TcpStream>, Receiver<TcpStream>) = channel();

        let tx_clone = tx.clone();
        let prox = thread::spawn(move || SlotManager::event_loop(tx, rx));

        SlotManager {
            prox: prox,
            prox_tx: tx_clone,
        }
    }

    pub fn sender(&self) -> Sender<TcpStream> {
        self.prox_tx.clone()
    }

    fn event_loop(tx: Sender<TcpStream>, rx: Receiver<TcpStream>) {
        let socket_list = Arc::new(Mutex::new(MasterSocketList::new()));
        let mut slot = Slot::new(socket_list.clone());
        loop {
            match rx.try_recv() {
                Ok(stream) => slot.new_connection(stream),
                Err(e) => {}
            }
        }
    }
}

pub struct Slot {
    prox: JoinHandle<()>,
    prox_tx: Sender<TcpStream>,
}

impl Slot {
    pub fn new(master_socket_list: Arc<Mutex<MasterSocketList>>) -> Slot {
        let (tx, rx): (Sender<TcpStream>, Receiver<TcpStream>) = channel();

        let tx_clone = tx.clone();
        let prox = thread::spawn(move || Slot::event_loop(tx, rx, master_socket_list));

        Slot {
            prox: prox,
            prox_tx: tx_clone,
        }
    }

    pub fn new_connection(&self, stream: TcpStream) {
        self.prox_tx.send(stream);
    }

    fn event_loop(tx: Sender<TcpStream>,
                  rx: Receiver<TcpStream>,
                  master_socket_list: Arc<Mutex<MasterSocketList>>) {

        let mut sockets = SocketList::new();
        loop {
            // Check for new connections
            match rx.try_recv() {
                Ok(stream) => {
                    let mut socket = Socket {
                        user: "default".to_string(),
                        stream: stream,
                    };
                    sockets = sockets.add(socket);

                    let mut msl_guard = match master_socket_list.lock() {
                        Ok(guard) => guard,
                        Err(poisoned) => poisoned.into_inner(),
                    };
                    let mut msl_handle = msl_guard.deref_mut();
                    *msl_handle = msl_handle.add(sockets.newest().unwrap());
                }
                Err(e) => {}
            }
        }
    }
}

fn main() {
    let mut slot_manager = SlotManager::new();
    let listener = TcpListener::bind("127.0.0.1:1234").unwrap();
    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                let sender = slot_manager.sender();
                thread::spawn(move || {
                    sender.send(stream);
                    //process_new_connection(stream, sender)
                });
            }
            Err(e) => println!("Connection error: {}", e),
        }
    }
    drop(listener);
}

我收到的错误...

error[E0477]: the type `[closure@src/main.rs:107:34: 107:86 tx:std::sync::mpsc::Sender<std::net::TcpStream>, rx:std::sync::mpsc::Receiver<std::net::TcpStream>, master_socket_list:std::sync::Arc<std::sync::Mutex<mastersocketlist::MasterSocketList<'_>>>]` does not fulfill the required lifetime
   --> src/main.rs:107:20
    |
107 |         let prox = thread::spawn(move || Slot::event_loop(tx, rx, master_socket_list));
    |                    ^^^^^^^^^^^^^
    |
    = note: type must outlive the static lifetime

我甚至不知道我所try 的是否是安全代码.

我希望mastersocketlist包含一个指向套接字的指针,其中套接字的生存期由创建它的线程定义.我相信这就是所有这些错误的含义,但我不知道如何提供适当的生命周期注释来修复它.

推荐答案

Rust的一个优点是,跨函数的类型判断完全由函数签名完成.这意味着您可以用unimplemented!()替换大部分函数体,并保留类型判断错误.

重复这个过程几次,你最终没有调用很多函数——删除那些函数.内联模块和减少 struct /枚举也有帮助.

在某个时刻,你的错误会消失——这是问题的第一条线索!坚持下go ,你会得到一个小小的复制品:

use std::sync::{Arc, Mutex};
use std::thread;

pub enum MasterSocketList<'a> {
    One(&'a u8),
}

pub struct Slot;

impl Slot {
    pub fn new<'a>(master_socket_list: Arc<Mutex<MasterSocketList<'a>>>) -> Slot {
        thread::spawn(move || {
            master_socket_list;
        });
        unimplemented!();
    }
}

fn main() {}

判断错误时,它仍然匹配:

error[E0477]: the type `[closure@src/main.rs:12:23: 14:10 master_socket_list:std::sync::Arc<std::sync::Mutex<MasterSocketList<'a>>>]` does not fulfill the required lifetime
  --> src/main.rs:12:9
   |
12 |         thread::spawn(move || {
   |         ^^^^^^^^^^^^^
   |
   = note: type must satisfy the static lifetime

让我们判断一下thread::spawn的签名:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static, 

这里的关键点是F: Send + 'static——你给spawn must only contain references that last the entire life of the program的结束.这是因为spawn可以创建变成detached的线程.一旦分离,线程就可以永远存在,所以所有引用都必须至少存在那么长的时间,否则会得到dangling references,这是一件坏事! rust 又一次拯救了这一天!

如果要保证线程将在已知点终止,可以使用scoped threads,例如scoped-threadpoolcrossbeam提供的.

如果你的代码中没有一个生命周期内的变量,那么使用某种类型的共享所有权,比如Arc,再加上一些可以确保只有一个线程可以变异变量的东西,比如Mutex,就足够了.这允许每个线程拥有共享值,最后在最后一个线程退出时删除它.详情见How do I share a mutable object between threads?.

Rust相关问答推荐

有没有方法处理rust中嵌套的ok_or()?

在Rust中创建可变片段的可变片段的最有效方法是什么?

计算具有相邻调换且没有插入或删除的序列的距离

为什么是!为Rust中的RwLockReadGuard和RwLockWriteGuard实现的发送特征?

创建包含缺失值的框架

在Rust中宏的表达式中提取对象

变量需要parse()中的显式类型

AXUM一路由多个不包括URL的参数类型

应为关联类型,找到类型参数

使用 select 处理 SIGINT 和子等待!无阻塞

Windows 上 ndarray-linalg 与 mkl-stats 的链接时间错误

随机函数不返回随机值

使用 pyo3 将 Rust 转换为 Python 自定义类型

从 rust 函数返回 &HashMap

Button.set_hexpand(false) 不会阻止按钮展开

‘&T as *const T as *mut T’ 在 ‘static mut’ 项目中合适吗?

在线程中运行时,TCPListener(服务器)在 ip 列表中的服务器实例之前没有从客户端接受所有客户端的请求

在运行时在 Rust 中加载字体

产生拥有值的迭代器的 Rust 可变borrow 在循环中失败

使用方法、关联函数和自由函数在 Rust 中初始化函数指针之间的区别