MQTT-java使用说明
本文的资料下载:
链接:https://pan.baidu.com/s/1OCfsQ_NqcehKy86kYkA-wg?pwd=1234
提取码:1234
MQTT基本介绍
MQTT是一个客户端服务端架构的发布/订阅模式的消息传输协议。
特点
- 易于实现
- 数据传输的服务质量可控
- 占用带宽小
- 传输数据不可预知
- 设备连接状态可知
主要版本
发布/订阅特性
- 相互独立,不需要知道有几个设备订阅了这个主题。
- 空间可分离,只要有互联网或者局域网就可以。
- 时间可异步
连接MQTT服务端
客户端发送CONNECT(报文)做请求连接
服务端回复CONNACK确认连接
CONNECT(报文)
名称 |
内容 |
clientId(distinct) |
“client-1” |
cleanSession(判定是不是一个重要客户端) |
true |
username(可选) |
“hans” |
password(可选) |
“letmein” |
lastWillTopic(可选) |
“/hans/will” |
lastWillQos(可选) |
2 |
lastMessage(可选) |
“unexpected exit” |
lastWillRetain(可选) |
false |
keepAlive |
60 |
clientId
客户端ID,每个客户端的ID是唯一的(distinct)。
cleanSession
用来标志是不是一个重要的客户端,如果是的话,标志为false。
clean清除,Session回话,就是不保存会话,设置为false,就是保存会话,如果客户端没有确认收到信息,那么就用保存的会话信息继续发送,直到接收成功。
设置为false 的时候,要把Qos>0。
keepAlive
心跳机制
要求客户端定时发送信息给服务端,告诉服务端客户端活着。
单位s,指定时多长时间发送一次“活着的”信息给服务端。
Qos
Qos |
含义 |
0 |
At most once,发送一次消息后,就不再关心它有没有发送到对方,也不设置任何重发机制 |
1 |
At least once,包含了简单的重发机制,Sender发送信息之后等待接收者的ACK,如果没收到ACK则重新发送信息。这种模式能保证信息至少能到达一次,但无法保证消息重复。 |
2 |
Exactly once,设计了略为复杂的重发和重复消息发现机制,保证消息到达对方并且严格只到达一次。 |
与计算机网络中的可靠传输和不可靠传输相似。
CONNACK
名称 |
内容 |
sessionPersent(当前会话) |
true |
returnCode(连接返回码) |
0 |
returnCode连接返回码返回值含义
返回码 |
返回码描述 |
0 |
成功连接 |
1 |
连接被服务端拒绝,原因是不支持客户端的MQTT协议版本 |
2 |
连接被服务端拒绝,原因是不支持客户端标识符的编码。 可能造成此原因的是客户端标识符编码是UTF-8,但是服务端不允许使用此编码。 |
3 |
连接被服务端拒绝,原因是服务端不可用。 即,网络连接已经建立,但MQTT服务不可用。 |
4 |
连接被服务端拒绝,原因是用户名或密码无效。 |
5 |
连接被服务端拒绝,原因是客户端未被授权连接到此服务端。 |
sessionPersent(当前会话)
上次服务器想发给客户端的信息,如果发送给客户端,客户端没有确认或者客户端没有收到,那么这个标志为true。
申请MQTT服务器-EMQX
EMQX免费申请MQTT服务器网址:
https://www.emqx.com/zh/cloud
点免费试用
注册一下
选择基础版,高级版的用数据集成里面的东西需要做VPC链接,比较麻烦。
这些默认就行,自己想换个就换个选项,然后下一步
立即部署
第一次创建的时候,只有左边红框里面的东西,显示正在创建实例,这个过程比较费劲
这个时候可以先来安装一下MQTTX这个软件,这个软件是由EMQX公司开发的
安装的时候,就一直下一步就行,可能会出现电脑警告认为这个是有病毒的软件,忽略就行,没毒。
同时我们还需要安装一下mosquitto。
打开以后是这样的:
创建实例结束后就是这样:
点击它
这里呢,我们看一下这几个的含义
连接地址:这个就是服务器的地址
连接端口:11066(mqtt), 12095(mqtts), 13788(ws), 14120(wss)
mqtt协议,用这个11066(mqtt)
mqtt+TLS,用这个12095(mqtts)
网页端的不加密传输,用这个13788(ws)
网页端的加密传输,用这个14120(wss)
类似于http和https。这几个端口不一定,每个账号自己创建完后的端口号可能不一样,要看自己创建完的是多少。
连接测试MQTT服务器
首先要添加一个认证,就是创建一个用户和密码。这个虽然在协议里面不是必须的,但是在这个服务器里面是必须的。
选左边的认证,然后输入用户名和密码,这里为了测试方便,我设置的用户名是test,密码是123456,然后点添加。
添加完以后:
然后可以做在线调试,选左边的在线调试就可以
连接地址就是创建的服务器地址,不需要手动输入,端口也是选好的mqtt的那个。
然后输入用户名,密码和连接名称,连接名称随便起一个就行,点连接。
这里显示的还有客户端ID,这个需要唯一标识,之前CONNECT(报文)那里提到过。Keepalive是心跳机制里面的每个多少秒发送一次信号,确认客户端还在连接着。
连接后显示如下:
我们将主题设为testtopic/1,Qos为0然后订阅。
下面发布的主题也设为testtopic/1,Qos为0,然后点发布。
右边的是我们发送的,左边的是收到的消息。
这样就测试成功了。
这一段也可以跟着这个官方的B站视频来做,视频链接:
https://www.bilibili.com/video/BV1hi4y1Z7Lr?spm_id_from=333.999.0.0
MQQTX的使用也差不多,输入服务器地址和端口号,用户和密码就能使用。
使用 Java SDK 连接到部署
下载java连接示例代码
文件里面也是有的,在这个压缩包里面,解压以后有个mqtt-client-Java的文件夹,这个就是我们要用的代码。
或者可以从网站上下载https://github.com/emqx/MQTT-Client-Examples
部署的教程,EMQX也提供了一个文档说明,网址如下:
https://docs.emqx.com/zh/cloud/latest/connect_to_deployments/java_sdk.html#%E5%89%8D%E6%8F%90%E6%9D%A1%E4%BB%B6
我们就直接在intellij IDEA中打开这个项目文件,然后进行配置修改就好。他是用Maven来做的配置,从头开始建,没有Maven的知识,不太容易。
改错
下面这一部分,是根据这个博客
https://blog.csdn.net/u014677702/article/details/86013565
打开以后,会发现有很多的红色报错,点击右上角的项目结构
把java11换成java1.8,然后确定就好了。
然后在SampleCallback这个里面
这里根据提示,改一下就行。
然后,安装一下Maven,下面这些按照这个博客安装的
https://blog.csdn.net/weixin_42893085/article/details/105539472
下载安装maven:
http://maven.apache.org/download.cgi
下载这个,或者我在文件夹里面也放了这个包
把它解压到C盘、D盘等盘里面就行。
这里我就放到了D盘
然后配置环境变量,右击此电脑,点属性
然后划到底,打开高级系统设置
打开环境变量
在这里新建一个
变量名为
MAVEN_HOME
变量值为
自己解压的文件夹目录(我的是D:\apache-maven-3.8.5)
目录下面需要有bin文件夹
输入完成,确认。
然后在Path这个里面选择编辑
新加入一条记录
%MAVEN_HOME%\bin
全部确定退出环境变量设置后,进行测试,WIN+R,输入cmd,打开终端。
echo %JAVA_HOME%
mvn -version
这样Maven已经在计算机上安装好了
希望更改Maven在本地存放依赖地址的,看下面这个视频更改,不改的话,默认存放在C盘,不怎么使用的话,放C盘也行。
https://www.bilibili.com/video/BV1Fz4y167p5?p=5
然后配置intellij,让intellij知道Maven这个东西。下面操作是根据这个博客来的:
https://blog.csdn.net/lyg9966/article/details/105904175
在解压后的文件里面,打开conf,里面有个settings.xml,打开这个文件。
在里面,加入下面这些
<!-- 更换阿里镜像,加快依赖下载 -->
<mirror>
<id>aliyunmaven</id>
<mirrorOf>central</mirrorOf>
<name>central</name>
<url>https://maven.aliyun.com/repository/central</url>
</mirror>
然后在Intellij IDEA设置
在这个地方填入
-Dmaven.wagon.http.ssl.insecure=true -Dmaven.wagon.http.ssl.allowall=true -Dmaven.wagon.http.ssl.ignore.validity.dates=true
这个博客里面还说要删除plugin,我这里没有报这个错误,有需要的看原博客。
构建,然后运行就可以。当然要是想要mqtt通信的效果,还需要在代码里面将之前的服务器信息进行配置。
java代码配置mqtt服务
需要配置这几个地方
String topic = "testtopic/1";//这个是刚才发布的那个话题名
String content = "Hello World";//这个是要发送的信息
int qos = 2;
String broker = "tcp://de1f21c0.cn-shenzhen.emqx.cloud:11066";//服务器地址+端口号
String clientId = MqttClient.generateClientId();
MemoryPersistence persistence = new MemoryPersistence();
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("test");//用户名
connOpts.setPassword("123456".toCharArray());//密码
运行以后
在网站里面查看或者在MQTTX中订阅了这个话题的话,就能看到
MQTT-java模拟单车扫码开锁
在文件夹里面有源代码mqtt-java-bic-phone
直接用Intellij打开就行。这是代码结构:
这里面实现了模拟手机和单车两个终端,手机发送SendJson的请求,然后服务器使用EMQX里面数据集成里面消息重新发布功能,
在这里面设置规则
SELECT
payload.bicid as bicid, payload.bicstate as bicstate, payload.lockstate as lockstate, payload.request as request, payload.userid as userid
FROM
"testtopic/3"
WHERE
bicstate = true
and lockstate = false
and request = 1
当bicstate的状态为true(车是好的),lockstate为false(车锁关着),request为1(请求开锁)的时候,就对testtopic/4发送RecJson格式的信息。
发送车子ID,用户ID和开锁请求lock为true,来请求车子开锁。
{"bicid":"${bicid}","userid":"${userid}","lock":"true"}
这些怎么配置,详情见帮助里面的数据集成里面的规则管理
https://docs.emqx.com/zh/cloud/latest/rule_engine/rules.html#%E5%88%9B%E5%BB%BA%E8%A7%84%E5%88%99
注意:我们这里用的不是专业版,而是基础版,所以不需要进行VPC设置。
通过规则设置,简单的实现了服务器处理数据,并根据数据处理结果进行数据转发,发送不同数据的操作。
当车子收到开锁请求,并且开锁以后,对testtopic/3发送”锁已打开“,这时手机就会收到这条信息,知道锁已打开。
先运行MqttRec(模拟单车)再运行MqttSample(模拟手机),就会看到下面的结果。
有时候运行的时候,会显示上次已经发送接收到了的信息,这个可能是因为qos的设置问题,具体原因不清楚。
所有代码都有注释,具体代码实现请看文件夹里面的代码。