Springboot+Netty搭建MQTT协议的服务端(基础Demo)

2023-05-16

Netty是业界最流行的nio框架之一,结合springboot可以满足快速开发

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上的。MQTT协议的可以用在物联网、小型设备、还有移动应用上。

Netty也可以实现MQTT协议,他的内部封装了MQTT协议的相关对象。

Springboot+Netty搭建MQTT协议的服务端基础Demo代码案例

使用Netty+SpringBoot方式可以快速地开发一套基于MQTT协议(主要是MQTT3.1和MQTT3.1.1)的服务端程序

SpringBoot+Netty创建,pom.xml文件导入依赖包

<?xml version="1.0"?>
<project
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
	xmlns="http://maven.apache.org/POM/4.0.0"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.5.RELEASE</version>
		<relativePath />
	</parent>

	<groupId>boot.base.mqtt.server</groupId>
	<artifactId>boot-example-base-mqtt-server-2.0.5</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>boot-example-base-mqtt-server-2.0.5</name>
	<url>http://maven.apache.org</url>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<!-- 打包成一个可执行jar -->
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<executions>
					<execution>
						<goals>
							<goal>repackage</goal>
						</goals>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
</project>

 Springboot启动类,直接在main里面启动netty的MQTT服务(也包含web应用的)

package boot.example.mqtt.server;

import boot.example.mqtt.server.netty.BootNettyMqttServerThread;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import boot.example.mqtt.server.netty.BootNettyMqttServer;


/**
 * 蚂蚁舞
 */
@SpringBootApplication
public class BootNettyMqttApplication implements CommandLineRunner {
    public static void main( String[] args ) {
		SpringApplication app = new SpringApplication(BootNettyMqttApplication.class);
		app.run(args);
    }

    @Override
    public void run(String... args) throws Exception {
        // 启动  1883
        int port = 1883;
        BootNettyMqttServerThread bootNettyMqttServerThread = new BootNettyMqttServerThread(port);
        bootNettyMqttServerThread.start();
    }
}

Netty的MQTT启动类

package boot.example.mqtt.server.netty;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * 蚂蚁舞
 */
public class BootNettyMqttServer {

	private NioEventLoopGroup bossGroup;

	private NioEventLoopGroup workGroup;

	/**
	 * 	启动服务
	 */
	public void startup(int port) {

		try {
			bossGroup = new NioEventLoopGroup(1);
			workGroup = new NioEventLoopGroup();

			ServerBootstrap bootstrap = new ServerBootstrap();
			bootstrap.group(bossGroup, workGroup);
			bootstrap.channel(NioServerSocketChannel.class);

			bootstrap.option(ChannelOption.SO_REUSEADDR, true)
					.option(ChannelOption.SO_BACKLOG, 1024)
					.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
					.option(ChannelOption.SO_RCVBUF, 10485760);

			bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
					.childOption(ChannelOption.SO_KEEPALIVE, true)
					.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

			bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
						protected void initChannel(SocketChannel ch) {
							ChannelPipeline channelPipeline = ch.pipeline();
							// 设置读写空闲超时时间
							channelPipeline.addLast(new IdleStateHandler(600, 600, 1200));
							channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
							channelPipeline.addLast("decoder", new MqttDecoder());
							channelPipeline.addLast(new BootNettyMqttChannelInboundHandler());
						}
					});
			ChannelFuture f = bootstrap.bind(port).sync();
			if(f.isSuccess()){
				System.out.println("startup success port = " + port);
				f.channel().closeFuture().sync();
			} else {
				System.out.println("startup fail port = " + port);
			}

			
//			//	绑定端口,监听服务是否启动
//			bootstrap.bind(port).addListener((ChannelFutureListener) channelFuture -> {
//				if (channelFuture.isSuccess()) {
//					System.out.println("startup success --- ");
//				} else {
//					System.out.println("startup fail ---");
//				}
//			});
		} catch (Exception e) {
			System.out.println("start exception"+e.toString());
		}

	}

	/**
	 * 	关闭服务
	 */
	public void shutdown() throws InterruptedException {
		if (workGroup != null && bossGroup != null) {
			bossGroup.shutdownGracefully().sync();
			workGroup.shutdownGracefully().sync();
			System.out.println("shutdown success");
		}
	}

}

MQTT服务端I/O数据读写处理类

