如何解决在并发读写rust的过程中,线程共享数据(传输或套接字)并使用arcs导致死锁时,使用阻塞套接字在不同线程之间分离读写的问题?

这里有一个简单的例子.

应用程序客户端由RUST编写:

use std::env;
use std::io::{Error, Read, Write};
use std::net::{TcpStream, ToSocketAddrs};
use std::ptr::eq;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::time::sleep;

#[derive(Debug)]
pub struct TcpTransport {
    pub conn: TcpStream,
}

impl TcpTransport {
    fn send_packet(&mut self, data: &[u8]) -> Result<(), Error> {
        self.conn.write_all(data)?;
        Ok(())
    }

    fn read_packet(&mut self) -> Result<Vec<u8>, Error> {
        let mut lenbuf = [0u8; 2];
        self.conn.read_exact(&mut lenbuf)?;

        let length = (lenbuf[0] as usize) << 8 | lenbuf[1] as usize;
        let mut databuf = vec![0u8; length];
        self.conn.read_exact(&mut databuf)?;

        Ok(databuf)
    }

    fn close(&self) -> Result<(), Error> {
        println!("close transport");
        self.conn.shutdown(std::net::Shutdown::Both)?;
        Ok(())
    }
}

pub fn new_tcp_transport(host: &str, port: u16) -> Result<TcpTransport, Error> {
    let socket_addr = (host, port).to_socket_addrs()?.next().unwrap();
    let conn = TcpStream::connect_timeout(&socket_addr, Duration::from_secs(10))?;
    Ok(TcpTransport { conn })
}

#[tokio::main]
async fn main() {
    // tsport Arc<Mutex><dyn Transport+Send+sync>
    let tsport = new_tcp_transport("127.0.0.1", 4000).unwrap();
    let tsport = Arc::new(Mutex::new(tsport));
    let tsport1 = tsport.clone();
    let tsport2 = tsport.clone();
    tokio::spawn(async move {
        println!("read worker started");
        'read_loop: loop {
            // sleep(Duration::from_millis(1000)).await;
            println!("read exec here===");
            let packet = match tsport1.lock().unwrap().read_packet() {
                Ok(packet) => packet,
                Err(err) => {
                    eprintln!("Transport read packet error: {:?}", err);
                    break 'read_loop;
                }
            };
            println!("read packet{:?}", packet);
        }
        println!("read worker stoped");
    });
    let _=tokio::spawn(async move {
        println!("write worker started");
        'writeloop: loop {
            sleep(Duration::from_millis(1000)).await;
            println!("write exec here===");
            let mut ts = tsport2.lock().unwrap();
            let data = vec![0x1,0x2,0x3,0x4];
            if let Err(er) = ts.send_packet(&data) {
                eprintln!("Transport write packet error: {:?}", er);
                break 'writeloop;
            }
            println!("send packet success.");
        }
        println!("write worker stoped");
    }).await;
}


NodeJS写的应用服务器:

var net = require('net');
net.createServer(function(socket){
    socket.on('data', function(data){
        console.log("server recv data:",data);
    });
}).listen(4000);

console.log('server listen 127.0.0.1:4000');

Cargo.toml

[package]
name = "readwrite"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio = { version = "1.28.1", features = ["full"] }

NodeJS编写的应用客户端:

var net = require('net');

var client = new net.Socket();
client.connect(4000, '127.0.0.1', function() {
    console.log('Connected');
    setInterval(() => {
        var data = Buffer.from("hello server");
        var datalen = data.length;
        console.log('client write===');
        client.write(Buffer.concat([Buffer.from([datalen >> 8, datalen % 256]), data]));
    }, 1000);

});

client.on('data', function(data) {
    console.log('client received: ',data);
});

推荐答案

您可以使用&TcpStream(对TcpStream的引用)也实现ReadWrite这一事实:

impl TcpTransport {
    fn send_packet(&self, data: &[u8]) -> Result<(), Error> {
        (&self.conn).write_all(data)?;
        Ok(())
    }

    fn read_packet(&self) -> Result<Vec<u8>, Error> {
        let mut lenbuf = [0u8; 2];
        (&self.conn).read_exact(&mut lenbuf)?;

        let length = (lenbuf[0] as usize) << 8 | lenbuf[1] as usize;
        let mut databuf = vec![0u8; length];
        (&self.conn).read_exact(&mut databuf)?;

        Ok(databuf)
    }
}

