在过go 的几天里,我一直在研究Rust中的并发/并行代码.我try 为不同的并发方法创建一个可执行的代码片断集合.仍然需要进行大量的实验.在这样做的时候,我遇到了一个关于东京的非常奇怪的问题.

话虽如此,我很确定有问题的代码不正确地使用Tokio.结果是代码不会中断,但也不能完全正确运行.虽然我已经找到了避免这个问题的方法,但我对导致这种行为的内部发生的事情非常感兴趣.

在内心深处,我找到了一种方法,让东京总是比它应该运行的任务少一个任务!根据我的测试,其余的运行得很好.问题是

  • 如果这段代码真的是错的,为什么还要运行任何东西呢?
  • 为什么总有一项任务不能执行?

Playground

#![allow(dead_code)]

// There are two ways to fix this code. They are marked with a "FIX" comment

use tokio;

const NUMBER_OF_THREADS: usize = 3;

fn main() {
    futures::executor::block_on(with_tokio_channels_fail_call_breaking(NUMBER_OF_THREADS));
    
    // FIX: use this line instead
    // with_tokio_channels_fail_call_fixing(NUMBER_OF_THREADS);
}

// Calling this function runs the code fine
fn with_tokio_channels_fail_call_fixing(number_of_threads: usize) {
    let rt = tokio::runtime::Runtime::new().unwrap();
    // When we use block_on on this fake-future (no .await inside), it runs
    // fine
    rt.block_on(tokio_channels_fail(number_of_threads));
}

// Calling this function runs (NUMBER_OF_THREADS - 1) threads fine,
// but the last one is never scheduled, causing the program to never exit
async fn with_tokio_channels_fail_call_breaking(number_of_threads: usize) {
    let rt = tokio::runtime::Runtime::new().unwrap();
    // Spawning and awaiting the fake-future has a really odd result!
    // I believe this is simply wrong usage of tokio. But WHY exactly?
    // I am interested in what goes wrong internally when we do this
    rt.spawn(tokio_channels_fail(number_of_threads)).await.unwrap();
}

// Note that this function is unnecessarily async
async fn tokio_channels_fail(number_of_threads: usize) {
    async fn cb(index: usize, num: usize, sender: std::sync::mpsc::Sender<(usize, usize)>) {
        let res = delay_thread_async_then_square(num, num).await;
        sender.send((index, res)).unwrap();
    }

    let (sender, receiver) = std::sync::mpsc::channel::<(usize, usize)>();
    let mut vec: Vec<usize> = (0..number_of_threads).into_iter().collect();

    let _x = (0..number_of_threads)
        .map(|num| {
            let sender = sender.clone();
            tokio::task::spawn(cb(num, num, sender))
        })
        .collect::<Vec<tokio::task::JoinHandle<()>>>();
    // need to drop the sender, because the iterator below will only complete once all senders are dropped
    drop(sender);

    // FIX: when uncommenting this line, the code also runs fine, but this is not a general solution.
    // But would it use the tokio runtime? Means: Would it be spread across multiple
    // kernel level threads?
    // futures::future::join_all(_x).await;

    receiver.iter().for_each(|(index, res)| {
        vec[index] = res;
    });

    println!("{:?}", vec);
}

async fn sleep_for(seconds: u64) {
    tokio::time::sleep(std::time::Duration::from_secs(seconds)).await;
}

async fn delay_thread_async_then_square(thread_no: usize, to_be_squared: usize) -> usize {
    let mut wait = 3;
    println!("Async thread {thread_no} sleeping for {wait} seconds");
    sleep_for(wait).await;
    wait = 7;
    println!("Async thread {thread_no} sleeping for {wait} seconds");
    sleep_for(wait).await;
    wait = 1;
    println!("Async thread {thread_no} sleeping for {wait} seconds");
    sleep_for(wait).await;
    wait = 4;
    println!("Async thread {thread_no} sleeping for {wait} seconds");
    sleep_for(wait).await;
    let res = to_be_squared.pow(2);
    println!("Async thread {thread_no} done. Result is {res}");
    res
}