package boot.example.mqtt.server.netty;


import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.*;
import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 蚂蚁舞
 */
@ChannelHandler.Sharable
public class BootNettyMqttChannelInboundHandler extends ChannelInboundHandlerAdapter {

	private final Logger log =  LoggerFactory.getLogger(this.getClass());
	


    /**
     * 	从客户端收到新的数据时,这个方法会在收到消息时被调用
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
    	if (null != msg) {
            MqttMessage mqttMessage = (MqttMessage) msg;
            log.info("info--"+mqttMessage.toString());
            MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
            Channel channel = ctx.channel();

            if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){
            	//	在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
            	//	to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
            	BootNettyMqttMsgBack.connack(channel, mqttMessage);
            }

            switch (mqttFixedHeader.messageType()){
                case PUBLISH:		//	客户端发布消息
                	//	PUBACK报文是对QoS 1等级的PUBLISH报文的响应
                	System.out.println("123");
                	BootNettyMqttMsgBack.puback(channel, mqttMessage);
                    break;
                case PUBREL:		//	发布释放
                	//	PUBREL报文是对PUBREC报文的响应
                	//	to do
                	BootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
                    break;
                case SUBSCRIBE:		//	客户端订阅主题
                	//	客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。
                	//	为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。
                	//	SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
                	// 	to do
                	BootNettyMqttMsgBack.suback(channel, mqttMessage);
                    break;
                case UNSUBSCRIBE:	//	客户端取消订阅
                	//	客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题
                	//	to do
                	BootNettyMqttMsgBack.unsuback(channel, mqttMessage);
                    break;
                case PINGREQ:		//	客户端发起心跳
                	//	客户端发送PINGREQ报文给服务端的
                	//	在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着
                	//	请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开
                	BootNettyMqttMsgBack.pingresp(channel, mqttMessage);
                    break;
                case DISCONNECT:	//	客户端主动断开连接
                	//	DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0
                	//	to do
                    break;
                default:
                    break;
            }
    	}
    }

    /**
     * 	从客户端收到新的数据、读取完成时调用
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
    }

    /**
     * 	客户端与服务端第一次建立连接时执行 在channelActive方法之前执行
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
    }

    /**
     * 	客户端与服务端 断连时执行 channelInactive方法之后执行
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        super.channelUnregistered(ctx);
    }

    /**
     * 	当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);  
        ctx.close();
    }

    /**
     * 	客户端与服务端第一次建立连接时执行
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }

    /**
     * 	客户端与服务端 断连时执行
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
        super.channelInactive(ctx);
    }
    
    /**
     * 	服务端 当读超时时 会调用这个方法
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
        super.userEventTriggered(ctx, evt);
        ctx.close();
    }

    
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
    }

}

对MQTT客户端发送消息后,处理的返回消息,基于MQTT协议的,需要MQTT协议的主要内容

package boot.example.mqtt.server.netty;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttConnAckMessage;
import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;

/**
 * 蚂蚁舞
 */
public class BootNettyMqttMsgBack {

	private static final Logger log =  LoggerFactory.getLogger(BootNettyMqttMsgBack.class);
	
	/**
	 * 	确认连接请求
	 * @param channel
	 * @param mqttMessage
	 */
	public static void connack (Channel channel, MqttMessage mqttMessage) {
		MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
		MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
		MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
		
		//	构建返回报文, 可变报头
		MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
		//	构建返回报文, 固定报头
		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
		//	构建CONNACK消息体
		MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
		log.info("back--"+connAck.toString());
		channel.writeAndFlush(connAck);
	}
	
	/**
	 * 	根据qos发布确认
	 * @param channel
	 * @param mqttMessage
	 */
	public static void puback (Channel channel, MqttMessage mqttMessage) {
		MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
		MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
		MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();
        byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
        mqttPublishMessage.payload().readBytes(headBytes);
        String data = new String(headBytes);
        System.out.println("publish data--"+data);

        switch (qos) {
	        case AT_MOST_ONCE: 		//	至多一次
	            break;
	        case AT_LEAST_ONCE:		//	至少一次
	    		//	构建返回报文, 可变报头
	    		MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
	    		//	构建返回报文, 固定报头
	    		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
	    		//	构建PUBACK消息体
	    		MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
	    		log.info("back--"+pubAck.toString());
	    		channel.writeAndFlush(pubAck);
	            break;
	        case EXACTLY_ONCE:		//	刚好一次
	            //	构建返回报文, 固定报头
	        	MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
	            //	构建返回报文, 可变报头
	            MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
	            MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);
	    		log.info("back--"+mqttMessageBack.toString());
	    		channel.writeAndFlush(mqttMessageBack);
	            break;
			default:
				break;
        }
	}
	
