使用Zeromq和protobuf实现的socket通信

2023-05-16

本文介绍使用ZeroMQ(下文简称ZMQ),结合protobuf序列化实现客户端和服务端的通信。在之前的一篇文章中(http://blog.csdn.net/cjf_wei/article/details/52894560)介绍了Google的protobuf序列化的使用,以及结合unix环境的socket编程实现简单的客户端到服务端的通信。在接触了zmq之后,尝试使用这个“极速消息通信库”来重构之前的实现。
ZMQ是iMatix开发的以消息为导向的开源中间件库,它类似于Berkeley套接字,它支持多种传输协议,它小巧、简单,但速度足够快,可以用作一个并发框架。它支持多种模式的传输,不管是客户端到服务端的1:1关系,还是M:N关系,亦或是订阅/发布它都能轻松应对。本文使用客户端到服务端的1:1的应答模式。

Zmq应答模式的基本使用
使用ZMQ进行通信,首先要创建一个上下文环境,然后使用它创建套接字。

void *context = zmq_ctx_new();//创建上下文

客户端和服务端使用的socket类型并不一样。

void *requester = zmq_socket(context, ZMQ_REQ); //for client
void *responder = zmq_socket(context, ZMQ_REP); //for server

随后服务端将socket绑定到一个周知的地址和端口

zmq_bind(responder,"tcp://*:5555");

而客户端则要尝试连接到服务端提供的地址

zmq_connect(requester,"tcp://localhost:5555");

要把数据写入消息需要使用zmq_msg_init_size()来初始化消息,而读取消息由于未知消息的长度只能使用zmq_msg_init()来创建一个空的消息。
消息初始化后,发送消息使用zmq_send_send(),接收消息则使用zmq_msg_recv();
访问消息可以使用zmq_msg_data(),要想知道消息的大小可以使用zmq_msg_size();

最后需要关闭套接字,并销毁上下文。

zmq_close(&requester);    //关闭套接字
zmq_ctx_destroy(context); //销毁上下文

protobuf的使用请参考(http://blog.csdn.net/cjf_wei/article/details/52894560),在此不再赘述。

代码实现

  • 客户端
//for client
#include <iostream>
#include <string>
//for protobuf
#include "Test.pb.h" 
//for zmq
#include <zmq.h>

using namespace std;
using namespace Test::protobuf ;

const int BUFFSIZE = 128;

int main()
{
    //socket通信所需的上下文环境
    void *context = zmq_ctx_new();
    //根据context建立的socket的链接,客户端使用ZMQ_REQ套接字
    void *requester = zmq_socket(context, ZMQ_REQ);
    if( -1 == zmq_connect(requester,"tcp://localhost:5555"))
    {
        cout<<"Connect to server failed..."<<endl;
        zmq_ctx_destroy(context);
        return -1;
    }
    cout<<"Connect to server success..."<<endl;

    HeartInfo myprotobuf;
    while(1)
    {
        myprotobuf.set_type("client");
        myprotobuf.set_ip("192.168.1.100");
        myprotobuf.set_port(5555);

        char buff[BUFFSIZE];
        myprotobuf.SerializeToArray(buff,BUFFSIZE);

        //客户端发送请求
        int len = strlen(buff);
        zmq_msg_t req;
        if(0 != zmq_msg_init_size(&req,len))
        {
            cout<<"zmq_msg_init failed..."<<endl;
            break;
        }
        memcpy(zmq_msg_data(&req),buff,len);
        if(len != zmq_msg_send(&req,requester,0))
        {
            zmq_msg_close(&req);
            cout<<"send faliled..."<<endl;
            break;
        }
        //成功发送后,在控制台打印发送消息的内容
        cout<<"Type:"<<myprotobuf.type()<<"\t"
            <<"IP:"<<myprotobuf.ip()<<"\t"
            <<"Port:"<<myprotobuf.port()<<"\n";
        zmq_msg_close(&req);

        //清空发送缓存
        memset(buff,0,BUFFSIZE*sizeof(char));

        //客户端接收来自服务端的相应
        zmq_msg_t reply;
        zmq_msg_init(&reply);
        int size = zmq_msg_recv(&reply,requester,0);
        memcpy(buff,zmq_msg_data(&reply),size);
        HeartInfo receive;
        receive.ParseFromArray(buff,BUFFSIZE);
        cout<<"Type:"<<receive.type()<<"\t"
            <<"IP:"<<receive.ip()<<"\t"
            <<"Port:"<<receive.port()<<"\n";
        zmq_msg_close(&reply);
    }
    zmq_close(&requester);
    zmq_ctx_destroy(context);
    return 0;
}
  • 服务端
#include <iostream>
#include <string>
//for protobuf
#include "Test.pb.h" 
//for zmq
#include <zmq.h>

using namespace std;
using namespace Test::protobuf ;

const int BUFFSIZE = 128;

int main()
{
    //socket通信所需的上下文环境
    void *context = zmq_ctx_new();
    //根据context建立的socket的链接,服务端使用ZMQ_REP套接字
    void *responder = zmq_socket(context, ZMQ_REP);
    if( -1 == zmq_bind(responder,"tcp://*:5555"))
    {
        cout<<"bind socket to server failed..."<<endl;
        return -1;
    }

    HeartInfo myprotobuf;
    while(1)
    {
        char buff[BUFFSIZE];
        //接收客户端请求
        zmq_msg_t request;
        zmq_msg_init(&request);
        int size = zmq_msg_recv(&request,responder,0);
        memcpy(buff,zmq_msg_data(&request),size);
        HeartInfo receive;
        receive.ParseFromArray(buff,BUFFSIZE);
        cout<<"Type:"<<receive.type()<<"\t"
            <<"IP:"<<receive.ip()<<"\t"
            <<"Port:"<<receive.port()<<"\n";
        zmq_msg_close(&request);

        //清空接收缓存
        memset(buff,0,BUFFSIZE*sizeof(char));
        sleep(2);

        myprotobuf.set_type("server");
        myprotobuf.set_ip("192.168.1.100");
        myprotobuf.set_port(5555);
        myprotobuf.SerializeToArray(buff,BUFFSIZE);

        //服务端发送响应
        int len = strlen(buff);
        zmq_msg_t reply;
        if(0 != zmq_msg_init_size(&reply,len))
        {
            cout<<"zmq_msg_init failed..."<<endl;
            break;
        }
        memcpy(zmq_msg_data(&reply),buff,len);
        if(len != zmq_msg_send(&reply,responder,0))
        {
            zmq_msg_close(&reply);
            cout<<"send faliled..."<<endl;
            break;
        }
        //成功发送后,在控制台打印发送消息的内容
        cout<<"Type:"<<myprotobuf.type()<<"\t"
            <<"IP:"<<myprotobuf.ip()<<"\t"
            <<"Port:"<<myprotobuf.port()<<"\n";
        zmq_msg_close(&reply);
    }
    zmq_close(&responder);
    zmq_ctx_destroy(context);
    return 0;
}

在本文中使用的是protobuf来序列化要传输的内容,当然直接传输字符串也是可以的,但是
需要注意的是“除了字节大小外,zmq对你发送的数据一无所知”。这意味着在C/C++中,传输的字符串是否以’\0’结尾,你要自己决定并负责安全的处理。


1.《ZeroMQ云时代极速消息通信库》.电子工业出版社,2015.
2. 使用protobuf和socket实现服务器间消息的传递.http://blog.csdn.net/cjf_wei/article/details/52894560

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用Zeromq和protobuf实现的socket通信 的相关文章

  • socket连接超时问题

    一部分 把CSDN与中文yahoo翻了底朝天 也没找到如何设置socket的连接超时的满意方法 问此问题的兄弟已有一大堆 这里偶就讲一下win下如何设置socket的connect超时 设置connect的超时很简单 CSDN上也有人提到过
  • Protobuf安装步骤

    今天看Brpc开源代码的时候 看到了里面提到了google开源的protobuf的数据序列化和反序列工具 所以特地下了源码 试着看下一个简单的使用过程 1 protobuf的介绍 google protobuf是一个灵活的 高效的用于序列化
  • 无法安装 PyZMP for Python -- 依赖项

    我在安装 iPython 的 PyZMP 依赖项时遇到问题 我尝试了很多方法 例如使用 pip brew 但最终还是使用手动安装包这个答案 Now pip list packages产生以下结果 pyzmq 14 2 0 dev pyzmq
  • NanoMsg (NNG) 和 FlatBuffers 是否适合该项目?

    大声喊出我们是否应该考虑更好的事情 我正在寻找一种非常快速且简单的方法来获取多个程序 例如 5 个 每个程序都在私有 OpenStack 云上的单独节点上运行以相互通信 数据包将是短 C 结构 小于 100 字节 交通流量将会较少 可能低于
  • 每个线程或每个调用一个 ZeroMQ 套接字?

    众所周知 ZeroMQ套接字不得共享应用程序线程之间 context t但实例可以 我有一个多线程应用程序 我想让每个线程不时与一个线程交换消息REQ REP socket 对方 事件 异常等 取决于他们正在做什么 他们正在做非 ZeroM
  • 在 Xamarin 中使用 ZeroMQ

    我有一个由服务器和客户端组成的应用程序 服务器是 C C 应用程序 客户端是面向 Windows Android 和 iOS 的跨平台 Xamarin 应用程序 服务器部分和客户端部分使用 ZeroMQ 消息进行通信 我尝试了当前的 C 实
  • PHP ZMQ 扩展:无法加载动态库

    我正在 Windows 10 上开发 Wampserver 我正在使用名为 ZMQ 的扩展 我从以下位置下载了 DLLS https pecl php net package zmq 1 1 3 windows https pecl php
  • 适用于 Windows 的 Zeromq PHP 扩展

    我正在使用配置了 IIS 7 5 的 Zend 服务器 我搜索了 edit Zeromq php 扩展 我找到了这些http valokuva org builds http valokuva org builds and http sna
  • React/ZMQ/Ratchet - Websocket 服务器响应

    我目前已经有一个正在运行并使用 Ratchet PHP 的 Web 套接字服务器 我还没有处于希望外部脚本与我的服务器进行通信的阶段 我可以使用 ZMQ 成功地将数据推送到它 push php json name gt Joe Bloggs
  • Pyinstaller 运行具有 pyzmq 依赖项的脚本时出错

    这是我的第一篇 StackOverflow 帖子 我在创建具有 pyzmq v22 0 2 依赖项的 pyinstaller v4 2 可执行文件时遇到问题 我通过运行 pyinstaller main py 创建了一个可执行文件 dist
  • Python 多处理问题?

    我有一个包含 500 个输入文件的文件夹 所有文件的总大小约为 500 MB 我想写一个python执行以下操作的脚本 1 将所有输入文件加载到内存中 2 初始化一个空的python稍后将使用的列表 参见项目符号 4 3 启动 15 个不同
  • jeromq 生产准备好了吗?

    我过去曾通过 JVM 应用程序使用 ZeroMQjzmq图书馆 我计划在一个新项目中使用 Zeromq 其中一些服务是在 JVM 上实现的 我刚刚发现jeromq https github com zeromq jeromq 一个 Zero
  • NODE_MODULE_VERSION 46。此版本的 Node.js 需要 NODE_MODULE_VERSION 64。请尝试重新编译或重新安装

    我正在尝试执行提供给我的节点应用程序 它应该可以正常工作 我已尝试运行它 但无法修复此错误 seba vps92941 services drivetech node awto js home seba services drivetech
  • Python ZeroMQ PUSH/PULL——丢失消息?

    我正在尝试使用python with zeroMQ in PUSH PULL模式 发送大小的消息4 MB 每隔几秒钟 由于某种原因 虽然看起来所有消息都已发送 但服务器似乎只收到了其中一些消息 我在这里缺少什么 这是客户端的代码 clien
  • 如何将扩展 PUB-SUB 模式中的发布者和订阅者与 C++ 中 ZeroMQ 中的中介同步?

    Extended PUB SUB topology https i stack imgur com GEgpx png 我在一个有 1 个中介的用例中有多个发布者和多个订阅者 在 ZeroMQ 指南中 我了解了如何使用额外的方法来同步 1
  • 将 ZeroMQ 与 C# 和 inproc 传输一起使用

    我正在尝试 ZeroMQ 并试图得到某物在职的 我的第一个想法是使用 inproc 传输设置 REP REQ 看看是否可以在两个线程之间发送消息 下面的大部分代码取自 clzmq 示例 但它似乎不起作用 服务器和客户端都绑定到传输 但是当客
  • 创建的线程数超出预期

    你可以找到该程序here https pastebin com H5fq732a 我正在消息传递框架 0MQ 中构建一个程序 我尝试执行我发布的内容here https stackoverflow com questions 4409620
  • REQ/REP 模式中的 ZeroMQ FiniteStateMachineException

    我有两个简单的组件 它们应该使用 REQ REP ZeroMQ 模式相互通信 服务器 REP Socket 是使用 pyzmq 在 Python 中实现的 import zmq def launch server print Launchi
  • PHP - 外部类/库可以从 apache 访问,但不能从 phpunit 访问

    我在我的 Web 应用程序中使用 ZeroMQ 套接字库 我已经配置了 php ini 以便 Apache 可以使用 ZMQ 但我不知道 phpunit 如何使用它 phpunit 不使用与 apache 使用相同的 php ini 吗 在
  • ZeroMQ在多线程应用程序中处理中断

    多线程环境下ZeroMQ的优雅退出 规格 带有 c 11 的 ubuntu 16 04 libzmq 4 2 3 示例代码 static int s interrupted 0 static void s signal handler in

随机推荐