UNPV2 学习:Posix Message Queues

2023-05-16

文章目录

    • 特点
    • 消息队列的释放
    • mq_notify 函数
    • mq_notify 使用信号通知消息到达
      • 直接在信号处理函数中调用 mq_notify 与 mq_receive 函数来接收数据
      • 在信号处理函数中设置标志在程序主逻辑中调用 mq_notify 与 mq_receive 函数来接收数据
      • 在信号处理函数中设置标志在程序主逻辑中调用 sigwait 等待信号然后调用 Mq_notify 与 Mq_receive 接收数据
      • 使用 select 监听 Posix 消息队列
      • 创建一个线程的执行函数的方式

特点

  1. 向 Posix 消息队列中写入消息时并不需要有读者进程存在,与 pipes 与 FIFOs 的行为不同
  2. 消息队列具有内核持久性,一个消息只有在被读取后才会释放,并不会因为写入消息的进程死亡而释放
  3. 在一个 Posix 消息队列上读取永远返回优先级最高的消息类型中最老的消息,而 System V 消息队列则能够返回任何目标优先级的消息
  4. Posix 消息队列允许当有一个消息被放进了空的队列时产生信号、初始化一个线程来通知消费者,System V 消息队列没有此功能

消息队列的释放

Posix 消息队列内部维护了一个引用计数,当引用计数大于 0 的时候目标消息队列能够从系统中移除,但是队列的释放仅在最后一次 mq_close 发生时才会触发。

mq_notify 函数

mq_notify 函数为 Posix 消息队列提供了一种异步通知机制,当消息被放到队列中时通知消费者进程, System V 消息队列就不具备这样的能力。

在调用 msgrcv 函数从 System V 消息队列中接收消息时进程可以挂起等待消息,但是在挂起期间不能执行任何其他任务。如果指定 NONBLOCK 标志调用 msgrcv 函数,进程不再阻塞但是却要持续调用此函数以确定队列中是否有数据到来,会浪费 cpu 时间。

Posix 消息队列支持通过如下两种方式来异步通知一个空的队列中有新的消息到来:

  1. 发送一个信号
  2. 创建一个线程来执行指定的函数

这两种机制通过指定不同的参数调用 mq_notify 函数来选择,mq_notify 函数的原型如下:

int mq_notify(mqd_t mqdes, const struct sigevent *sevp);

mq_notify 使用规则如下:

  1. 如果 sevp 参数非空,那么当前进程希望在空队列中有新的消息到达时被通知。我们说“该进程被注册为接收该队列的通知”。
  2. 如果 sevp 参数为空指针且当前进程已经使用 mq_notify 注册接收队列通知,己存在的注册将被移除。
  3. 同一时刻只支持单个进程调用 mq_notify 注册为接收某个特定队列的通知时间。
  4. 当有一个消息到达某个先前为空的队列,而且己有一个进程被注册为接收该队列的通知时,只有在没有任何线程阻塞在该队列的 mq_receive 调用中的前提下,通知才会发出。这就是说,在mq_reveive 调用中的阻塞比任何注册的通知具有更高优先级。
  5. 当该通知被发送给它的注册进程时,其注册事件被移除。该进程必须再次调用 mq_notify 以重新注册。

UNPV2 中提供了这两种不同方案的示例代码,我分别描述下关键的流程。

mq_notify 使用信号通知消息到达

直接在信号处理函数中调用 mq_notify 与 mq_receive 函数来接收数据

核心代码如下:

int main(int argc, char *argv[])
{
....................................
				Signal(SIGUSR1, sig_usr1);
        sigev.sigev_notify = SIGEV_SIGNAL;
        sigev.sigev_signo = SIGUSR1;
        Mq_notify(mqd, &sigev);
....................................
}

static void
sig_usr1(int signo)
{
        ssize_t n;

        Mq_notify(mqd, &sigev);                 /* reregister first */
        n = Mq_receive(mqd, buff, attr.mq_msgsize, NULL);
        printf("SIGUSR1 received, read %ld bytes\n", (long) n);
        return;
}

