我正在try 用Rust编写一个union-find的实现.这在C等语言中实现起来非常简单,但仍然需要进行复杂的运行时分析.

I'm having trouble getting Rust's mutex semantics to allow iterative hand-over-hand locking.

这就是我现在的处境.

首先,这是我想要在C中实现的 struct 的一部分的一个非常简单的实现:

#include <stdlib.h>

struct node {
  struct node * parent;
};

struct node * create(struct node * parent) {
  struct node * ans = malloc(sizeof(struct node));
  ans->parent = parent;
  return ans;
}

struct node * find_root(struct node * x) {
  while (x->parent) {
    x = x->parent;
  }
  return x;
}

int main() {
  struct node * foo = create(NULL);
  struct node * bar = create(foo);
  struct node * baz = create(bar);
  baz->parent = find_root(bar);
}

请注意,指针的 struct 是inverted tree;多个指针可能指向一个位置,并且没有循环.

此时,没有路径压缩.

这是一个可靠的翻译.我 Select 使用Rust的引用计数指针类型来支持上面提到的倒树类型.

请注意,这个实现要详细得多,可能是因为Rust提供了更高的安全性,但可能是因为我对Rust缺乏经验.

use std::rc::Rc;

struct Node {
    parent: Option<Rc<Node>>
}

fn create(parent: Option<Rc<Node>>) -> Node {
    Node {parent: parent.clone()}
}

fn find_root(x: Rc<Node>) -> Rc<Node> {
    let mut ans = x.clone();
    while ans.parent.is_some() {
        ans = ans.parent.clone().unwrap();
    }
    ans
}

fn main() {
    let foo = Rc::new(create(None));
    let bar = Rc::new(create(Some(foo.clone())));
    let mut prebaz = create(Some(bar.clone()));
    prebaz.parent = Some(find_root(bar.clone()));
}

每次调用find_root时,路径压缩都会沿着根路径 for each node 重新设置父 node .要将此功能添加到C代码中,只需要两个新的小函数:

void change_root(struct node * x, struct node * root) {
  while (x) {
    struct node * tmp = x->parent;
    x->parent = root;
    x = tmp;
  }
}

struct node * root(struct node * x) {
  struct node * ans = find_root(x);
  change_root(x, ans);
  return ans;
}

函数change_root执行所有的重父化,而函数root只是一个包装器,用于使用find_root的结果来重父化根路径上的 node .

为了在Rust中实现这一点,我决定必须使用Mutex,而不仅仅是引用计数指针,因为Rc接口只允许在多个指向该项的指针处于活动状态时,通过写时复制进行可变访问.因此,所有代码都必须更改.在进入路径压缩部分之前,我挂断了find_root:

use std::sync::{Mutex,Arc};

struct Node {
    parent: Option<Arc<Mutex<Node>>>
}

fn create(parent: Option<Arc<Mutex<Node>>>) -> Node {
    Node {parent: parent.clone()}
}

fn find_root(x: Arc<Mutex<Node>>) -> Arc<Mutex<Node>> {
    let mut ans = x.clone();
    let mut inner = ans.lock();
    while inner.parent.is_some() {
        ans = inner.parent.clone().unwrap();
        inner = ans.lock();
    }
    ans.clone()
}

这会产生错误(0.12.0)

error: cannot assign to `ans` because it is borrowed
ans = inner.parent.clone().unwrap();

note: borrow of `ans` occurs here
let mut inner = ans.lock();

What I think I need here is hand-over-hand locking. For the path A -> B -> C -> ..., I need to lock A, lock B, unlock A, lock C, unlock B, ... Of course, I could keep all of the locks open: lock A, lock B, lock C, ... unlock C, unlock B, unlock A, but this seems inefficient.

然而,Mutex不提供解锁,而是使用RAII.How can I achieve hand-over-hand locking in Rust without being able to directly call 101?


EDIT:正如 comments 所指出的,我可以使用Rc<RefCell<Node>>而不是Arc<Mutex<Node>>.这样做会导致相同的编译器错误.

为了清楚地说明我试图通过使用手动锁定来避免什么,这里有一个RefCell版本,它编译但在路径长度上使用了空格线性.

fn find_root(x: Rc<RefCell<Node>>) -> Rc<RefCell<Node>> {
    let mut inner : RefMut<Node> = x.borrow_mut();
    if inner.parent.is_some() {
        find_root(inner.parent.clone().unwrap())
    } else {
        x.clone()
    }
}

推荐答案

我们可以很容易地进行完全的手动锁定,因为我们只需要使用unsafe的一点就可以遍历这个列表,这对于告诉借阅判断器我们知道的一些细节是必要的,但它不知道.

但首先,让我们明确地阐述问题:

  • 我们想要遍历一个链表,其 node 存储为Arc<Mutex<Node>>,以获得列表中的最后一个 node
  • 我们需要在前进的过程中锁定列表中的每个 node ,这样,另一个并发遍历必须严格地跟随在我们后面,并且不能干扰我们的进度.

在我们进入细节之前,让我们试着写下这个函数的签名:

fn find_root(node: Arc<Mutex<Node>>) -> Arc<Mutex<Node>>;

现在我们知道了我们的目标,我们可以开始实施了——这里是第一次try :

