学习日记——《MQTT-JX》例程讲解(完结版)

2023-10-27

头文件

#include "ets_sys.h"
#include "driver/uart.h"
#include "osapi.h"
#include "mqtt.h"
#include "wifi.h"
#include "config.h"
#include "debug.h"
#include "gpio.h"
#include "user_interface.h"
#include "mem.h"
#include "sntp.h"
  • 在写主程序之前需要把这些头文件都添加上,不然大家需要自己写一些结构体,定义什么的。相对麻烦。别人已经写好了,我们调用即可。

MQTT客户端结构体

全局变量

MQTT_Client mqttClient;			// MQTT客户端_结构体【此变量非常重要】
typedef struct
{
	struct espconn *pCon;				// TCP连接结构体指针
	uint8_t security;					// 安全类型
	uint8_t* host;						// 服务端域名/地址
	uint32_t port;						// 网络连接端口号
	ip_addr_t ip;						// 32位IP地址

	mqtt_state_t  mqtt_state;			// MQTT状态

	mqtt_connect_info_t connect_info;	// MQTT【CONNECT】报文的连接参数

	MqttCallback connectedCb;			// MQTT连接成功_回调
	MqttCallback disconnectedCb;		// MQTT断开连接_回调
	MqttCallback publishedCb;			// MQTT发布成功_回调
	MqttCallback timeoutCb;				// MQTT超时_回调
	MqttDataCallback dataCb;			// MQTT接收数据_回调

	ETSTimer mqttTimer;					// MQTT定时器

	uint32_t keepAliveTick;				// MQTT客户端(ESP8266)心跳计数
	uint32_t reconnectTick;				// 重连等待计时
	uint32_t sendTimeout;				// 报文发送超时时间

	tConnState connState;				// ESP8266运行状态

	QUEUE msgQueue;						// 消息队列

	void* user_data;					// 用户数据(预留给用户的指针)

} MQTT_Client;
  • 他是之后ESP8266需要使用的TCP连接,MQTT连接,MQTT收发报文等的所有参数的集合。MQTT结构体包含于mqtt.h文件中。

SNTP定时器结构体

静态变量

static ETSTimer sntp_timer;
typedef struct _ETSTIMER_ {
    struct _ETSTIMER_    *timer_next;
    uint32_t              timer_expire;
    uint32_t              timer_period;
    ETSTimerFunc         *timer_func;
    void                 *timer_arg;
} ETSTimer;
  • 这个是通过SNTP获取网络时间时使用的所有参数的集合。ETSTimer结构体包含于ets_sys.h文件中。

user_init函数

void user_init(void)
{
    uart_init(BIT_RATE_115200, BIT_RATE_115200);	// 串口波特率设为115200
    os_delay_us(60000);
    PIN_FUNC_SELECT(PERIPHS_IO_MUX_GPIO4_U,	FUNC_GPIO4);	// GPIO4输出高	#
	GPIO_OUTPUT_SET(GPIO_ID_PIN(4),1);						// LED初始化	#
    CFG_Load();	// 加载/更新系统参数【WIFI参数、MQTT参数】
    // 网络连接参数赋值:服务端域名【mqtt_test_jx.mqtt.iot.gz.baidubce.com】、网络连接端口【1883】、安全类型【0:NO_TLS】
	MQTT_InitConnection(&mqttClient, sysCfg.mqtt_host, sysCfg.mqtt_port, sysCfg.security);
	// MQTT连接参数赋值:客户端标识符【..】、MQTT用户名【..】、MQTT密钥【..】、保持连接时长【120s】、清除会话【1:clean_session】
	MQTT_InitClient(&mqttClient, sysCfg.device_id, sysCfg.mqtt_user, sysCfg.mqtt_pass, sysCfg.mqtt_keepalive, 1);

	// 设置遗嘱参数(如果云端没有对应的遗嘱主题,则MQTT连接会被拒绝)
//	MQTT_InitLWT(&mqttClient, "Will", "ESP8266_offline", 0, 0);
	// 设置MQTT相关函数
	MQTT_OnConnected(&mqttClient, mqttConnectedCb);			// 设置【MQTT成功连接】函数的另一种调用方式
	MQTT_OnDisconnected(&mqttClient, mqttDisconnectedCb);	// 设置【MQTT成功断开】函数的另一种调用方式
	MQTT_OnPublished(&mqttClient, mqttPublishedCb);			// 设置【MQTT成功发布】函数的另一种调用方式
	MQTT_OnData(&mqttClient, mqttDataCb);					// 设置【接收MQTT数据】函数的另一种调用方式
	// 连接WIFI:SSID[..]、PASSWORD[..]、WIFI连接成功函数[wifiConnectCb]
	WIFI_Connect(sysCfg.sta_ssid, sysCfg.sta_pwd, wifiConnectCb);
	INFO("\r\nSystem started ...\r\n");
}
  • 首先执行串口初始化函数,串口波特率设置为115200
uart_init(BIT_RATE_115200, BIT_RATE_115200);	// 串口波特率设为115200
os_delay_us(60000);
  • 然后调用CFG_Load();函数来加载或者是调用系统函数,根据持有人标志是否和之前一样来选择更新或者是不更新系统参数。此函数位于config.c文件中。

加载/更新系统参数

void ICACHE_FLASH_ATTR CFG_Load()
{
	INFO("\r\nload ...\r\n");
	// 读Flash【0x7C】扇区,存放到【saveFlag】(读出之前的持有人标识)
	spi_flash_read((CFG_LOCATION+3)*SPI_FLASH_SEC_SIZE,(uint32 *)&saveFlag, sizeof(SAVE_FLAG));
	//根据【参数扇区标志】,读取对应扇区的系统参数【0:系统参数在0x79扇区!0:系统参数在0x7A扇区】
	if (saveFlag.flag == 0)
	{
		spi_flash_read((CFG_LOCATION+0)*SPI_FLASH_SEC_SIZE,	(uint32 *)&sysCfg, sizeof(SYSCFG));		// 读出系统参数(1区:0x79)
	}
	else //saveFlag.flag != 0
	{
		spi_flash_read((CFG_LOCATION+1)*SPI_FLASH_SEC_SIZE,	(uint32 *)&sysCfg, sizeof(SYSCFG));		// 读出系统参数(2区:0x7A)
	}
	// 只有在【持有人标识和之前不同】的情况下,才会更新系统参数(修改系统参数时,一定要记得修改持有人标识的值)如果觉得每次都修改持有人标识的值比较麻烦,那么可以注释这条语句。那么ESP8266每次上电执行的时候都会将参数更新到flash当中。
	if(sysCfg.cfg_holder != CFG_HOLDER)		// 持有人标识不同
	{
		os_memset(&sysCfg, 0x00, sizeof sysCfg);	// 参数扇区=0

		sysCfg.cfg_holder = CFG_HOLDER;		// 更新持有人标识

		os_sprintf(sysCfg.device_id, MQTT_CLIENT_ID, system_get_chip_id());		// 【MQTT_CLIENT_ID】MQTT客户端标识符
		sysCfg.device_id[sizeof(sysCfg.device_id) - 1] = '\0';					// 最后添'\0'(防止字符串填满数组,指针溢出)
		os_strncpy(sysCfg.sta_ssid, STA_SSID, sizeof(sysCfg.sta_ssid)-1);		// 【STA_SSID】WIFI名称
		os_strncpy(sysCfg.sta_pwd, STA_PASS, sizeof(sysCfg.sta_pwd)-1);			// 【STA_PASS】WIFI密码
		sysCfg.sta_type = STA_TYPE;												// 【STA_TYPE】WIFI类型
		os_strncpy(sysCfg.mqtt_host, MQTT_HOST, sizeof(sysCfg.mqtt_host)-1);	// 【MQTT_HOST】MQTT服务端域名/IP地址
		sysCfg.mqtt_port = MQTT_PORT;											// 【MQTT_PORT】网络连接端口号
		os_strncpy(sysCfg.mqtt_user, MQTT_USER, sizeof(sysCfg.mqtt_user)-1);	// 【MQTT_USER】MQTT用户名
		os_strncpy(sysCfg.mqtt_pass, MQTT_PASS, sizeof(sysCfg.mqtt_pass)-1);	// 【MQTT_PASS】MQTT密码
		sysCfg.security = DEFAULT_SECURITY;		/* default non ssl */			// 【DEFAULT_SECURITY】默认安全等级(默认=0,不加密)
		sysCfg.mqtt_keepalive = MQTT_KEEPALIVE;		// 【MQTT_KEEPALIVE】保持连接时长(宏定义==120)
		INFO(" default configuration\r\n");
		CFG_Save();		// 将更新后的系统参数烧录到Flash中
	}
}
  • 然后调用MQTT_InitConnection函数来将网络连接参数赋值比如服务端域名、网络端口号、安全类型。