上述代码实现了 SIGUSR1 的信号处理函数并配置 mq_notify 使用信号通知机制,通知信号为 SIGUSR。

main 函数中注册了 SIGUSR1 信号的处理函数 sig_usr1,此函数的逻辑如下:

  1. 调用 Mq_notify 重新注册通知事件
  2. 调用 Mq_receive 接收消息然后打印接收到的字节数

此实现存在的问题为不应该在信号处理函数中调用 mq_notify、mq_receive、printf 函数,这些函数并不是异步信号安全的函数。

在信号处理函数中设置标志在程序主逻辑中调用 mq_notify 与 mq_receive 函数来接收数据

核心代码如下:

for ( ; ; ) {
                Sigprocmask(SIG_BLOCK, &newmask, &oldmask);     /* block SIGUSR1 */
                while (mqflag == 0)
                        sigsuspend(&zeromask);
                mqflag = 0;             /* reset flag */

                Mq_notify(mqd, &sigev);                 /* reregister first */
                n = Mq_receive(mqd, buff, attr.mq_msgsize, NULL);
                printf("read %ld bytes\n", (long) n);
                Sigprocmask(SIG_UNBLOCK, &newmask, NULL);       /* unblock SIGUSR1 */
        }

static void
sig_usr1(int signo)
{
        mqflag = 1;
        return;
}

SIGUSR1 信号处理程序中仅仅设置一个全局变量 mqflag 的值,在程序主逻辑中调用 mq_notify 与 mq_receive 来接收消息。

上述代码首先修改当前线程的信号掩码,临时关闭 SIGUSR1,然后执行 sigsuspend 等待 SIGUSR1 信好到来。

sigsuspend 函数会使用 zeromask 表示的 signal mask 修改当前线程的 signal mask,然后挂起当前线程,直到有一个会执行 signal handler、终止进程的目标信号产生。当收到信号并终止进程时,sigsuspend 将不会返回。如果成功捕获到信号,sigsuspend 将会在信号处理函数执行后返回,signal 将会被恢复为调用 sigsuspend 函数之前的状态。

在 sigsuspend 返回后,程序重置 mqflag 标志并调用 Mq_notify 与 Mq_receive 接收消息并打印接收的字节数,最后调用 Sigprocmask unblock SIGUSR1 信号。
此实现存在如下问题:

由于通知消息仅在有一条新的消息被放到空的队列时产生,如果在我们能够读取第一个消息前有两个消息达到,那么只有一个通知事件产生,于是我们读取第一个消息并调用 sigsuspend 等待另一个消息,而后续可能没有新的消息产生,这样我们就会漏掉第二个消息。

为了解决这个问题,我们可以在 Mq_receive 的时候多次读取队列,这样就不会漏掉消息。示例代码如下:

for ( ; ; ) {
                Sigprocmask(SIG_BLOCK, &newmask, &oldmask);     /* block SIGUSR1 */
                while (mqflag == 0)
                        sigsuspend(&zeromask);
                mqflag = 0;             /* reset flag */

                Mq_notify(mqd, &sigev);                 /* reregister first */
                while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
                        printf("read %ld bytes\n", (long) n);
                }
                if (errno != EAGAIN)
                        err_sys("mq_receive error");
                Sigprocmask(SIG_UNBLOCK, &newmask, NULL);       /* unblock SIGUSR1 */
        }

在信号处理函数中设置标志在程序主逻辑中调用 sigwait 等待信号然后调用 Mq_notify 与 Mq_receive 接收数据

上文描述了在信号处理函数中设置标志的方式,一个更简单的方式是在一个函数中阻塞等待内核发送目标信号,可以通过 sigwait 函数来实现。

新的代码如下:

for ( ; ; ) {
                Sigwait(&newmask, &signo);
                if (signo == SIGUSR1) {
                        Mq_notify(mqd, &sigev);                 /* reregister first */
                        while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
                                printf("read %ld bytes\n", (long) n);
                        }
                        if (errno != EAGAIN)
                                err_sys("mq_receive error");
                }
        }

