如何解决在并发读写rust的过程中,线程共享数据(传输或套接字)并使用arcs导致死锁时,使用阻塞套接字在不同线程之间分离读写的问题?
这里有一个简单的例子.
应用程序客户端由RUST编写:
use std::env;
use std::io::{Error, Read, Write};
use std::net::{TcpStream, ToSocketAddrs};
use std::ptr::eq;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::time::sleep;
#[derive(Debug)]
pub struct TcpTransport {
pub conn: TcpStream,
}
impl TcpTransport {
fn send_packet(&mut self, data: &[u8]) -> Result<(), Error> {
self.conn.write_all(data)?;
Ok(())
}
fn read_packet(&mut self) -> Result<Vec<u8>, Error> {
let mut lenbuf = [0u8; 2];
self.conn.read_exact(&mut lenbuf)?;
let length = (lenbuf[0] as usize) << 8 | lenbuf[1] as usize;
let mut databuf = vec![0u8; length];
self.conn.read_exact(&mut databuf)?;
Ok(databuf)
}
fn close(&self) -> Result<(), Error> {
println!("close transport");
self.conn.shutdown(std::net::Shutdown::Both)?;
Ok(())
}
}
pub fn new_tcp_transport(host: &str, port: u16) -> Result<TcpTransport, Error> {
let socket_addr = (host, port).to_socket_addrs()?.next().unwrap();
let conn = TcpStream::connect_timeout(&socket_addr, Duration::from_secs(10))?;
Ok(TcpTransport { conn })
}
#[tokio::main]
async fn main() {
// tsport Arc<Mutex><dyn Transport+Send+sync>
let tsport = new_tcp_transport("127.0.0.1", 4000).unwrap();
let tsport = Arc::new(Mutex::new(tsport));
let tsport1 = tsport.clone();
let tsport2 = tsport.clone();
tokio::spawn(async move {
println!("read worker started");
'read_loop: loop {
// sleep(Duration::from_millis(1000)).await;
println!("read exec here===");
let packet = match tsport1.lock().unwrap().read_packet() {
Ok(packet) => packet,
Err(err) => {
eprintln!("Transport read packet error: {:?}", err);
break 'read_loop;
}
};
println!("read packet{:?}", packet);
}
println!("read worker stoped");
});
let _=tokio::spawn(async move {
println!("write worker started");
'writeloop: loop {
sleep(Duration::from_millis(1000)).await;
println!("write exec here===");
let mut ts = tsport2.lock().unwrap();
let data = vec![0x1,0x2,0x3,0x4];
if let Err(er) = ts.send_packet(&data) {
eprintln!("Transport write packet error: {:?}", er);
break 'writeloop;
}
println!("send packet success.");
}
println!("write worker stoped");
}).await;
}
NodeJS写的应用服务器:
var net = require('net');
net.createServer(function(socket){
socket.on('data', function(data){
console.log("server recv data:",data);
});
}).listen(4000);
console.log('server listen 127.0.0.1:4000');
Cargo.toml
[package]
name = "readwrite"
version = "0.1.0"
edition = "2021"
[dependencies]
tokio = { version = "1.28.1", features = ["full"] }
NodeJS编写的应用客户端:
var net = require('net');
var client = new net.Socket();
client.connect(4000, '127.0.0.1', function() {
console.log('Connected');
setInterval(() => {
var data = Buffer.from("hello server");
var datalen = data.length;
console.log('client write===');
client.write(Buffer.concat([Buffer.from([datalen >> 8, datalen % 256]), data]));
}, 1000);
});
client.on('data', function(data) {
console.log('client received: ',data);
});