我正在努力理解以下代码片段的行为:
use tokio::sync::mpsc::{error::SendError, unbounded_channel, UnboundedSender};
use std::sync::OnceLock;
static GSENDER: OnceLock<UnboundedSender<&'static str>> = OnceLock::new();
fn main() {
let (sender, mut channel) = unbounded_channel();
GSENDER.set(sender).unwrap();
tokio::runtime::Builder::new_multi_thread()
.worker_threads(1) // on a new thread
.enable_all()
.build()
.unwrap()
.spawn(async move {
println!("[{:?}] Starting channel", chrono::Utc::now());
while let Some(msg) = channel.recv().await {
println!("[{:?}] Recvd: {msg}", chrono::Utc::now());
}
println!("[{:?}] Closing channel", chrono::Utc::now());
});
// Does not help, as it shouldn't anyway
// std::thread::sleep(std::time::Duration::from_secs(1));
if let Some(channel_in) = GSENDER.get() {
if let Err(SendError(_)) = channel_in.send("test") {
println!("[{:?}] Channel down", chrono::Utc::now());
}
} else {
unreachable!()
}
}
Link to playground to reproduce个
新的运行时被创建,future 被催生.
然后,recv
人正在接受民意调查.
与此同时,我找到了发送者的另一半,并试图发送一条消息.
在这一点上,接收器要么被移到future ,要么(加上睡眠)它甚至轮询recv
.
为什么发件人报告通道已关闭?