ZeroMQ发布订阅模式之多进程实现

2023-05-16

ZeroMQ的发布订阅模式是单向的数据发布,服务器(即消息发布方)将更新的消息/事件推送到一组客户端(即订阅方)。
图1 发布-订阅模式
消息发布者创建ZMQ_PUB类型的socket并将消息发送到消息队列中。
消息订阅者创建ZMQ_SUB类型的socket,并使用zmq_setsockopt接口和ZMQ_SUBSCRIBE订阅事件。如果socket没有设置订阅,那么将收不到任何消息。订阅者可以设置多个订阅,多个订阅将会被累加起来,如果某个事件匹配任何订阅,那么订阅者都会接收到该事件。
订阅者可以通过ZMQ_UNSUBSCRIBE取消订阅某事件。
SUB套接字可以在循环中执行zmq_msg_recv来接收消息,也可以结合zmq_poll来接收消息,但不能向PUB套接字发送消息。
同样,PUB套接字可以多次执行zmq_msg_send,但不能执行zmq_msg_recv。
示例代码改编自教材,有一些函数冗余。
zmq_help.h:封装公共接口

#ifndef _ZMQ_HELPER_H_
#define _ZMQ_HELPER_H_

#include <vector>
#include "zmq.h"

class CZmqMsg
{
public:
    CZmqMsg()
    {
        (void)zmq_msg_init(&m_stMsg);
    }

    virtual ~CZmqMsg()
    {
        (void)zmq_msg_close(&m_stMsg);
    }

    void *GetMsgData()
    {
        return zmq_msg_data(&m_stMsg);
    }

    zmq_msg_t m_stMsg;
};

long ZQM_RecvMore(void * hSocket, std::vector<zmq_msg_t>& oRecvMsgList)
{
    int iMore = 0;
    size_t iMoreSize = sizeof(iMore);

    do
    {
        zmq_msg_t stMsg;
        (void)zmq_msg_init(&stMsg);

        long lRet = zmq_msg_recv(&stMsg, hSocket, 0);
        if (-1 == lRet)
        {
            (void)zmq_msg_close(&stMsg);
            return lRet;
        }

        oRecvMsgList.push_back(stMsg);

        lRet = zmq_getsockopt(hSocket, ZMQ_RCVMORE, &iMore, &iMoreSize);
        if (-1 == lRet)
        {
            /* ´Ë´¦Ê§°Ü²»ÐèÒª¹Ø±Õ msg_data£¬Í³Ò»Óɵ÷ÓÃÕß´¦Àí */
            return lRet;
        }
    } while (iMore);

    return 0;
}

void ZMQ_Recv_Data(void *requester, int bufLen, char *pcRspBuf)
{
    zmq_msg_t reply;
    zmq_msg_init(&reply);
    zmq_msg_recv(&reply, requester, 0);

    memcpy(pcRspBuf, zmq_msg_data(&reply), bufLen);
    zmq_msg_close(&reply);

    return;
}

#endif

client.cpp:订阅者,结合zmq_poll和zmq_msg_recv来接收消息

#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <sys/time.h>
#include <pthread.h>
#include <stdlib.h>
#include <sys/prctl.h>
#include <map>

#include "zmq_helper.h"
#include "zmq.h"

typedef void *(*CLIENT_THR_FUNC_CB)(void *);

void client_req_rsp_mode(void *)
{
    prctl(PR_SET_NAME, (unsigned long)"request");

    void *context = zmq_ctx_new();

    // Socket to talk to server
    printf("Connecting to hello world server.\n");
    void *requester = zmq_socket (context, ZMQ_REQ);
    zmq_connect (requester, "tcp://localhost:5555");

    int request_nbr;
    for (request_nbr = 0; request_nbr != 10; request_nbr++)
    {
        struct timeval start_time;
        char szSendBuf[10] = {0};
        snprintf(szSendBuf, sizeof(szSendBuf), "Hello_%03d", request_nbr);

        zmq_msg_t request;
        zmq_msg_init_data(&request, szSendBuf, strlen(szSendBuf), NULL, NULL);

        zmq_msg_send(&request, requester, 0);
        zmq_msg_close(&request);
        gettimeofday(&start_time, NULL);
        printf("Sending Hello Times:%d tv_sec:%ld tv_usec:%ld.\n", request_nbr, start_time.tv_sec, start_time.tv_usec);

        /* recv response data */
        char szRecvBuf[10] = {0};
        ZMQ_Recv_Data(requester, sizeof(szRecvBuf), szRecvBuf);
        struct timeval end_time;
        gettimeofday(&end_time, NULL);
        printf("Times:%d Received replay:%s tv_sec:%ld tv_usec:%ld\n", request_nbr, szRecvBuf, end_time.tv_sec, end_time.tv_usec);

        sleep(1);
    }

    sleep(2);
    zmq_close(requester);
    zmq_term(context);

    return;
}