	/**
	 * 	发布完成 qos2 
	 * @param channel
	 * @param mqttMessage
	 */
	public static void pubcomp (Channel channel, MqttMessage mqttMessage) {
        MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();       
        //	构建返回报文, 固定报头
    	MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);
        //	构建返回报文, 可变报头
        MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
        MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);
		log.info("back--"+mqttMessageBack.toString());
		channel.writeAndFlush(mqttMessageBack);
	}
	
	/**
	 * 	订阅确认
	 * @param channel
	 * @param mqttMessage
	 */
	public static void suback(Channel channel, MqttMessage mqttMessage) {
		MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
		MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader(); 
		//	构建返回报文, 可变报头
		MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
		Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
		//log.info(topics.toString());
		List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
		for (int i = 0; i < topics.size(); i++) {
			grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
		}
		//	构建返回报文	有效负载
		MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
		//	构建返回报文	固定报头
		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());
		//	构建返回报文	订阅确认
		MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);
		log.info("back--"+subAck.toString());
		channel.writeAndFlush(subAck);
	}
	
	/**
	 * 	取消订阅确认
	 * @param channel
	 * @param mqttMessage
	 */
	public static void unsuback(Channel channel, MqttMessage mqttMessage) {
		MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); 
		//	构建返回报文	可变报头
		MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
		//	构建返回报文	固定报头
		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
		//	构建返回报文	取消订阅确认
		MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);
		log.info("back--"+unSubAck.toString());
		channel.writeAndFlush(unSubAck);
	}
	
	/**
	 * 	心跳响应
	 * @param channel
	 * @param mqttMessage
	 */
	public static void pingresp (Channel channel, MqttMessage mqttMessage) {
		//	心跳响应报文	11010000 00000000  固定报文
		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
		MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
		log.info("back--"+mqttMessageBack.toString());
		channel.writeAndFlush(mqttMessageBack);
	}
	
	
}

启动MQTT的线程类

package boot.example.mqtt.server.netty;


/**
 *  蚂蚁舞
 */
public class BootNettyMqttServerThread extends Thread {

    private final int port;

    public BootNettyMqttServerThread(int port){
        this.port = port;
    }

    public void run() {
        BootNettyMqttServer bootNettyMqttServer = new BootNettyMqttServer();
        bootNettyMqttServer.startup(this.port);
    }


}

使用springboot+netty很容易在极短的时间内搭建基于MQTT协议的服务端,简单地DEMO应用仅仅只需要几个类就能满足要求,这个Demo应用是基于Netty4.x的。

使用Netty来搭建MQTT协议的服务端DEMO搭建好了,需要一个MQTT客户端来做测试,我使用的是eclipse的paho工具来测试的,支持window客户端版本,我使用的是(org.eclipse.paho.ui.app-1.1.1-win32.win32.x86_64.zip)  UI的意思是可以桌面用的,当然需要java的jdk支持的,此工具官网有的。

测试效果截图

DEMO仅仅只是DEMO,他只是单方面的支持客户端到服务端创建连接,订阅,取消订阅,发布消息等,最主要地还是要看到MQTT创建过程和数据

在类中,使用了log日志,使用springboot默认的log日志配置,便可以得到MQTT创建过程中的数据

MQTT协议主要由三部分组成

  • 固定头(MqttFixedHeader):所有的 MQTT 数据包都有,用于表示数据包类型及对应标识,还有数据包的大小
  • 可变头(variableHeader):部分的 MQTT 数据包中有,需要根据协议中具体类型来决定
  • 消息体(payload):部分的 MQTT 数据包中有,具体数据信息(关键真正业务用到的数据哦)

具体的MQTT协议需要参考文档

我们使用的是netty封装的mqtt类,在(io.netty.handler.codec.mqtt)里toString()方法得到报文信息参考如下

1.连接服务器(CONNECT)和确认连接请求(CONNACK)

