在过go 的几天里,我一直在研究Rust中的并发/并行代码.我try 为不同的并发方法创建一个可执行的代码片断集合.仍然需要进行大量的实验.在这样做的时候,我遇到了一个关于东京的非常奇怪的问题.
话虽如此,我很确定有问题的代码不正确地使用Tokio.结果是代码不会中断,但也不能完全正确运行.虽然我已经找到了避免这个问题的方法,但我对导致这种行为的内部发生的事情非常感兴趣.
在内心深处,我找到了一种方法,让东京总是比它应该运行的任务少一个任务!根据我的测试,其余的运行得很好.问题是
- 如果这段代码真的是错的,为什么还要运行任何东西呢?
- 为什么总有一项任务不能执行?
#![allow(dead_code)]
// There are two ways to fix this code. They are marked with a "FIX" comment
use tokio;
const NUMBER_OF_THREADS: usize = 3;
fn main() {
futures::executor::block_on(with_tokio_channels_fail_call_breaking(NUMBER_OF_THREADS));
// FIX: use this line instead
// with_tokio_channels_fail_call_fixing(NUMBER_OF_THREADS);
}
// Calling this function runs the code fine
fn with_tokio_channels_fail_call_fixing(number_of_threads: usize) {
let rt = tokio::runtime::Runtime::new().unwrap();
// When we use block_on on this fake-future (no .await inside), it runs
// fine
rt.block_on(tokio_channels_fail(number_of_threads));
}
// Calling this function runs (NUMBER_OF_THREADS - 1) threads fine,
// but the last one is never scheduled, causing the program to never exit
async fn with_tokio_channels_fail_call_breaking(number_of_threads: usize) {
let rt = tokio::runtime::Runtime::new().unwrap();
// Spawning and awaiting the fake-future has a really odd result!
// I believe this is simply wrong usage of tokio. But WHY exactly?
// I am interested in what goes wrong internally when we do this
rt.spawn(tokio_channels_fail(number_of_threads)).await.unwrap();
}
// Note that this function is unnecessarily async
async fn tokio_channels_fail(number_of_threads: usize) {
async fn cb(index: usize, num: usize, sender: std::sync::mpsc::Sender<(usize, usize)>) {
let res = delay_thread_async_then_square(num, num).await;
sender.send((index, res)).unwrap();
}
let (sender, receiver) = std::sync::mpsc::channel::<(usize, usize)>();
let mut vec: Vec<usize> = (0..number_of_threads).into_iter().collect();
let _x = (0..number_of_threads)
.map(|num| {
let sender = sender.clone();
tokio::task::spawn(cb(num, num, sender))
})
.collect::<Vec<tokio::task::JoinHandle<()>>>();
// need to drop the sender, because the iterator below will only complete once all senders are dropped
drop(sender);
// FIX: when uncommenting this line, the code also runs fine, but this is not a general solution.
// But would it use the tokio runtime? Means: Would it be spread across multiple
// kernel level threads?
// futures::future::join_all(_x).await;
receiver.iter().for_each(|(index, res)| {
vec[index] = res;
});
println!("{:?}", vec);
}
async fn sleep_for(seconds: u64) {
tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
}
async fn delay_thread_async_then_square(thread_no: usize, to_be_squared: usize) -> usize {
let mut wait = 3;
println!("Async thread {thread_no} sleeping for {wait} seconds");
sleep_for(wait).await;
wait = 7;
println!("Async thread {thread_no} sleeping for {wait} seconds");
sleep_for(wait).await;
wait = 1;
println!("Async thread {thread_no} sleeping for {wait} seconds");
sleep_for(wait).await;
wait = 4;
println!("Async thread {thread_no} sleeping for {wait} seconds");
sleep_for(wait).await;
let res = to_be_squared.pow(2);
println!("Async thread {thread_no} done. Result is {res}");
res
}