我在为某个特定问题创建安全高效的锁定机制时遇到了麻烦.以下是对这种情况的过于简单化的描述.

我有两个名为A和B的表,每个表都包含(比方说)10个条目的空间.在这10个条目中,其中9个是指向另一个表的指针,其余条目为空.在程序开始时,非空条目以双射对应开始.例如,如果我沿着一个链接从表A到表B,然后再返回,我就会到达A中的原始位置.同样,如果我沿着一个链接从表B到表A,然后再回来,我就会到达我在B中的原始位置.有一些全局变量用于跟踪空值的位置.

在设置了这个双射对应之后,程序产生了两个线程.第一个线程("线程A")重复执行以下操作:它随机 Select A的条目,将其交换到空点,然后继续调整B中的相应指针,以保持双射对应.另一个线程("线程B")执行完全相同的操作,但从另一个表开始.因此,它随机 Select B中的条目,将它们交换到空点,并调整A中的相应指针.

Question:我们如何在不锁定整个表的情况下使该系统线程安全,最好只使用每个指针的底部4位?

显然,我关心的实际情况涉及更大的表,而且每个条目都附加了(少量)数据.而且,在现实生活中,动作并不完全是随机的;它们是有目的的.尽管我认为这些细节与解决实际的多线程问题没有太大关系,但如果您想了解更多关于我正在try 做的事情的详细信息,请随时询问.

Addendum.我刚刚注意到这个问题有多个版本.最简单的版本是,如果我们 Select 了一个条目,并意识到它不能移动,这是完全可以的-我们只是随机 Select 另一个条目,并移动它.中等难度的版本说我们只允许对A做这个.这意味着在表B中,我们必须阻止并等待,直到移动成为可能,而不能简单地 Select 另一个条目.这个问题最难的版本是,我们被剥夺了放弃和重新随机化两张表的权利.FWIW,我对这个问题的所有版本都感兴趣;因此,即使你只能解决"简单"的版本,我仍然重视你的答案.

这里有一些针对x86-64/Linux的示例代码,尽管没有任何线程安全机制.由于qword的打包方式,目前的设计每个指针只有3位,但如果需要,我们可以通过使用128位条目将其升级为4位.

#include <pthread.h>
#include <stdint.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

// Clear the bottom 3 bits of a 64-bit value
#define GET_POINTER(val) (uint64_t*) ((val) & ~0x7ULL)

// Concatenates the top 61 bits of new_ptr with the bottom 3 bits of old_val
// so long as the bottom 3 bits of new_ptr are 0's
#define MODIFY_VALUE(new_ptr, old_val) ((uint64_t) (new_ptr)) ^ ((old_val) & 0x7ULL) 

// Declare globals
uint64_t A[10], B[10];
int index_of_NULL_value_A, index_of_NULL_value_B;

// Initialize tables
void init_globals() {

    // Initialize A[0] and B[0] to NULL pointers
    A[0] = (uint64_t) NULL; // Note that (uint64_t) NULL == 0;
    B[0] = (uint64_t) NULL; // Note that (uint64_t) NULL == 0;

    // Record the initial indexes of the NULL values
    index_of_NULL_value_A = 0;
    index_of_NULL_value_B = 0;

    // Create pointers from A to B
    for (int i = 1; i < 10; ++i) {
        A[i] = (uint64_t) &B[i];
    }

    // Create pointers from B to A
    for (int i = 1; i < 10; ++i) {
        B[i] = (uint64_t) &A[i];
    }

}

void verify_integrity_of_table(uint64_t* A, int index_of_NULL_value_A) {

    // Verify the forward direction
    for (int i = 0; i < 10; ++i) {

        if (i == index_of_NULL_value_A) {

            // Check that the NULL value is at this spot
            if (A[i] != (uint64_t) NULL) {
                fprintf(stderr, "Integrity check A failed! Missing NULL at i: %d\n", i);
                exit(1);
            }

        } else {

            // Check link integrity
            if (&A[i] != GET_POINTER(*GET_POINTER(A[i]))) {
                fprintf(stderr, "Integrity check A failed! Dodgy link at i: %d\n", i);
                exit(1);
            }

        }

    }

}

