linux消息队列服务,Linux进程间通信-消息队列(mqueue)深入理解

2023-05-16

前面两篇文章分解介绍了匿名管道和命名管道方式的进程间通信,本文将介绍Linux消息队列(posix)的通信机制和特点。

1、消息队列

消息队列的实现分为两种,一种为System V的消息队列,一种是Posix消息队列;这篇文章将主要围绕Posix消息队列介绍;

消息队列可以认为是一个消息链表,某个进程往一个消息队列中写入消息之前,不需要另外某个进程在该队列上等待消息的达到,这一点与管道和FIFO相反。Posix消息队列与System V消息队列的区别如下:

(1) 对Posix消息队列的读总是返回最高优先级的最早消息,对System V消息队列的读则可以返回任意指定优先级的消息。

(2)当往一个空队列放置一个消息时,Posix消息队列允许产生一个信号或启动一个线程,System V消息队列则不提供类似的机制。

2、消息队列的基本操作

2.1 打开一个消息队列

#include   

typedef int mqd_t;

mqd_t mq_open(const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */);

返回: 成功时为消息队列描述字,出错时为-1。

功能: 创建一个新的消息队列或打开一个已存在的消息的队列。

2.2 关闭一个消息队列

#include   

int mq_close(mqd_t mqdes);

返回: 成功时为0,出错时为-1。

功能: 关闭已打开的消息队列。

2.3 删除一个消息队列

#include   

int mq_unlink(const char *name)

返回: 成功时为0,出错时为-1

功能: 从系统中删除消息队列。

这三个函数操作的代码如下:

#include

#include

#include

#include

#include

#include

int main(int argc, char* argv[])

{

int flag = O_RDWR | O_CREAT | O_EXCL;

int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;

mqd_t mqid = mq_open("/mq_test", flag, mode,NULL);

if (mqid == -1)

{

printf("mqueue create failed!\n");

return 1;

}

else

{

printf("mqueue create success!\n");

}

mq_close(mqid);  return 0;

}

#include

#include

int main(int argc, char* argv[])

{

mq_unlink("/mq_test");

return 0;

}

注意:编译posix mqueue时,要连接运行时库(runtime library),既-lrt选项,运行结果如下:

172793886406bbf5d2d4bd3aa0e4cdd9.png

关于mqueue更多详细内容可以使用:man mq_overview命令查看,里面有一条需要注意的是,Linux下的Posix消息队列是在vfs中创建的,可以用

mount -t mqueue none /dev/mqueue

将消息队列挂在在/dev/mqueue目录下,便于查看。

2.4 mq_close()和mq_unlink()

mq_close()的功能是关闭消息队列的文件描述符,但消息队列并不从系统中删除,要删除一个消息队列,必须调用mq_unlink();这与文件系统的unlink()机制是一样的。

3、消息队列的属性

#include   

int mq_getattr(mqd_t mqdes, struct mq_attr *attr);

int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *attr);

均返回:成功时为0, 出错时为-1

每个消息队列有四个属性:

struct mq_attr

{

long mq_flags;       /* message queue flag : 0, O_NONBLOCK */

long mq_maxmsg;  /* max number of messages allowed on queue*/

long mq_msgsize;  /* max size of a message (in bytes)*/

long mq_curmsgs;  /* number of messages currently on queue */

};

4、消息收发

#include   

int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio);

返回:成功时为0,出错为-1

ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *priop);

返回:成功时为消息中的字节数,出错为-1

mqsend代码如下:

#include

#include

#include

#include

#include

#include

int main(int argc, char* argv[])

{

int flag = O_RDWR;

int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;

mqd_t mqid = mq_open("/mq_test",flag,mode,NULL);

if (mqid == -1)

{

printf("open mqueue failed!\n");

return 1;

}

char *buf = "hello, i am sender!";

mq_send(mqid,buf,strlen(buf),20);

mq_close(mqid);

return 0;

}

mqrecv代码如下:

#include

#include

#include

#include

#include

