POSIX消息队列

2023-05-16

POSIX消息队列概述

消息队列是Linux IPC中很常用的一种通信方式,它通常用来在不同进程间发送特定格式的消息数据。

消息队列和管道和FIFO有很大的区别,主要有以下两点:

一个进程向消息队列写入消息之前,并不需要某个进程在该队列上等待该消息的到达,而管道和FIFO是相反的,进程向其中写消息时,管道和FIFO必需已经打开来读,否则写进程就会阻塞(默认情况下),那么内核会产生SIGPIPE信号。 IPC的持续性不同。管道和FIFO是随进程的持续性,当管道和FIFO最后一次关闭发生时,仍在管道和FIFO中的数据会被丢弃。消息队列是随内核的持续性,即一个进程向消息队列写入消息后,然后终止,另外一个进程可以在以后某个时刻打开该队列读取消息。只要内核没有重新自举,消息队列没有被删除。

消息队列中的每条消息通常具有以下属性:

一个表示优先级的整数 消息的数据部分的长度  消息数据本身

POSIX消息队列的一个可能的设计是一个如下图所示的消息链表,链表头部有消息队列的属性信息。

图1 消息链表

POSIX消息队列的创建和关闭

POSIX消息队列的创建,关闭和删除用到以下三个函数接口:

#include <mqueue.h>

mqd_t mq_open(const char *name, int oflag, /* mode_t mode, struct mq_attr *attr */);
//成功返回消息队列描述符,失败返回-1

mqd_t mq_close(mqd_t mqdes);

mqd_t mq_unlink(const char *name);
 //成功返回0,失败返回-1

mq_open 用于打开或创建一个消息队列。

name:表示消息队列的名字,它符合POSIX IPC的名字规则。

oflag:表示打开的方式,和open函数的类似。有必须的选项:O_RDONLY,O_WRONLY,O_RDWR,还有可选的选项:O_NONBLOCK,O_CREAT,O_EXCL。

mode:是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时,才需要提供该参数。表示默认访问权限。可以参考open。

attr:也是一个可选参数,在oflag中含有O_CREAT标志且消息队列不存在时才需要。该参数用于给新队列设定某些属性,如果是空指针,那么就采用默认属性。

mq_open返回值是mqd_t类型的值,被称为消息队列描述符。在Linux 2.6.18中该类型的定义为整型:

#include <bits/mqueue.h>
typedef int mqd_t;

mq_close 用于关闭一个消息队列,和文件的close类型,关闭后,消息队列并不从系统中删除。一个进程结束,会自动调用关闭打开着的消息队列。

mq_unlink 用于删除一个消息队列。消息队列创建后只有通过调用该函数或者是内核自举才能进行删除。每个消息队列都有一个保存当前打开着描述符数的引用计数器,和文件一样,因此本函数能够实现类似于unlink函数删除一个文件的机制。

POSIX消息队列的名字所创建的真正路径名和具体的系统实现有关,关于具体POSIX IPC的名字规则可以参考《UNIX 网络编程 卷2:进程间通信》的P14。

经过测试,在Linux 2.6.18中,所创建的POSIX消息队列不会在文件系统中创建真正的路径名。且POSIX的名字只能以一个’/’开头,名字中不能包含其他的’/’。

POSIX消息队列的属性

POSIX标准规定消息队列属性mq_attr必须要含有以下四个内容:

long mq_flags //消息队列的标志:0或O_NONBLOCK,用来表示是否阻塞 
long mq_maxmsg //消息队列的最大消息数
long mq_msgsize //消息队列中每个消息的最大字节数
long mq_curmsgs //消息队列中当前的消息数目

在Linux 2.6.18中mq_attr结构的定义如下:

#include <bits/mqueue.h>
struct mq_attr
{
  long int mq_flags;      /* Message queue flags.  */
  long int mq_maxmsg;   /* Maximum number of messages.  */
  long int mq_msgsize;   /* Maximum message size.  */
  long int mq_curmsgs;   /* Number of messages currently queued.  */
  long int __pad[4];
};