sigwait 函数将会挂起当前线程直到 signal set 中指定的信号到来,此函数会接收这个信号(将信号从信号 pending list 中移除),然后通过第二个参数返回信号值。

上面的代码进一步简化,只调用 Sigwait,然后调用 Mq_notify、mq_receive,比使用 sigsuspend 更简单。

使用 select 监听 Posix 消息队列

Posix 消息队列描述符并不是一个普通的描述符不能使用 select、epoll 函数监控此描述符。可以使用 mq_notify + pipe 的方式,mq_notify 注册监控消息队列事件,通知方式为信号,在程序初始化时创建一个 pipe,在信号处理函数中调用 write 向这个管道的 fd 中写入数据,在主程序循环中 select pipe 的 fd 来间接的监听 Posix 消息队列。 write 系统调用是异步信号安全的函数,在信号处理函数中调用不会产生问题。

示例代码如下:

				Pipe(pipefd);

                /* 4establish signal handler, enable notification */
        Signal(SIGUSR1, sig_usr1);
        sigev.sigev_notify = SIGEV_SIGNAL;
        sigev.sigev_signo = SIGUSR1;
        Mq_notify(mqd, &sigev);

        FD_ZERO(&rset);
        for ( ; ; ) {
                FD_SET(pipefd[0], &rset);
                nfds = Select(pipefd[0] + 1, &rset, NULL, NULL, NULL);

                if (FD_ISSET(pipefd[0], &rset)) {
                        Read(pipefd[0], &c, 1);
                        Mq_notify(mqd, &sigev);                 /* reregister first */
                        while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
                                printf("read %ld bytes\n", (long) n);
                        }
                        if (errno != EAGAIN)
                                err_sys("mq_receive error");
                }
        }

static void
sig_usr1(int signo)
{
        Write(pipefd[1], "", 1);        /* one byte of 0 */
        return;
}

向 pipe 中写入的数据内容并不重要,重要的是写入这个动作触发 select 系统调用捕获事件,间接的绑定到 Posix 消息队列的通知事件。

创建一个线程的执行函数的方式

示例代码如下:

int
main(int argc, char **argv)
{
        if (argc != 2)
                err_quit("usage: mqnotifythread1 <name>");

        mqd = Mq_open(argv[1], O_RDONLY | O_NONBLOCK);
        Mq_getattr(mqd, &attr);

        sigev.sigev_notify = SIGEV_THREAD;
        sigev.sigev_value.sival_ptr = NULL;
        sigev.sigev_notify_function = notify_thread;
        sigev.sigev_notify_attributes = NULL;
        Mq_notify(mqd, &sigev);

        for ( ; ; )
                pause();                /* each new thread does everything */

        exit(0);
}

static void
notify_thread(union sigval arg)
{
        ssize_t n;
        void    *buff;

        printf("notify_thread started\n");
        buff = Malloc(attr.mq_msgsize);
        Mq_notify(mqd, &sigev);                 /* reregister */

        while ( (n = mq_receive(mqd, buff, attr.mq_msgsize, NULL)) >= 0) {
                printf("read %ld bytes\n", (long) n);
        }
        if (errno != EAGAIN)
                err_sys("mq_receive error");

         free(buff);
         pthread_exit(NULL);
}

sigev 中的 sigev_notify 设置为 SIGEV_THREAD 表示通过创建一个线程执行函数方式监听处理消息队列事件,sigev_notify_function 中设置了需要执行的函数指针为 notify_thread,此函数的主要逻辑如下:

  1. 申请一块 buff 用以接收消息
  2. 重新执行 Mq_notify 函数重新监听事件
  3. 调用 mq_receive 函数从队列中接收消息