#include

int main(int argc, char* argv[])

{

int flag = O_RDWR;

int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;

mqd_t mqid = mq_open("/mq_test",flag,mode,NULL);

if (mqid == -1)

{

printf("open mqueue failed!\n");

return 1;

}

struct mq_attr attr;

mq_getattr(mqid,&attr);

char buf[256] = {0};

int priority = 0;

mq_receive(mqid,buf,attr.mq_msgsize,&priority);

printf("%s\n",buf);

mq_close(mqid);

return 0;

}

运行结果如下:

70b0ae317b037d1de753dec0f15f104b.png

首先我们运行三次send,然后运行四次recv,可见recv的前三次是可以收到消息队列里的三个消息的,当运行第四次的时,系统消息队列里为空,recv就会阻塞;关于非阻塞式mqueue见下文。

5、mq_notify函数

如前文介绍,poxis消息队列运行异步通知,以告知何时有一个消息放置到某个空消息队列中,这种通知有两种方式可以选择:

(1)产生一个信号

(2)创建一个线程来执行一个指定的函数

这种通知通过mq_notify() 函数建立。该函数为指定的消息消息队列建立或删除异步事件通知,

#include

int mq_notify(mqd_t mqdes, const struct sigevent* notification);

(1)如果notification参数为非空,那么当前进程希望在有一个消息到达所指定的先前为空的对列时得到通知。

(2)如果notification参数为空,而且当前进程被注册为接收指定队列的通知,那么已存在的注册将被撤销。

(3)任意时刻只有一个进程可以被注册为接收某个给定队列的通知。

(4)当有一个消息到达先前为空的消息队列,而且已有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的mq_receive调用中的前提下,通知才会发出。即说明,在mq_receive调用中的阻塞比任何通知的注册都优先。

(5)当前通知被发送给它的注册进程时,其注册即被撤销。该进程必须再次调用mq_notify以重新注册。

sigevent结构如下:

union sigval{

int    sival_int;          /*integer value*/

void    *sival_ptr;        /*pointer value*/

};

struct sigevent{

int    sigev_notify;      /*SIGEV_{NONE, SIGNAL, THREAD}*/

int    sigev_signo;        /*signal number if SIGEV_SIGNAL*/

union sigval    sigev_value;

void    (*sigev_notify_function)(union sigval);

pthread_attr_t  *sigev_notify_attributes;

};

5.1 mq_notify() 使用信号处理程序

一个正确的使用非阻塞mq_receive的信号通知的例子:

#include

#include

#include

#include

#include

#include

#include

#include

void sig_usr1(int );

volatile sig_atomic_t mqflag;

int main(int argc, char* argv[])

{

mqd_t mqid = 0;

void *buff;

struct mq_attr attr;

struct sigevent sigev;

sigset_t zeromask,newmask,oldmask;

int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;

mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL);  // 非阻塞式打开mqueue

mq_getattr(mqid,&attr);

buff = malloc(attr.mq_msgsize);

sigemptyset(&zeromask);

sigemptyset(&newmask);

sigemptyset(&oldmask);

sigaddset(&newmask,SIGUSR1);    // 初始化信号集

signal(SIGUSR1,sig_usr1);      // 信号处理程序

sigev.sigev_notify = SIGEV_SIGNAL;

sigev.sigev_signo = SIGUSR1;

int n = mq_notify(mqid,&sigev);  // 启用通知

for (;;)

{

sigprocmask(SIG_BLOCK,&newmask,&oldmask);

while(mqflag == 0)

sigsuspend(&zeromask);

mqflag = 0;

ssize_t n;

mq_notify(mqid, &sigev);    // 重新注册

while( (n = mq_receive(mqid,buff,attr.mq_msgsize,NULL)) >=0)

printf("SIGUSR1 received, read %ld bytes.\n",(long)n);  //读取消息

if(errno != EAGAIN)

printf("mq_receive error\n");

sigprocmask(SIG_UNBLOCK,&newmask,NULL);

}

