首先,感谢您的阅读和帮助!

这是一个用C语言编写的多进程多线程代码,它涉及创建多个工作进程和一个通过未命名管道与工作进程通信的调度程序线程.

以下是代码的说明:

struct

该程序定义了两种 struct 类型:

  • worker_data,包含两个字段,即整数id和状态字符串.
  • dispatcher_args,其字段dispatcher_pipe是指向2D整数数组的指针.

共享内存和信号量

该程序设置共享内存和信号量.共享存储器用于存储关于可由多个进程访问的工作进程的数据(worker_data).信号量用于进程/线程之间的同步.

全局变量

声明了几个全局变量,包括共享内存变量、信号量变量、工作进程的计数和存储工作进程的ID的array.

调度器功能

调度程序是一个无限期循环的线程函数.在每次迭代中,它判断共享内存中每个工作进程的状态.如果工人的状态为"Ready",则调度程序将状态更改为"Working",向工人的管道中写入一条消息,然后关闭管道.

Worker函数

每个工作进程都运行工作函数.该函数使用select()来等待来自调度器的数据在其管道上可用.当数据可用时,它读取数据,执行一些工作(未在提供的代码中指定),将其在共享内存中的状态更改为"就绪",然后无限期地重复该过程.

主要功能

Main函数执行以下操作:

  1. 为辅助数据分配共享内存.
  2. 取消链接并打开信号量.
  3. 初始化dispatcher_args struct 并为其分配地址dispatcher_pipe.
  4. 使用循环和fork()创建工作进程.在每次迭代中,它设置一个管道,派生一个新进程,并分配子进程来运行Worker函数.
  5. 创建一个运行dispatcher函数的调度程序线程.
  6. 等待调度程序线程完成.
  7. 通过取消映射共享内存、关闭和取消共享内存和信号量的链接以及释放分配的内存来进行清理.

此代码的目的是为了让调度器威胁与工作进程进行通信,调度器将任务分配给工作进程,并等待它们准备好执行新任务.每个工作进程通过更改其在共享内存中的状态来指示其已为新任务做好准备.

可以使用共享内存、管道和信号量来执行任务.

当我用GCC的updatest.c-o updsts-lpline-lrt编译程序时,程序执行"第一个循环",这意味着它对工作进程(例如1)进行读写,并且它正确地更改了那个工作进程(1)的状态,但在"第二个循环"中,之前执行任务的工作进程(1)似乎已经准备好了,但在工作进程中它没有收到任何消息,只有当它转到下一个工作进程(与Worker 2不同)时,它才会收到消息.我不知道是与工作进程的读取有关,还是与工作进程状态的变化有关.

下面是我得到的输出(发送和获取消息的"循环"由一个\n分隔)

Worker 1 Status ready
Worker 1 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 1 Status ready

Worker 1 Status ready
Worker 1 Status working
message away
not read Worker 1 Status working

Worker 2 Status ready
Worker 2 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 2 Status ready
not read Worker 1 Status working

Worker 2 Status ready
Worker 2 Status working
message away
not read Worker 1 Status working
not read Worker 2 Status working

Worker 3 Status ready
Worker 3 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 3 Status ready
not read Worker 1 Status working
not read Worker 2 Status working

Worker 3 Status ready
Worker 3 Status working
message away
not read Worker 1 Status working
not read Worker 2 Status working
not read Worker 3 Status working

Worker 4 Status ready
Worker 4 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 4 Status ready
not read Worker 1 Status working
not read Worker 2 Status working
not read Worker 3 Status working

Worker 4 Status ready
Worker 4 Status working
message away
^C

注意,我按^C是因为正如我在循环" for (int i = 0; i < n_workers; i++) "中的问题中所说的那样,由于没有工人准备好(所有5个工人应该都在工作),这会产生一个无限循环.

以下是代码

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/mman.h>

typedef struct worker_data{
    int id;
    char status[20];
} worker_data;

typedef struct dispatcher_args{
    int** dispatcher_pipe;
} dispatcher_args;

// variables for shared memory
worker_data* workers;
size_t workers_size;

// variables for semaphores
sem_t *sem_log;

int n_workers = 5;
pid_t *pid_workers;

void* dispatcher(void *args) {
    dispatcher_args* arg = args;
    int (*dispatcher_pipe)[2] = (int (*)[2])arg->dispatcher_pipe;

    char msg[100];
    strcpy(msg,"Hello!");
    int j=0;

    printf("THREAD DISPATCHER CREATED\n");

    strcpy(msg,"Hello");

    while(1){

        for (int i = 0; i < n_workers; i++) {
            if (strcmp(workers[i].status, "ready") == 0) {
                printf("\nWorker %d Status %s\n",workers[i].id,workers[i].status);

                // update status of the worker in shared memory
                strcpy(workers[i].status, "working");

                printf("Worker %d Status %s\n",workers[i].id,workers[i].status);
                close(dispatcher_pipe[i][0]); // close the read end of the pipe
                printf("message away\n");
                write(dispatcher_pipe[i][1], &msg, sizeof(msg));
                close(dispatcher_pipe[i][1]); // close the write end of the pipe
                break;
            }
            else {
                printf("not read Worker %d Status %s\n",workers[i].id,workers[i].status);
            }
        }
        //j+=1;
        sleep(2);
    }
}

