【开源之美】nanomsg(2) :req/rep 模式

2023-11-08

req/rep 模式显然就是类似http的应答模式。在某些基于短连接的进程间通讯方式上可以很方便的使用。下面我们举个例子:

服务端:demo


#ifndef NANOMSGUTIL_H
#define NANOMSGUTIL_H

#include "messageDispatch.h"
#include "thread/nthread.h"

class NanomsgServer : public QThread
{
public:
    NanomsgServer(const QString url = "tcp://127.0.0.1:5555");

    int NanoServer();

    virtual void run() override final;

    int process();

    void stop();

private:
    QString m_url;
    bool m_stopFlag = false;
    MessageDispatch m_dispatcher; /// 消息分发处理
};

#endif

#include "nanomsgServer.h"
#include <NLog>
#include <QJsonDocument>
#include <QJsonObject>
#include <QJsonArray>

/*
    Copyright 2016 Garrett D'Amore <garrett@damore.org>

    Permission is hereby granted, free of charge, to any person obtaining a copy
    of this software and associated documentation files (the "Software"),
    to deal in the Software without restriction, including without limitation
    the rights to use, copy, modify, merge, publish, distribute, sublicense,
    and/or sell copies of the Software, and to permit persons to whom
    the Software is furnished to do so, subject to the following conditions:

    The above copyright notice and this permission notice shall be included
    in all copies or substantial portions of the Software.

    THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
    THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
    FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
    IN THE SOFTWARE.

    "nanomsg" is a trademark of Martin Sustrik
*/

/*  This program serves as an example for how to write an async RPC service,
    using the RAW request/reply pattern and nn_poll.  The server receives
    messages and keeps them on a list, replying to them.

    Our demonstration application layer protocol is simple.  The client sends
    a number of milliseconds to wait before responding.  The server just gives
    back an empty reply after waiting that long.

    To run this program, start the server as async_demo <url> -s
    Then connect to it with the client as async_client <url> <msec>.

    For example:

    % ./async_demo tcp://127.0.0.1:5555 -s &
    % ./async_demo tcp://127.0.0.1:5555 323
    Request took 324 milliseconds.
*/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#ifdef WIN32
#include <windows.h>
#include <winsock.h>
#else
#include <sys/time.h>
#endif

#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>

/*  MAXJOBS is a limit on the on the number of outstanding requests we
    can queue.  We will not accept new inbound jobs if we have more than
    this queued.  The reason for this limit is to prevent a bad client
    from consuming all server resources with new job requests. */

#define MAXJOBS 100
#define MAXLENS 10*1024

/*  The server keeps a list of work items, sorted by expiration time,
    so that we can use this to set the timeout to the correct value for
    use in poll.  */
struct work {
    struct work *next;
    struct nn_msghdr request;
    uint64_t expire;
    void *control;
};


#ifdef WIN32
int gettimeofday(struct timeval *tp, void *tzp)
{
    time_t clock;
    struct tm tm;
    SYSTEMTIME wtm;
    GetLocalTime(&wtm);
    tm.tm_year   = wtm.wYear - 1900;
    tm.tm_mon   = wtm.wMonth - 1;
    tm.tm_mday   = wtm.wDay;
    tm.tm_hour   = wtm.wHour;
    tm.tm_min   = wtm.wMinute;
    tm.tm_sec   = wtm.wSecond;
    tm. tm_isdst  = -1;
    clock = mktime(&tm);
    tp->tv_sec = clock;
    tp->tv_usec = wtm.wMilliseconds * 1000;
    return (0);
}
#endif


/*  Return the UNIX time in milliseconds.  You'll need a working
    gettimeofday(), so this won't work on Windows.  */
uint64_t milliseconds (void)
{
    struct timeval tv;
    gettimeofday (&tv, NULL);
    return (((uint64_t)tv.tv_sec * 1000) + ((uint64_t)tv.tv_usec / 1000));
}


NanomsgServer::NanomsgServer(const QString url)
{
    m_url = url;
}