return 0;

}

void sig_usr1(int signo)

{

mqflag = 1;

}

��里为什么使用的是非阻塞式mq_receive,为什么不在信号处理程序中打印接收到的字符请参阅《unp 第二卷》

5.2 mq_notify() 使用线程处理程序

异步事件通知的另一种方式是把sigev_notify设置成SIGEV_THREAD,这会创建一个新线程,该线程调用由sigev_notify_function指定的函数,所用的参数由sigev_value指定,新线程的属性由sigev_notify_attributes指定,要指定线程的默认属性的话,传空指针。新线程是作为脱离线程创建的。

#include

#include

#include

#include

#include

#include

#include

#include

#include

mqd_t mqid = 0;

struct mq_attr attr;

struct sigevent sigev;

static void notify_thread(union sigval);

int main(int argc, char* argv[])

{

int mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH;

mqid = mq_open("/mq_test",O_RDONLY | O_NONBLOCK,mode,NULL);

mq_getattr(mqid,&attr);

sigev.sigev_notify = SIGEV_THREAD;

sigev.sigev_notify_function = notify_thread;

sigev.sigev_value.sival_ptr = NULL;

sigev.sigev_notify_attributes = NULL;

int n = mq_notify(mqid,&sigev);

for (;;)

pause();

return 0;

}

static void notify_thread(union sigval arg)

{

ssize_t n;

char* buff;

printf("notify_thread_started!\n");

buff = malloc(attr.mq_msgsize);

mq_notify(mqid, &sigev);

while( (n = mq_receive(mqid,buff,attr.mq_msgsize,NULL)) >=0)

printf("SIGUSR1 received, read %ld bytes.\n",(long)n);

if(errno != EAGAIN)

printf("mq_receive error\n");

free(buff);

pthread_exit(0);

}

0b1331709591d260c1c78e86d0c51c18.png

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