void Worker(int *worker_pipe, int worker_index) {
    char msg[100];
    sprintf(msg, "WORKER %d READY\n", workers[worker_index].id);

    fd_set set;
    FD_ZERO(&set);
    FD_SET(worker_pipe[0], &set);

    while(1){


        int ready = select(worker_pipe[0]+1, &set, NULL, NULL, NULL);
        if (ready < 0) {
            perror("select");
            exit(1);
        }
        printf("message get 1\n");
        if (FD_ISSET(worker_pipe[0], &set)) {
            printf("message get 2\n");
            // data is available, read from the pipe
            ssize_t nbytes = read(worker_pipe[0], &msg, sizeof(msg));
            if (nbytes < 0) {
                perror("read");
                exit(1);
            }
            printf("Worker message received from Dispatcher: %s\n", msg);
        }
        close(worker_pipe[1]);

        // functions of worker
        // ...

        // change status of the worker in shared memory
        strcpy(workers[worker_index].status, "ready");
        printf("JOB FINISHED Worker %d Status %s\n", workers[worker_index].id, workers[worker_index].status);

        // sleep(3);
        // printf("I am Worker and Im alive\n");
    }
}

int main() {
    pthread_t dispatcher_thread;
    int dispatcher_pipe[n_workers][2];
    workers_size = n_workers * sizeof(worker_data);
    int workers_fd = shm_open("workers", O_CREAT | O_RDWR, 0666);
    ftruncate(workers_fd, workers_size);
    workers = mmap(NULL, workers_size, PROT_READ | PROT_WRITE, MAP_SHARED, workers_fd, 0);

    pid_workers = malloc(n_workers * sizeof(pid_t));

// create semaphores
    sem_unlink("sem_log");
    sem_log = sem_open("sem_log", O_CREAT | O_EXCL, 0700, 1);

    dispatcher_args* disp_args = malloc(sizeof(dispatcher_args));
    disp_args->dispatcher_pipe= (int **) dispatcher_pipe;

// creating worker processes
    for (int i = 0; i < n_workers; i++) {
        worker_data worker;
        worker.id = i+1;
        strcpy(worker.status, "ready");
        workers[i] = worker;

        // create the unnamed pipe for the dispatcher and corresponding association
        if (pipe(dispatcher_pipe[i]) == -1) {
            printf("Error creating dispatcher pipe n1\n");
            exit(0);
        }

        pid_workers[i] = fork();

        if (pid_workers[i] == 0) {
            // worker process
            Worker(dispatcher_pipe[i], i);
            // printf("Hey\n");
        } else if (pid_workers[i] > 0) {
            // parent process
        } else {
            // error forking
            perror("fork");
            exit(EXIT_FAILURE);
        }
    }

// create dispatcher thread
    pthread_create(&dispatcher_thread, NULL, dispatcher, disp_args);

// wait for the threads to finish
    pthread_join(dispatcher_thread, NULL);

// clean up
    munmap(workers, workers_size);
    close(workers_fd);
    shm_unlink("workers");
    sem_close(sem_log);
    sem_unlink("sem_log");
    free(pid_workers);
    free(workers);
    free(disp_args);

    return 0;

}

推荐答案

Don't关闭管道!

  1. 在主进程/线程中,在发送第一条消息后,关闭管道的发送端.因此,您可以never发送另一条消息.

  2. 在工作进程中,在收到第一条消息后,关闭管道的接收端.所以,你可以never接收另一条消息.

  3. 我不会用string来代表工人州.我会用一张enum元的.串can't被原子地更新.

  4. 主设备not应该正在设置工人状态.只有工人才应该这样做.否则,您可能会出现争用情况.

  5. 当工作进程设置状态时,您希望使用stdatomic.h个基元.它们执行缓存更新/刷新/同步.

C++相关问答推荐

无效使用未定义类型'structsquare'?

什么C代码将确定打开的套接字正在使用的网络适配器?

ATmega328P EEPROM未写入

C是否用0填充多维数组的其余部分?

Char变量如何在不使用方括号或花括号的情况下存储字符串,以及它如何迭代到下一个字符?

为什么我会收到释放后堆使用错误?

X64:并发写入布尔数组

CGO:如何防止在使用CGO在包中包含C头文件时出现多个定义...&q;错误?

在句子中转换单词的问题

错误...的多个定义(&Q)首先在这里定义&

将多项式从文件.txt加载到终端时出现问题

这段代码用于在C中以相反的顺序打印数组,但它不起作用

从CentOS 7到Raspberry PI 2B的交叉编译-无法让LIBC和System Include标头一起工作

基于蝶数恰好有8个除数的事实的代码

用C++初始化局部数组变量

C程序printf在getchar while循环后不工作

在C中使用字符串时是否不需要内存分配?

访问未对齐联合的成员是否为未定义行为,即使被访问的成员已充分对齐?

为什么孤儿进程在 Linux 中没有被 PID 1 采用,就像我读过的一本书中声称的那样?

在 C23 之前如何对空指针使用nullptr?