POSIX消息队列的属性设置和获取可以通过下面两个函数实现:

#include <mqueue.h>

mqd_t mq_getattr(mqd_t mqdes, struct mq_attr *attr);

mqd_t mq_setattr(mqd_t mqdes, struct mq_attr *newattr, struct mq_attr *oldattr);
                               //成功返回0,失败返回-1

mq_getattr用于获取当前消息队列的属性,mq_setattr用于设置当前消息队列的属性。其中mq_setattr中的oldattr用于保存修改前的消息队列的属性,可以为空。

mq_setattr可以设置的属性只有mq_flags,用来设置或清除消息队列的非阻塞标志。newattr结构的其他属性被忽略。mq_maxmsg和mq_msgsize属性只能在创建消息队列时通过mq_open来设置。mq_open只会设置该两个属性,忽略另外两个属性。mq_curmsgs属性只能被获取而不能被设置。

下面是测试代码:

#include <iostream>
#include <cstring>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <mqueue.h>
 
using namespace std;
 
int main()
{
    mqd_t mqID;
    mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, NULL);
 
    if (mqID < 0)
    {
        cout<<"open message queue error..."<<strerror(errno)<<endl;
        return -1;
    }
 
    mq_attr mqAttr;
    if (mq_getattr(mqID, &mqAttr) < 0)
    {
        cout<<"get the message queue attribute error"<<endl;
        return -1;
    }
 
    cout<<"mq_flags:"<<mqAttr.mq_flags<<endl;
    cout<<"mq_maxmsg:"<<mqAttr.mq_maxmsg<<endl;
    cout<<"mq_msgsize:"<<mqAttr.mq_msgsize<<endl;
    cout<<"mq_curmsgs:"<<mqAttr.mq_curmsgs<<endl;
}

在Linux 2.6.18中执行结果是:

mq_flags:0
mq_maxmsg:10
mq_msgsize:8192
mq_curmsgs:0

POSIX消息队列的使用

POSIX消息队列可以通过以下两个函数来进行发送和接收消息:

#include <mqueue.h>

mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);
//成功返回0,出错返回-1
 
mqd_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);
//成功返回接收到消息的字节数,出错返回-1

#ifdef __USE_XOPEN2K
mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abs_timeout);

mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio, const struct timespec *abs_timeout);
#endif

mq_send 向消息队列中写入一条消息,mq_receive从消息队列中读取一条消息。

mqdes:消息队列描述符;

msg_ptr:指向消息体缓冲区的指针;

msg_len:消息体的长度,其中mq_receive的该参数不能小于能写入队列中消息的最大大小,即一定要大于等于该队列的mq_attr结构中mq_msgsize的大小。如果mq_receive中的msg_len小于该值,就会返回EMSGSIZE错误。POXIS消息队列发送的消息长度可以为0。

msg_prio:消息的优先级;它是一个小于MQ_PRIO_MAX的数,数值越大,优先级越高。POSIX消息队列在调用mq_receive时总是返回队列中最高优先级的最早消息。如果消息不需要设定优先级,那么可以在mq_send是置msg_prio为0,mq_receive的msg_prio置为NULL。

还有两个XSI定义的扩展接口限时发送和接收消息的函数:mq_timedsend和mq_timedreceive函数。默认情况下mq_send和mq_receive是阻塞进行调用,可以通过mq_setattr来设置为O_NONBLOCK。

下面是消息队列使用的测试代码:

#include <iostream>
#include <cstring>
#include <errno.h>
 
#include <unistd.h>
#include <fcntl.h>
#include <mqueue.h>
 
using namespace std;
 
