ZeroMQ发布-订阅模式的应用(C++)

2023-05-16

我使用的ZeroMQ版本是4.2.0,应用的是其发布-订阅模式

应该知道的细节:
PUB-SUB套接字是慢连接,你无法得知SUB是何时开始接收消息的。就算你先打开了SUB套接字,后打开PUB发送消息,这时SUB还是会丢失一些消息的,因为建立连接是需要一些时间的。很少,但并不是零。参考文章:传送门1.传送门2.

先看代码,参考该文章:ZeroMQ的订阅发布(publish-subscribe)模式

//server--publish
#include <iostream>
#include <string>
#include <sstream>
#include <cstring>
#include <iomanip>
#include <cstdlib>
#include <ctime>
#include <assert.h>
#include <zmq.h>

using namespace std;

int main ()
{
    void *context = zmq_ctx_new ();
    void *publisher = zmq_socket (context, ZMQ_PUB);
    int rc = zmq_bind (publisher, "tcp://127.0.0.1:5556");
    assert (rc == 0);

    //  Initialize random number generator
    srand(time(0));
    while (1) {
        int zipcode, temperature, relhumidity;
        zipcode     = rand() % 100000;
        temperature = rand() % 215 - 80;
        relhumidity = rand() % 50 + 10;

        ostringstream os;
        os << setw(5) << setfill('0')<< zipcode <<" "
           << temperature <<" "<< relhumidity << "\0";

		int len = strlen(os.str().c_str());
		zmq_msg_t msg;
		zmq_msg_init_size(&msg, len);
		
		char *message = static_cast<char *>(zmq_msg_data(&msg));
		memcpy(message, os.str().c_str(), strlen(os.str().c_str()));
		message[len] = '\0';
		int size = zmq_msg_send(&msg, publisher, 0);
		assert(len == size);
    }
    zmq_close (publisher);
    zmq_ctx_destroy (context);
    return 0;
}
//client--subscribe
#include <iostream>
#include <string>
#include <cstring>
#include <assert.h>
#include <zmq.h>

using namespace std;

int main (int argc, char *argv [])
{
    //  Socket to talk to server
    printf ("Collecting updates from weather server...\n");
    void *context = zmq_ctx_new ();
    void *subscriber = zmq_socket (context, ZMQ_SUB);
    int rc = zmq_connect (subscriber, "tcp://localhost:5556");
    assert (rc == 0);

    char filter1[] = "10001 ";
    char filter2[] = "20002 ";
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter1, strlen (filter1));
    assert (rc == 0);
    rc = zmq_setsockopt (subscriber, ZMQ_SUBSCRIBE,filter2, strlen (filter2));
    assert (rc == 0);

    //  Process 100 updates
    int size;
    char buffer [256];
    zmq_msg_t msg;
    rc = zmq_msg_init(&msg);
    assert(0 == rc);
    
    for (int update_nbr = 0; update_nbr < 100; update_nbr++) {

        int size = zmq_msg_recv(&msg, subscriber, 0);
         if (size == -1){
            cout<< "receiver error , skip this message"<<endl;
            continue;
        }
		static_cast<char*>(zmq_msg_data(&msg))[size] = '\0';
		
        cout <<static_cast<char*>(zmq_msg_data(&msg)) <<endl;
    }

    zmq_close (subscriber);
    zmq_ctx_destroy (context);
    return 0;
}

ZeroMQ官方API:http://api.zeromq.org/4-2:_start

几个要点

  1. 没有什么特别的要求我觉得还是使用zmq_send()和zmq_recv()这两个函数来进行数据的收发比较好,简单直接。
  2. 初始化消息推荐使用zmq_msg_init_size(zmq_msg_t*, size_t len),我在使用的时候,发现服务器这边如果使用zmq_init(zmq_msg_t)来初始化消息,zmq_msg_send()返回值是0,这意味着发出去的字节数是0,客户端也收不到收据,我搞不懂这是什么原因,换成zmq_msg_init_size()来初始化消息就能返回发送消息的字节数了。客户端这边我用zmq_msg_init()初始化消息能正常接收到来自服务器的消息。
  3. 使用消息,推荐将消息的创建放在循环体内,看别人的博客提到消息发送成功后就会被释放,再次使用的时候就会出现问题。所以每次循环的时候都创建一个消息,然后初始化后使用。