MQTT_InitConnection函数

void ICACHE_FLASH_ATTR MQTT_InitConnection(MQTT_Client *mqttClient, uint8_t* host, uint32_t port, uint8_t security)
{
    uint32_t temp;
    INFO("MQTT_InitConnection\r\n");
    os_memset(mqttClient, 0, sizeof(MQTT_Client));		// 【MQTT客户端】结构体 = 0
    temp = os_strlen(host);								// 服务端域名/IP的字符串长度
    mqttClient->host = (uint8_t*)os_zalloc(temp+1);		// 申请空间,存放服务端域名/IP地址字符串
    os_strcpy(mqttClient->host, host);					// 字符串拷贝
    mqttClient->host[temp] = 0;							// 最后'\0'
    mqttClient->port = port;							// 网络端口号 = 1883
    mqttClient->security = security;					// 安全类型 = 0 = NO_TLS
}
  • 之后调用MQT在这里插入代码片T_InitClient函数来将MQTT连接的参数赋值等等,客户端标识符赋值、MQTT用户名赋值、MQTT密码赋值、保持连接时长赋值、清除会话标志位赋值。
  • 设置入栈报文以及出栈报文的缓存区。定义了队列QUEUE_Init,并且对队列进行了初始化
  • 接下来创建了一个任务,MQTT例程就是基于任务来处理不同的状态下所要进行的操作。

MQTT_InitClient函数

MQTT_InitClient(MQTT_Client *mqttClient, uint8_t* client_id, uint8_t* client_user, uint8_t* client_pass, uint32_t keepAliveTime, uint8_t cleanSession)
{
    uint32_t temp;
    INFO("MQTT_InitClient\r\n");
    // MQTT【CONNECT】报文的连接参数 赋值
    os_memset(&mqttClient->connect_info, 0, sizeof(mqtt_connect_info_t));		// MQTT【CONNECT】报文的连接参数 = 0
    temp = os_strlen(client_id);
    mqttClient->connect_info.client_id = (uint8_t*)os_zalloc(temp + 1);			// 申请【客户端标识符】的存放内存
    os_strcpy(mqttClient->connect_info.client_id, client_id);					// 赋值【客户端标识符】
    mqttClient->connect_info.client_id[temp] = 0;								// 最后'\0'
    if (client_user)	// 判断是否有【MQTT用户名】
    {
        temp = os_strlen(client_user);
        mqttClient->connect_info.username = (uint8_t*)os_zalloc(temp + 1);
        os_strcpy(mqttClient->connect_info.username, client_user);				// 赋值【MQTT用户名】
        mqttClient->connect_info.username[temp] = 0;
    }
    if (client_pass)	// 判断是否有【MQTT密码】
    {
        temp = os_strlen(client_pass);
        mqttClient->connect_info.password = (uint8_t*)os_zalloc(temp + 1);
        os_strcpy(mqttClient->connect_info.password, client_pass);				// 赋值【MQTT密码】
        mqttClient->connect_info.password[temp] = 0;
    }
    mqttClient->connect_info.keepalive = keepAliveTime;							// 保持连接 = 120s
    mqttClient->connect_info.clean_session = cleanSession;						// 清除会话 = 1 = clean_session
    // 设置mqtt_state部分参数
    mqttClient->mqtt_state.in_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE);		// 申请in_buffer内存【入站报文缓存区】
    mqttClient->mqtt_state.in_buffer_length = MQTT_BUF_SIZE;					// 设置in_buffer大小
    mqttClient->mqtt_state.out_buffer = (uint8_t *)os_zalloc(MQTT_BUF_SIZE);	// 申请out_buffer内存【出站报文缓存区】
    mqttClient->mqtt_state.out_buffer_length = MQTT_BUF_SIZE;					// 设置out_buffer大小
    mqttClient->mqtt_state.connect_info = &(mqttClient->connect_info);			// MQTT【CONNECT】报文的连接参数(指针),赋值给mqttClient->mqtt_state.connect_info
    // 初始化MQTT出站报文缓存区
    mqtt_msg_init(&mqttClient->mqtt_state.mqtt_connection, mqttClient->mqtt_state.out_buffer, mqttClient->mqtt_state.out_buffer_length);
    QUEUE_Init(&mqttClient->msgQueue, QUEUE_BUFFER_SIZE);	// 消息队列初始化【队列可以存放一个/多个MQTT报文】
    // 创建任务:任务函数【MQTT_Task】、优先级【2】、任务指针【mqtt_procTaskQueue】、消息深度【1】
    system_os_task(MQTT_Task, MQTT_TASK_PRIO, mqtt_procTaskQueue, MQTT_TASK_QUEUE_SIZE);
    // 安排任务:参数1=任务等级 / 参数2=消息类型 / 参数3=消息参数
    system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)mqttClient);	// 参数3的类型必须为【os_param_t】型
}

设置遗嘱

  • 设置遗嘱也是在mqtt.c文件中进行的,具体操作有设置遗嘱主题、设置遗嘱消息、设置遗嘱是否保持、设置遗嘱质量等等。我们也可以不用遗嘱。
// 设置遗嘱:遗嘱主题【...】、遗嘱消息【...】、遗嘱质量【Will_Qos=0】、遗嘱保持【Will_Retain=0】
void ICACHE_FLASH_ATTR
MQTT_InitLWT(MQTT_Client *mqttClient, uint8_t* will_topic, uint8_t* will_msg, uint8_t will_qos, uint8_t will_retain)
{
    uint32_t temp;
    temp = os_strlen(will_topic);
    mqttClient->connect_info.will_topic = (uint8_t*)os_zalloc(temp + 1);		// 申请【遗嘱主题】的存放内存
    os_strcpy(mqttClient->connect_info.will_topic, will_topic);					// 赋值【遗嘱主题】
    mqttClient->connect_info.will_topic[temp] = 0;								// 最后'\0'
    temp = os_strlen(will_msg);
    mqttClient->connect_info.will_message = (uint8_t*)os_zalloc(temp + 1);
    os_strcpy(mqttClient->connect_info.will_message, will_msg);					// 赋值【遗嘱消息】
    mqttClient->connect_info.will_message[temp] = 0;
    mqttClient->connect_info.will_qos = will_qos;			// 遗嘱质量【Will_Qos=0】
    mqttClient->connect_info.will_retain = will_retain;		// 遗嘱保持【Will_Retain=0】
}
  • 之后还有设置MQTT的另一种调用方式。执行上面注释的语句就相当于执行user_init函数中的语句,例如:执行mqttClient->connectedCb(...)语句就相当于执行mqttConnectedCb(...)

函数调用重定义

// 执行 mqttClient->connectedCb(...) => mqttConnectedCb(...)
void ICACHE_FLASH_ATTR MQTT_OnConnected(MQTT_Client*mqttClient, MqttCallback connectedCb)
{
    mqttClient->connectedCb = connectedCb;	// 函数名【mqttConnectedCb】
}
// 执行 mqttClient->disconnectedCb(...) => mqttDisconnectedCb(...)
void ICACHE_FLASH_ATTR MQTT_OnDisconnected(MQTT_Client *mqttClient, MqttCallback disconnectedCb)
{
    mqttClient->disconnectedCb = disconnectedCb;	// 函数名【mqttDisconnectedCb】
}
// 执行 mqttClient->dataCb(...) => mqttDataCb(...)
void ICACHE_FLASH_ATTR MQTT_OnData(MQTT_Client *mqttClient, MqttDataCallback dataCb)
{
    mqttClient->dataCb = dataCb;	// 函数名【mqttDataCb】
}
// 执行 mqttClient->publishedCb(...) => mqttPublishedCb(...)
void ICACHE_FLASH_ATTR MQTT_OnPublished(MQTT_Client *mqttClient, MqttCallback publishedCb)
{
    mqttClient->publishedCb = publishedCb;	// 函数名【mqttPublishedCb】
}
// 执行 mqttClient->timeoutCb(...) => 【...】未定义函数
void ICACHE_FLASH_ATTR MQTT_OnTimeout(MQTT_Client *mqttClient, MqttCallback timeoutCb)
{
    mqttClient->timeoutCb = timeoutCb;
}
  • 之后调用WIFI_Connect函数来设置WIFI连接的参数,设置ESP8266设置为STA模式,设置STA参数、设置WIFI定时器,接入WIFI。

WIFI_Connect函数