客户端到服务端的网络连接建立后,客户端发送给服务端的第一个报文必须是CONNECT报文

MqttConnectMessage[
fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=35], 
variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=20],
payload=MqttConnectPayload[clientIdentifier=paho1614153936872000000, willTopic=null, willMessage=null, userName=null, password=null]
]

从CONNECT报文中,我们可以看到很多的信息,协议标识,协议级别,会话,遗嘱,用户,密码等,我这里抓取的报文只是一个基础参考

服务端发送CONNACK报文响应从客户端收到的CONNECT报文,服务端发送给客户端的第一个报文必须是CONNACK,这里也只是一个参考,具体需要根据CONNECT来返回报文

MqttConnAckMessage[
fixedHeader=MqttFixedHeader[messageType=CONNACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], 
variableHeader=MqttConnAckVariableHeader[connectReturnCode=CONNECTION_ACCEPTED, sessionPresent=true], 
payload=]

2.订阅主题(SUBSCRIBE)和确认订阅(SUBACK)

客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题,为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端,SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端,具体的应用,DEMO里只是体现了订阅主题的过程,实际业务并不是如此简单,订阅主题的报文参考如下

MqttSubscribeMessage[
fixedHeader=MqttFixedHeader[messageType=SUBSCRIBE, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=56], 
variableHeader=MqttMessageIdVariableHeader[messageId=1], 
payload=MqttSubscribePayload[
MqttTopicSubscription[topicFilter=test/netty/post, qualityOfService=AT_MOST_ONCE], 
MqttTopicSubscription[topicFilter=test/netty/get, qualityOfService=AT_LEAST_ONCE], 
MqttTopicSubscription[topicFilter=test/netty/event, qualityOfService=EXACTLY_ONCE]
]]

服务端发送SUBACK报文给客户端,用于确认它已收到并且正在处理SUBSCRIBE报文,SUBACK报文包含一个返回码清单,它们指定了SUBSCRIBE请求的每个订阅被授予的最大QoS等级,确认订阅的报文参考如下

MqttSubAckMessage[
fixedHeader=MqttFixedHeader[messageType=SUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=5], 
variableHeader=MqttMessageIdVariableHeader[messageId=1], 
payload=MqttSubAckPayload[grantedQoSLevels=[0, 1, 2]]
]

3.取消订阅(UNSUBSCRIBE)和取消订阅确认(UNSUBACK)

客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题,参考报文如下

MqttUnsubscribeMessage[
fixedHeader=MqttFixedHeader[messageType=UNSUBSCRIBE, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=53], 
variableHeader=MqttMessageIdVariableHeader[messageId=2], 
payload=MqttUnsubscribePayload[topicName = test/netty/post, topicName = test/netty/get, topicName = test/netty/event]
]

服务端发送UNSUBACK报文给客户端用于确认收到UNSUBSCRIBE报文,参考报文如下

MqttUnsubAckMessage[
fixedHeader=MqttFixedHeader[messageType=UNSUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], 
variableHeader=MqttMessageIdVariableHeader[messageId=2], 
payload=
]

4.心跳请求(PINGREQ)和心跳响应(PINGRESP)

客户端发送PINGREQ报文给服务端的,在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着,请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开,参考报文如下

MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PINGREQ, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0], 
variableHeader=, 
payload=
]

服务端发送PINGRESP报文响应客户端的PINGREQ报文,表示服务端还活着

MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PINGRESP, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0], 
variableHeader=, 
payload=
]

5.断开连接(DISCONNECT)客户端主动断开连接

DISCONNECT报文是客户端发给服务端的最后一个控制报文,表示客户端正常断开连接,而服务端不需要返回消息了,处理业务逻辑便可。

MqttMessage[
fixedHeader=MqttFixedHeader[messageType=DISCONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0], 
variableHeader=, 
payload=
]

6.发布和订阅,根据(QoS等级)

  • 发布消息(PUBLISH): PUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息 
  • 发布确认(PUBACK): PUBACK报文是对QoS 1等级的PUBLISH报文的响应
  • 发布收到(PUBREC): PUBREC报文是对QoS等级2的PUBLISH报文的响应,它是QoS 2等级协议交换的第二个报文 
  • 发布释放(PUBREL): PUBREL报文是对PUBREC报文的响应,它是QoS 2等级协议交换的第三个报文
  • 发布完成(PUBCOMP): PUBCOMP报文是对PUBREL报文的响应,它是QoS 2等级协议交换的第四个也是最后一个报文