int main()
{
    mqd_t mqID; //创建消息队列描述符
    mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT | O_EXCL, 0666, NULL);
 
    if (mqID < 0)
    {
        if (errno == EEXIST)
        {
            mq_unlink("/anonymQueue");
            mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, NULL);
        }
        else
        {
            cout<<"open message queue error..."<<strerror(errno)<<endl;
            return -1;
        }
    }
 
    if (fork() == 0)    //子进程
    {
        mq_attr mqAttr;
        mq_getattr(mqID, &mqAttr);
 
        char *buf = new char[mqAttr.mq_msgsize];    //获得消息队列的大小
 
        for (int i = 1; i <= 5; ++i)
        {
            if (mq_receive(mqID, buf, mqAttr.mq_msgsize, NULL) < 0) //接受消息
            {
                cout<<"receive message  failed. ";
                cout<<"error info:"<<strerror(errno)<<endl;
                continue;
            }
            cout<<"receive message "<<i<<": "<<buf<<endl;   
        }
        exit(0);
    }
    //父进程
    char msg[] = "yuki";
    for (int i = 1; i <= 5; ++i)
    {
        if (mq_send(mqID, msg, sizeof(msg), i) < 0) //发送消息
        {
            cout<<"send message "<<i<<" failed. ";
            cout<<"error info:"<<strerror(errno)<<endl;
        }
        cout<<"send message "<<i<<" success. "<<endl;   
        sleep(1);
    }
}                     

在Linux 2.6.18下的执行结构如下:

send message 1 success. 
receive message 1: yuki
send message 2 success. 
receive message 2: yuki
send message 3 success. 
receive message 3: yuki
send message 4 success. 
receive message 4: yuki
send message 5 success. 
receive message 5: yuki

POSIX消息队列的限制

POSIX消息队列本身的限制就是mq_attr中的mq_maxmsg和mq_msgsize,分别用于限定消息队列中的最大消息数和每个消息的最大字节数。在前面已经说过了,这两个参数可以在调用mq_open创建一个消息队列的时候设定。当这个设定是受到系统内核限制的。

下面是在Linux 2.6.18下shell对启动进程的POSIX消息队列大小的限制:

# ulimit -a |grep message
POSIX message queues     (bytes, -q) 819200

限制大小为800KB,该大小是整个消息队列的大小,不仅仅是最大消息数*消息的最大大小;还包括消息队列的额外开销。前面我们知道Linux 2.6.18下POSIX消息队列默认的最大消息数和消息的最大大小分别为:

mq_maxmsg = 10
mq_msgsize = 8192

为了说明上面的限制大小包括消息队列的额外开销,下面是测试代码:

#include <iostream>
#include <cstring>
#include <errno.h>
 
#include <unistd.h>
#include <fcntl.h>
#include <mqueue.h>
 
using namespace std;
 
int main(int argc, char **argv)
{
    mqd_t mqID; //消息队列描述符
    mq_attr attr;
    attr.mq_maxmsg = atoi(argv[1]);
    attr.mq_msgsize = atoi(argv[2]);
 
    mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT | O_EXCL, 0666, &attr);
 
    if (mqID < 0)
    {
        if (errno == EEXIST)
        {
            mq_unlink("/anonymQueue");
            mqID = mq_open("/anonymQueue", O_RDWR | O_CREAT, 0666, &attr);
 
            if(mqID < 0)
            {
                cout<<"open message queue error..."<<strerror(errno)<<endl;
                return -1;
            }
        }
        else
        {
            cout<<"open message queue error..."<<strerror(errno)<<endl;
            return -1;
        }
    }
 
    mq_attr mqAttr;
    if (mq_getattr(mqID, &mqAttr) < 0)
    {
        cout<<"get the message queue attribute error"<<endl;
        return -1;
    }
 
    cout<<"mq_flags:"<<mqAttr.mq_flags<<endl;
    cout<<"mq_maxmsg:"<<mqAttr.mq_maxmsg<<endl;
    cout<<"mq_msgsize:"<<mqAttr.mq_msgsize<<endl;
    cout<<"mq_curmsgs:"<<mqAttr.mq_curmsgs<<endl; 
}

下面进行创建消息队列时设置最大消息数和消息的最大大小进行测试:

[root@idcserver program]# g++ -g test.cpp -lrt
[root@idcserver program]# ./a.out 10 81920
open message queue error...Cannot allocate memory
[root@idcserver program]# ./a.out 10 80000
open message queue error...Cannot allocate memory
[root@idcserver program]# ./a.out 10 70000
open message queue error...Cannot allocate memory
[root@idcserver program]# ./a.out 10 60000
mq_flags:0
mq_maxmsg:10
mq_msgsize:60000
mq_curmsgs:0

从上面可以看出消息队列真正存放消息数据的大小是没有819200B的。可以通过修改该限制参数,来改变消息队列的所能容纳消息的数量。可以通过下面方式来修改限制,但这会在shell启动进程结束后失效,可以将设置写入开机启动的脚本中执行,例如.bashrc,rc.local。

[root@idcserver ~]# ulimit -q 1024000000
[root@idcserver ~]# ulimit -a |grep message
POSIX message queues     (bytes, -q) 1024000000

下面再次测试可以设置的消息队列的属性。

[root@idcserver program]# ./a.out 10 81920
mq_flags:0
mq_maxmsg:10
mq_msgsize:81920
mq_curmsgs:0
[root@idcserver program]# ./a.out 10 819200
mq_flags:0
mq_maxmsg:10
mq_msgsize:819200
mq_curmsgs:0
[root@idcserver program]# ./a.out 1000 8192  
mq_flags:0
mq_maxmsg:1000
mq_msgsize:8192
mq_curmsgs:0

POSIX消息队列在实现上还有另外两个限制:

MQ_OPEN_MAX:一个进程能同时打开的消息队列的最大数目,POSIX要求至少为8
MQ_PRIO_MAX:消息的最大优先级,POSIX要求至少为32

转载于:https://www.cnblogs.com/WindSun/p/11438501.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

