Paho MQTT 嵌入式c客户端研究笔记 (二)

2023-05-16

  paho.mqtt.embedded-c-master\MQTTPacket\samples,这个目录里面封装了发布消息、订阅消息的示例。运行pub0sub1,这个示例里面会去订阅主题消息、发布主题消息。并且订阅和发布的消息是同一个主题,所以在运行过程中会看到循环打印同一份消息。代码如下:

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include "MQTTPacket.h"
#include "transport.h"

/* This is in order to get an asynchronous signal to stop the sample,
as the code loops waiting for msgs on the subscribed topic.
Your actual code will depend on your hw and approach*/
#include <signal.h>

int toStop = 0;

void cfinish(int sig)
{
    signal(SIGINT, NULL);
    toStop = 1;
}

void stop_init(void)
{
    signal(SIGINT, cfinish);
    signal(SIGTERM, cfinish);
}
/* */

int main(int argc, char *argv[])
{
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    int rc = 0;
    int mysock = 0;
    unsigned char buf[200];
    int buflen = sizeof(buf);
    int msgid = 1;
    MQTTString topicString = MQTTString_initializer;
    int req_qos = 0;
    char* payload = "mypayload";
    int payloadlen = strlen(payload);
    int len = 0;
    char *host = "m2m.eclipse.org";
    int port = 1883;

    stop_init();
    if (argc > 1)
        host = argv[1];

    if (argc > 2)
        port = atoi(argv[2]);

    mysock = transport_open(host, port);
    if(mysock < 0)
        return mysock;

    printf("Sending to hostname %s port %d\n", host, port);

    data.clientID.cstring = "me";
    data.keepAliveInterval = 20;
    data.cleansession = 1;
    data.username.cstring = "testuser";
    data.password.cstring = "testpassword";
    //连接服务器
    len = MQTTSerialize_connect(buf, buflen, &data);
    rc = transport_sendPacketBuffer(mysock, buf, len);

    /* wait for connack */
    if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK)
    {
        unsigned char sessionPresent, connack_rc;

        if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
        {
            printf("Unable to connect, return code %d\n", connack_rc);
            goto exit;
        }
    }
    else
        goto exit;

    /* subscribe 订阅主题消息*/
    topicString.cstring = "substopic";
    len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);

    rc = transport_sendPacketBuffer(mysock, buf, len);
    if (MQTTPacket_read(buf, buflen, transport_getdata) == SUBACK)  /* wait for suback */
    {
        unsigned short submsgid;
        int subcount;
        int granted_qos;        
        rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
        if (granted_qos != 0)
        {
            printf("granted qos != 0, %d\n", granted_qos);
            goto exit;
        }
    }
    else
        goto exit;

    /* loop getting msgs on subscribed topic循环读取消息 */
    topicString.cstring = "pubtopic";
    while (!toStop)
    {
        /* transport_getdata() has a built-in 1 second timeout,
        your mileage will vary */
        if (MQTTPacket_read(buf, buflen, transport_getdata) == PUBLISH)
        {
            unsigned char dup;
            int qos;
            unsigned char retained;
            unsigned short msgid;
            int payloadlen_in;
            unsigned char* payload_in;
            int rc;
            MQTTString receivedTopic;

            rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
                    &payload_in, &payloadlen_in, buf, buflen);
            printf("message arrived %.*s\n", payloadlen_in, payload_in);
        }

        printf("publishing reading\n");
        //下面两行是用来发布消息。这里发布,上面订阅,就形成了一个循环。
        len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen);
        rc = transport_sendPacketBuffer(mysock, buf, len);
    }

    printf("disconnecting\n");
    len = MQTTSerialize_disconnect(buf, buflen);
    rc = transport_sendPacketBuffer(mysock, buf, len);

exit:
    transport_close(mysock);

    return 0;
}

在这个示例中,改造一下,把发布消息的代码给注释掉。我们发现,过了一会,循环读消息的地方就再也接收不到服务器发送过来的消息了。推测是此时客户端与服务器之间的长连接已经断了。回到代码,

//设置心跳包间隔时间
data.keepAliveInterval = 20;

Keep Alive timer
The Keep Alive timer is present in the variable header of a MQTT CONNECT message.

The Keep Alive timer, measured in seconds, defines the maximum time interval between >messages received from a client. It enables the server to detect that the network connection to a client has dropped, without having to wait for the long TCP/IP timeout. The client has a responsibility to send a message within each Keep Alive time period. In the absence of a data-related message during the time period, the client sends a PINGREQ message, which the server acknowledges with a PINGRESP message.

