我有一个C++程序和一个Rust程序,我已经成功地让它们通过POSIX共享内存(C++rust)进行了交谈.

我现在试图做的是使它们同步.我已经设法使用原子布尔创建了一个工作但效率低下的原始系统(在 rust 面like this上创建了AtomicBool).

然而,我真的很想使用互斥/condvar在线程之间进行同步,这就是我被困住的地方.

我似乎能够初始化它的C++端,几乎逐字跟随this example个字.

我试着把它直接翻译成铁 rust :

    let raw_shm = shm.get_shm();

    let mut mtx_attrs = MaybeUninit::<nix::libc::pthread_mutexattr_t>::uninit();
    if unsafe { nix::libc::pthread_mutexattr_init(mtx_attrs.as_mut_ptr()) } != 0 {
        panic!("failed to create mtx_attrs");
    };
    let mtx_attrs = unsafe { mtx_attrs.assume_init() };

    let mut cond_attrs = MaybeUninit::<nix::libc::pthread_condattr_t>::uninit();
    if unsafe { nix::libc::pthread_condattr_init(cond_attrs.as_mut_ptr()) } != 0 {
        panic!("failed to create cond_attrs");
    };
    let cond_attrs = unsafe { cond_attrs.assume_init() };

    if unsafe {
        nix::libc::pthread_mutexattr_setpshared(
            &mtx_attrs as *const _ as *mut _,
            PTHREAD_PROCESS_SHARED,
        )
    } != 0
    {
        panic!("failed to set mtx as process shared");
    };

    if unsafe {
        nix::libc::pthread_condattr_setpshared(
            &cond_attrs as *const _ as *mut _,
            PTHREAD_PROCESS_SHARED,
        )
    } != 0
    {
        panic!("failed to set cond as process shared");
    };

    // I know that these offsets are correct, having used `offsetof` on the C++ side
    let mtx_start = unsafe { &raw_shm.as_slice()[3110416] };
    let mtx = unsafe { &*(mtx_start as *const _ as *const pthread_mutex_t) };
    let cond_start = unsafe { &raw_shm.as_slice()[3110440] };
    let cond = unsafe { &*(cond_start as *const _ as *const pthread_mutex_t) };

    if unsafe {
        nix::libc::pthread_mutex_init(&mtx as *const _ as *mut _, &mtx_attrs as *const _ as *mut _)
    } != 0
    {
        panic!("failed to init mtx");
    };
    if unsafe {
        nix::libc::pthread_cond_init(
            &cond as *const _ as *mut _,
            &cond_attrs as *const _ as *mut _,
        )
    } != 0
    {
        panic!("failed to init cond");
    };

所有这些都以返回值0传递...到目前一切尚好.

我现在可以通过以下两种方式之一进行测试:

  1. 我可以让这个简单的C++程序继续运行,并让它停止在condvar处等待:
if (pthread_mutex_lock(&shmp->mutex) != 0)
    throw("Error locking mutex");
if (pthread_cond_wait(&shmp->condition, &shmp->mutex) != 0)
    throw("Error waiting for condition variable");

在铁 rust 中:

let sig = unsafe { nix::libc::pthread_cond_signal(&cond as *const _ as *mut _) };
    dbg!(sig);

尽管返回0(即成功),但我的C++程序并未释放超过condvar;它仍在等待,就好像从未收到信号一样.

  1. 我可以设置另一个简单的C++程序,它在循环中无休止地发出条件变量的信号:
    for (unsigned int count = 0;; count++) {
        if (pthread_cond_signal(condition) != 0)
            throw("Error")
        // sleep for a bit
    }

然后在Rust 的时候,大概是这样:

    loop {
        if unsafe { nix::libc::pthread_mutex_lock(&mtx as *const _ as *mut _) } > 0 {
            panic!("Failed to acquire lock")
        };
        if unsafe {
            nix::libc::pthread_cond_wait(&cond as *const _ as *mut _, &mtx as *const _ as *mut _)
        } > 0
        {
            panic!("Failed to acquire lock")
        };
    }

这样做,锁定互斥锁的调用是成功的,但我在pthread_cond_wait定义的here上得到了EINVAL,我似乎无法纠正...

我觉得我快到了...你有什么 idea 让它发挥作用吗?(这主要是一个概念验证).

推荐答案

为了子孙后代,我设法让这一点发挥作用.

为了阐明程序是如何架构的,有两个二进制文件:一个是C++,一个是Ruust.Rust程序使用std::process::Command生成C++程序.