void ICACHE_FLASH_ATTR WIFI_Connect(uint8_t* ssid, uint8_t* pass, WifiCallback cb)
{
	struct station_config stationConf;
	INFO("WIFI_INIT\r\n");
	wifi_set_opmode_current(STATION_MODE);		// 设置ESP8266为STA模式
	//wifi_station_set_auto_connect(FALSE);		// 上电不自动连接已记录的AP(已注释,即:上电自动连接已记录的AP(默认))
	wifiCb = cb;	// 函数名赋值:wifiCb可以作为函数名使用,wifiCb(..) == wifiConnectCb(..)
	// 设置STA参数
	os_memset(&stationConf, 0, sizeof(struct station_config));	// STA信息 = 0
	os_sprintf(stationConf.ssid, "%s", ssid);					// SSID赋值
	os_sprintf(stationConf.password, "%s", pass);				// 密码赋值
	wifi_station_set_config_current(&stationConf);	// 设置STA参数
	// 设置WiFiLinker定时器
	os_timer_disarm(&WiFiLinker);	// 定时器:WIFI连接
	os_timer_setfn(&WiFiLinker, (os_timer_func_t *)wifi_check_ip, NULL);	// wifi_check_ip:检查IP获取情况
	os_timer_arm(&WiFiLinker, 1000, 0);		// 1秒定时(1次)
	//wifi_station_set_auto_connect(TRUE);	// 上电自动连接已记录AP
	wifi_station_connect();		// ESP8266接入WIFI
}
  • 定时检查IP地址的获取情况,如果没有成功获取IP地址的话,会再次启用定时器。如果WIFI状态改变的话,程序将进入WIFI连接状态函数。下面是定时回调函数,位于WIFI.C文件中

定时函数

// 定时函数:检查IP获取情况
static void ICACHE_FLASH_ATTR wifi_check_ip(void *arg)
{
	struct ip_info ipConfig;
	os_timer_disarm(&WiFiLinker);	// 关闭WiFiLinker定时器
	wifi_get_ip_info(STATION_IF, &ipConfig);		// 获取IP地址
	wifiStatus = wifi_station_get_connect_status();	// 获取接入状态
	// 获取到IP地址
	if (wifiStatus == STATION_GOT_IP && ipConfig.ip.addr != 0)
	{
		// 获取IP后,每2秒检查一次WIFI连接的正确性【防止WIFI掉线等情况】
		os_timer_setfn(&WiFiLinker, (os_timer_func_t *)wifi_check_ip, NULL);
		os_timer_arm(&WiFiLinker, 2000, 0);
	}
	// 未获取到IP地址
	else
	{
		if(wifi_station_get_connect_status() == STATION_WRONG_PASSWORD)
		{
			INFO("STATION_WRONG_PASSWORD\r\n");	// 密码错误
			wifi_station_connect();
		}
		else if(wifi_station_get_connect_status() == STATION_NO_AP_FOUND)
		{
			INFO("STATION_NO_AP_FOUND\r\n");	// 未发现对应AP
			wifi_station_connect();
		}
		else if(wifi_station_get_connect_status() == STATION_CONNECT_FAIL)
		{
			INFO("STATION_CONNECT_FAIL\r\n");	// 连接失败
			wifi_station_connect();
		}
		else
		{
			INFO("STATION_IDLE\r\n");
		}
		// 再次开启定时器
		os_timer_setfn(&WiFiLinker, (os_timer_func_t *)wifi_check_ip, NULL);
		os_timer_arm(&WiFiLinker, 500, 0);		// 500Ms定时
	}
	// 如果WIFI状态改变,则调用[wifiConnectCb]函数
	if(wifiStatus != lastWifiStatus)
	{
		lastWifiStatus = wifiStatus;	// WIFI状态更新
		if(wifiCb)				// 判断是否设置了[wifiConnectCb]函数
		wifiCb(wifiStatus);		// wifiCb(wifiStatus)=wifiConnectCb(wifiStatus)
	}
}
  • 在定时回调函数当中,定时器实时监测IP地址的获取情况,如果没有成功获取IP地址的话,会再次启动定时器,如果WIFI状态改变的话,它将进入void wifiConnectCb()函数

wifiConnectCb函数

void wifiConnectCb(uint8_t status)
{
	// 成功获取到IP地址
	//---------------------------------------------------------------------
    if(status == STATION_GOT_IP)
    {
    	ip_addr_t * addr = (ip_addr_t *)os_zalloc(sizeof(ip_addr_t));

    	// 在官方例程的基础上,增加2个备用服务器
    	//---------------------------------------------------------------
    	sntp_setservername(0, "us.pool.ntp.org");	// 服务器_0【域名】
    	sntp_setservername(1, "ntp.sjtu.edu.cn");	// 服务器_1【域名】

    	ipaddr_aton("210.72.145.44", addr);	// 点分十进制 => 32位二进制
    	sntp_setserver(2, addr);					// 服务器_2【IP地址】
    	os_free(addr);								// 释放addr

    	sntp_init();	// SNTP初始化


        // 设置SNTP定时器[sntp_timer]
        //-----------------------------------------------------------
        os_timer_disarm(&sntp_timer);
        os_timer_setfn(&sntp_timer, (os_timer_func_t *)sntpfn, NULL);
        os_timer_arm(&sntp_timer, 1000, 1);		// 1s定时
    }

    // IP地址获取失败
	//----------------------------------------------------------------
    else
    {
          MQTT_Disconnect(&mqttClient);	// WIFI连接出错,TCP断开连接
    }
}
  • 在这个函数中,判读是否获取IP地址,如果成功获取的话,进行SNTP初始化,并设置SNTP定时函数。

STNP定时函数

目的:获取当前网络时间

void sntpfn()
{
    u32_t ts = 0;

    ts = sntp_get_current_timestamp();		// 获取当前的偏移时间

    os_printf("current time : %s\n", sntp_get_real_time(ts));	// 获取真实时间

    if (ts == 0)		// 网络时间获取失败
    {
        os_printf("did not get a valid time from sntp server\n");
    }
    else //(ts != 0)	// 网络时间获取成功
    {
            os_timer_disarm(&sntp_timer);	// 关闭SNTP定时器

            MQTT_Connect(&mqttClient);		// 开始MQTT连接
    }
}
  • 代码解释: 首先在程序的开始就定义了此类型u32_t为无符号长整形。
typedef unsigned long 		u32_t;
  • 注意:unsigned long int在C语言中是无符号长整形变量,是整形变量的一种。 unsigned long int 与unsigned
    long是等价的,即定义的时候int可以不写。在定时函数中,首先ts获取当前偏移时间。sntp_get_current_timestamp在我之前写的学习日记——ESP8266STNP博客中有详细的介绍。

https://blog.csdn.net/quanqueen/article/details/108045372

  • sntp_get_current_timestamp:功能:查询当前距离基准时间( 1970.01.01 00: 00: 00 GMT + 8)的时间戳,单位:秒。函数定义:uint32 sntp_get_current_timestamp()此定义在sntp.h文件中已经定义过了,所以我们只需将添加头文件即可使用。返回的是距离基准时间的时间戳。
  • sntp_get_real_time功能:查询实际时间 (GMT + 8)
    函数定义:char* sntp_get_real_time(long t)同上在sntp.h文件中已经定义过。参数:long t:与基准时间相距的时间戳。返回值为实际时间。此出的long t就是我们程序中的ts,通过ts = sntp_get_current_timestamp();我们将基准时间相距的时间戳赋值给了ts,之后直接输出真实的时间即可。 os_printf("current time : %s\n", sntp_get_real_time(ts));
  • 判断我们的ts是否有值,若没有值,则获取网络时间失败,输出did not get a valid time from sntp server。若有值,则获取网络时间成功,即可关闭SNTP,打开MQTT连接。
    此函数用于判读是否成功获取网络时间,如果成功获取的话进入void ICACHE_FLASH_ATTR MQTT_Connect()这个函数,进行MQTT连接准备。

MQTT连接准备函数

