我有4个EC2实例,我计划拥有一个分布式网络,因此每个实例都会向每个人(包括自己)发送数据.

我首先将IP地址从文件读取到变量ip_address_clone.

假设列表是这样的:

A.A.A.A
B.B.B.B
C.C.C.C
D.D.D.D

然后,我try 在线程中为所有实例运行服务器和客户端,以便在所有实例的实例中都有一个发送者和接收者工作者处于活动状态(同样也是为其自身).

thread::scope(|s| {
    s.spawn(|| {
        for _ip in ip_address_clone.clone() {
            let _result = newserver::handle_server(INITIAL_PORT + port_count);
        }
    });

    s.spawn(|| {
        let three_millis = time::Duration::from_millis(3);
        thread::sleep(three_millis);

        for ip in ip_address_clone.clone() {
            let self_ip_clone = self_ip.clone();

            let _result = newclient::match_tcp_client(
                [ip.to_string(), (INITIAL_PORT + port_count).to_string()].join(":"),
                self_ip_clone,
            );
        }
    });
});

服务器代码为:

use std::error::Error;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::tcp::ReadHalf;
use tokio::net::TcpListener;

#[tokio::main]
pub async fn handle_server(port: u32) -> Result<(), Box<dyn Error>> {
    let listener = TcpListener::bind(["0.0.0.0".to_string(), port.to_string()].join(":"))
        .await
        .unwrap(); // open connection

    let (mut socket, _) = listener.accept().await.unwrap(); // starts listening
    println!("---continue---");

    let (reader, mut writer) = socket.split(); // tokio socket split to read and write concurrently

    let mut reader: BufReader<ReadHalf> = BufReader::new(reader);
    let mut line: String = String::new();

    loop {
        //loop to get all the data from client until EOF is reached

        let _bytes_read: usize = reader.read_line(&mut line).await.unwrap();

        if line.contains("EOF")
        //REACTOR to be used here
        {
            println!("EOF Reached");

            writer.write_all(line.as_bytes()).await.unwrap();
            println!("{}", line);

            line.clear();

            break;
        }
    }

    Ok(())
}

客户端代码为:

use std::error::Error;
use tokio::io::AsyncWriteExt;
use tokio::net::TcpStream;

#[tokio::main]
pub async fn match_tcp_client(address: String, self_ip: String) -> Result<(), Box<dyn Error>> {
    // Connect to a peer
    let mut stream = TcpStream::connect(address.clone()).await?;
    // Write some data.
    stream.write_all(self_ip.as_bytes()).await?;
    stream.write_all(b"hello world!EOF").await?;
    // stream.shutdown().await?;
    Ok(())
}

问题是,我没有像我期望的那样得到交流.事实上,我运行的第一个实例(使用ssh)接收所有数据,第二个实例接收除第一个实例以外的所有数据,第三个实例接收除第一个和第二个实例以外的所有数据,依此类推.

以下是第一个实例的日志(log):

Starting
execution type
nok
launched
---continue---
EOF Reached
A.A.A.Ahello world!EOF
---continue---
EOF Reached
B.B.B.Bhello world!EOF
---continue---
EOF Reached
C.C.C.Chello world!EOF
---continue---
EOF Reached
D.D.D.Dhello world!EOF

和二审日志(log):

Starting
execution type
nok
launched
---continue---
EOF Reached
B.B.B.Bhello world!EOF
---continue---
EOF Reached
C.C.C.Chello world!EOF
---continue---
EOF Reached
D.D.D.Dhello world!EOF

虽然我使用的是线程,但通信仍然保持同步,并且特定实例只能将数据从自身获取到ip_address_clone中的其他IPS. 您可以在第二个实例日志(log)中看到---continue---出现的次数,它的侦听器似乎不接受来自第一个实例的请求.

推荐答案

我认为," node 只将数据发送到自己"的证据高度表明,它正在将数据发送到自己的端口(仅),而不是其他端口(它们完全相同).在这里,我相信独特的端口应该可以解决您的问题.

  • 您正在为所有实例使用相同的端口号.当多个实例try 绑定到同一端口时,这可能会导致冲突.相反,您应该 for each 实例使用唯一的端口号.一种做法是通过向基端口号(3000、3001、...)添加偏移量来实现这一点.当每个实例绑定到唯一的端口号时,更适合于开发测试.
  • 您正在 for each 实例创建一个新线程,但每个线程只处理一个连接.这可能会降低效率,并且可能会限制程序可以处理的连接数量.相反,您可以使用Tokio的spawn函数 for each 连接派生一个任务.这允许您同时处理多个连接.
  • 此外,循环在移动到下一个IP地址之前不会等待线程完成.这可能会导致同步问题,并可能导致unexpected behavior.

On a personal note, testing asynchronous communication between distributed nodes is hard; especially when we have multiple threads, and they don't work.

Rust相关问答推荐

Tauri tauri—apps/plugin—store + zustand

MutexGuard中的过滤载体不需要克隆

使用铁 rust S还原对多个数组执行顺序kronecker积

如何从ruust中的fig.toml中读取?

如何在嵌套的泛型 struct 中调用泛型方法?

对于rustc编译的RISC-V32IM二进制文件,llvm objdump没有输出

在使用粗粒度锁访问的数据 struct 中使用 RefCell 是否安全?

UnsafeCell:它如何通知 rustc Select 退出基于别名的优化?

Rust 中的内存管理

str 和 String 的 Rust 生命周期

有没有办法通过命令获取 Rust crate 的可安装版本列表?

具有在宏扩展中指定的生命周期的枚举变体数据类型

If let expression within .iter().any

如何在 Rust Polars 中可靠地连接 LazyFrames

Rust,使用枚举从 HashMap 获取值

用逗号分隔字符串,但在标记中使用逗号

实现不消费的迭代器

隐式类型闭包的错误生命周期推断

基于名称是否存在的条件编译

您不能borrow 对只读值的可变引用