首先,感谢您的阅读和帮助!
这是一个用C语言编写的多进程多线程代码,它涉及创建多个工作进程和一个通过未命名管道与工作进程通信的调度程序线程.
以下是代码的说明:
struct
该程序定义了两种 struct 类型:
-
worker_data
,包含两个字段,即整数id
和状态字符串. -
dispatcher_args
,其字段dispatcher_pipe
是指向2D整数数组的指针.
共享内存和信号量
该程序设置共享内存和信号量.共享存储器用于存储关于可由多个进程访问的工作进程的数据(worker_data
).信号量用于进程/线程之间的同步.
全局变量
声明了几个全局变量,包括共享内存变量、信号量变量、工作进程的计数和存储工作进程的ID的array.
调度器功能
调度程序是一个无限期循环的线程函数.在每次迭代中,它判断共享内存中每个工作进程的状态.如果工人的状态为"Ready",则调度程序将状态更改为"Working",向工人的管道中写入一条消息,然后关闭管道.
Worker函数
每个工作进程都运行工作函数.该函数使用select()
来等待来自调度器的数据在其管道上可用.当数据可用时,它读取数据,执行一些工作(未在提供的代码中指定),将其在共享内存中的状态更改为"就绪",然后无限期地重复该过程.
主要功能
Main函数执行以下操作:
- 为辅助数据分配共享内存.
- 取消链接并打开信号量.
- 初始化
dispatcher_args
struct 并为其分配地址dispatcher_pipe
. - 使用循环和fork()创建工作进程.在每次迭代中,它设置一个管道,派生一个新进程,并分配子进程来运行Worker函数.
- 创建一个运行
dispatcher
函数的调度程序线程. - 等待调度程序线程完成.
- 通过取消映射共享内存、关闭和取消共享内存和信号量的链接以及释放分配的内存来进行清理.
此代码的目的是为了让调度器威胁与工作进程进行通信,调度器将任务分配给工作进程,并等待它们准备好执行新任务.每个工作进程通过更改其在共享内存中的状态来指示其已为新任务做好准备.
可以使用共享内存、管道和信号量来执行任务.
当我用GCC的updatest.c-o updsts-lpline-lrt编译程序时,程序执行"第一个循环",这意味着它对工作进程(例如1)进行读写,并且它正确地更改了那个工作进程(1)的状态,但在"第二个循环"中,之前执行任务的工作进程(1)似乎已经准备好了,但在工作进程中它没有收到任何消息,只有当它转到下一个工作进程(与Worker 2不同)时,它才会收到消息.我不知道是与工作进程的读取有关,还是与工作进程状态的变化有关.
下面是我得到的输出(发送和获取消息的"循环"由一个\n分隔)
Worker 1 Status ready
Worker 1 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 1 Status ready
Worker 1 Status ready
Worker 1 Status working
message away
not read Worker 1 Status working
Worker 2 Status ready
Worker 2 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 2 Status ready
not read Worker 1 Status working
Worker 2 Status ready
Worker 2 Status working
message away
not read Worker 1 Status working
not read Worker 2 Status working
Worker 3 Status ready
Worker 3 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 3 Status ready
not read Worker 1 Status working
not read Worker 2 Status working
Worker 3 Status ready
Worker 3 Status working
message away
not read Worker 1 Status working
not read Worker 2 Status working
not read Worker 3 Status working
Worker 4 Status ready
Worker 4 Status working
message away
message get 1
message get 2
Worker message received from Dispatcher: Hello
JOB FINISHED Worker 4 Status ready
not read Worker 1 Status working
not read Worker 2 Status working
not read Worker 3 Status working
Worker 4 Status ready
Worker 4 Status working
message away
^C
注意,我按^C是因为正如我在循环" for (int i = 0; i < n_workers; i++) "中的问题中所说的那样,由于没有工人准备好(所有5个工人应该都在工作),这会产生一个无限循环.
以下是代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/mman.h>
typedef struct worker_data{
int id;
char status[20];
} worker_data;
typedef struct dispatcher_args{
int** dispatcher_pipe;
} dispatcher_args;
// variables for shared memory
worker_data* workers;
size_t workers_size;
// variables for semaphores
sem_t *sem_log;
int n_workers = 5;
pid_t *pid_workers;
void* dispatcher(void *args) {
dispatcher_args* arg = args;
int (*dispatcher_pipe)[2] = (int (*)[2])arg->dispatcher_pipe;
char msg[100];
strcpy(msg,"Hello!");
int j=0;
printf("THREAD DISPATCHER CREATED\n");
strcpy(msg,"Hello");
while(1){
for (int i = 0; i < n_workers; i++) {
if (strcmp(workers[i].status, "ready") == 0) {
printf("\nWorker %d Status %s\n",workers[i].id,workers[i].status);
// update status of the worker in shared memory
strcpy(workers[i].status, "working");
printf("Worker %d Status %s\n",workers[i].id,workers[i].status);
close(dispatcher_pipe[i][0]); // close the read end of the pipe
printf("message away\n");
write(dispatcher_pipe[i][1], &msg, sizeof(msg));
close(dispatcher_pipe[i][1]); // close the write end of the pipe
break;
}
else {
printf("not read Worker %d Status %s\n",workers[i].id,workers[i].status);
}
}
//j+=1;
sleep(2);
}
}
void Worker(int *worker_pipe, int worker_index) {
char msg[100];
sprintf(msg, "WORKER %d READY\n", workers[worker_index].id);
fd_set set;
FD_ZERO(&set);
FD_SET(worker_pipe[0], &set);
while(1){
int ready = select(worker_pipe[0]+1, &set, NULL, NULL, NULL);
if (ready < 0) {
perror("select");
exit(1);
}
printf("message get 1\n");
if (FD_ISSET(worker_pipe[0], &set)) {
printf("message get 2\n");
// data is available, read from the pipe
ssize_t nbytes = read(worker_pipe[0], &msg, sizeof(msg));
if (nbytes < 0) {
perror("read");
exit(1);
}
printf("Worker message received from Dispatcher: %s\n", msg);
}
close(worker_pipe[1]);
// functions of worker
// ...
// change status of the worker in shared memory
strcpy(workers[worker_index].status, "ready");
printf("JOB FINISHED Worker %d Status %s\n", workers[worker_index].id, workers[worker_index].status);
// sleep(3);
// printf("I am Worker and Im alive\n");
}
}
int main() {
pthread_t dispatcher_thread;
int dispatcher_pipe[n_workers][2];
workers_size = n_workers * sizeof(worker_data);
int workers_fd = shm_open("workers", O_CREAT | O_RDWR, 0666);
ftruncate(workers_fd, workers_size);
workers = mmap(NULL, workers_size, PROT_READ | PROT_WRITE, MAP_SHARED, workers_fd, 0);
pid_workers = malloc(n_workers * sizeof(pid_t));
// create semaphores
sem_unlink("sem_log");
sem_log = sem_open("sem_log", O_CREAT | O_EXCL, 0700, 1);
dispatcher_args* disp_args = malloc(sizeof(dispatcher_args));
disp_args->dispatcher_pipe= (int **) dispatcher_pipe;
// creating worker processes
for (int i = 0; i < n_workers; i++) {
worker_data worker;
worker.id = i+1;
strcpy(worker.status, "ready");
workers[i] = worker;
// create the unnamed pipe for the dispatcher and corresponding association
if (pipe(dispatcher_pipe[i]) == -1) {
printf("Error creating dispatcher pipe n1\n");
exit(0);
}
pid_workers[i] = fork();
if (pid_workers[i] == 0) {
// worker process
Worker(dispatcher_pipe[i], i);
// printf("Hey\n");
} else if (pid_workers[i] > 0) {
// parent process
} else {
// error forking
perror("fork");
exit(EXIT_FAILURE);
}
}
// create dispatcher thread
pthread_create(&dispatcher_thread, NULL, dispatcher, disp_args);
// wait for the threads to finish
pthread_join(dispatcher_thread, NULL);
// clean up
munmap(workers, workers_size);
close(workers_fd);
shm_unlink("workers");
sem_close(sem_log);
sem_unlink("sem_log");
free(pid_workers);
free(workers);
free(disp_args);
return 0;
}