Paho MQTT 嵌入式c客户端研究笔记 (二)



#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include "MQTTPacket.h"
#include "transport.h"

/* This is in order to get an asynchronous signal to stop the sample,
as the code loops waiting for msgs on the subscribed topic.
Your actual code will depend on your hw and approach*/
#include <signal.h>

int toStop = 0;

void cfinish(int sig)
    signal(SIGINT, NULL);
    toStop = 1;

void stop_init(void)
    signal(SIGINT, cfinish);
    signal(SIGTERM, cfinish);
/* */

int main(int argc, char *argv[])
    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    int rc = 0;
    int mysock = 0;
    unsigned char buf[200];
    int buflen = sizeof(buf);
    int msgid = 1;
    MQTTString topicString = MQTTString_initializer;
    int req_qos = 0;
    char* payload = "mypayload";
    int payloadlen = strlen(payload);
    int len = 0;
    char *host = "";
    int port = 1883;

    if (argc > 1)
        host = argv[1];

    if (argc > 2)
        port = atoi(argv[2]);

    mysock = transport_open(host, port);
    if(mysock < 0)
        return mysock;

    printf("Sending to hostname %s port %d\n", host, port);

    data.clientID.cstring = "me";
    data.keepAliveInterval = 20;
    data.cleansession = 1;
    data.username.cstring = "testuser";
    data.password.cstring = "testpassword";
    len = MQTTSerialize_connect(buf, buflen, &data);
    rc = transport_sendPacketBuffer(mysock, buf, len);

    /* wait for connack */
    if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK)
        unsigned char sessionPresent, connack_rc;

        if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
            printf("Unable to connect, return code %d\n", connack_rc);
            goto exit;
        goto exit;

    /* subscribe 订阅主题消息*/
    topicString.cstring = "substopic";
    len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);

    rc = transport_sendPacketBuffer(mysock, buf, len);
    if (MQTTPacket_read(buf, buflen, transport_getdata) == SUBACK)  /* wait for suback */
        unsigned short submsgid;
        int subcount;
        int granted_qos;        
        rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
        if (granted_qos != 0)
            printf("granted qos != 0, %d\n", granted_qos);
            goto exit;
        goto exit;

    /* loop getting msgs on subscribed topic循环读取消息 */
    topicString.cstring = "pubtopic";
    while (!toStop)
        /* transport_getdata() has a built-in 1 second timeout,
        your mileage will vary */
        if (MQTTPacket_read(buf, buflen, transport_getdata) == PUBLISH)
            unsigned char dup;
            int qos;
            unsigned char retained;
            unsigned short msgid;
            int payloadlen_in;
            unsigned char* payload_in;
            int rc;
            MQTTString receivedTopic;

            rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
                    &payload_in, &payloadlen_in, buf, buflen);
            printf("message arrived %.*s\n", payloadlen_in, payload_in);

        printf("publishing reading\n");
        len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen);
        rc = transport_sendPacketBuffer(mysock, buf, len);

    len = MQTTSerialize_disconnect(buf, buflen);
    rc = transport_sendPacketBuffer(mysock, buf, len);


    return 0;


Keep Alive timer
The Keep Alive timer is present in the variable header of a MQTT CONNECT message.

The Keep Alive timer, measured in seconds, defines the maximum time interval between >messages received from a client. It enables the server to detect that the network connection to a client has dropped, without having to wait for the long TCP/IP timeout. The client has a responsibility to send a message within each Keep Alive time period. In the absence of a data-related message during the time period, the client sends a PINGREQ message, which the server acknowledges with a PINGRESP message.

If the server does not receive a message from the client within one and a half times the Keep Alive time period (the client is allowed “grace” of half a time period), it disconnects the client as if the client had sent a DISCONNECT message. This action does not impact any of the client’s subscriptions. See DISCONNECT for more details.

If a client does not receive a PINGRESP message within a Keep Alive time period after sending a PINGREQ, it should close the TCP/IP socket connection.

The Keep Alive timer is a 16-bit value that represents the number of seconds for the time period. The actual value is application-specific, but a typical value is a few minutes. The maximum value is approximately 18 hours. A value of zero (0) means the client is not disconnected.

The format of the Keep Alive timer is shown in the table below. The ordering of the 2 bytes of the Keep Alive Timer is MSB, then LSB (big-endian).


while (!toStop)

        topicString.cstring = "pubtopic";
        len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);

        rc = transport_sendPacketBuffer(mysock, buf, len);
        printf("publishing reading000\n");
        if (MQTTPacket_read(buf, buflen, transport_getdata) == SUBACK)  /* wait for suback */
            unsigned short submsgid;
            int subcount;
            int granted_qos;

            rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
            if (granted_qos != 0)
                printf("granted qos != 0, %d\n", granted_qos);
                goto exit;
            goto exit;
        /* transport_getdata() has a built-in 1 second timeout,
        your mileage will vary */
        int state = MQTTPacket_read(buf, buflen, transport_getdata);
        //printf("state is = %d\n", state);
        //printf("PUBLISH is = %d\n", PUBLISH);
        if (state != -1){
                    printf("state2 is = %d\n", state);
        if (state == PUBLISH)
            unsigned char dup;
            int qos;
            unsigned char retained;
            unsigned short msgid;
            int payloadlen_in;
            unsigned char* payload_in;
            int rc;
            MQTTString receivedTopic;

            rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
                    &payload_in, &payloadlen_in, buf, buflen);
            printf("message arrived %.*s\n", payloadlen_in, payload_in);            

        //printf("publishing reading\n");
        //len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)payload, payloadlen);
        //rc = transport_sendPacketBuffer(mysock, buf, len);