推荐答案

在异步执行器中运行阻塞代码时,可能会发生TL;DR:种奇怪的事情


总而言之,您已经完成了一个阻塞任务,该阻塞任务直到它自己的.spawn()ed任务完成(通过通道)才会完成.

在Tokio中,派生的任务可能最终在不同的线程上运行,但是,您已经注意到,始终没有运行一个线程.这是因为它将始终try 在当前被阻止的同一线程上运行.

Tokio执行器有一种称为"后进先出槽"的优化机制,它只容纳单个任务.它背后的思想是,如果一个任务唤醒了另一个任务,那么被唤醒的任务几乎总是可以取得进展,从而在同一个Executor线程上"插队".这是如何工作的当对多个任务执行.spawn()时,第一个任务将进入槽中,但是下一个产生的任务将"踢出"已经在槽中的任务,依此类推.这就是(在您的例子中是NUMBER_OF_THREADS = 3)任务0和1总是被踢出到另一个线程,而任务2将留在槽中的原因.

不幸的是,后进先出槽不参与Tokio的"工作窃取"方案,因此等待在那里的任务永远不会移动到另一个线程.因此,在您的用例中,您已经锁定了自己,当前在Executor上运行的任务在另一个任务完成之前不会屈服,但是除非当前任务屈服,否则另一个任务无法完成.

现在,让我们来说明为什么"修复"可以解决这个问题.

  1. 显然,如果您添加futures::future::join_all(_x).await,那么您的任务将不再阻塞,直到所有其他.spawn()-ed任务完成为止.因此,执行器实际上可以运行最后一个.spawn()-ed任务.这也可以通过使用像从tokio::sync::mpsc开始的异步通道来补救.

  2. .spawn()开始的任务和从.block_on()开始的任务的区别实际上很有趣.发生的情况是,.block_on正在运行的任务实际上并不是在Tokio Executor上运行的,它是在自己的小环境中独自运行的.

    block_on的文档中写道:

    Non-worker future

    请注意,此函数所需的future 不是以工作者的身份运行的.人们的期望是,其他任务将由这里的future 催生.等待这里提供的来自future 的其他future 将不会像那些作为工人而产生的那样快.

    因此,它没有后进先出槽来导致相同的行为.

  3. 作为单独的实验,我在构建Runtiime时使用了disable_lifo_slot方法.禁用该机制允许所有任务通过单独的线程执行.

Rust相关问答推荐

为什么基于高山Linux的Docker镜像不能在绝对路径下找到要执行的命令?

正则表达式中的重叠匹配?(铁 rust 正则式发动机)

在生存期内将非静态可变引用转换为范围内的静态可变引用

在文件链实施中绕过borrow 判断器

Rust从关联函数启动线程

将特征与具有生命周期的关联类型一起使用时的生命周期方差问题

如何在 `connect_activate()` 之外创建一个 `glib::MainContext::channel()` 并将其传入?

从字节数组转换为字节元组和字节数组时,为什么 Transmute 会对字节重新排序?

如何正确使用git2::Remote::push?

仅在使用 &mut 或线程时borrow 的数据在闭包之外转义?

`移动||异步移动{...}`,如何知道哪个移动正在移动哪个?

切片不能被 `usize` 索引?

如何获取函数中borrow 的切片的第一部分?

使用 `clap` 在 Rust CLI 工具中设置布尔标志

你能告诉我如何在 Rust 中使用定时器吗?

为什么 Rust 编译器在移动不可变值时执行复制?

从函数返回 u32 的数组/切片

`if let` 只是另一种编写其他 `if` 语句的方式吗?

如何制作具有关联类型的特征的类型擦除版本?

为什么 Rust 中的关联类型需要明确的生命周期注释?