If the server does not receive a message from the client within one and a half times the Keep Alive time period (the client is allowed “grace” of half a time period), it disconnects the client as if the client had sent a DISCONNECT message. This action does not impact any of the client’s subscriptions. See DISCONNECT for more details.

If a client does not receive a PINGRESP message within a Keep Alive time period after sending a PINGREQ, it should close the TCP/IP socket connection.

The Keep Alive timer is a 16-bit value that represents the number of seconds for the time period. The actual value is application-specific, but a typical value is a few minutes. The maximum value is approximately 18 hours. A value of zero (0) means the client is not disconnected.

The format of the Keep Alive timer is shown in the table below. The ordering of the 2 bytes of the Keep Alive Timer is MSB, then LSB (big-endian).

  pub0sub1示例代码中,删除发布消息代码块只保留订阅消息功能以后,客户端在20秒之内都可以接收到服务器推送过来的消息。如果20秒内客户端没有向服务器发送PINGREQ消息,那么服务器会关闭掉TCP/IP端口连接。
  因此,如果希望开启一个可以永远保持订阅消息的客户端,需要在设置的心跳间隔时间内向服务器发送PINGREQ消息。具体做法可以这样,代码如下:

while (!toStop)
    {

        /****循环订阅消息*************/
        topicString.cstring = "pubtopic";
        len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);

        rc = transport_sendPacketBuffer(mysock, buf, len);
        printf("publishing reading000\n");
        if (MQTTPacket_read(buf, buflen, transport_getdata) == SUBACK)  /* wait for suback */
        {
            unsigned short submsgid;
            int subcount;
            int granted_qos;

            rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
            if (granted_qos != 0)
            {
                printf("granted qos != 0, %d\n", granted_qos);
                goto exit;
            }
        }
        else
            goto exit;
        /*****************/
        /* transport_getdata() has a built-in 1 second timeout,
        your mileage will vary */
        int state = MQTTPacket_read(buf, buflen, transport_getdata);
        //printf("state is = %d\n", state);
        //printf("PUBLISH is = %d\n", PUBLISH);
        if (state != -1){
                    printf("state2 is = %d\n", state);
            }
        if (state == PUBLISH)
        {
            unsigned char dup;
            int qos;
            unsigned char retained;
            unsigned short msgid;
            int payloadlen_in;
            unsigned char* payload_in;
            int rc;
            MQTTString receivedTopic;

            rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
                    &payload_in, &payloadlen_in, buf, buflen);
            printf("message arrived %.*s\n", payloadlen_in, payload_in);            
        }

        //printf("publishing reading\n");
        //len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen);
        //rc = transport_sendPacketBuffer(mysock, buf, len);
    }

  我们在循环读取消息的代码块中加入了订阅消息,这样可以保持客户端与服务器之间长连接不会断开。当然,这么做效果并不好,因为循环发送订阅消息会对服务器产生比较多的负载。可以对代码做个优化,比如每间隔10秒钟发送一次订阅消息的请求。百度云官方提供的客户端代码就是这么实现的,下一次我们拿出来做对比。文档代码点这里

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Paho MQTT 嵌入式c客户端研究笔记 (二) 的相关文章