1).QoS0-至多一次,最多一次

客户端->服务端  PUBLISH  服务端无需向客户端发送确认消息,这就是最多一次消息,参考报文

MqttPublishMessage[
fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=25], 
variableHeader=MqttPublishVariableHeader[topicName=test/netty/post, packetId=-1], 
payload=PooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: PooledUnsafeDirectByteBuf(ridx: 27, widx: 27, cap: 496))
]

其中playload的数据可以用下面代码获取

		MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
		MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
		MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();
        byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
        mqttPublishMessage.payload().readBytes(headBytes);
        String data = new String(headBytes);

2).QoS1-至少一次,服务器下发确认消息

客户端->服务端 PUBLISH 参考报文

MqttPublishMessage[
fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=26], 
variableHeader=MqttPublishVariableHeader[topicName=test/netty/get, packetId=4], 
payload=PooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 480))]

服务端->客户端 PUBACK 参考报文

MqttPubAckMessage[
fixedHeader=MqttFixedHeader[messageType=PUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], 
variableHeader=MqttMessageIdVariableHeader[messageId=4], 
payload=]

3).QoS2-刚好一次(共四个报文)

客户端->服务端 PUBLISH 第一个报文

MqttPublishMessage[
fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=EXACTLY_ONCE, isRetain=false, remainingLength=28], 
variableHeader=MqttPublishVariableHeader[topicName=test/netty/post, packetId=5], 
payload=PooledSlicedByteBuf(ridx: 0, widx: 9, cap: 9/9, unwrapped: PooledUnsafeDirectByteBuf(ridx: 30, widx: 30, cap: 496))]

服务端->客户端 PUBREC 第二个报文

MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PUBREC, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=2], 
variableHeader=MqttMessageIdVariableHeader[messageId=5], 
payload=]

客户端->服务端 PUBREL 第三个报文

MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PUBREL, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=2], 
variableHeader=MqttMessageIdVariableHeader[messageId=5], 
payload=]

服务端->客户端 PUBCOMP 第四个报文

MqttMessage[
fixedHeader=MqttFixedHeader[messageType=PUBCOMP, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], 
variableHeader=MqttMessageIdVariableHeader[messageId=5], 
payload=]

这仅仅只是一个DEMO,不涉及任何业务,主要参考了(第一章 - MQTT介绍 · MQTT协议中文版)MQTT中文版协议

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Springboot+Netty搭建MQTT协议的服务端(基础Demo) 的相关文章