void verify_integrity() {

    // Verify the forward direction
    verify_integrity_of_table(A, index_of_NULL_value_A);

    // Verify the backward direction
    verify_integrity_of_table(B, index_of_NULL_value_B);

}

typedef void *(*start_routine_t)(void *);
pthread_t pthread_create_or_exit(start_routine_t start_routine) {

    // Declare variables
    pthread_t thread_id;
    int result;

    // Create the thread
    result = pthread_create(&thread_id, NULL, start_routine, NULL);
    if (result != 0) {
        perror("Failed to create thread!\n");
        exit(EXIT_FAILURE);
    }

    // Return the thread id
    return thread_id;

}

void do_a_random_swap(uint64_t* A, int* addr_of_index_of_NULL_value) {
    
    // Get the index of the NULL value
    int index_of_NULL = *addr_of_index_of_NULL_value;

    // Choose a random index
    int i = rand() % 10;
    while (i == index_of_NULL) {
        i = rand() % 10;
    }

    // Update the backpointer
    uint64_t* new_ptr = &A[index_of_NULL];
    uint64_t  old_val = *(GET_POINTER(A[i]));
    *GET_POINTER(A[i]) = MODIFY_VALUE(new_ptr, old_val);

    // Copy the item into the NULL spot and clear the old spot
    A[index_of_NULL] = A[i];
    A[i] = (uint64_t) NULL; // Note that (uint64_t) NULL == 0

    // Update the NULL index tracker
    *addr_of_index_of_NULL_value = i;

}

void* fst_start_routine(void* arg) {

    // Tell the compiler not to panic about the fact that we're not using this argument
    (void) arg;

    // Loop forever
    while (1) {
        if (time(NULL) % 2 == 0) {
            do_a_random_swap(A, &index_of_NULL_value_A);
        } else {
            sleep(0.1);
        }
    }

    // We never get here
    return NULL;

}

void* snd_start_routine(void* arg) {
    
    // Tell the compiler not to panic about the fact that we're not using this argument
    (void) arg;

    // Loop forever
    while (1) {
        if (time(NULL) % 2 == 0) {
            do_a_random_swap(B, &index_of_NULL_value_B);
        } else {
            sleep(0.1);
        }
    }

    // We never get here
    return NULL;

}

void* integrity_checker_start_routine(void* arg) {

    // Tell the compiler not to panic about the fact that we're not using this argument
    (void) arg;

    // Loop forever, checking the integrity of the system during odd seconds
    for (;; sleep(0.1)) {
        if (time(NULL) % 2 == 1) {
            verify_integrity();
        }
    }

    // We never get here
    return NULL;

}

int main() {

    // Initialize random seed
    srand(time(NULL));

    // Initialize table and NULL-index trackers
    init_globals();

    // Verify integrity of the initialized values
    verify_integrity();

    // Spawn some threads
    pthread_create_or_exit(fst_start_routine);
    pthread_create_or_exit(snd_start_routine);
    pthread_create_or_exit(integrity_checker_start_routine);

    // Loop forever
    while (1) {
        sleep(1);
    }

    // Return successfully (we never get here)
    return 0;

}

推荐答案

高级语言中的锁定原语不起作用.https://jakob.engbloms.se/archives/65您必须使用提供锁定或互锁/原子功能的标准库函数(如果可用),或者如果您不能从标准库函数合成一个,则需要拆分汇编.

假设您不是在某个奇怪的嵌入式环境中运行,可以使用原子方法来完成.

如果我有以下汇编函数,我就可以做到:

EXECUTE_SWAP_A如下所示:

struct table_entry {
   _Atomic struct table_entry *ptr;
   void *attached;
}

const uintptr_t bitmask = ~3ULL;

