请考虑下面的代码.当您try 使用println!("why?");
个消费者线程时,它将永远不会结束.在没有输出的情况下线程成功加入.您也可以在信号量释放之前打印一些字符串,但不能在它之后打印.为什么?
use std::collections::VecDeque;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc, Condvar, Mutex,
};
use std::thread;
use std::time::Duration;
pub struct Semaphore {
lock: Mutex<isize>,
cvar: Condvar,
}
impl Semaphore {
pub fn new(count: isize) -> Semaphore {
Semaphore {
lock: Mutex::new(count),
cvar: Condvar::new(),
}
}
pub fn acquire(&self) {
let mut count = self.lock.lock().unwrap();
while *count <= 0 {
count = self.cvar.wait(count).unwrap();
}
*count -= 1;
}
pub fn release(&self) {
*self.lock.lock().unwrap() += 1;
self.cvar.notify_one();
}
}
fn main() {
const MAX_QUEUE_LENGTH: isize = 5;
const PRODUCER_NUMBER: u32 = 10;
const CONSUMER_NUMBER: u32 = 10;
let item_queue = Arc::new(Mutex::new(VecDeque::new()));
let queue_length = Arc::new(Semaphore::new(0));
let empty_number = Arc::new(Semaphore::new(MAX_QUEUE_LENGTH));
let is_running = Arc::new(AtomicBool::new(true));
let mut producers = Vec::new();
for _ in 0..PRODUCER_NUMBER {
producers.push(thread::spawn({
let item_queue = item_queue.clone();
let queue_length = queue_length.clone();
let empty_number = empty_number.clone();
let is_running = is_running.clone();
move || producer(item_queue, queue_length, empty_number, is_running)
}));
}
let mut consumers = Vec::new();
for _ in 0..CONSUMER_NUMBER {
consumers.push(thread::spawn({
let item_queue = item_queue.clone();
let queue_length = queue_length.clone();
let empty_number = empty_number.clone();
let is_running = is_running.clone();
move || consumer(item_queue, queue_length, empty_number, is_running)
}));
}
thread::sleep(Duration::from_secs(1));
is_running.store(false, Ordering::Relaxed);
for prod in producers {
let _ = prod.join();
}
for cons in consumers {
let _ = cons.join();
}
}
struct Item {}
fn producer(
item_queue: Arc<Mutex<VecDeque<Item>>>,
queue_length: Arc<Semaphore>,
empty_number: Arc<Semaphore>,
is_running: Arc<AtomicBool>,
) {
while is_running.load(Ordering::Relaxed) {
let item = Item {};
empty_number.acquire();
{
let mut item_queue = item_queue.lock().unwrap();
item_queue.push_back(item);
}
queue_length.release();
}
}
fn consumer(
item_queue: Arc<Mutex<VecDeque<Item>>>,
queue_length: Arc<Semaphore>,
empty_number: Arc<Semaphore>,
is_running: Arc<AtomicBool>,
) {
while is_running.load(Ordering::Relaxed) {
let item;
queue_length.acquire();
{
let mut item_queue = item_queue.lock().unwrap();
item = item_queue.pop_front().unwrap();
}
empty_number.release();
println!("why?");
}
}