为简洁起见,省略了错误处理和导入.

  1. Rust程序启动并创建一个新的共享内存块(如果现有块存在,则将其删除,以确保程序始终以新状态启动).我使用shared_memory个 crate 为我处理细节,这也提供了有用的帮助程序,如access to a raw pointer到内存块的开头.

共享内存块的 struct 如下:

#[repr(c)]
struct SharedMemoryLayout {
    ready: std::sync::atomic::AtomicBool,
    mutex: libc::pthread_mutex_t,
    condition: libc::pthread_cond_t,
}

共享内存块是用零初始化的,因此ready从一开始就是false.

  1. Rust程序生成具有std::process::Command::spawn的C++程序,然后循环等待,直到readytrue.
let proc = Command::new("/path/to/c++/binary").spawn().unwrap();

let ptr: *mut u8 = // pointer to first byte of shared memory block;
let ready: &AtomicBool =  unsafe { &*(ptr as *mut bool as *const AtomicBool) };

loop {
    if ready.load(Ordering::SeqCst) {
        break
    } else {
        thread::sleep(Duration::from_secs(1));
    }
}
  1. C++程序打开共享内存块,并将其放入其本地地址空间.
struct SharedMemoryLayout
{
    std::atomic_bool ready;
    pthread_mutex_t mutex;
    pthread_cond_t condition;
};

int fd = shm_open("name_of_shared_memory_block", O_RDWR, S_IRUSR | S_IWUSR);
struct SharedMemoryLayout *sync = (SharedMemoryLayout *)mmap(NULL, sizeof(*sync), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
  1. C++程序继续并继续初始化mutexcondition,然后将内存块标记为就绪.
pthread_mutexattr_t mutex_attributes;
pthread_condattr_t condition_attributes;

pthread_mutexattr_init(&mutex_attributes);
pthread_condattr_init(&condition_attributes);

pthread_mutexattr_setpshared(&mutex_attributes, PTHREAD_PROCESS_SHARED);
pthread_condattr_setpshared(&condition_attributes, PTHREAD_PROCESS_SHARED);

pthread_mutex_init(&sync->mutex, &mutex_attributes);
pthread_cond_init(&sync->condition, &condition_attributes);

pthread_mutexattr_destroy(&mutex_attributes);
pthread_condattr_destroy(&condition_attributes);

std::atomic_bool *ready = &syncp->ready;
ready->store(true);

然后在以下条件下进入环路信令:

for (unsigned int count = 0;; count++) {
    // do something
    sleep(1);
    pthread_cond_signal(&sync->condition);
}
  1. 现在,在步骤2)中,铁 rust 程序将从循环中释放.具体化在步骤4)中初始化的互斥体和条件.
let mutex = unsafe {ptr.offset(4) as *mut pthread_mutex_t};
let condition = unsafe {ptr.offset(32) as *mut pthread_cond_t};

现在我们可以等待条件,得到c++程序的通知.

loop {
    unsafe {
        pthread_mutex_lock(mutex);
        pthread_cond_wait(condition, mutex);
        pthread_mutex_unlock(mutex);
        
        // Do something
    }
}

Rust相关问答推荐

值为可变对象的不可变HashMap

如何获取Serde struct 的默认实例

编译项目期间使用Cargo生成时出现rustc错误

为什么Rust不支持带关联常量的特征对象?

Rust编译器似乎被结果类型与anyhow混淆

随机函数不返回随机值

根据掩码将 simd 通道设置为 0 的惯用方法?

`use` 和 `crate` 关键字在 Rust 项目中效果不佳

如何从borrow 的异步代码运行阻塞代码?

Rust 异步循环函数阻塞了其他future 任务的执行

‘&T as *const T as *mut T’ 在 ‘static mut’ 项目中合适吗?

为什么需要同时为值和引用实现`From`?方法不应该自动解引用或borrow 吗?(2023-06-16)

打印 `format_args!` 时borrow 时临时值丢失

为什么 i32 Box 类型可以在 Rust 中向下转换?

试图理解 Rust 中的可变闭包

如何为返回正确类型的枚举实现 get 方法?

有没有更好的方法来为拥有 DIsplay 事物集合的 struct 实现 Display?

带有库+多个二进制文件的Cargo 项目,二进制文件由多个文件组成?

如何在 Rust 中构建一个 str

为什么 Bevy 的 Trait 边界不满足 Rapier 物理插件?