随机推荐

  • CEF3 C++接口实现自定义浏览器[simple version]

    目录 目录 1 一 工具准备 2 二 编译C 43 43 接口 2 三 实现浏览器 5 浏览器定制 5 浏览器默认最大化 xff0c 标题从配置文件读取 5 设置浏览器中文环境 xff0c 开启npapi插件功能 xff0c 并注册身份证阅
  • Windows笔记本声音无法找到输出设备

    现象 xff1a 扬声器无法正常工作 xff0c 声音选项提示无法找到输出设备 xff0c 设备管理器的音频输入和输出的声卡文件旁边有黄色的感叹号 xff0c 属性详情里面设备状态提示 xff1a 该设备无法启动 xff08 代码10 xf
  • C++扑克牌发牌

    自动发扑克牌 52张牌无大小王随机发给4个玩家 poker cpp 自动发扑克牌 52张牌无大小王随机发给4个 loaction src poker cpp include lt assert h gt include lt iostrea
  • ArchLinux下i3wm简单配置和美化

    先show下自己配置的截图 简单弄了下 xff0c 凑合用 本文默认你已经安装了基本的archlinux系统 xff0c 只是没有配置桌面环境 所以跳过前面archlinux的基础安装过程 ArchLinux的具体安装请参见我的博客 htt
  • CentOS8下编译配置nginx+rtmp,搭建推流服务器

    一 环境 服务器操作系统 xff1a CentOS Linux release 8 2 2004 Core nginx 版本 https nginx org download nginx 1 18 0 tar gz RMTP模块 xff1a
  • CentOS7网络配置(ping不同的原因及解决方法)

    这是配置好的CentOS7 xff0c 刚开始在Vmware里装CentOS7后是没有ip的 xff0c 原因是CentOS7默认不启动网卡的 xff0c 网卡不启用还ping个毛 进入 etc sysconfig network scip
  • SpringBoot-JPA进行多表连接查询

    通过JPA进行简单的 内 连接查询 1 准备 1 1开发工具Intellij Idea 1 2数据库mysql 1 3新建Spring Initializr项目 xff0c 勾选web mysql rest jpa依赖 2 开始 2 1项目
  • 响应式导航栏-利用纯css实现

    思路 xff1a 当屏幕为移动设备时 xff0c 隐藏导航栏列表项目 xff0c 显示菜单按钮 给菜单按钮 xff08 电脑时隐藏 xff09 加入hover 或者checked选择器实现 xff0c 当hover或者checked的触发时
  • BTRFS文件系统安装ArchLinux

    layout post title BTRFS文件系统安装ArchLinux date 2017 10 02 categories Linux 主要为以下步骤 xff1a 1 下载ArchLinux安装镜像并 制作U盘启动工具 2 开机进入
  • JSP文件上传

    JSP文件上传 网上的方法几乎都是使用的org apache commons fileupload的jar包 xff0c 需要手动下载导入commons fileupload jar和commons io jar 其实tomcat自带的or
  • ORPALIS PDF Reducer Pro(免费pdf压缩器工具)官方正式版V4.0.1 | pdf免费压缩软下载 | 怎样将pdf压缩得很小?

    ORPALIS PDF Reducer Pro 是一款优秀实用的离线单机版pdf免费压缩软件 xff0c 也就是大家说的免费pdf压缩器工具 xff0c 内置多种超高压缩比的PDF压缩算法和创新的页面布局分析以及自动颜色检测机制 xff0c
  • ubuntu系统文件夹作用

    opt 文件夹 用户级的程序目录 xff0c 可以理解为D Software xff0c opt有可选的意思 xff0c 这里可以用于放置第三方大型软件 xff08 或游戏 xff09 xff0c 当你不需要时 xff0c 直接rm rf掉
  • rhel7安装GUI

    check the rank of starting system systenmctl get default not found startx 查看光盘是否挂载 df 挂载 yum mount dev sr0 mnt mount dev
  • org-mode Properties-and-Columns翻译

    https orgmode org manual Properties and Columns html Properties and Columns 文章目录 属性属性语法特殊属性属性查询属性继承 Column View 列视图定义列列定
  • python利用ffmpeg进行rtmp推流直播

    思路 xff1a opencv读取视频 gt 将视频分割为帧 gt 将每一帧进行需求加工后 gt 将此帧写入pipe管道 gt 利用ffmpeg进行推流直播 pipe管道 xff1a 啥是pipe管道 xff1f 粗略的理解就是一个放共享文
  • 电脑环境PCL配置及VS2019环境配置

    VS2019配置pcl 1 12 0 前言 对于 3D 点云处理来说 xff0c PCL 完全是一个的模块化的现代 C 43 43 模板库 其基于以下第三方库 xff1a Boost Eigen FLANN VTK CUDA OpenNI
  • java算法--兔子繁殖问题

    java算法 兔子繁殖问题 题目 xff1a 古典问题 xff1a 有一对兔子 xff0c 从出生后第 3 个月起每个月都生一对兔子 xff0c 小兔子长到第四 个月后每个月又生一对兔子 xff0c 假如兔子都不死 xff0c 问每个月的兔
  • Linux配置Wifi模块

    linux终端无线网卡连接wifi xff1a 扫描可用连接wifi nmcli dev wifi 添加一个wifi的连接 nmcli dev wifi con 无线网络名称 password 无线网络密码 name 任意连接名称 xff0
  • Paho MQTT 嵌入式c客户端研究笔记

    最近做物联网设备 xff0c 需求长连接推送功能 当前物联网有一个标准协议是MQTT xff0c 对应有很多开源服务端 xff0c 如何快速接入这个服务呢 有两种接入方案 xff1a 1 自己clone 代码修改维护 2 找第三方服务 xf
  • Paho MQTT 嵌入式c客户端研究笔记 (二)

    paho mqtt embedded c master MQTTPacket samples xff0c 这个目录里面封装了发布消息 订阅消息的示例 运行pub0sub1 xff0c 这个示例里面会去订阅主题消息 发布主题消息 并且订阅和发