linux消息队列服务,Linux进程间通信-消息队列(mqueue)深入理解 的相关文章

  • 从 Python 调用 PARI/GP

    我想打电话PARI GP http pari math u bordeaux fr dochtml gpman html仅从Python计算函数nextprime n 对于不同的n是我定义的 不幸的是我无法得到帕里蟒蛇 http code
  • 在哪里可以找到并安装 pygame 的依赖项?

    我对 Linux 比较陌生 正在尝试安装 python 的 pygame 开发环境 当我运行 setup py 时 它说我需要安装以下依赖项 我找到并安装了其中之一 SDL 然而 其他人则更加难以捉摸 Hunting dependencie
  • 使用 sed 更新 xml 属性(Windows + cygwin 和 Linux)?

    我需要使用 sed 命令对 xml 文件进行更新 但我在这方面遇到了麻烦 它需要在 Windows 使用 cygwin 和 Linux 上运行 XML 具有以下元素
  • fopen 不返回

    我在 C 程序中使用 fopen 以只读模式 r 打开文件 但就我而言 我观察到 fopen 调用没有返回 它不返回 NULL 或有效指针 执行在 fopen 调用时被阻止 文件补丁绝对正确 我已经验证过 并且不存在与权限相关的问题 任何人
  • Discord.net 无法在 Linux 上运行

    我正在尝试让在 Linux VPS 上运行的 Discord net 中编码的不和谐机器人 我通过单声道运行 但我不断收到此错误 Unhandled Exception System Exception Connection lost at
  • Pyaudio 安装错误 - “命令‘gcc’失败,退出状态 1”

    我正在运行 Ubuntu 11 04 Python 2 7 1 并想安装 Pyaudio 于是我跑了 sudo easy install pyaudio 在终端中 进程退出并显示以下错误消息 Searching for pyaudio Re
  • 如何将目录及其子目录中的所有 PDF 文件复制到一个位置?

    如何全部复制PDF文件从目录及其子目录到单个目录 实际上还有更多的文件 并且深度有些任意 假设四个目录的最大深度是公平的 我想这些文件需要重命名 如果a pdf例如 位于多个目录中 因为我会adding https ebooks stack
  • 安装J语言的JQt IDE,出现错误

    我一直按照这里的说明进行操作 http code jsoftware com wiki System Installation Linux http code jsoftware com wiki System Installation L
  • 如何在shell中输出返回码?

    我正在尝试通过调用自定义 shell 脚本sh bin sh c myscript sh gt log txt 2 gt 1 echo 该命令的输出是创建的后台进程的 PID 我想指导 bin sh保存返回码myscript sh到某个文件
  • 有谁知道在哪里定义硬件、版本和序列号。 /proc/cpuinfo 的字段?

    我想确保我的 proc cpuinfo 是准确的 目前它输出 Hardware am335xevm Revision 0000 Serial 0000000000000000 我可以在代码中的哪里更改它以给出实际值 这取决于 Linux 的
  • 如何通过保持目录结构完整来同步路径中匹配模式的文件?

    我想将所有文件从服务器 A 复制到服务器 B 这些文件在不同级别的文件系统层次结构中具有相同的父目录名称 例如 var lib data sub1 sub2 commonname filetobecopied foo var lib dat
  • 为什么我可以直接从 bash 执行 JAR?

    我是一个长期从事 Java 工作的人 并且知道运行带有主类的 JAR 的方法MANIFEST MFJar 中的文件很简单 java jar theJar jar 我用它来启动 Fabric3 服务器 包含在bin server jar在其标
  • 如何通过ssh检查ubuntu服务器上是否存在php和apache

    如何通过ssh检查Ubuntu服务器上apache是 否安装了php和mysql 另外如果安装的话在哪个目录 如果安装了其他软件包 例如 lighttpd 那么它在哪里 确定程序是否已安装的另一种方法是使用which命令 它将显示您正在搜索
  • Gtk-ERROR **:检测到 GTK+ 2.x 符号

    我正在使用 gcc 编译我的 c 应用程序 并使用以下标志 gcc evis c pkg config cflags libs gtk 2 0 libs clutter gtk 1 0 libs gthread 2 0 Wall o evi
  • 是否可以创建一个脚本来保存和恢复权限?

    我正在使用 Linux 系统 需要对一组嵌套文件和目录进行一些权限实验 我想知道是否没有某种方法可以保存文件和目录的权限 而不保存文件本身 换句话说 我想保存权限 编辑一些文件 调整一些权限 然后将权限恢复到目录结构中 将更改的文件保留在适
  • 按进程名称过滤并记录 CPU 使用情况

    Linux 下有选项吗顶部命令 https www man7 org linux man pages man1 top 1 html我可以在哪里按名称过滤进程并将每秒该进程的 CPU 使用情况写入日志文件 top pgrep 过滤输出top
  • Linux - 从第二个选项卡获取文本

    假设我们有这样的文件 一些文本11 一些文本12 一些文本13 一些文本21 一些文本22 一些文本23 文本由制表符分隔 我们知道第 1 列中的一些文本 但希望从第 2 列中获取文本 我知道我可以通过以下方式获取线路 grep somet
  • waitpid() 的作用是什么?

    有什么用waitpid 它通常用于等待特定进程完成 或者如果您使用特殊标志则更改状态 基于其进程 ID 也称为pid 它还可用于等待一组子进程中的任何一个 无论是来自特定进程组的子进程还是当前进程的任何子进程 See here http l
  • 进程退出后 POSIX 名称信号量不会释放

    我正在尝试使用 POSIX 命名信号量进行跨进程同步 我注意到进程死亡或退出后 信号量仍然被系统打开 在进程 打开它 死亡或退出后是否有办法使其关闭 释放 早期的讨论在这里 当将信号量递减至零的进程崩溃时 如何恢复信号量 https sta
  • 子目录中的头文件(例如 gtk/gtk.h 与 gtk-2.0/gtk/gtk.h)

    我正在尝试使用 GTK 构建一个 hello world 其中包括以下行 include

随机推荐