void ICACHE_FLASH_ATTR MQTT_Connect(MQTT_Client *mqttClient)
{
    //espconn_secure_set_size(0x01,6*1024);	 // SSL双向认证时才需使用	// try to modify memory size 6*1024 if ssl/tls handshake failed

	// 开始MQTT连接前,判断是否存在MQTT的TCP连接。如果有,则清除之前的TCP连接
    if (mqttClient->pCon)
    {
        // Clean up the old connection forcefully - using MQTT_Disconnect
        // does not actually release the old connection until the
        // disconnection callback is invoked.

        mqtt_tcpclient_delete(mqttClient);	// 删除TCP连接、释放pCon内存、清除TCP连接指针
    }
    // TCP连接设置
    mqttClient->pCon = (struct espconn *)os_zalloc(sizeof(struct espconn));	// 申请pCon内存
    mqttClient->pCon->type = ESPCONN_TCP;										// 设为TCP连接
    mqttClient->pCon->state = ESPCONN_NONE;
    mqttClient->pCon->proto.tcp = (esp_tcp *)os_zalloc(sizeof(esp_tcp));		// 申请esp_tcp内存
    mqttClient->pCon->proto.tcp->local_port = espconn_port();					// 获取ESP8266可用端口
    mqttClient->pCon->proto.tcp->remote_port = mqttClient->port;				// 设置端口号
    mqttClient->pCon->reverse = mqttClient;										// mqttClient->pCon->reverse 缓存 mqttClient指针
    espconn_regist_connectcb(mqttClient->pCon, mqtt_tcpclient_connect_cb);		// 注册TCP连接成功的回调函数
    espconn_regist_reconcb(mqttClient->pCon, mqtt_tcpclient_recon_cb);			// 注册TCP异常中断的回调函数
    mqttClient->keepAliveTick = 0;	// MQTT客户端(ESP8266)心跳计数
    mqttClient->reconnectTick = 0;	// 重连等待计时:当进入重连请求状态后,需等待5秒,之后进行重新连接
    // 设置MQTT定时(1秒)【功能:心跳计时、重连计时、TCP发送计时】
    os_timer_disarm(&mqttClient->mqttTimer);
    os_timer_setfn(&mqttClient->mqttTimer, (os_timer_func_t *)mqtt_timer, mqttClient);	// mqtt_timer
    os_timer_arm(&mqttClient->mqttTimer, 1000, 1);										// 1秒定时(重复)
    // 打印SSL配置:安全类型[NO_TLS == 0]
    os_printf("your ESP SSL/TLS configuration is %d.[0:NO_TLS\t1:TLS_WITHOUT_AUTHENTICATION\t2ONE_WAY_ANTHENTICATION\t3TWO_WAY_ANTHENTICATION]\n",DEFAULT_SECURITY);
    // 解析点分十进制形式的IP地址
    if (UTILS_StrToIP(mqttClient->host, &mqttClient->pCon->proto.tcp->remote_ip))	// 解析IP地址(点分十进制字符串形式)
    {
        INFO("TCP: Connect to ip  %s:%d\r\n", mqttClient->host, mqttClient->port);	// 打印IP地址
        // 根据安全类型,调用不同的TCP连接方式
        if (mqttClient->security)	// 安全类型 != 0
        {
#ifdef MQTT_SSL_ENABLE
            if(DEFAULT_SECURITY >= ONE_WAY_ANTHENTICATION )		// 单向认证【ONE_WAY_ANTHENTICATION = 2】
            {                 
            espconn_secure_ca_enable(ESPCONN_CLIENT,CA_CERT_FLASH_ADDRESS);
            }
            if(DEFAULT_SECURITY >= TWO_WAY_ANTHENTICATION)		// 双向认证【TWO_WAY_ANTHENTICATION = 3】
            {
            espconn_secure_cert_req_enable(ESPCONN_CLIENT,CLIENT_CERT_FLASH_ADDRESS);
            }
            espconn_secure_connect(mqttClient->pCon);			// 不认证【TLS_WITHOUT_AUTHENTICATION = 1】
#else
            INFO("TCP: Do not support SSL\r\n");
#endif
        }

        else	// 安全类型 = 0 = NO_TLS
        {
            espconn_connect(mqttClient->pCon);	// TCP连接(作为Client连接Server)
        }
    }

    // 解析域名
    else
    {
        INFO("TCP: Connect to domain %s:%d\r\n", mqttClient->host, mqttClient->port);

        espconn_gethostbyname(mqttClient->pCon, mqttClient->host, &mqttClient->ip, mqtt_dns_found);
    }

    mqttClient->connState = TCP_CONNECTING;		// TCP正在连接
}
  • 在这个函数中进行TCP连接设置,并且定义了TCP连接成功得回调函数,定义了TCP异常中断得回调函数,定义了1秒钟得重复定时,在void ICACHE_FLASH_ATTR MQTT_Connect()之后调用uint8_t ICACHE_FLASH_ATTR UTILS_StrToIP()这个函数。

ICACHE_FLASH_ATTR UTILS_StrToIP函数

uint8_t ICACHE_FLASH_ATTR UTILS_StrToIP(const int8_t* str, void *ip)
{
	    /* The count of the number of bytes processed. */
	    int i;
	    /* A pointer to the next digit to process. */
	    const char * start;
	    start = str;	// 字符串指针赋值
	    // IPV4
	    for (i = 0; i < 4; i++)		// 一共四个十进制字符串
	    {
	        /* The digit being processed. */
	        char c;
	        /* The value of this byte. */
	        int n = 0;
	        while (1)
	        {
	            c = * start;	// 字符串某字符赋值
	            start++;		// 从前往后(从左向右)
	            if (c >= '0' && c <= '9')	// 在“0~9”范围内
	            {
	                n *= 10;		// 权重
	                n += c - '0';	// 将'0'~'9'字符转换为对应的数字
	            }
	            /* We insist on stopping at "." if we are still parsing
	               the first, second, or third numbers. If we have reached
	               the end of the numbers, we will allow any character. */
	            else if ((i < 3 && c == '.') || i == 3)
	            {
	                break;		// 遇到'.'则解析下一十进制字符串
	            }
	            else
	            {
	                return 0;	// 解析失败
	            }
	        }
	        if (n >= 256)	// n过大,解析失败
	        {
	            return 0;
	        }
	        ((uint8_t*)ip)[i] = n;	// 赋值给IP数组
	    }
	    return 1;	// 解析成功,返回1
}
  • 这个函数来解析点分十进制形式得IP地址,如果解析失败得话,将进行域名解析

域名解析成功回调函数

LOCAL void ICACHE_FLASH_ATTR mqtt_dns_found(const char *name, ip_addr_t *ipaddr, void *arg)
{
    struct espconn *pConn = (struct espconn *)arg;		 // 获取TCP连接指针
    MQTT_Client* client = (MQTT_Client *)pConn->reverse; // 获取mqttClient指针
    if (ipaddr == NULL)		// 域名解析失败
    {
        INFO("DNS: Found, but got no ip, try to reconnect\r\n");
        client->connState = TCP_RECONNECT_REQ;	// TCP重连请求(等待5秒)
        return;
    }
    INFO("DNS: found ip %d.%d.%d.%d\n",		// 打印域名对应的IP地址
         *((uint8 *) &ipaddr->addr),
         *((uint8 *) &ipaddr->addr + 1),
         *((uint8 *) &ipaddr->addr + 2),
         *((uint8 *) &ipaddr->addr + 3));
    // 判断IP地址是否正确(?=0)
    if (client->ip.addr == 0 && ipaddr->addr != 0)	// 未保存IP地址:mqttClient->ip.addr == 0
    {
        os_memcpy(client->pCon->proto.tcp->remote_ip, &ipaddr->addr, 4);	// IP赋值
        // 根据安全类型,调用不同的TCP连接方式
        if (client->security)		// 安全类型 != 0
        {
#ifdef MQTT_SSL_ENABLE
            if(DEFAULT_SECURITY >= ONE_WAY_ANTHENTICATION )		// 单向认证【ONE_WAY_ANTHENTICATION = 2】
            {
            espconn_secure_ca_enable(ESPCONN_CLIENT,CA_CERT_FLASH_ADDRESS);
            }
            if(DEFAULT_SECURITY >= TWO_WAY_ANTHENTICATION)		// 双向认证【TWO_WAY_ANTHENTICATION = 3】
            {                espconn_secure_cert_req_enable(ESPCONN_CLIENT,CLIENT_CERT_FLASH_ADDRESS);
            }
            espconn_secure_connect(client->pCon);				// 不认证【TLS_WITHOUT_AUTHENTICATION = 1】
#else
            INFO("TCP: Do not support SSL\r\n");
#endif
        }
        else	// 安全类型 = 0 = NO_TLS
        {
            espconn_connect(client->pCon);		// TCP连接(作为Client连接Server)
        }
        client->connState = TCP_CONNECTING;		// TCP正在连接
        INFO("TCP: connecting...\r\n");
    }
    system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);		// 安排任务MQTT_Task
}
  • 在域名解析成功回调函数中,它根据安全类型得不同来选择不同得TCP连接方式,我们暂时不加密。所以它将调用
    espconn_connect函数来将ESP8266作为TCPclient连接到TCPServer。

TCP连接成功回调函数