随机推荐

  • stm32 基于TIM1定时器的PWM输出

    void PWM TIM1 uint16 t arr uint16 t psc RCC APB2PeriphClockCmd RCC APB2Periph TIM1 ENABLE 定时器TIM1时钟使能 TIM DeInit TIM1 设置
  • stm32 can总线参考例程

    CAN初始化 tsjw 重新同步跳跃时间单元 范围 1 3 CAN SJW 1tq CAN SJW 2tq CAN SJW 3tq CAN SJW 4tq tbs2 时间段2的时间单元 范围 1 8 tbs1 时间段1的时间单元 范围 1
  • 物联网IOT-mqtt协议

    MQTT是一种客户机服务器发布 订阅消息传递传输协议 它重量轻 开放 简单 设计简单 易于实现 这些特性使其非常适合在许多情况下使用 xff0c 包括受限的环境 xff0c 如机器间通信 M2M 和物联网 IoT 环境 xff0c 在这些环
  • 联合索引的最左匹配原则的成因

    联合索引的最左匹配原则的成因 上面我们只看的是单一的索引 xff0c 接下来咱们来看看联合索引 xff0c 也就是回答第二个问题 联合索引的最左匹配原则的成因 什么是联合索引呢 xff0c 就是由多列组成的索引了 那亦要了解其成因 xff0
  • 腾讯云轻量服务器的Ubuntu如何使用root(根)用户登陆ssh/Shell/terminal/终端/WindTerm/FinalShell

    Ubuntu 系统的默认用户名是 ubuntu xff0c 并在安装过程中默认不设置 root 帐户和密码 您如有需要 xff0c 可在设置中开启允许 root 用户登录 具体操作步骤如下 xff1a 使用 ubuntu 帐户登录轻量应用服
  • Ubuntu安装sshd服务

    ubuntu安装ssh服务 一 安装shhd SSH分客户端openssh client和openssh server 如果你只是想登陆别的机器的SSH只需要安装openssh client xff08 ubuntu有默认安装 xff0c
  • Linux环境(六)--资源与限制

    资源与限制 运行在Linux系统上的程序是有资源限制的 这些也许是硬件引起的限制 例如内存 xff0c 也许由系统策略引起的限制 例如 xff0c 允许 的CPU时间 xff0c 或者是实现的限制 例如 xff0c 整数的尺寸或是文件名允许
  • 遇到了C/C++控制台程序无法输入中文的情况

    其实C C 43 43 控制台程序无法cin中文的情况并不是你使用了string xff0c string是能输入并保存中文的 xff1b 经过一番探究 xff0c 我发现主要的问题是文件的编码和控制台所处的代码页 xff08 控制台的编码
  • Jpg2Dcm中文乱码问题

    Jpg2Dcm中文乱码问题 最近老板提出了一个新的功能要求 xff0c 希望可以把图片转成dcm 在实现功能的问题中遇见了很多问题和掉过许多坑 于是在此记录下来 问题 xff1a 第一次在进行Jpg2Dcm时 xff0c 可以进行图片转dc
  • 神经网络的数学表达式,神经网络的数学理论

    什么是神经网络 神经网络可以指向两种 xff0c 一个是生物神经网络 xff0c 一个是人工神经网络 生物神经网络 xff1a 一般指生物的大脑神经元 xff0c 细胞 xff0c 触点等组成的网络 xff0c 用于产生生物的意识 xff0
  • python装饰器详解(四)---把参数传递给装饰器

    因为装饰器必须接收一个函数当做参数 所以 不可以直接把被装饰函数的参数传递给装饰器 装饰器就是一个普通的函数 xff0c 回顾 def my decorator func print 34 I am an ordinary function
  • Motion Deblurring图像运动去模糊代码

    http www di ens fr whyte Efficient Deblurring for Shaken and Partially Saturated Images http www di ens fr willow resear
  • maven执行install时报错 The packaging for this project did not assign a file to the build artifact

    问题描述 maven中执行plugins下面的install install时会报如下错误 span class token class name Failed span span class token keyword to span s
  • realsense相机两种获取相机内外参的方式

    https www it610 com article 1296417297711308800 htm 命令 xff1a rs sensor control 这个命令是一个exe文件 xff0c 可以去 C Program Files x8
  • wget设置代理

    1 在bash shell中设定代理 basrhc export http proxy 61 34 166 111 53A 167 3128 34 export ftp proxy 61 34 166 111 53A 167 3128 34
  • chown,chgrp,chmod,u+s,g+s,o+t

    chown user file directory change owner 将后面的目标文件或者目录的所有者替换成 user chgrp group file directory change group 将目标文件或者目录的所有组替换成
  • Segment Routing笔记(一)

    SR 理论 一 MPLS TE缺点 RSVP TE大部分都是为了FRR的目的不支持ECMP所有流量都需要在隧道里诞生了 战术型 TE xff0c 只在需要的时候使用 术语 TI LFA 与拓扑无关的无环路备份 xff0c 能保证备份路径的最
  • Springboot+Netty搭建UDP服务端

    UDP是一个无连接协议 xff0c 应用范围很大 xff0c 对于一些低功耗的设备可以使用UDP方式向云端推送消息信息 xff0c 也可以在推送消息时收到从云端原路返回的消息 xff0c 使用Netty 43 SpringBoot方式可以快
  • Springboot+Netty搭建UDP客户端

    使用Netty 43 SpringBoot方式可以快速地开发一套基于UDP协议的服务端程序 xff0c 同样的也可以开发客户端 xff0c 一般使用UDP都是使用原生的方式 xff0c 发送消息后就不管不问 xff0c 也就是不需要确定消息
  • Springboot+Netty搭建MQTT协议的服务端(基础Demo)

    Netty是业界最流行的nio框架之一 xff0c 结合springboot可以满足快速开发 MQTT xff08 Message Queuing Telemetry Transport xff0c 消息队列遥测传输协议 xff09 xff