fn find_root(incoming: Arc<Mutex<Node>>) -> Arc<Mutex<Node>> {
    // We have to separate this from incoming since the lock must
    // be borrowed from incoming, not this local node.  
    let mut node = incoming.clone();
    let mut lock = incoming.lock();
    
    // Could use while let but that leads to borrowing issues.
    while lock.parent.is_some() {
       node = lock.parent.as_ref().unwrap().clone(); // !! uh-oh !!
       lock = node.lock();
    }

    node
} 

如果我们试图编译它,rustc将在标记为!! uh-oh !!的行上出错,告诉我们当lock仍然存在时,我们不能移出 node ,因为lock是borrow node.这不是一个虚假的错误!lock中的数据可能会随着node的消失而消失——这只是因为我们知道,即使我们移动node,我们也可以将lock中的数据保持在同一个有效的内存位置,从而解决这个问题.

这里的关键洞察是,Arc中包含的数据的生命周期是动态的,借阅判断器很难推断Arc中数据的有效期.

这种情况每隔一段时间就会发生一次;与rustc相比,您对数据的生命周期和组织有更多的了解,您希望能够向编译器表达这些知识,有效地说"相信我".输入:unsafe——我们告诉编译器我们知道的比它多的方式,应该允许我们告知它我们知道但它不知道的保证.

在这种情况下,保证非常简单——我们将在锁仍然存在时替换 node ,但我们不会确保锁中的数据在 node 消失后仍然有效.为了表达这种保证,我们可以使用mem::transmute,这是一个允许我们重新解释任何变量类型的函数,只需使用它将 node 返回的锁的生存期更改为略长于实际值即可.

为了确保我们信守promise ,我们将在重新分配锁时使用另一个切换变量来保持 node -尽管这会移动 node (更改其地址)并且borrow 判断器会对我们生气,但我们知道这没关系,因为lock不指向 node ,它指向node内的数据,他的地址(在本例中,因为它位于Arc后面)不会改变.

在我们得到解决方案之前,需要注意的是,我们在这里使用的技巧是only有效的,因为我们使用的是Arc.借阅判断器正在警告我们一个可能严重的错误——如果Mutex保持在内联状态,而不是在Arc中,那么这个错误将正确防止在释放后使用,在lock中保持的MutexGuard将try 解锁一个已经被丢弃的Mutex,或者至少移动到另一个内存位置.

use std::mem;
use std::sync::{Arc, Mutex};

fn find_root(incoming: Arc<Mutex<Node>>) -> Arc<Mutex<Node>> {
    let mut node = incoming.clone();
    let mut handoff_node;
    let mut lock = incoming.lock().unwrap();
    
    // Could use while let but that leads to borrowing issues.
    while lock.parent.is_some() {
       // Keep the data in node around by holding on to this `Arc`.
       handoff_node = node;

       node = lock.parent.as_ref().unwrap().clone();

       // We are going to move out of node while this lock is still around,
       // but since we kept the data around it's ok.
       lock = unsafe { mem::transmute(node.lock().unwrap()) };
    }

    node
} 

就这样,rustc很高兴,我们有了手动锁,因为只有在我们获得了新锁之后,最后一把锁才会被释放!

在这个实现中有一个尚未回答的问题,我还没有得到答案,那就是旧值的删除和新值的赋值是否保证是原子的——如果不是,则存在一个竞争条件,即旧锁在赋值lock中获得新锁之前被释放.解决这个问题非常简单,只需在重新分配holdover_lock变量之前将旧锁移到其中,然后在重新分配lock之后将其丢弃.

希望这能充分解决您的问题,并展示当您真正了解更多信息时,如何使用unsafe解决借阅判断器中的"缺陷".我仍然希望,你知道的比借阅判断器更多的情况是罕见的,而转化生命周期不是"通常"的行为.

如你所见,以这种方式使用Mutex是相当复杂的,你必须处理manymany,可能的比赛条件来源,我甚至可能没有捕捉到所有这些!除非您真的需要从许多线程访问这个 struct ,否则如果需要的话,最好只使用RcRefCell,因为这会使事情变得更容易.

Rust相关问答推荐

如何优化小型固定大小数组中的搜索?

将JSON密钥转换为Polars DataFrame

在没有引用计数或互斥锁的情况下,可以从Rust回调函数内的封闭作用域访问变量吗?

MutexGuard中的过滤载体不需要克隆

JSON5中的变量类型(serde)

为什么&;mut buf[0..buf.len()]会触发一个可变/不可变的borrow 错误?

`actix-web` 使用提供的 `tokio` 运行时有何用途?

为什么某些类型参数仅在特征边界中使用的代码在没有 PhantomData 的情况下进行编译?

为什么切片时需要参考?

Rust 如何将链表推到前面?

一旦令牌作为文字使用,声明宏不匹配硬编码值?

在多核嵌入式 Rust 中,我可以使用静态 mut 进行单向数据共享吗?

当你删除一个存在于堆栈中的值时,为什么 rust 不会抱怨

为什么1..=100返回一个范围而不是一个整数?

如何在 Rust 中创建最后一个元素是可变长度数组的 struct ?

为什么分配对变量的引用使我无法返回它

Rust 跨同一文件夹中文件的可见性

通用类型,不同于输入类型,作为函数的返回值

为什么我不能将元素写入 Rust 数组中移动的位置,但我可以在元组中完成

为什么 `ref` 会导致此示例*取消引用*一个字段?