void ICACHE_FLASH_ATTR mqtt_tcpclient_connect_cb(void *arg)
{
    struct espconn *pCon = (struct espconn *)arg;		 // 获取TCP连接指针
    MQTT_Client* client = (MQTT_Client *)(pCon->reverse);// 获取mqttClient指针
    // 注册回调函数
    espconn_regist_disconcb(client->pCon, mqtt_tcpclient_discon_cb);	// TCP断开成功_回调
    espconn_regist_recvcb(client->pCon, mqtt_tcpclient_recv);			// TCP接收成功_回调
    espconn_regist_sentcb(client->pCon, mqtt_tcpclient_sent_cb);		// TCP发送成功_回调
    INFO("MQTT: Connected to broker %s:%d\r\n", client->host, client->port);
    // 【CONNECT】报文发送准备
    // 初始化MQTT报文缓存区
	mqtt_msg_init(&client->mqtt_state.mqtt_connection, client->mqtt_state.out_buffer, client->mqtt_state.out_buffer_length);
	// 配置【CONNECT】控制报文,并获取【CONNECT】报文[指针]、[长度]
    client->mqtt_state.outbound_message = mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info);
    // 获取待发送的报文类型(此处是【CONNECT】报文)
    client->mqtt_state.pending_msg_type = mqtt_get_type(client->mqtt_state.outbound_message->data);
    // 获取待发送报文中的【报文标识符】(【CONNECT】报文中没有)
    client->mqtt_state.pending_msg_id = mqtt_get_id(client->mqtt_state.outbound_message->data,client->mqtt_state.outbound_message->length);
    // TCP发送成功/报文发送5秒计时结束 => 报文发送结束(sendTimeout=0)
    client->sendTimeout = MQTT_SEND_TIMOUT;	// 发送MQTT报文时,sendTimeout=5
    INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
    // TCP:发送【CONNECT】报文
    if (client->security)	// 安全类型 != 0
    {
#ifdef MQTT_SSL_ENABLE
        espconn_secure_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
#else
        INFO("TCP: Do not support SSL\r\n");
#endif
    }
    else	// 安全类型 = 0 = NO_TLS
    {
    	// TCP发送:数据=[client->mqtt_state.outbound_message->data]、长度=[client->mqtt_state.outbound_message->length]
        espconn_send(client->pCon, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length);
    }
    client->mqtt_state.outbound_message = NULL;		// 报文发送完后,清除出站报文指针
    client->connState = MQTT_CONNECT_SENDING;		// 状态设为:MQTT【CONNECT】报文发送中【MQTT_CONNECT_SENDING】
    system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);	// 安排任务MQTT_Task
}
  • 当TCP连接成功建立之后ESP822将作为MQTT客户端与MQTT服务端进行连接,那么ESP8266应该向mqtt服务端发送CONNECT控制报文,那么ESP8266首先需要配置CONNECT控制报文。

配置【CONNECT】控制报文