void *client_create_subscriber_socket(void *context, const char *filter)
{
    void *subscriber = zmq_socket(context, ZMQ_SUB);
    
    /* 高水位选项的设置要在bind/connect之前, 否则不生效 */
    int iSocketOpt = 0;
    (void)zmq_setsockopt(subscriber, ZMQ_RCVHWM, &iSocketOpt, sizeof(iSocketOpt));
    (void)zmq_setsockopt(subscriber, ZMQ_SNDHWM, &iSocketOpt, sizeof(iSocketOpt));
    
    zmq_connect(subscriber, "tcp://localhost:5556");

    (void)zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter));
    
    return subscriber;
}

std::map<void *, void *> goSubscriberSocket;

void client_subscriber(void *)
{
    printf("Collecting updates from weather server....\n");
    prctl(PR_SET_NAME, (unsigned long)"subscriber");

    void *context = zmq_ctx_new();

    void *subscriber1 = client_create_subscriber_socket(context, "10001");
    goSubscriberSocket.insert(std::pair<void *, void *>(subscriber1, subscriber1));

    void *subscriber2 = client_create_subscriber_socket(context, "10002");
    goSubscriberSocket.insert(std::pair<void *, void *>(subscriber2, subscriber2));

    void *subscriber3 = client_create_subscriber_socket(context, "10003");
    goSubscriberSocket.insert(std::pair<void *, void *>(subscriber3, subscriber3));

    int update_num = 0;
    int total_temp = 0;
    unsigned int i = 0;
    zmq_pollitem_t astPollItems[3];

    while (true)
    {
        (void)memset(astPollItems, 0, sizeof(astPollItems));

        std::map<void *, void *>::iterator it = goSubscriberSocket.begin();
        for (i = 0; it != goSubscriberSocket.end(); ++it, i++)
        {
            astPollItems[i].socket = it->first;
            astPollItems[i].events = ZMQ_POLLIN;
        }

        long lRet = zmq_poll(astPollItems, goSubscriberSocket.size(), (long)10);
        if (lRet <= 0)
        {
            continue;
        }

        for (i = 0; i < goSubscriberSocket.size(); i++)
        {
            if (astPollItems[i].revents & ZMQ_POLLIN)
            {
                char update[20] = {0};
                ZMQ_Recv_Data(astPollItems[i].socket, sizeof(update), update);

                int zipcode = 0, temp = 0, relhumidity = 0;
                sscanf(update, "%d %d %d", &zipcode, &temp, &relhumidity);
                total_temp += temp;
                printf("subscriber Times:%d zipcode:%d temp:%d rel:%d\n", update_num, zipcode, temp, relhumidity);

                update_num++;
            }
        }

        if (update_num >= 100)
        {
            break;
        }
    }

    printf("Average temperature for zipcode:%s was %dF\n", "10001", (int)total_temp / update_num);

    zmq_close(subscriber1);
    zmq_close(subscriber2);
    zmq_close(subscriber3);
    zmq_ctx_destroy(context);

    return;
}

int main (void)
{
    pthread_t thridReqMode;
    //pthread_create(&thridReqMode, NULL, (CLIENT_THR_FUNC_CB)client_req_rsp_mode, NULL);

    pthread_t thridSublist;
    pthread_create(&thridSublist, NULL, (CLIENT_THR_FUNC_CB)client_subscriber, NULL);

    for ( ; ; )
    {
        sleep(1);
    }

    pthread_join(thridReqMode,NULL);
    pthread_join(thridSublist,NULL);

    return 0;
}

server.cpp:发布者

#include <stdio.h>
#include <string.h>
#include <iostream>
#include <unistd.h>
#include <sys/time.h>
#include <pthread.h>
#include <stdlib.h>
#include <sys/prctl.h>

#include "zmq_helper.h"
#include "zmq.h"

typedef void *(*BP_THR_FUNC_CB)(void *);