#[tokio::main]
async fn main() {
    let tsport = new_tcp_transport("127.0.0.1", 4000).unwrap();
    let tsport = Arc::new(tsport);
    let tsport1 = tsport.clone();
    let tsport2 = tsport.clone();
    tokio::spawn(async move {
        println!("read worker started");
        'read_loop: loop {
            // sleep(Duration::from_millis(1000)).await;
            let packet = match tsport1.read_packet() {
                Ok(packet) => packet,
                Err(err) => {
                    eprintln!("Transport read packet error: {:?}", err);
                    break 'read_loop;
                }
            };
            println!("read packet{:?}", packet);
        }
        println!("read worker stoped");
    });
    let _ = tokio::spawn(async move {
        println!("write worker started");
        'writeloop: loop {
            // sleep(Duration::from_millis(1000)).await;
            let data = vec![0x1, 0x2, 0x3, 0x4];
            if let Err(er) = tsport2.send_packet(&data) {
                eprintln!("Transport write packet error: {:?}", er);
                break 'writeloop;
            }
            println!("send packet success.");
        }
        println!("write worker stoped");
    })
    .await;
}

但是您的代码是不正确的,因为您永远不能在异步上下文中阻塞.STD的TcpStream被阻断了.你需要用tokio's TcpStream美元.它没有为引用实现AsyncReadAsyncWrite,但tokiotokio::io::split()来将AsyncRead + AsyncWrite一分为二:

use std::io::Error;
use std::time::Duration;

use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf};
use tokio::net::TcpStream;

pub async fn new_tcp_transport(host: &str, port: u16) -> Result<TcpStream, Error> {
    let connect_fut = TcpStream::connect((host, port));
    let conn = tokio::time::timeout(Duration::from_secs(10), connect_fut).await??;
    Ok(conn)
}

async fn read_packet(conn: &mut ReadHalf<TcpStream>) -> Result<Vec<u8>, Error> {
    let mut lenbuf = [0u8; 2];
    conn.read_exact(&mut lenbuf).await?;

    let length = (lenbuf[0] as usize) << 8 | lenbuf[1] as usize;
    let mut databuf = vec![0u8; length];
    conn.read_exact(&mut databuf).await?;

    Ok(databuf)
}

#[tokio::main]
async fn main() {
    // tsport Arc<Mutex><dyn Transport+Send+sync>
    let tsport = new_tcp_transport("127.0.0.1", 4000).await.unwrap();
    let (mut read, mut write) = tokio::io::split(tsport);
    tokio::spawn(async move {
        println!("read worker started");
        'read_loop: loop {
            // sleep(Duration::from_millis(1000)).await;
            let packet = match read_packet(&mut read).await {
                Ok(packet) => packet,
                Err(err) => {
                    eprintln!("Transport read packet error: {:?}", err);
                    break 'read_loop;
                }
            };
            println!("read packet{:?}", packet);
        }
        println!("read worker stoped");
    });
    let _ = tokio::spawn(async move {
        println!("write worker started");
        'writeloop: loop {
            // sleep(Duration::from_millis(1000)).await;
            let data = vec![0x1, 0x2, 0x3, 0x4];
            if let Err(er) = write.write_all(&data).await {
                eprintln!("Transport write packet error: {:?}", er);
                break 'writeloop;
            }
            println!("send packet success.");
        }
        println!("write worker stoped");
    })
    .await;
}

Rust相关问答推荐

在‘await’点上使用‘std::同步::Mutex’是否总是会导致僵局?

为什么迭代器上的`. map(...)`的返回类型如此复杂?

在actix—web中使用Redirect或NamedFile响应

如何将`Join_all``Vec<;Result<;Vec<;Foo&>;,Anywhere::Error&>;`合并到`Result<;Vec<;Foo&>;,Anywhere::Error&>;`

有没有办法在Rust中配置常量变量的值?

如何修复数组中NewType导致的运行时开销

如何使用RefCell::JOYMOMTborrow 对 struct 不同字段的可变引用

用于实现获取 struct 体 id 的特征规范

为什么 Rust 创建的 f32 小于 f32::MIN_POSITIVE?

Boxing 如何将数据从堆栈移动到堆?

`use` 和 `crate` 关键字在 Rust 项目中效果不佳

Rust 并行获取对 ndarray 的每个元素的可变引用

当锁被释放时,将锁包装到作用域中是否会发生变化?

部署Rust发布二进制文件的先决条件

没有明确地说return会产生错误:match arms have incompatible types

为什么数组不像向量那样在 for 块之后移动?

当用作函数参数时,不强制执行与绑定的关联类型

TinyVec 如何与 Vec 大小相同?

BigUint 二进制补码

你能用 Rust 和 winapi 制作 Windows 桌面应用程序吗?