// mqtt_msg_connect(&client->mqtt_state.mqtt_connection, client->mqtt_state.connect_info)
mqtt_message_t* ICACHE_FLASH_ATTR mqtt_msg_connect(mqtt_connection_t* connection, mqtt_connect_info_t* info)
{
  struct mqtt_connect_variable_header* variable_header;		// 【CONNECT】报文的【可变报头】指针
  init_message(connection);		// 设置报文长度 = 3(暂时设为【固定报头】长度(3),之后添加【可变报头】、【有效载荷】)
  // 判断消息长度是否超过缓存区长度						// 【注:[message.length]是指TCP传输的整个MQTT报文长度】
  if(connection->message.length + sizeof(*variable_header) > connection->buffer_length)		// 判断MQTT报文长度
    return fail_message(connection);
  // 跳过了对【固定报头】的赋值,只为【固定报头】保留了3个字节的空间。	注:剩余长度最多占两字节。
  // 获取【可变报头】指针,并更新报文长度
  variable_header = (void*)(connection->buffer + connection->message.length);	// 【可变报头】指针 = 报文缓存区指针+3(固定报头)
  connection->message.length += sizeof(*variable_header);						// 报文长度 == 固定报头 + 可变报头
  // 协议名、协议级别赋值
  variable_header->lengthMsb = 0;	// lengthMsb
#if defined(PROTOCOL_NAMEv31)
  variable_header->lengthLsb = 6;	// lengthLsb
  memcpy(variable_header->magic, "MQIsdp", 6);
  variable_header->version = 3;		// v31版本 = 3
#elif defined(PROTOCOL_NAMEv311)
  variable_header->lengthLsb = 4;	// lengthLsb
  memcpy(variable_header->magic, "MQTT", 4);
  variable_header->version = 4;		// v311版本 = 4
#else
#error "Please define protocol name"
#endif
  variable_header->flags = 0;	// 连接标志字节 = 0(暂时清0,待会赋值)
  // 保持连接时长赋值
  variable_header->keepaliveMsb = info->keepalive >> 8;		// 赋值高字节
  variable_header->keepaliveLsb = info->keepalive & 0xff;	// 赋值低字节
  // clean_session = 1:客户端和服务端必须丢弃之前的任何会话并开始一个新的会话
  if(info->clean_session)
    variable_header->flags |= MQTT_CONNECT_FLAG_CLEAN_SESSION; //clean_session=1
  // 判断是否存在[client_id],存在则设置[client_id]字段
  if(info->client_id != NULL && info->client_id[0] != '\0')
  {
	// 将[client_id]字段添加到报文缓存区,报文长度+=[client_id]所占长度
	if(append_string(connection, info->client_id, strlen(info->client_id)) < 0)
	return fail_message(connection);
  }
  else
    return fail_message(connection);	// 报文出错
  // 判断是否存在[will_topic]
  if(info->will_topic != NULL && info->will_topic[0] != '\0')
  {
	// 将[will_topic]字段添加到报文缓存区,报文长度+=[will_topic]所占长度
	if(append_string(connection, info->will_topic,strlen(info->will_topic))<0)
    return fail_message(connection);
	// 将[will_message]字段添加到报文缓存区,报文长度+=[will_message]所占长度
	if(append_string(connection,info->will_message,strlen(info->will_message))<0)
    return fail_message(connection);
	// 设置【CONNECT】报文中的Will标志位:[Will Flag]、[Will QoS]、[Will Retain]
	variable_header->flags |= MQTT_CONNECT_FLAG_WILL;		// 遗嘱标志位 = 1
	if(info->will_retain)
    variable_header->flags |= MQTT_CONNECT_FLAG_WILL_RETAIN;// WILL_RETAIN = 1
	variable_header->flags |= (info->will_qos & 3) << 3;	// will质量赋值
  }
  • 这函数将CONNECT得固定报头部分,可变报头部分,有效载荷部分,依次写入报头缓存区。首先为CONNECT控制报文得固定报头预留了3个字节长度,因为剩余长度最多占用两个字节,虽然MQTT协议规定剩余长度最多是4个字节但是因为TCP发送数据包长度得限制,剩余长度不会大于两个字节。接下来是赋值可变报头部分。依次赋值协议名、协议级别。接下来是连接标志字节,暂时设为0,之后再按位赋值。接下啦是设置保持连接时长,接下来是设置清除会话标志位。之后是设置有效载荷部分得设备ID字段,这个设备ID是一个字符串,那么字符串前面就需要添加两个字节得前缀,我们调用append_string这个函数来将参数字符串添加到出栈报文缓存区。在字符串前添加两个字符串得前缀来表示这个字符串得长度,之后再返回参数字符串在MQTT控制报文中所占得长度。

append_string函数

static int ICACHE_FLASH_ATTR append_string(mqtt_connection_t* connection, const char* string, int len)
{
  if(connection->message.length + len + 2 > connection->buffer_length)	// 判断报文是否过长
  return -1;
  // 设置字符串前的两字节前缀,表示此字符串的长度
  connection->buffer[connection->message.length++] = len >> 8;		// 高八位
  connection->buffer[connection->message.length++] = len & 0xff;	// 低八位
  memcpy(connection->buffer+connection->message.length, string, len);	// 将[参数字符串]添加到报文缓存区
  connection->message.length += len;	// 报文长度 += [参数字符串]所占长度
  return len + 2;	// 返回[参数字符串]在MQTT控制报文中所占长度
}
  • 之后再判断遗嘱主题是否存在,如果遗嘱主题存在得话,依次将遗嘱主题,遗嘱消息字符串添加到出栈报文缓存区,并且将连接标志当中得主题标志主题质量,主题保留,这些标志位依次赋值。之后判断是否有用户名字段,如果有,将用户名字段添加前缀,写入出栈报文缓存区并且将连接标志字节中得用户名标志位置1
    。接下来是判断用户名密码字段是否存

在。最后,调用fini_message函数,这是CONNECT控制报的固定报头。

设置【MQTT控制报文】的固定报头函数

static mqtt_message_t* ICACHE_FLASH_ATTR fini_message(mqtt_connection_t* connection, int type, int dup, int qos, int retain)
{
  int remaining_length = connection->message.length - MQTT_MAX_FIXED_HEADER_SIZE;	// 获取【可变报头】+【有效载荷】长度
  // 设置固定报头(固定头中的剩余长度使用变长度编码方案,详情请参考MQTT协议手册)
  if(remaining_length > 127)	// 剩余长度占2字节
  {
    connection->buffer[0] = ((type&0x0f)<<4)|((dup&1)<<3)|((qos&3)<<1)|(retain&1);	// 固定头的首字节赋值
    connection->buffer[1] = 0x80 | (remaining_length % 128);	// 剩余长度的第一个字节
    connection->buffer[2] = remaining_length / 128;				// 剩余长度的第二个字节
    connection->message.length = remaining_length + 3;			// 报文的整个长度
    connection->message.data = connection->buffer;				// MQTT报文指针 -> 出站报文缓存区首地址
  }
  else	//if(remaining_length<= 127) // 剩余长度占1字节
  {
	// 			buffer[0] = 无
    connection->buffer[1] = ((type&0x0f)<<4)|((dup&1)<<3)|((qos&3)<<1)|(retain&1);	// 固定头的首字节赋值
    connection->buffer[2] = remaining_length;			// 固定头中的[剩余长度](可变报头+负载数据)
    connection->message.length = remaining_length + 2;	// 报文的整个长度
    connection->message.data = connection->buffer + 1;	// MQTT报文指针 -> 出站报文缓存区首地址+1
  }
  return &connection->message;		// 返回报文首地址【报文数据、报文整体长度】
}
  • 此函数中int type参数是报文类型int dup, int qos, int retain这三个参数是报文类型标志位。也就是固定报文的第一个字节的第四位,除了PUBLISH报文的报文类型标志位是由重复分发标志[dup][Bit3]、服务质量[qos][Bit2~1]、报文保留标志[retain][Bit1=0]组成。其余类型报文的报文类型标志位是固定的。
  • 在此之前,我们已经将可变报头部分,有效载荷部分写入了出栈报文缓存区。如果可变报头加有效载荷的长度大于127,那么固定报头就是三个字节。第一个字节就是固定报头的首字节,也就是报文类型+报文类型标志位,后面的两个字节是剩余长度字段。
  • 如果可变报头+有效载荷的长度小雨大于127,那么剩余长度是占一个字节,固定报头是两个字节。所以说,我们预留给固定报头中的第一个字节是没有用的,固定报头的首字节写入三个字节当中的第二个字节,剩余长度部分写入第三个字节。之后我们将出栈报文缓存区的首地址或者是首地址+1,这个是由剩余长度或者是固定报头的长度决定的。将我们的出栈报文赋值给我们的MQTT报文指针。
  • 我们已经配置了控制报文并且已经获取了控制报文的指针和长度,接下来我们使用TCP发送API来将我们CONNECT控制报文发送给MQTT服务端。报文发送完成后,清除MQTT报文指针。MQTT服务端发送CONNECT控制报文之后,MQTT服务端会像我们返回确认连接请求报文。放ESP8266成功接入网络数据后,会进入mqtt_tcpclient_recv网络回调函数,在回调函数中会判断当前接收到的MQTT报文是否是CONNACK请求报文。并且判断ESP8266是否是请求MATT连接状态。如果是,那么将ESP8266设置为MQTT_DATA状态,并且执行MQTT连接成功的函数。

MQTT连接成功函数

void mqttConnectedCb(uint32_t *args)
{
    MQTT_Client* client = (MQTT_Client*)args;	// 获取mqttClient指针
    INFO("MQTT: Connected\r\n");
    // 【参数2:主题过滤器 / 参数3:订阅Qos】
	MQTT_Subscribe(client, "SW_LED", 0);	// 订阅主题"SW_LED",QoS=0
//	MQTT_Subscribe(client, "SW_LED", 1);
//	MQTT_Subscribe(client, "SW_LED", 2);
	// 【参数2:主题名 / 参数3:发布消息的有效载荷 / 参数4:有效载荷长度 / 参数5:发布Qos / 参数6:Retain】
	MQTT_Publish(client, "SW_LED", "ESP8266_Online", strlen("ESP8266_Online"), 0, 0);	// 向主题"SW_LED"发布"ESP8266_Online",Qos=0、retain=0
//	MQTT_Publish(client, "SW_LED", "ESP8266_Online", strlen("ESP8266_Online"), 1, 0);
//	MQTT_Publish(client, "SW_LED", "ESP8266_Online", strlen("ESP8266_Online"), 2, 0);
}
  • 当ESP8266客户端成功连接到MQTT服务端后,就可以向服务端订阅主题,发布消息。
  • 注意:放我们订阅完主题之后,我们不是将订阅报文直接发送到MQTT服务端,而是将订阅报文接入到队列中。

队列

  • 另外开辟的长度为2048字节的一块内存,作用:缓存一个或多个ESP8266将要发送给MQTT服务端的报文,之后在任务函数中,再将这些报文依次发送给MQTT服务端。
  • 在报文头之前添加一个起始码0X7E,在报文尾添加结束码0X7F。0X7E与0X7F之间就是完整的MQTT报文。MQTT报文中本身包含有0X7E、0X7D、0X7F。就将这个数值与0X20异或,并且在它之前添加一个前缀码0X7D。
  • 一个数与同一个数异或两次就与之前的数一样。
  • 报文的解析,从起始码开始,一直解析,指导遇到结束码。然后这个报文就结束了。

在这里插入图片描述
在这里插入图片描述

  • 示例
I16 ICACHE_FLASH_ATTR PROTO_AddRb(RINGBUF *rb, const U8 *packet, I16 len)
{
    U16 i = 2;
    if(RINGBUF_Put(rb,0x7E)==-1) return -1;	// 向当前队列写指针指向处写入【起始码:0x7E】

    while (len--)	// 循环[len]次(报文所有字节)
    {
       switch (*packet)	// 获取当前数据包的一个字节
       {
       case 0x7D:		// 判断数据 ?= 【0x7D】/【0x7E】/【0x7F】
       case 0x7E:
       case 0x7F:
        	// 如果数据==[0x7D]||[0x7E]||[0x7F],都在此数据前写入[0x7D]【因为[0x7E]==起始码、[0x7F]==结束码】
        	if(RINGBUF_Put(rb, 0x7D) == -1) return -1;				// 在此数据前写入[0x7D]
        	if(RINGBUF_Put(rb, *packet++^0x20) == -1) return -1;	// 【0x7D/0x7E/0x7F】^=0x20,写入队列(注:a^b^b == a)
            i += 2;		// 写入队列的字节数+2
            break;
        // 数据包当前数据不是特殊码,则正常写入
       default:
    	   if(RINGBUF_Put(rb, *packet++) == -1) return -1;		// 写入数据包指针对应值
    	   i++;		// 写入队列的字节数+1
    	   break;
       }
	}
    if(RINGBUF_Put(rb, 0x7F) == -1) return -1;	// 向当前队列写指针指向处写入[结束码:0x7F]
    return i;	// 返回写入数量(包括起始码、结束码)
}
  • 首先向队列中写入0X7E起始码,之后有一个循环while,喜欢次数就是报文的中长度,意思是报文的每个字节一字不落得写入队列中,写入时需要判断当前字节是否是特殊码,然后是特殊码,需要在前面添加0X7D前缀码,将特殊码异或上0X20,如果不是特殊码就直接将数值写入队列。当报文都写入队列完成之后,最后要添加一个0X7F得结束码,并且返回一个写入数量。这个函数就执行完毕啦

订阅主题函数

BOOL ICACHE_FLASH_ATTR MQTT_Subscribe(MQTT_Client *client, char* topic, uint8_t qos)
{
    uint8_t dataBuffer[MQTT_BUF_SIZE];		// 解析后报文缓存(1204字节)
    uint16_t dataLen;						// 解析后报文长度
    // 配置【SUBSCRIBE】报文,并获取【SUBSCRIBE】报文[指针]、[长度]
    client->mqtt_state.outbound_message=mqtt_msg_subscribe(&client->mqtt_state.mqtt_connection,topic, qos,&client->mqtt_state.pending_msg_id);
    INFO("MQTT: queue subscribe, topic\"%s\", id: %d\r\n", topic, client->mqtt_state.pending_msg_id);
    // 将报文写入队列,并返回写入字节数(包括特殊码)
    while(QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1)
    {
        INFO("MQTT: Queue full\r\n");
        // 解析队列中的报文
        if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1)	// 解析失败 = -1
        {
            INFO("MQTT: Serious buffer error\r\n");
            return FALSE;
        }
    }
    system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);	// 安排任务MQTT_Task

    return TRUE;
}
  • 通过QUEUE_Puts函数将订阅主题主题报文写入队列当中,之后安排系统任务,订阅主题函数得参数2是主题过滤器,参数3是订阅质量。

发布消息函数

