我正在努力理解以下代码片段的行为:

use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
use std::sync::OnceLock;

static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();

fn main() {
    let (sender, mut channel) = unbounded_channel();
    
    GSENDER.set(sender).unwrap();

    tokio::runtime::Builder::new_multi_thread()
        .worker_threads(1) // on a new thread
        .enable_all()
        .build()
        .unwrap()
        .spawn(async move {
            println!("[{:?}] Starting channel", chrono::Utc::now());

            while let Some(msg) = channel.recv().await {
                println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
            }
            
            println!("[{:?}] Closing channel", chrono::Utc::now());
        });
       
    // Does not help, as it shouldn't anyway
    // std::thread::sleep(std::time::Duration::from_secs(1));
        
    if let Some(channel_in) = GSENDER.get() {
        if let Err(SendError(_)) = channel_in.send("test") {
            println!("[{:?}] Channel down", chrono::Utc::now());
        }
    } else {
        unreachable!()
    }
}

Link to playground to reproduce

新的运行时被创建,future 被催生. 然后,recv人正在接受民意调查. 与此同时,我找到了发送者的另一半,并试图发送一条消息. 在这一点上,接收器要么被移到future ,要么(加上睡眠)它甚至轮询recv.

为什么发件人报告通道已关闭?

推荐答案

tokio运行时内产生的所有任务在运行时被删除时关闭(在下一个.await点).这里的运行时是临时的,所以它在语句的末尾被删除.该任务将只运行到第一个.await点.

让运行时成为一个活变量,它就会起作用:

use std::sync::OnceLock;
use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};

static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();

fn main() {
    let (sender, mut channel) = unbounded_channel();

    GSENDER.set(sender).unwrap();

    let runtime = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(1) // on a new thread
        .enable_all()
        .build()
        .unwrap();
    runtime.spawn(async move {
        println!("[{:?}] Starting channel", chrono::Utc::now());

        while let Some(msg) = channel.recv().await {
            println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
        }

        println!("[{:?}] Closing channel", chrono::Utc::now());
    });

    if let Some(channel_in) = GSENDER.get() {
        if let Err(SendError(_)) = channel_in.send("test") {
            println!("[{:?}] Channel down", chrono::Utc::now());
        }
    } else {
        unreachable!()
    }
}

Playground.

Rust相关问答推荐

trait声明中的生命周期参数

如何仅使用http机箱发送http请求?

有没有办法在Rust中配置常量变量的值?

如何格式化传入Rust中mysql crate的Pool::new的字符串

无法定义名为&new&的关联函数,该函数的第一个参数不是self

有没有可能让泛型Rust T总是堆分配的?

为什么Deref类特征不构成?

什么时候使用FuturesOrdered?

Rust wasm 中的 Closure::new 和 Closure::wrap 有什么区别

如何对一个特征的两个实现进行单元测试?

UnsafeCell:它如何通知 rustc Select 退出基于别名的优化?

如何获取模块树?

为什么Rust编译器会忽略模板参数应具有静态生命周期?

如何将 Rust 字符串转换为 i8(c_char) 数组?

Rust 将特性传递给依赖项

是否可以预测堆栈溢出?

试图理解 Rust 中的可变闭包

如何重写这个通用参数?

如何在不设置精度的情况下打印浮点数时保持尾随零?

为什么我不能将元素写入 Rust 数组中移动的位置,但我可以在元组中完成