/*  The server runs forever. */
void NanomsgServer::run()
{
    INFO_PRINT_LINE << "start service thread.";

    int fd;
    struct work *worklist = NULL;
    int npending = 0;

    /*  Create the socket. */
    fd = nn_socket(AF_SP, NN_REP);
    if (fd < 0) {
        fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
        return ;
    }

    /*  Bind to the URL.  This will bind to the address and listen
        synchronously; new clients will be accepted asynchronously
        without further action from the calling program. */

    if (nn_bind (fd, m_url.toStdString().data()) < 0) {
        fprintf (stderr, "nn_bind: %s\n", nn_strerror (nn_errno ()));
        nn_close (fd);
        return ;
    }

    /*  Main processing loop. */
    while(!m_stopFlag){

        void *buf = NULL;
        int nbytes = nn_recv (fd, &buf, NN_MSG, 0);
        if (nbytes < 0) {

            fprintf (stderr, "nn_recv: %s\n",nn_strerror (nn_errno ()));
            nn_freemsg (buf);

            continue;
        }

        char* request = NULL;
        request = (char*)malloc(nbytes+1);
        //memcpy((void*)request,buf,nbytes);
        strncpy(request,(const char*)buf,nbytes);
        request[nbytes] = '\0';
        QByteArray ba = QByteArray(request).trimmed();

        //INFO_PRINT_LINE << (char*)buf << nbytes;
        INFO_PRINT_LINE << request << strlen(request);

        /// message dispatch
        QJsonDocument loadDoc(QJsonDocument::fromJson(ba));
        QJsonObject dataObj = loadDoc.object();

		/// deal message
        QString responce = m_dispatcher.deal(QString(request));

        // responce to client
        const char *d = responce.toUtf8().constData();

        int sz_d = strlen(d) + 1; // '\0' too
        nbytes = nn_send (fd, d, sz_d, 0);

        assert (bytes == sz_d);

        INFO_PRINT_LINE << "[responce]  " << d << nbytes;

        free(request);
        nn_freemsg (buf);
    }

    nn_close (fd);
    return;
}

void NanomsgServer::stop()
{
    INFO_PRINT_LINE << "stop";

    if (QThread::isRunning())
    {
        INFO_PRINT_LINE << "stop";

        m_stopFlag = true;
        QThread::quit();
        QThread::wait();
    }
}

客户端:demo

#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>

#ifdef WIN32
#include <windows.h>
#include <winsock.h>
#else
#include <sys/time.h>
#endif

#include <nanomsg/nn.h>
#include <nanomsg/reqrep.h>
#define DEFAULT_URL             "tcp://127.0.0.1:5555"
#define DEFAULT_BUFFER_SIZE     (10*1024)

char npi_appId[32] = {0};

/*************************  Log Module *******************************/
#if _MSC_VER > 1000
#pragma once
#endif // _MSC_VER > 1000

enum {
    LL_NOTICE 	= 1, 	//一般输出
    LL_WARNING 	= 2, 	//告警输出
    LL_TRACE 	= 3,	//追踪调试
    LL_DEBUG 	= 4,	//软件bug
    LL_FATAL 	= 5     //致命错误
};