BOOL ICACHE_FLASH_ATTR MQTT_Publish(MQTT_Client *client, const char* topic, const char* data, int data_length, int qos, int retain)
{
    uint8_t dataBuffer[MQTT_BUF_SIZE];	// 解析后报文缓存(1204字节)
    uint16_t dataLen;					// 解析后报文长度

    // 配置【PUBLISH】报文,并获取【PUBLISH】报文[指针]、[长度]
    client->mqtt_state.outbound_message = mqtt_msg_publish(&client->mqtt_state.mqtt_connection,
                                          topic, data, data_length,
                                          qos, retain,
                                          &client->mqtt_state.pending_msg_id);

    if (client->mqtt_state.outbound_message->length == 0)	// 判断报文是否正确
    {
        INFO("MQTT: Queuing publish failed\r\n");
        return FALSE;
    }
    // 串口打印:【PUBLISH】报文长度,(队列装填数量/队列大小)
    INFO("MQTT: queuing publish, length: %d, queue size(%d/%d)\r\n", client->mqtt_state.outbound_message->length, client->msgQueue.rb.fill_cnt, client->msgQueue.rb.size);
    // 将报文写入队列,并返回写入字节数(包括特殊码)

    while (QUEUE_Puts(&client->msgQueue, client->mqtt_state.outbound_message->data, client->mqtt_state.outbound_message->length) == -1)
    {
        INFO("MQTT: Queue full\r\n");	// 队列已满
        // 解析队列中的数据包

        if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == -1)	// 解析失败 = -1
        {
            INFO("MQTT: Serious buffer error\r\n");
            return FALSE;
        }
    }
    system_os_post(MQTT_TASK_PRIO, 0, (os_param_t)client);	// 安排任务

    return TRUE;
}
  • 参数2是主题名指针,参数3 是发布消息得有效载荷指针,参数4是有效载荷得长度,参数5是发布消息得质量,参数6是是否保留消息,首先,我们需要配置发送消息报文,之后将报文写入到队列当中,向系统安排任务,这样我们就向队列当中成功得写入了订阅主题报文和发布消息报文。

MQTT任务函数

// MQTT任务函数【任务:根据ESP8266运行状态,执行相应操作】
// TCP_RECONNECT_REQ			TCP重连请求(等待5秒)	退出Tsak(5秒后,进入TCP_RECONNECT状态)
// TCP_RECONNECT				TCP重新连接				执行MQTT连接准备,并设置ESP8266状态
// MQTT_DELETING				MQTT正在删除			TCP断开连接
// TCP_DISCONNECTING			TCP正在断开
// TCP_RECONNECT_DISCONNECTING	TCP暂时断开(断开后会重连)
// TCP_DISCONNECTED				TCP成功断开				删除TCP连接,并释放pCon内存
// MQTT_DELETED					MQTT已删除				删除MQTT客户端,并释放相关内存
// MQTT_KEEPALIVE_SEND			MQTT心跳				向服务器发送心跳报文
// MQTT_DATA					MQTT数据传输			TCP发送队列中的报文
void ICACHE_FLASH_ATTR MQTT_Task(os_event_t *e)	// 不判断消息类型
{
	INFO("\r\n------------- MQTT_Task -------------\r\n");
    MQTT_Client* client = (MQTT_Client*)e->par;		// 【e->par】 == 【mqttClient指针的值】,所以需类型转换
    uint8_t dataBuffer[MQTT_BUF_SIZE];	// 数据缓存区(1204字节)
    uint16_t dataLen;					// 数据长度
    if (e->par == 0)		// 没有mqttClient指针,错误
    return;
    // 根据ESP8266运行状态,执行相应操作
    switch (client->connState)
    {
    	// TCP重连请求(等待5秒),退出Tsak
    	case TCP_RECONNECT_REQ:		break;
    	// TCP重新连接:执行MQTT连接准备,并设置ESP8266状态
    	case TCP_RECONNECT:
    		mqtt_tcpclient_delete(client);	// 删除TCP连接、释放pCon内存、清除TCP连接指针
    		MQTT_Connect(client);			// MQTT连接准备:TCP连接、域名解析等
    		INFO("TCP: Reconnect to: %s:%d\r\n", client->host, client->port);
    		client->connState = TCP_CONNECTING;		// TCP正在连接
    		break;
    	// MQTT正在删除、TCP正在断开、【心跳请求】报文发送失败:TCP断开连接
    	case MQTT_DELETING:
    	case TCP_DISCONNECTING:
    	case TCP_RECONNECT_DISCONNECTING:
    		if (client->security)	// 安全类型 != 0
    		{
#ifdef MQTT_SSL_ENABLE
    			espconn_secure_disconnect(client->pCon);
#else
    			INFO("TCP: Do not support SSL\r\n");
#endif
    		}
    		else 	// 安全类型 = 0 = NO_TLS
    		{
    			espconn_disconnect(client->pCon);	// TCP断开连接
    		}
    		break;
    	// TCP成功断开
    	case TCP_DISCONNECTED:
    		INFO("MQTT: Disconnected\r\n");
    		mqtt_tcpclient_delete(client);	// 删除TCP连接、释放pCon内存、清除TCP连接指针
    		break;
    	// MQTT已删除:ESP8266的状态为[MQTT已删除]后,将MQTT相关内存释放
    	case MQTT_DELETED:
    		INFO("MQTT: Deleted client\r\n");
    		mqtt_client_delete(client);		// 删除MQTT客户端,并释放相关内存
    		break;
    	// MQTT客户端存活报告
    	case MQTT_KEEPALIVE_SEND:
    		mqtt_send_keepalive(client);	// 向MQTT服务器发送【心跳】报文
    		break;
    	// MQTT传输数据状态
    	case MQTT_DATA:
    		if (QUEUE_IsEmpty(&client->msgQueue) || client->sendTimeout != 0)
    		{
    			break;	// 【队列为空 || 发送未结束】,不执行操作
    		}
    		// 【队列非空 && 发送结束】:解析并发送 队列中的报文
    		if (QUEUE_Gets(&client->msgQueue, dataBuffer, &dataLen, MQTT_BUF_SIZE) == 0)	// 解析成功 = 0
    		{
				client->mqtt_state.pending_msg_type = mqtt_get_type(dataBuffer);		// 获取报文中的【报文类型】
				client->mqtt_state.pending_msg_id = mqtt_get_id(dataBuffer, dataLen);	// 获取报文中的【报文标识符】
				client->sendTimeout = MQTT_SEND_TIMOUT;	// 发送MQTT报文时,sendTimeout=5
				INFO("MQTT: Sending, type: %d, id: %04X\r\n", client->mqtt_state.pending_msg_type, client->mqtt_state.pending_msg_id);
				// 发送报文
				if (client->security)	// 安全类型 != 0
				{
#ifdef MQTT_SSL_ENABLE
					espconn_secure_send(client->pCon, dataBuffer, dataLen);
#else
					INFO("TCP: Do not support SSL\r\n");
#endif
				}
				else	// 安全类型 = 0 = NO_TLS
				{
					espconn_send(client->pCon, dataBuffer, dataLen);	// TCP发送数据包
				}
				client->mqtt_state.outbound_message = NULL;		// 报文发送完后,清除出站报文指针
				break;
    		}
        break;
    }
}	// 函数【MQTT_Task】结束
  • 在任务函数中会根据ESP8266得运行状态来执行相应得操作,我们得ESP8266已经和MQTT服务端建立了MQTT连接,所以说它是MQTT传输数据状态,首先调用QUEUE_IsEmpty函数来判断队列是否为空,如果队列不为空,则调用QUEUE_Gets来解析一个完整得报文,
  • 循环解析,直到遇到0X7F,结束解析(即使有许多未解析的报文,但是它依然解析一个)
  • 之后调用TCP发送接口来将解析后的报文发送给MQTT服务端。
  • 当MQTT服务端接收到ESP8266向他发送的MQTT控制报文时,他会根据控制报文做出相应的应答。

保持连接时长

  • ESP8266需要在保持连接时长内向MQTT发起通信,否则MQTT服务端会断开与ESP8266的网络连接。
  • 在定时回调函数中,ESP8266一直在保持着心跳计数,每秒钟加一,如果心跳计数大于我们设置的保持连接时长的二分之一,我们就发送心跳报文。
    这个理论的知识非常繁琐复杂,还需要理论结合实践。
    以下是源代码,大家可以研究,互相学习。
    链接:https://pan.baidu.com/s/17tyYa_pqV6kFlCOH2kcjww
    提取码:g5kk

实验现象

在这里插入图片描述

  • 打开串口,复位ESP8266,串口调试助手就获得到了MQTT服务器的域名,成功订阅主题并发布上线消息。同时,在MQTT-FX也成功的接收到了SW_LED主题的载荷消息。
  • 注意在打开ESP8266之前,复位之前,要先打开MQTT-FX,连接成功之后在复位ESP8266。
    在这里插入图片描述
  • 通过MQTT-FX服务端,向SW_LED主题发布LED_ON消息,LED被点亮。
    在这里插入图片描述
    由此可以说明,ESP8266与百度云连接成功。

