我试图用人造丝par_iter()优化我的功能.

单线程版本类似于:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {

    let result = txs.iter().map(|tx| {

         tx.verify_and_store(store)

    }).collect();

    ...
}

每个Store实例只能由一个线程使用,但Store的多个实例可以同时使用,因此我可以通过clonestore实现多线程:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {

    let result = txs.par_iter().map(|tx| {

         let mut local_store = store.clone();

         tx.verify_and_store(&mut local_store)

    }).collect();

    ...
}

然而,这克隆了storeonevery迭代,这太慢了.我希望每个线程使用一个存储实例.

人造丝可以吗?或者我应该求助于手动线程和工作队列?

推荐答案

可以使用线程局部变量来确保在给定线程中不会多次创建local_store.

例如,它编译(full source):

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
    use std::cell::RefCell;
    thread_local!(static STORE: RefCell<Option<Store>> = RefCell::new(None));

    let mut result = Vec::new();

    txs.par_iter().map(|tx| {
        STORE.with(|cell| {
            let mut local_store = cell.borrow_mut();
            if local_store.is_none() {
                *local_store = Some(store.clone());
            }
            tx.verify_and_store(local_store.as_mut().unwrap())
        })
    }).collect_into(&mut result);
}

然而,这段代码有两个问题.第一,如果store的克隆在par_iter()完成时需要做一些事情,比如刷新缓冲区,那就不会发生——只有在人造丝的工作线程退出时,才会调用Drop,甚至is not guaranteed.

第二个问题,也是更严重的问题,是每个工作线程恰好创建store个克隆.如果Rayon缓存了它的线程池(我相信是的),这意味着稍后对verify_and_store的一个无关调用将继续处理最后已知的store个克隆,这可能与当前存储无关.

这可以通过使代码复杂化来纠正:

  • 将克隆的变量存储在Mutex<Option<...>>而不是Option中,以便调用par_iter()的线程可以访问它们.这将在每次访问时产生一个互斥锁,但该锁将是无竞争的,因此价格低廉.

  • 在互斥体周围使用Arc,以收集对向量中创建的存储克隆的引用.该向量用于在迭代完成后通过将存储重置为None来清理存储.

  • 将整个通话包装在一个不相关的互斥对象中,这样两个并行的verify_and_store次通话就不会看到对方的store 克隆.(如果在迭代之前创建并安装了新的线程池,这可能是可以避免的.)希望这种序列化不会影响verify_and_store的性能,因 for each 调用都将利用整个线程池.

结果并不完美,但它可以编译,只使用安全代码,并且似乎可以工作:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
    use std::sync::{Arc, Mutex};
    type SharedStore = Arc<Mutex<Option<Store>>>;

    lazy_static! {
        static ref STORE_CLONES: Mutex<Vec<SharedStore>> = Mutex::new(Vec::new());
        static ref NO_REENTRY: Mutex<()> = Mutex::new(());
    }
    thread_local!(static STORE: SharedStore = Arc::new(Mutex::new(None)));

    let mut result = Vec::new();
    let _no_reentry = NO_REENTRY.lock();

    txs.par_iter().map({
        |tx| {
            STORE.with(|arc_mtx| {
                let mut local_store = arc_mtx.lock().unwrap();
                if local_store.is_none() {
                    *local_store = Some(store.clone());
                    STORE_CLONES.lock().unwrap().push(arc_mtx.clone());
                }
                tx.verify_and_store(local_store.as_mut().unwrap())
            })
        }
    }).collect_into(&mut result);

    let mut store_clones = STORE_CLONES.lock().unwrap();
    for store in store_clones.drain(..) {
        store.lock().unwrap().take();
    }
}

Rust相关问答推荐

as操作符如何将enum转换为int?

为什么拥有的trait对象的相等运算符移动了正确的操作数?

使用模块中的所有模块,但不包括特定模块

MPSC频道在接收器处阻塞

你是如何在铁 rust 一侧的金牛座获得应用程序版本的?

如何正确地将App handler传递给Tauri中的其他模块?

为什么铁 rust S似乎有内在的易变性?

字段类型为Boxed的 struct 的生存期必须超过static

为什么BufReader实际上没有缓冲短寻道?

有没有一种方法可以创建一个闭包来计算Rust中具有随机系数的n次多项式?

期望一个具有固定大小 x 元素的数组,找到一个具有 y 元素的数组

如何从宏调用闭包?

可选包装枚举的反序列化

我可以禁用发布模式的开发依赖功能吗?

如何使用泛型满足 tokio 异步任务中的生命周期界限

为什么拥有 i32 所有权的函数需要它是可变的?

为什么指定生命周期让我返回一个引用?

在空表达式语句中移动的值

为什么 match 语句对引用类型比函数参数更挑剔?

为什么分配对变量的引用使我无法返回它