我有4个EC2实例,我计划拥有一个分布式网络,因此每个实例都会向每个人(包括自己)发送数据.
我首先将IP地址从文件读取到变量ip_address_clone
.
假设列表是这样的:
A.A.A.A
B.B.B.B
C.C.C.C
D.D.D.D
然后,我try 在线程中为所有实例运行服务器和客户端,以便在所有实例的实例中都有一个发送者和接收者工作者处于活动状态(同样也是为其自身).
thread::scope(|s| {
s.spawn(|| {
for _ip in ip_address_clone.clone() {
let _result = newserver::handle_server(INITIAL_PORT + port_count);
}
});
s.spawn(|| {
let three_millis = time::Duration::from_millis(3);
thread::sleep(three_millis);
for ip in ip_address_clone.clone() {
let self_ip_clone = self_ip.clone();
let _result = newclient::match_tcp_client(
[ip.to_string(), (INITIAL_PORT + port_count).to_string()].join(":"),
self_ip_clone,
);
}
});
});
服务器代码为:
use std::error::Error;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::tcp::ReadHalf;
use tokio::net::TcpListener;
#[tokio::main]
pub async fn handle_server(port: u32) -> Result<(), Box<dyn Error>> {
let listener = TcpListener::bind(["0.0.0.0".to_string(), port.to_string()].join(":"))
.await
.unwrap(); // open connection
let (mut socket, _) = listener.accept().await.unwrap(); // starts listening
println!("---continue---");
let (reader, mut writer) = socket.split(); // tokio socket split to read and write concurrently
let mut reader: BufReader<ReadHalf> = BufReader::new(reader);
let mut line: String = String::new();
loop {
//loop to get all the data from client until EOF is reached
let _bytes_read: usize = reader.read_line(&mut line).await.unwrap();
if line.contains("EOF")
//REACTOR to be used here
{
println!("EOF Reached");
writer.write_all(line.as_bytes()).await.unwrap();
println!("{}", line);
line.clear();
break;
}
}
Ok(())
}
客户端代码为:
use std::error::Error;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;
#[tokio::main]
pub async fn match_tcp_client(address: String, self_ip: String) -> Result<(), Box<dyn Error>> {
// Connect to a peer
let mut stream = TcpStream::connect(address.clone()).await?;
// Write some data.
stream.write_all(self_ip.as_bytes()).await?;
stream.write_all(b"hello world!EOF").await?;
// stream.shutdown().await?;
Ok(())
}
问题是,我没有像我期望的那样得到交流.事实上,我运行的第一个实例(使用ssh)接收所有数据,第二个实例接收除第一个实例以外的所有数据,第三个实例接收除第一个和第二个实例以外的所有数据,依此类推.
以下是第一个实例的日志(log):
Starting
execution type
nok
launched
---continue---
EOF Reached
A.A.A.Ahello world!EOF
---continue---
EOF Reached
B.B.B.Bhello world!EOF
---continue---
EOF Reached
C.C.C.Chello world!EOF
---continue---
EOF Reached
D.D.D.Dhello world!EOF
和二审日志(log):
Starting
execution type
nok
launched
---continue---
EOF Reached
B.B.B.Bhello world!EOF
---continue---
EOF Reached
C.C.C.Chello world!EOF
---continue---
EOF Reached
D.D.D.Dhello world!EOF
虽然我使用的是线程,但通信仍然保持同步,并且特定实例只能将数据从自身获取到ip_address_clone
中的其他IPS.
您可以在第二个实例日志(log)中看到---continue---
出现的次数,它的侦听器似乎不接受来自第一个实例的请求.