参考连接

https://www.bilibili.com/video/BV1dJ411S723?p=53
http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

学习日记——《MQTT-JX》例程讲解(完结版) 的相关文章

  • WebView的一些问题分析

    1 性能问题 打开速度比原生慢 对于一个普通用户来讲 打开一个WebView通常会经理一下几个阶段 发出请求 gt 到达新的页面 页面白屏 gt 页面基本框架出现 但是没有数据 gt 页面处于loading状态 gt 出现数据 如果从程序上
  • ElasticSearch6.x +logstash6.x +MySQL8 MySQL8 数据同步,字母大小写问题

    ElasticSearch6 x logstash6 x同步MySQL8数据的时候 sql里面含有的大写字母 到了ElasticSearch6 x的时候就会变成小写 这是因为在jdbc conf里面没有添加 lowercase column
  • 黑客是这样的炼成的

    黑客的态度 黑客们解决问题 建设事物 信仰自由和双向的帮助 人人为我 我为人人 要想被认为是一名黑客 你的行为必须显示出你已经具备了这种态度 要想做的好象你具备这种态度 你就不得不真的具备这种态度 但是如果你想靠培养黑客态度在黑客文化中得到
  • Qt安卓工程报错:No rule to make target

    Qt编译工程报错 No rule to make target 网上查到的解决方案是这样的 第一种情况 Qt编译工程时候 所有用到的源文件包括头文件和库文件的 总路径长度不能超过190个左右字符 一旦超过 就会提示找不到那个文件 这个可能是
  • 新一代的网络请求库 Httpx

    点击上方 Python学习开发 选择 加为星标 第一时间关注Python技术干货 简介 HTTPX 是最近 GitHub看的到一个比较火的一个项目 根据官网的描述 总结有如下特点 和使用 requests 一样方便 requests 有的它
  • 排序算法(一)冒泡排序,简单选择排序,直接插入排序,希尔排序

    冒泡排序 简单选择排序 直接插入排序是三种复杂度为O n2 的算法 希尔排序在特殊增量序列的时候可以获得复杂度为O n3 2 冒泡排序 1 最简单的排序实现 这里把每个数和这个数之后的每个数比较 大于就交换位置 缺点 多出了很多次没有用的交
  • ros2 bag play

    optional arguments h help show this help message and exit s sqlite3 my read only test plugin my test plugin storage sqli
  • Idea intellij 如何创建多个Maven 模块进行协作?

    第一 根工程 先选择新建一个maven工程 不打勾Create from archetype 直接选择next 填写总的工程名字 这样就可以得到如下的项目 删除src下面的文件 比如现在的项目结构要做成如下形式 LS web admin c
  • 2020年12月 C/C++(二级)真题解析#中国电子学会#全国青少年软件编程等级考试

    C C 编程 1 8级 全部真题 点这里 第1题 数组指定部分逆序重放 将一个数组中的前k项按逆序重新存放 例如 将数组8 6 5 4 1前3项逆序重放得到5 6 8 4 1 时间限制 1000 内存限制 65536 输入 输入为两行 第一
  • 国产免费虚拟化OVM与 OpenStack对比

    OpenStack作为一款全球化的开源软件 需要丰富而强大的技术团队进行深度开发与维护 OVM作为国产免费的虚拟化软件 有开箱即用的优势 不需要二次投入太多成本 下面对两个产品的深度对比 OVM是开箱即用的一站式解决方案 OpenStack

随机推荐

  • Gitee码云如何邀请合作者加入

    问题 在创建了项目之后 想邀请别人加入 始终找不到邀请的入口 解决方案 1 选中项目 2 点击管理 3 项目成员管理 开发者 添加项目成员 邀请用户 4 会有三种不同的邀请方式 随意邀请了
  • 蓝桥杯乘积尾0(分析)

    1 问题描述 如下的10行数据 每行有10个整数 请你求出它们的乘积的末尾有多少个零 5650 4542 3554 473 946 4114 3871 9073 90 4329 2758 7949 6113 5659 5245 7432 3
  • Error:java: 错误: 不支持发行版本 5 解决方法(详细)

    使用配置 编译器 idea JDK jdk 13 注意 文章篇幅有点长 若省时间可直接看方法二或者方法三 Error java 错误 不支持发行版本 5 出现原因 本地配置jdk和idea默认的jdk不匹配 方法一 File gt Proj
  • vue脚手架vue-cli的卸载和安装

    若电脑之前已经安装过vue cli了 但是版本过低 比方说当前vue cli的版本为2 9 6 然后我想升级到vue cli的最新版本4 0 5 则需要将旧版本卸载 然后再重新安装 vue cli vue cli vue3 0之前版本使用此
  • PHP对接口执行效率慢的优化

    PHP对接口执行效率慢的优化 造成执行效率低的原因可以由很多方面找原因 从代码层面 代码质量低 执行效率也会有很大影响的 从硬件方面 服务器配置低 服务器配置是基础 这个跑不动肯定慢 从数据量方面 查询数据量过多 sql语句过于繁杂 执行缓
  • 安装yarn

    Install via npm It is recommended to install Yarn through the npm package manager which comes bundled with Node js when
  • c字符串函数sprintf()和snprintf()详解

    sprintf 是个变参函数 定义格式如下 int sprintf char buffer const char format argument 精华显然在于第二个字符串 格式化字符串 1 格式化数字字符串 sprintf最常见的应用之一莫
  • React中如何使用refs

    ref是React中的一种属性 当render函数返回某个组件的实例时 可以给render中的某个虚拟DOM节点添加一个ref属性 如下面的代码所示 html view plain copy print
  • Node学习1

    Node 加载模块 加载内置模块和第三方模块直接require 名字 自定义模块需要加路径 require 加载模块时候会自 动调用被加载模块代码 require永远以module export所指向的对象为准 模块作用域 和函数作用域类似
  • 解决VScode使用git报错:Git: Bad status code: 500

    VS CODE GIT 500 问题处理 pudn com相关错误的处理链接博客 作为记录
  • C++ STL各标准容器使用手册

    原文 http blog csdn net nohackcc article details 8900017 1 vector 内部实现 数组 就是没有固定大小的数组 vector直接翻译是向量的意思 支持操作 begin 取首个元素 返回
  • 第10课 微信小程序数据存储(同步缓存、异步缓存,本地读取缓存):

    第10课 微信小程序数据存储 同 异步缓存 本地读取缓存 同步缓存 wx setStorageSync key value 异步缓存 wx setStorage Object object 同步删除缓存 wx removeStorageSy
  • java开发:java多线程(三):lock方式实现线程同步机制

    java多线程系列文章 java多线程 一 synchronized 对象锁和类锁的区别 java多线程 二 wait sleep join 和yield 区别 这章博客讲解lock如何实现同步机制 比较和synchronized 的区别
  • unity项目过程中本菜新遇到的问题和解决方案

    试出了奇怪的效果 还挺好看 canvas background text 在unity中打开的vs编辑器中没有代码提示 流星曳尾 博客园 这里我一开始不知道为啥text显示不出来 调成screen size才发现 是canvas方向反了 调
  • C语言-01

    以下内容为个人笔记 无实际参考意义 取地址符 int a 9 int pa 9 定义了一个今天类型的变量a 给他的值为9 定义了一个int类型的指针pa 指向变量a的地址 指定a输出的结果为9 指定 a输出的结果为存放a变量的地址 指定pa
  • linux如何做到不丢日志,rsyslogd日志丢失的解决

    最近发现跑keepalived的几台机器的日志总是打印不完 还好给抛了一个报错 信息如下 root yw lvs2 backup etc tail n 1000000 var log messages 20130526 grep rate
  • c语言中如何实现生成随机数

    文章目录 一 rand 函数 二 rand srand 三 rand sranf time 一 rand 函数 c语言中自带的生成随机数的函数rand 只要引用头文件 include
  • Apache Kafka Connect JNDI注入漏洞 (CVE-2023-25194) 安全风险通告

    https mp weixin qq com s biz MzU5NDgxODU1MQ mid 2247497666 idx 1 sn b58717baf54fe52ec517b89fe370f589 chksm fe79d35ac90e5
  • 自动化测试用例要怎么写,据说这是最全的......

    前言 自动化测试是使用专门的软件工具来验证软件解决方案 这通常涉及自动化功能作为测试过程的一部分 测试自动化最常见的对象是 测试管理和缺陷管理 单元和单元集成测试 功能测试 回归测试 非功能测试 如性能和可扩展性 自动化测试用例的编写是实现
  • 学习日记——《MQTT-JX》例程讲解(完结版)

    头文件 include ets sys h include driver uart h include osapi h include mqtt h include wifi h include config h include debug