请考虑下面的代码.当您try 使用println!("why?");个消费者线程时,它将永远不会结束.在没有输出的情况下线程成功加入.您也可以在信号量释放之前打印一些字符串,但不能在它之后打印.为什么?

use std::collections::VecDeque;
use std::sync::{
    atomic::{AtomicBool, Ordering},
    Arc, Condvar, Mutex,
};
use std::thread;
use std::time::Duration;

pub struct Semaphore {
    lock: Mutex<isize>,
    cvar: Condvar,
}

impl Semaphore {
    pub fn new(count: isize) -> Semaphore {
        Semaphore {
            lock: Mutex::new(count),
            cvar: Condvar::new(),
        }
    }

    pub fn acquire(&self) {
        let mut count = self.lock.lock().unwrap();
        while *count <= 0 {
            count = self.cvar.wait(count).unwrap();
        }
        *count -= 1;
    }

    pub fn release(&self) {
        *self.lock.lock().unwrap() += 1;
        self.cvar.notify_one();
    }
}

fn main() {
    const MAX_QUEUE_LENGTH: isize = 5;
    const PRODUCER_NUMBER: u32 = 10;
    const CONSUMER_NUMBER: u32 = 10;

    let item_queue = Arc::new(Mutex::new(VecDeque::new()));
    let queue_length = Arc::new(Semaphore::new(0));
    let empty_number = Arc::new(Semaphore::new(MAX_QUEUE_LENGTH));
    let is_running = Arc::new(AtomicBool::new(true));

    let mut producers = Vec::new();
    for _ in 0..PRODUCER_NUMBER {
        producers.push(thread::spawn({
            let item_queue = item_queue.clone();
            let queue_length = queue_length.clone();
            let empty_number = empty_number.clone();
            let is_running = is_running.clone();
            move || producer(item_queue, queue_length, empty_number, is_running)
        }));
    }

    let mut consumers = Vec::new();
    for _ in 0..CONSUMER_NUMBER {
        consumers.push(thread::spawn({
            let item_queue = item_queue.clone();
            let queue_length = queue_length.clone();
            let empty_number = empty_number.clone();
            let is_running = is_running.clone();
            move || consumer(item_queue, queue_length, empty_number, is_running)
        }));
    }

    thread::sleep(Duration::from_secs(1));

    is_running.store(false, Ordering::Relaxed);

    for prod in producers {
        let _ = prod.join();
    }

    for cons in consumers {
        let _ = cons.join();
    }
}

struct Item {}

fn producer(
    item_queue: Arc<Mutex<VecDeque<Item>>>,
    queue_length: Arc<Semaphore>,
    empty_number: Arc<Semaphore>,
    is_running: Arc<AtomicBool>,
) {
    while is_running.load(Ordering::Relaxed) {
        let item = Item {};

        empty_number.acquire();
        {
            let mut item_queue = item_queue.lock().unwrap();
            item_queue.push_back(item);
        }
        queue_length.release();
    }
}

fn consumer(
    item_queue: Arc<Mutex<VecDeque<Item>>>,
    queue_length: Arc<Semaphore>,
    empty_number: Arc<Semaphore>,
    is_running: Arc<AtomicBool>,
) {
    while is_running.load(Ordering::Relaxed) {
        let item;

        queue_length.acquire();
        {
            let mut item_queue = item_queue.lock().unwrap();
            item = item_queue.pop_front().unwrap();
        }
        empty_number.release();

        println!("why?");
    }
}

推荐答案

找到了答案.我需要实现acquire_timeout方法,因为当我们已经在循环中时,is_running可以变成false:将没有可用的项目,但我们将等待它们,因此需要超时.

首先,我错误地实现了它,因此在我的代码中替换为标准acquire.但后来我正确地写了它,它解决了问题. 下面是该方法的代码:

pub fn acquire_timeout(&self, dur: Duration) -> bool {
    let mut count = self.lock.lock().unwrap();
    match self.cvar.wait_timeout(count, dur) {
        Ok((new_count, timeout)) if !timeout.timed_out() => {
                count = new_count;
                if *count > 0 {
                    *count -= 1;
                    true
                } else {
                    false
                }
            },
        _ => false,
    }
}

然后我用这个:

while !queue_length.acquire_timeout(Duration::from_millis(50)) {
    if !is_running.load(Ordering::Relaxed) && queue_length.get_value() == 0 {
        return;
    }
}

而不是这样:

queue_length.acquire();

方法get_value返回信号量的当前值.

Rust相关问答推荐

如何访问Rust存储值的内存地址

如何创建引用构造函数拥有的变量的对象?

使用pyo3::Types::PyIterator的无限内存使用量

integer cast as pointer是什么意思

有没有一种惯用的方法来判断VEC中是否存在变体?

在IntoIter上调用.by_ref().Take().rev()时会发生什么情况

无符号整数的Rust带符号差

如何实现Serde::Ser::Error的调试

获取已知数量的输入

Rust proc_macro 和 syn:解析空格

中文优化标题:跳出特定循环并返回一个值

方法可以被误认为是标准特性方法

为什么Rust中无法推断生命周期?

borrow 匹配手臂内部的可变

判断 is_ok 后重用结果

我如何将特征作为 struct 的拥有字段?

有没有办法使用 NASM 语法进行内联汇编?

如何在 Rust 的内置函数上实现特征?

为什么 u64::trailing_zeros() 在无分支工作时生成分支程序集?

为什么 Bevy 的 Trait 边界不满足 Rapier 物理插件?