POSIX消息队列 的相关文章

  • 从共享对象调用主可执行文件中的函数

    我必须从加载了 LD PRELOAD 的共享库调用主可执行文件中的函数 可执行文件导出所有符号并包含调试信息 不幸的是我无法访问它的源代码 目前 我在尝试加载该共享库时遇到未定义的符号错误 有没有办法做到这一点 附 目标平台是FreeBSD
  • 如何在 Linux 中使用 POSIX 方法从文件中读取 Unicode-16 字符串?

    我有一个包含 UNICODE 16 字符串的文件 我想将其读入 Linux 程序 这些字符串是根据 Windows 内部 WCHAR 格式编写的 Windows 总是使用 UTF 16 吗 例如在日语版本中 我相信我可以使用原始读取来读取它
  • 是否可以(并且安全)使接受套接字成为非阻塞?

    我正在寻找一种方法来打断accept http pubs opengroup org onlinepubs 009695399 functions accept html调用阻塞套接字 使用信号不是一种选择 因为这意味着在库中 我不想弄乱用
  • 使用 POSIX API 读取文件

    考虑以下代码段 用于将文件内容读入缓冲区 include
  • 人类可读、递归、排序的最大文件列表

    在 POSIX shell 中打印前 10 个最大文件列表的最佳实践是什么 必须有比我当前的解决方案更优雅的东西 DIR N 10 LIMIT 512000 find DIR type f size LIMIT k exec du sort
  • 为什么系统调用返回 EFAULT 而不是发送段错误?

    需要明确的是 这是一个设计问题 而不是一个实现问题 我想知道 POSIX 这样做的背后的基本原理 当给定无效内存位置时 POSIX 系统调用返回 EFAULT 而不是使用户空间程序崩溃 通过发送 sigsegv 这使得它们的行为与用户空间函
  • getchar() 返回错误的特殊情况是什么?

    所以我知道getchar 当输入结束或发生错误时返回 EOF 我也知道我可以通过以下方式检查发生了哪些情况ferror stdin and feof stdin 我想知道什么情况下会特别发生错误 我检查了这两个函数的手册页 但那里没有任何内
  • 如何从 POSIX 文件描述符构造 C++ fstream?

    我基本上是在寻找 fdopen 的 C 版本 我对此做了一些研究 这是看起来应该很容易但事实证明非常复杂的事情之一 我是否在这个信念中遗漏了一些东西 即它真的很容易 如果没有 是否有一个好的图书馆可以处理这个问题 编辑 将我的示例解决方案移
  • 我可以获得`FILE*`的访问模式吗?

    我必须复制一个FILE Mac OS X 上的 C 语言 使用 POSIXint不幸的是 文件描述符一直是不可能的 所以我想出了以下函数 static FILE fdup FILE fp const char mode int fd fil
  • 有没有通过 fstat() 的 POSIX 方法来检查文件是否是符号链接?

    有没有 POSIX 方式通过fstat 2 检查文件是否是符号链接 有旗帜O NOFOLLOW in open 2 可以检查它 但是 它不是 POSIX 有S ISLNK in fstat 2 其中说man fstat The S ISLN
  • OSX 上的单调时钟

    CLOCK MONOTONIC似乎不可用 所以clock gettime已经出局了 我在一些地方读到 mach absolute time 可能是正确的方法 但在读到它是一个 cpu 相关值 后 它立即让我想知道它是否在下面使用 rtdsc
  • Python:“导入 posix”问题

    如果我导入os模块 我可以运行以下命令来推断 os py 的位置 gt gt gt import os gt gt gt print os file usr lib python2 6 os pyc 但是 当我导入时posix 它不具有 f
  • Linux 中 POSIX 可靠信号和 POSIX 实时信号有什么区别?

    我读了一个手册页signal using 男人7信号 http man7 org linux man pages man7 signal 7 html我看到两种类型的信号 所以 我有一个问题 有什么区别POSIX 可靠信号 and POSI
  • 在自己的处理程序中捕获信号

    include
  • Unix域SOCK_DGRAM和SOCK_SEQPACKET之间的区别?

    根据 Unix 套接字的 Linux 手册页 UNIX 域中的有效套接字类型是 SOCK DGRAM 对于保留消息边界的面向数据报的套接字 与大多数 UNIX 实现一样 UNIX 域数据报套接字始终可靠且不可靠 不重新排序数据报 以及 自
  • osx 上的 aio:它是在内核中实现还是通过用户线程实现?其他选择?

    我正在开发我的小型 C 框架 并且有一个文件类 它也应该支持异步读写 除了在我发现的一些工作线程中使用同步文件 I O 之外 唯一的解决方案是 aio 无论如何 我环顾四周并在某处读到 在 Linux 中 aio 甚至不是在内核中实现的 而
  • getline() 与 fgets():控制内存分配

    要从文件中读取行 有getline and fgets POSIX 函数 忽略可怕的gets 这是常识getline 优先于fgets 因为它根据需要分配行缓冲区 我的问题是 这不危险吗 如果有人意外或恶意地创建了一个 100GB 的文件
  • 什么是 POSIX 合规性以及它对我有何影响?

    我不断看到这个问题出现 每次我查找它时 我都无法很好地解释它是什么或它对我意味着什么 什么是 POSIX 合规性 假设我的程序仅在兼容 POSIX 的机器上运行 这对我作为程序员来说有何简化 甚至吗 POSIX http pubs open
  • 如何从标准输入读取一行,阻塞直到找到换行符?

    我试图从命令行的标准输入一次读取任意长度的一行 我不确定是否能够包含 GNU readline 并且更喜欢使用库函数 我读过的文档表明getline应该可以工作 但在我的实验中它不会阻塞 我的示例程序 include
  • 同步 I/O 是否会使线程繁忙?

    假设我正在同步 I O 套接字上执行 I O 该套接字已准备好read or write手术 这意味着调用线程不会在操作上被阻塞 无论非阻塞 SOCK NONBLOCK 套接字的阻塞性质 但以下事情我不清楚 实际转移何时发生 当套接字标记为

随机推荐