void server_req_rsp_mode(void *)
{
    prctl(PR_SET_NAME, (unsigned long)"response");

    // Prepare our context and socket
    void *context = zmq_ctx_new();
    void *responder = zmq_socket(context, ZMQ_REP);
    zmq_bind(responder, "tcp://*:5555");

    while (true)
    {
        char szRecvBuf[10] = {0};
        ZMQ_Recv_Data(responder, sizeof(szRecvBuf), szRecvBuf);

        struct timeval start_time;
        gettimeofday(&start_time, NULL);
        printf("Server Received data:%s tv_sec:%ld tv_usec:%ld\n", szRecvBuf, start_time.tv_sec, start_time.tv_usec);

        sleep(1);

        // Send reply back to client
        zmq_msg_t reply;
        zmq_msg_init_data(&reply, "World", 6, NULL, NULL);

        zmq_msg_send(&reply, responder, 0);

        zmq_msg_close(&reply);
    }

    zmq_close(responder);
    zmq_ctx_destroy(context);

    return;
}

void server_publisher(void *)
{
    prctl(PR_SET_NAME, (unsigned long)"publisher");

    void *context = zmq_ctx_new();
    void *publisher = zmq_socket(context, ZMQ_PUB);

	/* 先设置高水位再bind */
    int iSocketOpt = 0;
    (void)zmq_setsockopt(publisher, ZMQ_RCVHWM, &iSocketOpt, sizeof(iSocketOpt));
    (void)zmq_setsockopt(publisher, ZMQ_SNDHWM, &iSocketOpt, sizeof(iSocketOpt));

    zmq_bind(publisher, "tcp://*:5556");
    zmq_bind(publisher, "ipc://weather.ipc");

    while (true)
    {
        int zipcode = 0, temperature = 0, relhumidity = 0;
        zipcode = rand() %1000 + 10000;
        temperature = rand() % 215 - 80;
        relhumidity = rand() % 50 + 10;

        char update[20] = {0};
        snprintf(update, sizeof(update), "%05d %04d %03d", zipcode, temperature, relhumidity);

        zmq_msg_t request;
        zmq_msg_init_data(&request, update, strlen(update), NULL, NULL);

        zmq_msg_send(&request, publisher, 0);
        zmq_msg_close(&request);
        usleep(1*1000);
    }

    zmq_close(publisher);
    zmq_ctx_destroy(context);

    return;
}

int main ()
{
    pthread_t thridReqMode;
    pthread_create(&thridReqMode, NULL, (BP_THR_FUNC_CB)server_req_rsp_mode, NULL);

    pthread_t thridPublist;
    pthread_create(&thridPublist, NULL, (BP_THR_FUNC_CB)server_publisher, NULL);

    for ( ; ; )
    {
        sleep(1);
    }

    pthread_join(thridReqMode,NULL);
    pthread_join(thridPublist,NULL);

    return 0;
}

编译命令:
g++ -g -Wall -fPIC -I/home/zhangjian/mq -L/home/zhangjian/mq -lzmq -lpthread client.cpp -o client
g++ -g -Wall -fPIC -fpermissive -I/home/zhangjian/mq -L/home/zhangjian/mq -lpthread -lzmq server.cpp -o server

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ZeroMQ发布订阅模式之多进程实现 的相关文章