#define Print_NOTICE(log_fmt,...) \
    do{ \
    printf("L(%d)[%s:%d][%s]:  "log_fmt"\n", LL_NOTICE,__FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \
    }while (0)

#define Print_WARN(log_fmt,...) \
    do{ \
    printf("L(%d)[%s:%d][%s]:  "log_fmt"\n", LL_WARNING, __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \
    }while (0)

#define Print_TRACE(log_fmt,...) \
    do{ \
    printf("L(%d)[%s:%d][%s]:  "log_fmt"\n", LL_TRACE,__FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \
    }while (0)

#define Print_DEBUG(log_fmt,...) \
    do{ \
    printf("L(%d)[%s:%d][%s]:  "log_fmt"\n", LL_DEBUG, __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \
    }while (0)

#define Print_FATAL(log_fmt,...) \
    do{ \
    printf("L(%d)[%s:%d][%s]:  "log_fmt"\n",LL_FATAL, __FILE__, __LINE__, __FUNCTION__, ##__VA_ARGS__); \
    }while (0)


int NanoClientRequest(const char *url , const char* request, long len,char* result);

/*************************  nanomsg client  *******************************/
#define MAXJOBS 100
#define MAXLENS 10*1024

struct work {
    struct work *next;
    struct nn_msghdr request;
    uint64_t expire;
    void *control;
};


#ifdef WIN32
int gettimeofday(struct timeval *tp, void *tzp)
{
    time_t clock;
    struct tm tm;
    SYSTEMTIME wtm;
    GetLocalTime(&wtm);
    tm.tm_year   = wtm.wYear - 1900;
    tm.tm_mon   = wtm.wMonth - 1;
    tm.tm_mday   = wtm.wDay;
    tm.tm_hour   = wtm.wHour;
    tm.tm_min   = wtm.wMinute;
    tm.tm_sec   = wtm.wSecond;
    tm. tm_isdst  = -1;
    clock = mktime(&tm);
    tp->tv_sec = clock;
    tp->tv_usec = wtm.wMilliseconds * 1000;
    return (0);
}
#endif

uint64_t milliseconds (void)
{
    struct timeval tv;
    gettimeofday (&tv, NULL);
    return (((uint64_t)tv.tv_sec * 1000) + ((uint64_t)tv.tv_usec / 1000));
}


/*  The client runs just once, and then returns. */
int NanoClientRequest (const char *url, const char* request, long len, char *result)
{
    int fd;
    int rc;

    fd = nn_socket (AF_SP, NN_REQ);
    if (fd < 0) {
        fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
        return (-1);
    }

    if (nn_connect (fd, url) < 0) {
        fprintf (stderr, "nn_socket: %s\n", nn_strerror (nn_errno ()));
        nn_close (fd);
        return (-1);
    }

    if (nn_send (fd, request, len , 0) < 0) {
        fprintf (stderr, "nn_send: %s\n", nn_strerror (nn_errno ()));
        nn_close (fd);
        return (-1);
    }

    void* buf = NULL;
    rc = nn_recv (fd, &buf, NN_MSG , 0);
    if (rc < 0) {
        fprintf (stderr, "nn_recv: %s\n", nn_strerror (nn_errno ()));
        nn_close (fd);
        return (-1);
    }
    
    Print_TRACE("[recv rep]: %d  %s",rc,buf);

    memcpy((void*)result,buf,rc);

    nn_freemsg (buf);

    nn_shutdown (fd, 0);

    return 0;
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【开源之美】nanomsg(2) :req/rep 模式 的相关文章

随机推荐

  • 奇安信远程技术支持实习面试总结

    1 自我介绍 2 问项目 3 ssl vpn 和ipsec vpn 区别 4 Vxlan的用途和作用 5 跨专业就业的原因 6 Linux常用命令 查询内存的命令 7 交换机两种接口模式 8 Vrrp协议 9 Ospf五类数据包 10 对于
  • erp系统服务器电脑配置,erp软件服务器电脑配置

    erp软件服务器电脑配置 内容精选 换一换 Atlas 200 DK开发者板支持通过USB端口或者网线与Ubuntu服务器进行连接 连接示例图如图1所示 Atlas 200 DK连接Ubuntu服务器有以下场景 使用USB连接线通过USB端
  • 仿百度页面制作html+css+js动态页面

    仿百度页面制作html css js动态页面
  • java stream 两个List<Map>合并

    new三条源数据 value值均为一个字 加入list Map
  • C++笔记(随时更新)

    一 string key str sort key begin key end 二 emplace back优于push back 能就地通过参数构造对象 不需要拷贝和移动内存 提升容器插入性能 三 swap nums i nums i 1
  • Python表白代码合集:用完这几种表白代码,找不到对象你来找我,这也太秀了吧❤️

    明天就七夕了 谁说程序员不懂浪漫 今天给大家分享几种有意思的表白代码 带你用Python 码 上七夕 话不多说 我们直接上代码 第一种 表白弹窗 先看效果 文字背景啥的 大家可以自定义一下 代码展示 20行代码实现弹窗 import tki
  • 特征工程(1)--特征工程是什么?

    机器学习领域的大神Andrew Ng 吴恩达 老师曾说 Coming up with features is difficult time consuming requires expert knowledge Applied machin
  • 乐高编程机器人编程有什么区别

    乐高编程机器人编程有什么区别 一直以来家长们对于孩子的学习重视程度可谓是相当的大 很多的家长会给孩子选择一些能够让孩子适应社会发展的课程 就拿现在很多的家长想要孩子去学习机器人编程的课程来说 有的家长对于乐高编程机器人编程有什么区别并不清楚
  • 【Xgplayer】xgplayer基本使用

    文章目录 xgplayer简介 xgplayer官网 Xgplayer VS VideoJs xgplayer下载 播放器组件 使用播放器 效果图 推荐 xgplayer简介 开发团队 字节跳动 字节跳动出品 必属精品 xgplayer是一
  • VS2022编译GDAL库报错: LINK : error LNK2001: 无法解析的外部符号 _OSRValidate _OGR_G_GetPointCount _OGRRegisterAll

    目录 场景复现 解决方案 场景复现 使用VS2022的Native Tools command prompt for 2022工具编译GDAL库时 报 LINK error LNK2001 无法解析的外部符号 OSRValidate OGR
  • Pytorch入门实战(5):基于nn.Transformer实现机器翻译(英译汉)

    使用Google Colab运行 open In Colab 源码地址 文章目录 本文涉及知识点 本文内容 环境配置 数据预处理 文本分词与构造词典 Dataset and Dataloader 模型构建 模型训练 模型推理 本文涉及知识点
  • 数据库并发控制 事务调度 可串行调度

    所谓并发操作 是指在多用户共享的系统中 许多用户可能同时对同一数据进行操作 所带来的问题是数据的不一致性 具体表现为 丢失更新 不可重复读 读脏数据 1 事务调度 1 1 串行调度 Serial Schedule 是指多个事务依序串行执行
  • 华为手机上的网上邻居怎么用_HUAWEI Mate 8 网络邻居 使用教程

    本帖最后由 爱奔跑的蜗牛 于 2016 1 19 23 54 编辑 有根数据线 手机连接电脑传输管理文件算不上什么秘密 但总有那么一两天 忘记带数据线 又急需拷贝电脑文件到手机上 除了问别人借数据线 难道就不能 自力更生 了吗 当然不是 拥
  • 【华为OD机试真题 python】通信误码【2022 Q4

    题目描述 通信误码 信号传播过程中会出现一些误码 不同的数字表示不同的误码ID 取值范围为1 65535 用一个数组记录误码出现的情况 每个误码出现的次数代表误码频度 请找出记录中包含频度最高误码的最小子数组长度 输入描述 误码总数目 取值
  • Python编程的注意事项

    目录 一 异常处理 1 精细化地捕获异常 2 finally 块中的资源清理 3 抛出自定义异常 二 类的继承 1 不要过度使用继承 2 了解多重继承的问题 三 垃圾回收与内存管理 1 对象引用计数的概念 2 循环引用的问题 Python
  • Kubernetes中的PV和PVC

    K8s引入了一组叫作Persistent Volume Claim PVC 和Persistent Volume PV 的API对象 大大降低了用户声明和使用持久化Volume的门槛 在Pod的Volumes中 只要声明类型是persist
  • [1080]idea import引用报错

    从GIT上拉下代码后 出现这种情况 类正常 但是import是浅灰色 引用类有红色警告 代码中所有的引用都报错 重启idea 无效 删除引用的类与被引用的类中的代码 无效 重新加载maven 无效 最后 清理缓存后 恢复正常 File gt
  • 硬件系统工程师宝典(29)-----应用DC/DC要注意什么?

    各位同学大家好 欢迎继续做客电子工程学习圈 今天我们继续来讲这本书 硬件系统工程师宝典 上篇我们说到使用LDO时 除了要考虑输入 输出电压外 还要注意压差 最大输出电流等 今天我们来讲讲DC DC的应用分析 DC DC分类 将一个不受控的输
  • 台式电脑重装系统失败怎么办

    当大家使用一键重装系统软件给自己电脑重装系统的时候 都可能会遇到一些故障问题造成台式电脑重装系统失败的情况发生 那么大家遇到台式电脑重装系统失败怎么办呢 现在小编就教下大家相关的方法教程 大家一起来看看吧 工具 原料 系统版本 window
  • 【开源之美】nanomsg(2) :req/rep 模式

    req rep 模式显然就是类似http的应答模式 在某些基于短连接的进程间通讯方式上可以很方便的使用 下面我们举个例子 服务端 demo ifndef NANOMSGUTIL H define NANOMSGUTIL H include