void perform_swap_a(size_t index)
{
   // Lock our side
   uintptr_t selected;
   _Atomic struct table_entry *expected;
   do {
      selected = (uintptr_t)atomic_load_explicit(&A[index].ptr,memory_order_acquire);
      expected = (struct table_entry *)(selected & bitmask);
      // Spin until we locked this one
   } while (atomic_compare_exchange_weak(&A[index].ptr, &expected, (struct table_entry *)((selected & bitmask) + 1));
   // Lock other side
   struct table_entry *bptr = ((struct table_entry *)((uintptr_t)atomic_load_explicit(&A[index].ptr,memory_order_acquire) ~3);
   do {
      selected = (uintptr_t)atomic_load_explicit(&bptr->ptr,memory_order_acquire);
      expected = (struct table_entry *)(selected & bitmask);
      // Spin until we locked this one
   } while (atomic_compare_exchange_weak(&bptr->ptr, &expected, (struct table_entry *)((selected & bitmask) + 1)); 
   // Perform swap
   struct table_entry newentry;
   newentry = A[index];
   A[index_of_NULL_value_A] = A[index]
   A[index] = newentry;
   size_t tmp = index;
   index = index_of_NULL_value_A;
   index_of_NULL_value_A = tmp;
   // Unlock other side
   atomic_store_explicit(&bptr->ptr,(struct table_entry *)((uintptr_t)bptr->ptr & bitmask),  memory_order_release);
   bptr->ptr = (table_entry *)(read_ptr(bptr) & bitmask);
   // Unlock our side
   atomic_store_explicit(&A[index]->ptr, (struct table_entry *)((uintptr_t)bptr->ptr & bitmask),  memory_order_release);
}

Perform_swap_b类似于:

void perform_swap_b(size_t index)
{
   // Lock other side
   struct table_entry *aptr;
   uintptr_t selected;
   struct table_entry * expected;
   do {
      // Every time we go around this loop, the pointer in index can change so we must reload.
      aptr = atomic_load_explicit(&B[index].ptr,memory_order_acquire);
      selected = atomic_load_explicit(&aptr->ptr,memory_order_acquire);
      expected = (struct table_entry *)(selected & bitmask);
      // Spin until we locked this one
   } while (aptr == NULL ||
        atomic_compare_exchange_weak(&aptr->ptr, &expected, (struct table_entry *)((selected & bitmask) + 1));
   // Lock our side
   do {
       selected = atomic_load_explicit(&B[index].ptr,memory_order_acquire);
      expected = (struct table_entry *)(selected & bitmask);
      // Spin until we locked this one
   } while (atomic_compare_exchange_weak(&B[index]->ptr, &expected, (struct table_entry *)((selected & bitmask) + 1));
   // Perform swap
   struct table_entry newentry;
   newentry = B[index];
   B[index_of_NULL_value_A] = B[index]
   B[index] = newentry;
   size_t tmp = index;
   index = index_of_NULL_value_B;
   index_of_NULL_value_B = tmp;
   // Unlock our side
   atomic_store_explicit(&B[index]->ptr, (struct table_entry *)((uintptr_t)B[index]->ptr & bitmask), memory_order_release);
   // Unlock other side
   atomic_store_explicit(&aptr->ptr, (struct table_entry *)((uintptr_t)(aptr->ptr) & bitmask);
}

整个算法只使用了指针中的1位,因此如果需要,可以将常量3更改为1.

C++相关问答推荐

测量ARM MCU中断延迟的问题

Can函数指针指向C++中具有不同参数连续性的函数

以下声明和定义之间的区别

在循环中复制与删除相同条件代码的性能

tick.q中的Kdb+键控表语法

如何使用C++在控制台中以彩色打印被阻止的客户端

<;unistd.h>;和<;sys/unistd.h>;之间有什么区别?

Cairo STM32MP1 cairo_Surface_WRITE_TO_PNG始终返回CAROLIO_STATUS_WRITE_ERROR

类型定义 struct 与简单的类型定义 struct

这段代码用于在C中以相反的顺序打印数组,但它不起作用

宏观;S C调深度

分配给静态变量和动态变量的位置之间有区别吗?

'printf("%s", user_input)' 危险吗?

C Makefile - 如何避免重复提及文件名

为什么程序在打印每个数字之前要等待所有输入?

为什么简单的 ELF 二进制文件中存在重叠和未对齐的段?

在C语言中实现defer关键字

如何使用另一个 struct 变量初始化C struct 数组成员

VirtualAlloc在0x000'00000000处.

为什么 strtol() 返回 0x7fffffff 而不是预期的 0xAABBCCDD?