我正在研究IPC,并且有一个关于System V消息队列使用的基本示例.代码非常简单:主进程创建消息队列,然后使用fork()并开始无限循环地发送消息,但会有一些延迟.子进程在一个循环中接收消息,其逻辑是:Hibernate -接收所有累积-Hibernate .

#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <sys/types.h>
#include <unistd.h>


typedef struct 
{
    int value;
} messageText;

typedef struct 
{
    long mtype;
    messageText mtext;
} queueMessage;


int main()
{ 
    key_t queueKey = ftok(__FILE__, 'M');
    int msgQueue = msgget(queueKey, IPC_CREAT | 0660);
    
    pid_t pid = fork();
    if (0 == pid)
    {        
        // never returns
        child_main(msgQueue);
        abort();
    }

    queueMessage msg = 
    {
        .mtype = 1,
        .mtext = { 0 }
    };

    for (int i = 0; /* void */ ; ++i)
    {
        msg.mtext.value = i;

        int rc = msgsnd(msgQueue, &msg, sizeof(msg.mtext), 0);
        if (rc == -1)
            perror("msgsnd");

        usleep(250000);  // 0.25 sec

        struct msqid_ds queueInfo;
        msgctl(msgQueue, IPC_STAT, &queueInfo);
        printf("Messages in queue = %ld\n", queueInfo.msg_qnum);
    }

    return 0;
}
void child_main(int msgQueue)
{
    queueMessage msg = { 0 };
    int msgTyp = 0;  // 1st message in queue
    int msgRcvFlg = IPC_NOWAIT | MSG_NOERROR;

    while (1)
    {
        while (1)
        {
            int rc = msgrcv(msgQueue, &msg, sizeof(msg.mtext), msgTyp, msgRcvFlg);
            if (-1 == rc)
            {
                if (errno == ENOMSG)
                {
                    goto sleep;
                }
                else
                {
                    perror("msgrcv");
                    exit(EXIT_FAILURE);
                }
            }

            printf("read: %d\n", msg.mtext.value);
        }

    sleep:

        usleep(5000000);  // 5 sec.
    }
}

因此,预期的行为是父消息在队列中累积,直到子消息被唤醒、读取并再次Hibernate ().为了判断它的逻辑是否正确,每次家长发送新消息时,我都会打印消息编号.但实际行为是,由于某种原因,队列内容似乎被重置,因此子元素收到的消息很少.父母睡的时间越长越少 当子元素try 接收消息时,消息仍在队列中.

这是父母睡眠时间的输出示例 0.25秒.

Messages in queue = 1
Messages in queue = 2
Messages in queue = 3
Messages in queue = 4
Messages in queue = 5
Messages in queue = 0
Messages in queue = 1
Messages in queue = 2
Messages in queue = 3
read: 8
read: 9
read: 10
read: 11
Messages in queue = 0
Messages in queue = 1
Messages in queue = 0
...

ipcs -l个yields

------ Messages Limits --------
max queues system wide = 32000
max size of message (bytes) = 8192
default max size of queue (bytes) = 16384

------ Shared Memory Limits --------
max number of segments = 4096
max seg size (kbytes) = 18014398509465599
max total shared memory (kbytes) = 18446744073709551612
min seg size (bytes) = 1

------ Semaphore Limits --------
max number of arrays = 32000
max semaphores per array = 32000
max semaphores system wide = 1024000000
max ops per semop call = 500
semaphore max value = 32767

我在手册页中没有找到任何关于这种行为的信息.有人能解释一下吗?

推荐答案

如果有证据表明该问题在系统重新启动后无法重现,我会大胆猜测,该问题是由orphaned processes条来自同一陈旧消息队列的消费消息促成的.

如果没有与给定键相关联的消息队列,则msgget(queueKey, IPC_CREAT | 0660);只创建new消息队列.否则,它将悄悄返回先前创建的消息队列.

当已经存在与给定键相关联的消息队列时,必须在标志中指定IPC_CREAT | IPC_EXCL以使msgget失败(将errno设置为EEXIST).

该示例中没有清理消息队列的任何内容.这是通过以下方式完成的:

int rc = msgctl(key, IPC_RMID, NULL);

如果不这样做,System V消息队列将在系统正常运行期间持续存在(除非在ipcrm之前手动删除).

此外,ftok的确定性及其相对较小的地址空间意味着,有轻微(但并非不可能)的可能性,即不同的进程正在通过冲突获取相同的key_t,并使用消息队列中的消息.

通过使用IPC_PRIVATE作为msgget的键,这两个可能的问题都可以部分解决(具有正确权限的进程仍然可以在使用msgsnd/msgrcv时手动指定与之交互的msqid).

一般的建议是确保您在使用完资源时正在清理它们.这意味着reaping个子进程可以正确处理,在不再需要消息队列(共享内存和信号量!)时将其删除,当然还会判断所有系统调用.

C++相关问答推荐

Bison解析器转移/减少冲突

Zig将std.os.argv转换为C类型argv

球体—立方体重叠:无、部分或全部?

使用SWI—Prolog的qsave_program生成二进制文件有什么好处?'

返回一个包含数组的 struct

如何调试LD_PRELOAD库中的构造函数?

当我运行/调试C程序时,Malloc()似乎正在将&q;r\r...&q;赋值给一个指针,我不确定为什么?

如何将长字符串转换为较小的缩写,该缩写由第一个字符、最后一个字符和中间的字符数组成?

非常大的数组的大小

_泛型控制表达式涉及数组碰撞警告的L值转换错误?

使用scanf在C中读取和存储文件中的值

调用mProtection将堆栈上的内存设置为只读,直接导致程序SIGSEGV

如何有效地编写代码来判断两个元素数量相同的数组即使在不同的位置也具有相同的元素?

如何按顺序将所有CSV文件数据读入 struct 数组?

获取前2个连续1比特的索引的有效方法

Printf()在C中打印终止字符之后的字符,我该如何解决这个问题?

从文件到链表读取日期

为什么写入关闭管道会返回成功

Clang 是否为内联汇编生成了错误的代码?

如何在C中以0x格式打印十六进制值