随机推荐

  • VS2013取消预编译头

    创建C 43 43 Win32控制台时忘了取消预编译头 xff0c 怎么取消 xff1f 右键项目 gt 属性 gt 配置属性 gt C C 43 43 gt 预编译头 gt 不使用预编译头 改天再写预编译头是干嘛的
  • Xmanager 6 图形化界面连接Centos7 配置步骤

    1 前提 Centos 7已经安装就绪 2 安装Xmanager 6 本博客以Xmanager6 0 0108为例 a 首先双击可执行文件 出现如下安装界面 点击 下一步 nbsp nbsp nbsp nbsp nbsp nbsp nbsp
  • VS中VC++目录中的$是什么意思

    VC ExecutablePath x64 项目是x64平台 WindowsSDK ExecutablePath VS ExecutablePath MSBuild ExecutablePath VC IncludePath VCInsta
  • 关于单应性矩阵的理解:Homography matrix for dummies

    尽量写的通俗一点 xff0c 因为从某种程度上讲 xff0c 本人也是dummy 1 先说homogeneous coordinate xff0c 齐次坐标 一幅2D图像上的非齐次坐标为 x y xff0c 而齐次坐标为 x y 1 xff
  • 关于RANSAC的理解

    先说最小二乘 ok xff0c 你手头有一堆数据 xff0c 比如这些蓝点 xff1a 那么我们假设它符合一个直线模型 xff1a y 61 ax 43 b xff0c 用最小二乘就可以很容易求解出未知参数a和b 最小二乘大法确实好哇 xf
  • Visual Studio中监视数组

    比如有一个double h 9 xff0c 如果选择监视 xff0c 那么就只会监视h 0 xff0c 如果想监视其他元素 xff0c 难道只能h 1 h 2 一个个的添加吗 xff1f 当然不需要 xff0c 在监视中输入h 9就可以了
  • 在编译PX4之前,你需要知道的几件事

    1 在git上clone代码 xff0c 必须是clone xff0c 因为编译时需要有 git文件夹 如果你看Makefile就会发现有这么一行 xff1a Enforce the presence of the GIT reposito
  • 马氏距离与卡方分布

    最近在看 Fundamentals of object tracking xff0c 看到最近邻滤波时 xff0c 碰到了题中的两个概念 以下内容基本来自wiki xff0c 读者有不懂的地方看wiki更清晰明了 1 马氏距离 Mahala
  • PX4中的mavlink

    简介 px4与地面站的通信协议是mavlink xff0c 对于其消息格式的介绍看这里和这里 需要注意几点 xff1a 不光是px4与qgroundcontrol通信通过mavlink xff0c 有一些sensor也支持mavlink m
  • STM32F1Debug,定时器时基初始化参数

    STM32F1 xff0c 定时器时基初始化参数 错误代码 xff1a 在初始化时基时 xff0c 没有给TIM ClockDivision和TIM RepetitionCounter赋值 错误代码 xff1a span class tok
  • dockerfile详解

    前言 各位想必应该记得 xff0c 我们此前如果安装一个nginx的话 xff0c 安装完以后 xff0c 我们说过很多次了 xff0c 通常不会运行在默认配置下 xff0c 那因此 xff0c 我们通常需要去改一改它的配置文件或者定义模块
  • docker hub + github action x持续集成CI/CD

    docker 43 github 持续集成CI CD docker 持续集成 参考官网 xff1a https docs docker com ci cd best practices 的大部分内容 2020 Jetbrains devel
  • 利用爬虫获取免费IP代理

    项目目标 通过爬虫获取 西拉代理 xff08 http www xiladaili com xff09 上的高匿代理 xff0c 并储存至一个列表 项目分析 首先对网页进行观察 xff0c 主体内容如下图所示 不但指明了代理IP 协议类型
  • DOCKER windows 7 详细安装教程

    Edit DOCKER windows安装 编者 xff1a xiaym 日期 xff1a 2015年1月20日 排版工具 xff1a 马克飞象 QQ 252536711 DOCKER windows安装 1 下载程序包2 设置环境变量3
  • 职场里不能与之结为团队的十种人

    俗话说 xff1a 女怕嫁错郎 xff0c 男怕入错行 同样 xff0c 一个人进入职场最怕的就是遇上了自己无法与其默契的某些团队成员 xff0c 这会影响到自己的事业进取 xff0c 影响到自己努力奋斗的成果收获 xff0c 影响到自己做
  • python读取大疆P1相机POS

    大疆P1相机读取POS xff0c 算法不是很好 xff0c 但是可以用 未来有好的算法再贡献 import os import os path import exifread workspace 61 r 39 G 20210727 39
  • Java数据结构之Lambda表达式

    目录 1 背景1 1 Lambda表达式的语法1 2 函数式接口 2 Lambda表达式的基本使用3 变量捕获3 1 匿名内部类的变量捕获3 2 Lambda的变量捕获 4 Lambda在集合当中的使用4 1 Collection接口4 2
  • docker学习笔记

    一 docker简介 xff1a 1 是什么 xff1a xff08 1 xff09 为什么会有docker出现 xff0c 将解决什么样的问题 xff1a 当我们在开发一个项目的时候 xff0c 假如您自己的电脑有您自己的开发环境 xff
  • iserver配置https加密通信

    1 升级iserver为https访问 xff1a iserver是部署在tomcat中 xff0c 所以只要配置tomcat的相关配置就可以 xff1a xff08 1 xff09 https访问需要用到证书 xff0c 因此需要准备相关
  • ZeroMQ发布订阅模式之多进程实现

    ZeroMQ的发布订阅模式是单向的数据发布 xff0c 服务器 xff08 即消息发布方 xff09 将更新的消息 事件推送到一组客户端 xff08 即订阅方 xff09 消息发布者创建ZMQ PUB类型的socket并将消息发送到消息队列