参考文章:https://www.cnblogs.com/chenny7/p/6245236.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ZeroMQ发布-订阅模式的应用(C++) 的相关文章

  • NanoMsg (NNG) 和 FlatBuffers 是否适合该项目?

    大声喊出我们是否应该考虑更好的事情 我正在寻找一种非常快速且简单的方法来获取多个程序 例如 5 个 每个程序都在私有 OpenStack 云上的单独节点上运行以相互通信 数据包将是短 C 结构 小于 100 字节 交通流量将会较少 可能低于
  • ZMQ ROUTER中的客户端维护

    ZeroMQ ROUTER 套接字如何在内部维护其客户端连接 该指南称每个客户都会获得一个唯一的 ID 但并不清楚 什么算作客户端 每台机器不同的客户端或每个连接的应用程序不同 从客户端收到的请求数量是否有限制 原因是 我正在对这段代码进行
  • 如何使用 ZMQ 发送/接收通过 Protocol Buffers 序列化的二进制数据

    我需要在 ZMQ 套接字上发送一个对象 用 GPB 序列化 目前该代码有一个额外的副本 如何直接将序列化数组写入message ts data ABT CommunicationProtocol introPacket Fill the p
  • zmq::message_t发送后可以重复使用吗?

    我正在使用 ZeroMQ 来实现一个玩具通信协议 这是我第一次使用这个框架 库 现在 在我的协议中 某一方发送多个连续消息 所有消息都具有相同的大小 所以 我想 我会避免重新分配它们 而只是尝试用不同的内容重新填充消息数据缓冲区 例如 zm
  • ZeroMQ 中的 N 到 N 异步模式?

    尽管我阅读了该指南 但我找不到执行以下操作的方法 我们有n个出版商 我们有 m 个订户 每个订阅者订阅某种类型的消息 一个发布者可以发送多种消息 多个发布者可以发出相同类型的消息 如何在 0MQ 中创建 N 到 N 或 N 到 1 到 N
  • 为什么 Mono 上的 NetMQ DealerSocket 在 Debian Wheezy 上不向服务器发送消息,但在 Windows 上却发送消息?

    我在 Debian Wheezy 上的 Mono 4 8 上使用 NetMQ 4 0 0 1 时遇到一些问题 经销商套接字不会发送任何消息 直到我不会停止调用它来发送新消息 当我将Thread Sleep 1000 在创建任务之间比一切都好
  • 如何使用 Zeromq 的 inproc 和 ipc 传输?

    我是 ZERMQ 的新手 ZeroMQ 具有 TCP INPROC 和 IPC 传输 我正在寻找在 Winx64 和 python 2 7 中使用 python 和 inproc 的示例 这些示例也可以用于 Linux 另外 我一直在寻找
  • 为没有 ZeroMQ 绑定的语言创建 IPython 的语言内核

    有some http andrew gibiansky com blog ipython ipython kernels 有趣的描述 https stackoverflow com questions 22782028 ipython la
  • 具有自定义负载平衡的 ZMQ 套接字

    我研究了 ZMQ PUSH PULL 套接字 尽管我非常喜欢这种简单性 特别是与我现在在 UDP 套接字上的系统中实现的自定义碎片 ack 相比 但我希望使用自定义负载平衡 而不是简单的循环 robin 我相信 ZMQ PUSH PULL
  • 为分布式系统构建数据收集和监控的中间件[关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我目前正在寻找一个好的中间件来构建监控和维护系统的解决方案 我们面临的挑战是监控 收集数据并维护由多达
  • ZeroMQ:重新绑定套接字时地址使用错误

    将 ZeroMQ 套接字绑定到端点并关闭套接字后 将另一个套接字绑定到同一端点需要多次尝试 之前的调用zmq bind直到成功失败并出现错误 地址正在使用 EADDRINUSE 下面的代码演示了这个问题 include
  • ZeroMQ性能测试。准确的延迟是多少?

    我正在使用 zmq 跨进程传输消息 并且我想做一些性能测试来获取延迟和吞吐量 官方网站给出了指南讲述如何运行性能测试 http zeromq org results perf howto 例如 我尝试过 local lat tcp 1521
  • Python ZeroMQ PUSH/PULL——丢失消息?

    我正在尝试使用python with zeroMQ in PUSH PULL模式 发送大小的消息4 MB 每隔几秒钟 由于某种原因 虽然看起来所有消息都已发送 但服务器似乎只收到了其中一些消息 我在这里缺少什么 这是客户端的代码 clien
  • 连接到远程 IPython 实例

    我想在一台机器上运行 IPython 实例 并从不同的进程连接到它 通过 LAN 以运行一些 python 命令 我知道 zmq 是可能的 http ipython org ipython doc dev development ipyth
  • 负载测试 ZeroMQ (ZMQ_STREAM) 以查找它可以处理的最大并发用户数

    有没有人有任何实际场景对 ZMQ 套接字进行负载测试以获得最大数量 他们可以处理的 并发用户 不是吞吐量 看起来 ZeroMQ 在 FD 限制方面存在一些严重问题 场景是 有许多 Web 服务器框架吹嘘它们可以处理数百万个并发用户 现在 如
  • 应用程序在 iOS 6 上崩溃:找不到符号:___sync_fetch_and_add_4

    我有一个与 iOS4 和 iOS5 完美配合的应用程序 它使用针对 ARM 的 Zeromq 库的静态编译版本 Apple 拒绝了我的应用程序 因为他们声称它在 iOS 6 下崩溃 尚未发布 wth 在使用 iOS 6 GM 尝试之后 我可
  • 无法在 ubuntu 19.04 上安装 libzmq3-dev

    我正在尝试安装libzmq3 dev on 乌班图19 04 使用命令 sudo apt install build essential libsocketcan dev libzmq3 dev 我收到消息 gt Some packages
  • ZeroMQ在多线程应用程序中处理中断

    多线程环境下ZeroMQ的优雅退出 规格 带有 c 11 的 ubuntu 16 04 libzmq 4 2 3 示例代码 static int s interrupted 0 static void s signal handler in
  • ZeroMQ可以用来接受传统的套接字请求吗?

    我正在尝试使用 ZeroMQ 重写我们的旧服务器之一 现在我有以下服务器设置 适用于 Zmq 请求 using var context ZmqContext Create using var server context CreateSoc
  • Python 3.6 ZeroMQ (PyZMQ) asyncio pub sub Hello World

    我刚刚开始使用 ZeroMQ 我正在尝试让 Hello World 在 Python 3 6 中与 PyZMQ 和 asyncio 一起使用 我试图将模块的功能与发布 订阅代码分离 因此有以下类设置 Edit 1 最小化示例 Edit 2

随机推荐

  • 深度学习中查看显卡使用情况

    命令 xff1a nvidia smi 功能 xff1a 显示服务器上的GPU的情况 命令 xff1a nvidia smi l 功能 xff1a 定时更新显示服务器上的GPU的情况 命令 xff1a watch n 3 nvidia sm
  • Shell脚本入门(一)--- 变量赋值、调取、echo$计算

    Shell脚本入门 xff08 一 xff09 文章目录 Shell脚本入门 xff08 一 xff09 64 toc shell 脚本变量赋值echo 计算 作业 xff1a 获取主机基本信息及分区使用率 shell 脚本 shell 变
  • C和C++中 struct的区别

    1 xff1a C 43 43 中不需要加struct就可以定义变量 xff0c 而c需要加struct 2 xff1a C 43 43 结构体内部可以使用函数
  • C和C++的const

    1 C语言的const修饰的变量都有空间 xff0c 全局的在常量区 xff0c 局部的在栈区 xff1b 2 C语言的const修饰的全局变量具有外部链接属性 xff0c extern const int a xff1b 即可使用 xff
  • 连接SSH失败的原因以及方法

    一 检查用户名密码 连接失败时可以先检查验证信息 xff0c 步骤如下 xff1a 1 运行软件 xff0c 在会话管理中找到连接失败的会话 右键单击会话名 xff0c 点击属性 图1 xff1a 查看属性 2 在弹出的对话框点击用户身份验
  • 虚地址空间

  • tensorflow2.0 学习笔记-利用tf.data.Dataset API读取numpy array文件

    tensorflow2 0 学习笔记 数据读取1 利用tf data Dataset API读取numpy array文件读取numpy array数据 利用tf data Dataset API读取numpy array文件 读取nump
  • matplotlib绘图中文乱码问题--解决方案(Windows)

    最近使用python绘图时 xff0c 出现中文乱码问题 xff0c 结合在网上搜索理解后 xff0c 按照如下步骤 xff0c 成功解决 解决方案 xff1a 步骤一 xff1a 找到Mircosoft YaHei UI字体文件 一般 在
  • Python数据分析之--运动员数据揭秘(一)

    在网易云课堂看了城市数据团的课程 xff0c 对理解利用pytthon进行数据分析的基本流程很有帮助 xff0c 因此进行复盘总结 xff0c 加深自己的理解 xff0c 巩固相关操作 分析资料及工具 xff1a Spyder Python
  • 缓慢变化维

    一 什么是缓慢变化维 xff1f 缓慢变化维 xff08 Slowly Changing Dimensions SCD xff09 它的提出是因为在现实世界中 xff0c 维度的属性并不是静态的 xff0c 它会随着时间的流失发生缓慢的变化
  • 一个小例子带你入门-Tableau

    声明 xff1a 本文是学习W3Cschool教程整理所得 xff0c 非原创 xff0c 原文链接 xff1a W3Cschool 创建任何Tableau数据分析报告涉及三个基本步骤 连接到数据源 它涉及定位数据并使用适当类型的连接来读取
  • SQL 必知必会--函数篇

    对SQL的基础函数做复习回顾 xff0c 本篇涉及的函数知识如下 xff1a 好了 xff0c 下面开始复习 xff1a SQL Aggregate 函数计算从列中取得的值 xff0c 返回一个单一的 值 Max 函数 作用 xff1a 返
  • WPS的Linux Mint版(Ubuntu)提示“系统缺失字体”的解决方法

    wps的Linux Mint版 Ubuntu 版安装成功后 xff0c 可能每次启动的时候都会提示 xff1a 系统缺失字体 xff0c 如图 xff1a 解决方法 1 首先下载字体包并解压 链接 https pan baidu com s
  • SQL必知必会--中级篇(二)

    接上一篇SQL必知必会 中级篇 xff08 一 xff09 xff0c 继续对sql知识进行整理复习 本篇包含知识点如图 xff1a 一 SQL 约束 用于规定表中的数据规 则 xff1b 如果存在违反约束的数据行为 xff0c 行为会被约
  • 静态网页个人简历

    程序员的简历是不用随身带的 首先 xff0c 作为程序员自己的简历是比别人特别的 xff1b 程序员应该是有思想 xff0c 有高情商的手工艺人 作为程序员简历是随身带的代码 xff0c 用代码书写的简历就像是一份随身携带着的简历 xff0
  • CMake之CMakeLists.txt编写入门

    自定义变量 主要有隐式定义和显式定义两种 隐式定义的一个例子是PROJECT指令 xff0c 它会隐式的定义 lt projectname gt BINARY DIR和 lt projectname gt SOURCE DIR两个变量 xf
  • 照相机成像原理 数码相机的成像原理

    照相机成像原理 数码相机的成像原理 1 1 数码相机 的成像原理 当打开相机的电源开关后 xff0c 主控程序芯片开始检查整个相机 xff0c 确定各个部件是否处于可工作状态 如果一切正常 xff0c 相机将处于待命状态 xff1b 若某一
  • MySQL 单表查询

    创建数据库并插入数据 创建表 xff0c 数据类型请自行查询 CREATE TABLE fruits id INT NOT NULL sid INT NOT NULL NAME CHAR 255 NOT NULL price DECIMAL
  • react ant Design pro Umi 项目左上角icon不显示

    今天本地运行项目没有问题 xff0c 打包发到远端发现logo icon不显示了 然后找了找资料说 LayoutSettings 哪里logo用链接图片可以本地图片有时候会加载异常 解决方法 xff1a 找到 app tsx 加个logo
  • ZeroMQ发布-订阅模式的应用(C++)

    我使用的ZeroMQ版本是4 2 0 xff0c 应用的是其发布 订阅模式 应该知道的细节 xff1a PUB SUB套接字是慢连接 xff0c 你无法得知SUB是何时开始接收消息的 就算你先打开了SUB套接字 xff0c 后打开PUB发送