在这种实现中,主线程可以做其它的任务,在示例程序中主线程啥也不干。这种创建线程执行函数的机制表面上看上去挺简单,可 mq_notify 注册的 notify_thread 是一个用户态虚拟内存空间的代码地址,它不能在内核态执行,意味着线程的创建与回调的执行都在用户态完成,那内核又是如何将事件投递到新创建的线程,让此线程执行回调来处理消息呢?

在进一步探讨前,先在我的本地 linux 环境上运行下示例程序,运行 log 如下:

[longyu@debian] pxmsg $ ./mqcreate /test1
[longyu@debian] pxmsg $ ./mqnotifythread1 /test1
notify_thread started
read 50 bytes
notify_thread started
read 50 bytes
notify_thread started
read 1024 bytes

mqnotifythread demo 能够正常接收消息。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

UNPV2 学习:Posix Message Queues 的相关文章

  • Linux进程间共享内存

    我有使用多个进程的服务器 fork 有大量数据可以由一个进程创建 并且应该在其他进程之间共享 因此 我使用 shm open mmap 创建共享内存并将其映射到虚拟内存 struct SharedData const char name i
  • 批处理cmd中弹出消息

    echo msgbox Hey Here is a message gt tmp tmp vbs cscript nologo tmp tmp vbs del tmp tmp vbs or echo msgbox Hey Here is a
  • Posix 线程问题

    我试图通过示例来理解 pthreads 我编写了以下代码 每次运行时都会给出不同的答案 有人可以解释一下这个错误吗 TIA 斯维亚 代码在这里 include
  • 使用 dup2 时的竞争条件

    这个联机帮助页 http linux die net man 2 dup2为了dup2系统调用说 EBUSY 仅限 Linux 这可能会在执行期间由 dup2 或 dup3 返回 open 2 和 dup 的竞争条件 它谈论什么竞争条件以及
  • 为什么系统调用返回 EFAULT 而不是发送段错误?

    需要明确的是 这是一个设计问题 而不是一个实现问题 我想知道 POSIX 这样做的背后的基本原理 当给定无效内存位置时 POSIX 系统调用返回 EFAULT 而不是使用户空间程序崩溃 通过发送 sigsegv 这使得它们的行为与用户空间函
  • ASP.net MasterPage.master 不存在

    我在 IIS 服务器上发布我的网站时遇到问题 我无法直接访问它 因此我必须依靠其他人在 IIS 上配置我的网站 但是 当我上传我的网站时 出现此错误 Line 1 Line 2
  • getchar() 返回错误的特殊情况是什么?

    所以我知道getchar 当输入结束或发生错误时返回 EOF 我也知道我可以通过以下方式检查发生了哪些情况ferror stdin and feof stdin 我想知道什么情况下会特别发生错误 我检查了这两个函数的手册页 但那里没有任何内
  • 有没有一种异步方式知道文件已更改?

    我想异步监视文件的任何更改 也就是说 当文件被修改 删除时 我希望在我的程序中进行回调 可能来自内核 该文件只是一个纯文本文件 我知道可以使用轮询机制来做到这一点 但我正在寻找一种基于事件的解决方案 我读到了有关 inotify 的内容 但
  • POSIX srandom(...) 和 random() 函数的 Windows 等效项?

    我正在尝试将一些代码从 UNIX 移植到 Windows 并且我需要 POSIX 的实现srandom x and random 对于给定的种子函数x 生成与符合 POSIX 1 2001 的编号规则相同的编号规则 Windows 上有哪些
  • 每个进程的最大线程数 - sysconf(_SC_THREAD_THREADS_MAX) 失败

    我试图找到 UNIX 机器上每个进程的最大线程数 并编写下面的代码来使用 sysconf include
  • POSIX 风格的操作系统中的命令行选项应该是下划线风格吗? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 POSIX 风格操作系统中程序的命令行选项名称是否应该是下划线风格 例如 cure world hunger 或者也许是其他风格 curewor
  • Python:“导入 posix”问题

    如果我导入os模块 我可以运行以下命令来推断 os py 的位置 gt gt gt import os gt gt gt print os file usr lib python2 6 os pyc 但是 当我导入时posix 它不具有 f
  • Linux 中 POSIX 可靠信号和 POSIX 实时信号有什么区别?

    我读了一个手册页signal using 男人7信号 http man7 org linux man pages man7 signal 7 html我看到两种类型的信号 所以 我有一个问题 有什么区别POSIX 可靠信号 and POSI
  • Unix域SOCK_DGRAM和SOCK_SEQPACKET之间的区别?

    根据 Unix 套接字的 Linux 手册页 UNIX 域中的有效套接字类型是 SOCK DGRAM 对于保留消息边界的面向数据报的套接字 与大多数 UNIX 实现一样 UNIX 域数据报套接字始终可靠且不可靠 不重新排序数据报 以及 自
  • 如何通过id获取消息discord.py

    我想知道如何通过消息 ID 获取消息 我努力了discord fetch message id and discord get message id 但两者都会提出 Command raised an exception Attribute
  • 如何从 POSIXct 对象获取原点

    我有一个像这样的函数 foo function time in code here that changes POSIXct to numeric time out as POSIXct time in origin 1970 01 01
  • osx 上的 aio:它是在内核中实现还是通过用户线程实现?其他选择?

    我正在开发我的小型 C 框架 并且有一个文件类 它也应该支持异步读写 除了在我发现的一些工作线程中使用同步文件 I O 之外 唯一的解决方案是 aio 无论如何 我环顾四周并在某处读到 在 Linux 中 aio 甚至不是在内核中实现的 而
  • getline() 与 fgets():控制内存分配

    要从文件中读取行 有getline and fgets POSIX 函数 忽略可怕的gets 这是常识getline 优先于fgets 因为它根据需要分配行缓冲区 我的问题是 这不危险吗 如果有人意外或恶意地创建了一个 100GB 的文件
  • 从命名管道读取

    我必须实现一个 打印服务器 我有 1 个客户端文件和 1 个服务器文件 include
  • Xenomai 中的周期性线程实时失败

    我正在创建一个周期性线程 它在模拟输出上输出方波信号 我正在使用 Xenomai API 中的 Posix Skin 和 Analogy 我使用示波器测试了代码的实时性能 并查看了方波信号 频率为 1kHz 的延迟 我应该实现 250us

随机推荐

  • c语言编程“水仙花数”

    文章目录 打印所有的水仙花数 所谓的 水仙花数 是指一个三位数 xff0c 其各位数字的立方和等于该数本身 例如 xff0c 153是水仙花数 xff0c 因为153 61 1 3 43 5 3 43 3 3 打印所有的水仙花数 所谓的 水
  • inux查看日志的几种方法

    linux 日志查看 tail head cat tac sed less echo 1 命令格式 tail 必要参数 选择参数 文件 f 循环读取 q 不显示处理信息 v 显示详细的处理信息 c lt 数目 gt 显示的字节数 n lt
  • asp不能正常用的原因

    前几天做网站时 xff0c 机子出现了这种症状 xff0c 重装过IE和IIS一样也无法解决 xff0c 在百度里找了一下 xff0c 下面的方法真的很适用 症状 xff1a 运行asp程序 包括其他动态网页程序 出现500内部错误信息 x
  • 用DLL实现把数据库的记录导出到EXCEL中(VB)

    39 新建一个ActiveX DLL工程工程名为DbToExcel 39 工程 gt 引用 引用Microsoft ActiveX Data Objects 2 6 Library 39 Microsoft Excel 9 0 Object
  • MySQL转换为SqlServer数据库

    如何将MySQL数据导入到SqlServer中 xff0c 请看以下步骤 xff1a 1 安装mysql数据库的ODBC驱动 xff0c mysql connector odbc 3 51 19 win32 msi 2 打开控制面板管理工具
  • DataTimePicker数据绑定遇到Null时异常的原因

    DateTimePicker1 DataBindings Add 34 Value 34 bindingSource1 34 assessortime 34 如果字段 assessortime的值 为 null 时 就会出现异常 后来发现
  • c#中DataTable与实体集合相互转换

    以下是将集合类转换成DataTable lt summary gt 将集合类转换成DataTable lt summary gt lt param name 61 34 list 34 gt 集合 lt param gt lt return
  • 用Linux命令行生成随机密码的十种方法

    转载自 极客范 xff0c 不得不夸夸强大的Bash啊 xff01 xff01 xff01 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61
  • C++20 Ranges

    VS2019 C 43 43 20的Ranges 01 引入范围的动机02 范围 ranges 03 range v3库04 C 43 43 20 range demo 01 引入范围的动机 C 43 43 17以前的标准库中大多数通用算法
  • 面向对象分析设计步骤

    一 创建用例 初步确定用例 xff1a 1 确定参与者 2 确定用例 xff08 系统操作 xff09 3 确定参与者与用例之间的关系 用例细节描述 xff1a 1 用例名称 2 操作详细描述 3 前置条件描述 4 部署约束 5 正常事件流
  • collect2: ld terminated with signal 9 错误解决办法

    编译android是出现如下错误 xff1a target Java CameraEffectsTests out target common obj APPS CameraEffectsTests intermediates classe
  • 浅谈Stein算法求最大公约数(GCD)的原理及简单应用

    一 Stein算法过程及其简单证明 1 一般步骤 xff1a s1 当两数均为偶数时将其同时除以2至至少一数为奇数为止 xff0c 记录除掉的所有公因数2的乘积k xff1b s2 如果仍有一数为偶数 xff0c 连续除以2直至该数为奇数为
  • 【已解决】@Configration爆红

    64 Configration爆红 问题原因 xff1a 一 xff1a 没有添加依赖 二 xff1a 添加依赖了 xff0c 但是依赖版本过低 解决方法 xff1a 把依赖的版本改的高一点 span class token generic
  • 关于冒泡排序的程序( 第三次作业)

    此前想过把两种排序方式都一起写在一个工程文件里 xff0c 但做了下 xff0c 能力有限 xff0c 没法写完整 xff0c 所以就只能分别写 xff0c 这个是冒泡排序 xff0c 代码已尽量做到准确的注释 xff0c 希望提醒自己往后
  • BSS段

    深入理解计算机系统 bss段 xff0c data段 text段 堆 heap 和栈 stack 1 关于BSS段的大小 2 1 BSS段中的内容 2 2 BSS段在加载运行前的处理 3 3 BSS段的作用 3 4 代码优化对BSS段的影响
  • Java 比较两个List对象差集(根据某一值)

    很多都是比较List lt String gt 的 xff0c 和自身业务不符 xff0c jdk1 8 新特性强大的Stream API xff0c 具体是什么方法 xff0c 什么作用自行百度 xff0c 复制粘贴可以解决问题就OK 4
  • Windows10 安装Redis(图文教程)

    Redis xff08 Remote Dictionary Server xff0c 即远程字典服务 xff0c 是一个开源的使用ANSI C语言编写 支持网络 可基于内存亦可持久化的日志型 Key Value数据库 一 下载redis客户
  • e17 enlightenment 介绍及配置

    为什么要有一个窗口管理器 为什么一定要有一个桌面背景 xff0c 甚至是标题栏 或是如果把一个应用程序如firefox当成桌面背景行不行 桌面能不能再快一点 我不想把资源浪费在那些用不到的地方 Linux那么多虚拟桌面 xff0c 为什么我
  • Vim: Warning: input is not from a terminal 后退出 vim 终端异常

    Vim Warning input is not from a terminal 后退出 vim 终端异常 今天执行了如下命令调用 vi 来打开 find 搜索到的文件 xff1a longyu 64 longyu pc span clas
  • UNPV2 学习:Posix Message Queues

    文章目录 特点消息队列的释放mq notify 函数mq notify 使用信号通知消息到达直接在信号处理函数中调用 mq notify 与 mq receive 函数来接收数据在信号处理函数中设置标志在程序主逻辑中调用 mq notify