MQTT订阅和发送实战(JAVA)

2023-11-09

本列子使用MQTT+EMQX+Springboot
1.在pom.xml中引入依赖

        <!-- mqtt -->
        <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-integration</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-stream</artifactId>
  </dependency>
  <dependency>
   <groupId>org.springframework.integration</groupId>
   <artifactId>spring-integration-mqtt</artifactId>
  </dependency>

2.在application.yml中添加配置

mqtt:
  username: 用户名
  password: 密码
  # 推送信息的连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.1.61:61613
  url: tcp://127.0.0.1:1883
 

3.创建MqttProperties这个类,将配置文件中的值注入对象中

@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
/**
  * 账号
  */
 private String username;

 /**
  * 密码
  */
 private String password;
   /**
  * url
  */
 private String[] url;
}

4.创建MqttConfig这个类,进行mqtt相关配置

@Data
@Configuration
public class MqttConfig {
    /**
     * 发布的通道名称
     */
    public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";
    
    /**
     * 订阅消息的通道名称
     */
    public static final String CHANNEL_NAME_IN = "mqttInputChannel";
 
    @Autowired
    private MqttProperties mqttProperties;

/**
  * MQTT连接器选项
  */
 @Bean
 public MqttConnectOptions getMqttConnectOptions() {
  MqttConnectOptions options = new MqttConnectOptions();
  // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,
  // 设置为true表示每次连接到服务器都以新的身份连接
  options.setCleanSession(true);
  // 设置连接的用户名
  
  // 设置连接的密码
  options.setUserName(mqttProperties.getUserName());
  options.setPassword(mqttProperties.getpassword().toCharArray());
  options.setServerURIs(mqttProperties.getUrl().toArray(new String[] {}));
  // 设置超时时间 单位为秒
  options.setConnectionTimeout(10);
  options.setMaxInflight(1000);
  options.setAutomaticReconnect(true);
  // 设置会话心跳时间 单位为秒 服务器会每隔20秒的时间向客户端发送心跳判断客户端是否在线,但这个方法并没有重连的机制
  options.setKeepAliveInterval(20);
  return options;
 }

 /**
  * MQTT客户端工厂类,用户生成mqtt客户端
  *
  * @return
  */
 @Bean
 public MqttPahoClientFactory mqttClientFactory() {
  DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
  factory.setConnectionOptions(getMqttConnectOptions());
  return factory;
 }
 
 /**
  * MQTT出站通道
  * @return
  */
 @Bean(name =CHANNEL_NAME_OUT)
 public MessageChannel mqttOutboundChannel() {
  return new DirectChannel();
 }

 /**
  * MQTT出站通道适配器,用于消息推送
  * @return
  */
 @Bean
 @ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
 public MessageHandler mqttOutbound() {
  String clientId = mqttProperties.getClientId().concat("-sender");
  MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(clientId,mqttClientFactory());
  messageHandler.setAsync(true);
  messageHandler.setDefaultQos(1);
  return messageHandler;
 }
 
 
 /**
  * MQTT入站通道
  * @return
  */
    @Bean(name =CHANNEL_NAME_IN)
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

 /**
  * MQTT入站通道适配器, 消息订阅绑定(消费者)
  * @return
  */
 @Bean
 public MessageProducer inbound() {
  //在application.xml中加上监听的topic和clientId
   String[] topicAry = mqttProperties.getTopics().toArray(new String[] {});
   String clientId = mqttProperties.getClientId().concat("-receiver");
  MqttPahoMessageDrivenChannelAdapter adapter = new         MqttPahoMessageDrivenChannelAdapter(
    clientId, mqttClientFactory(),topicAry);
  //设置超时时间,默认30000毫秒
  adapter.setCompletionTimeout(30000L);
  adapter.setConverter(new DefaultPahoMessageConverter());
  //设置接收消息服务质量为1
  adapter.setQos(1);
  adapter.setOutputChannel(mqttInputChannel());
  return adapter;
 }

}

5.创建监听类MqttListener

@Bean
 @ServiceActivator(inputChannel = "mqttInputChannel")
 public MessageHandler handler() {
  return new MessageHandler() {
   
   @Override
   public void handleMessage(Message<?> message) throws MessagingException {
    MessageHeaders headers = message.getHeaders();
    System.err.println(headers);
    System.err.println("接受到消息");
   }
  };
 }

6.创建发送类MqttGateway

@Component
@MessagingGateway(defaultRequestChannel = MqttConfig.CHANNEL_NAME_OUT)
public interface MqttGateway {

 /**
 * 发送信息到MQTT服务器
 *
 * @param topic 主题
 * @param payload 消息主体
 */
 void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

 /**
 * 发送信息到MQTT服务器
 *
 * @param topic 主题
 * @param qos 对消息处理的几种机制。<br> 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。<br>
 * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。<br>
 * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
 * @param payload 消息主体
 */
 void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);

}

在需要发送的业务中调用发送接口,在监听类出进行解析并进行业务处理即可

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MQTT订阅和发送实战(JAVA) 的相关文章

  • 使用 Guice 注入类集合

    我正在尝试用 Google Guice 2 0 注入东西 我有以下结构 FooAction implements Action BarAction implements Action 然后我有一个带有以下构造函数的 ActionLibrar
  • 在Java中将*s打印为三角形?

    我在 Java 课程中的作业是制作 3 个三角形 一份左对齐 一份右对齐 一份居中 我必须为什么类型的三角形制作一个菜单 然后输入需要多少行 三角形必须看起来像这样 到目前为止 我能够完成左对齐的三角形 但我似乎无法获得其他两个 我尝试用谷
  • 单元测试组合服务方法

    我正在为一个类编写 junit 单元测试 该类使用以下方法实现公开的接口 public Set
  • 如何在log4j的配置文件中为文件附加器提供环境变量路径

    我有一个log4j xml配置文件 和一个RollingFileAppender我需要提供用于存储日志的文件路径 问题是我的代码将作为可运行的 jar 部署在 Unix 机器上 所以如果我传递这样的参数 value logs message
  • 如何提取文件 jre-9/lib/modules?

    In JRE 9 lib目录 至少在 Windows 上 有一个名为modules其大小约为107 MB 是否可以提取该文件或在其中列出 java 模块 我可以看到一个名为jmod可以在jdk 9 bin jmod exe 但那是为了阅读
  • javax.persistence.RollbackException:提交事务时出错],根本原因是 java.lang.StackOverflowError:null

    我有一个使用 Spring Data REST 框架的 Spring Boot API 从 spring boot starter parent 2 1 0 RELEASE 继承的依赖项 我正在尝试执行 PUT 或 PATCH 请求来更新实
  • 具有 CRUD 功能的基于 Spring Web 的管理工具

    在 PHP Symfony 世界里有一个工具叫 Sonata Adminhttps sonata project org https sonata project org 基于 AdminLTE 模板 这是一款一体化管理工具 具有登录 菜单
  • OpenNLP 与斯坦福 CoreNLP

    我一直在对这两个包进行一些比较 但不确定该往哪个方向走 我简单地寻找的是 命名实体识别 人 地点 组织等 性别识别 一个不错的训练 API 据我所知 OpenNLP 和斯坦福 CoreNLP 提供了非常相似的功能 然而 Stanford C
  • 使用 JAX-WS 的 WebLogic 中没有模式导入的单个 WSDL

    如何使用 JAX WS 配置由 WebLogic 10 3 6 生成的 Web 服务 以将对象架构包含在单个 WSDL 文件声明 而不是导入声明 中 示例代码 界面 import javax ejb Local Local public i
  • Android volley使用RequestFuture.get()时出现超时异常

    在我的片段中 我尝试使用 TMDB 的开放电影数据库来获取有关 正在播放 电影的详细信息 如果我使用 RequestFuture get time TimeUnit 方法来执行此齐射请求 我总是会收到超时错误 如果我在 Safari 中手动
  • LocalDate 减去 period 得到错误的结果

    LocalDate减去一个Period 如 28年1个月27天 得到错误的结果 但减去一个Period 只有天单位 如 10282 天 得到正确的结果 有什么需要注意的吗 public static void main String arg
  • 从 HttpClient 3 转换为 4

    我已经成功地对所有内容进行了更改 但以下内容除外 HttpClient client HttpPost method client new DefaultHttpClient method new HttpPost url InputStr
  • Android 解析 JSON 卡在 get 任务上

    我正在尝试解析一些 JSON 数据 我的代码工作了一段时间 我不确定我改变了什么突然破坏了代码 当我运行代码时 我没有收到任何运行时错误或警告 我创建一个新的 AsyncTask 并执行它 当我打电话时 get 在这个新任务中 调试器在此行
  • 合并两个地图的最佳实践是什么

    如何将新地图添加到现有地图 地图具有相同的类型Map
  • 多线程——更快的方法?

    我有一堂有吸气剂的课程getInt 和一个二传手setInt 在某个领域 比如说领域 Integer Int 一个类的一个对象 比如说SomeClass The setInt 这里是同步的 getInt isn t 我正在更新的值Int来自
  • 改变 Java 中凯撒移位的方向

    用户可以通过选择 1 向左或 2 向右移动字母来选择向左或向右移动 左边工作正常 右边不行 现在它显示了完全相同的循环 但我已经改变了所有 and 以不同的方式进行标记 最终我总是得到奇怪的字符 如何让程序将字符向相反方向移动 如果用户输入
  • java swing:向 JTree 项目添加自定义图形按钮

    我想在 JTree 中的项目右侧添加一个带有小图标的附加按钮 这可以做到吗 如果是这样 怎么办 thanks Clamp 你在这方面成功了吗 我想做同样的事情 但很难让 JButton 响应用户 设置渲染器以显示按钮的过程很顺利 但所有鼠标
  • Java:一个函数有多种返回类型...可以使用泛型吗?

    为了简单起见 我有一些程序 如下所示 public String fetchValueAsString String key public DateTime fetchValueAsDateTime String key 我想要类似的东西
  • 传递 Android DialogFragment 参数时,onCreateDialog 捆绑参数意外为 null

    我正在尝试使用 DialogFragment 在 Android 中显示一个基本对话框 并使用对话框消息的参数 如中所述StackOverflow线程 https stackoverflow com questions 15459209 p
  • 将数组值导出到 csv 文件 java

    我只需要帮助将数组元素导出到 csv 文件 我不知道我的代码有什么问题 任何帮助将不胜感激 谢谢 for int index 0 index lt cols length index FileWriter fw new FileWriter

随机推荐

  • 【EMC基础篇①】噪声是什么?EMC是什么?噪声损害是电子社会的现代病

    EMC基础篇 噪声是什么 EMC是什么 噪声损害是电子社会的现代病 电脑的通信错误 手机通话突然断开 您有过类似的经验吗 我们周围充斥着噪声 它们会通过各种线路侵入电子设备 引发故障 那么 这些看不见的噪声的真身是什么 本周为您带来3篇关于
  • js设计模式基础篇(四)之高阶函数

    高阶函数 高阶函数是指至少满足下列条件之一的函数 函数可以作为参数被传递 函数可以作为返回值输出 函数作为参数传递 把函数当作参数传递 这代表我们可以抽离出一部分容易变化的业务逻辑 把这部分业务逻辑放在函数参数中 这样一来可以分离业务代码中
  • 俞敏洪:如果创业者缺乏这8种能力,失败可能性很大

    俞敏洪是一个高明的创业家 他很早就知道 做什么生意都需要用 信仰 做前提来说服目标用户和客户 在他的 愿你的青春不负梦想 中 俞敏洪记录了很多创业的感悟 以下内容是俞敏洪结合自己的创业实践 总结了一个创业者要成功 应该具备哪八种能力 1 目
  • Unity基础

    Unity3D 游戏开发 第一章 基础知识 Unity是一个用于创建游戏和三维互动内容的开发工具 是一个专业游戏引擎 2D游戏 视角锁定 二维坐标 3D游戏 任意视角 三维坐标 虚拟现实 VR 三维空间虚拟世界 通过虚拟设备与虚拟环境事物进
  • adfs服务器获取信息失败,授权给adfs读取ad 在ad服务器上运行

    Because the application pool identity for the AD FS 2 0 AppPool is running as a domain user service account you must con
  • 华为ensp静态路由配置,ssh远程登录配置

    通ping与ssh 实验准备 建立拓扑图 三台路由器 两台pc 配置pc1与pc2 对pc1进行网络配置 设置ip为192 168 226 100 网关为192 168 226 1 ipconfig命令查看pc1配置信息 确认配置成功 配置
  • 刷脸支付带来支付链条变化和交易场景改变

    没赶上POS代理 没赶上扫码支付代理 这些推广人员每年被动收入上百万 现在刷脸支付的风口已来 你是把握还是错过 此外 技术科技进步 带来的支付链条的变化以及交易场景的改变 从而引发第三方支付革命 谁落后谁就会被淘汰 一些新概念有助于刺激消费
  • GNU风格 汇编语法总结

    汇编源程序一般用于系统最基本的初始化 初始化堆栈指针 设置页表 操作 ARM的协处理器等 这些初始化工作完成后就可以跳转到C代码main函数中执行 1 GNU汇编语言语句格式 任何Linux汇编行都是如下结构
  • Datax-web操作指南

    Datax web操作指南 1 登录datax可视化管理系统datax web 网址 http ip 9527 index html dashboard 用户名密码 admin 1 2 使用操作 同步数据的主要操作步骤创建项目 配置数据源
  • mixSeq: A Simple Data Augmentation Method for Neural Machine Translation阅读笔记

    Abstract 数据增强是指通过操纵输入 如添加随机噪声 屏蔽特定部分 来扩大数据集 大多数数据增强技术都是在单一的输入上操作的 这限制了训练语料库的多样性 在本文中 我们提出了一种简单而有效的神经机器翻译数据增强技术 mixSeq 它操
  • BadUSB制作教程_BadUSB简单示例(初学)_程序编写工具

    说明 本文章仅供学习交流 请勿用于非法用途 我用的是CJMCU Beetle arduino Leonardo USB ATMEGA32U4 Mini Size Development Board 程序编写工具 https download
  • 石头剪刀布游戏的MATLAB GUI实现

    石头剪刀布游戏的MATLAB GUI实现 在这篇文章中 我们将详细介绍如何使用MATLAB GUI编写一个简单的石头剪刀布游戏 我们将使用MATLAB的图形用户界面 GUI 工具箱来创建一个交互式界面 让用户可以与游戏进行互动 我们将逐步介
  • 收入最高的十大IT职位

    过去的一年对科技行业来说非常艰难 一些公司宣布裁员 面临着迫在眉睫的经济衰退威胁 尽管经历了坎坷的一年 但市场对科技人才的需求仍然强劲 截至1月 美国的科技人才失业率下降到1 5 对于拥有适当技能和专业知识的技术人员来说 市场对人才的需求仍
  • hive解析json数据

    hive 处理json数据总体来说有两个方向的路走 1 将json以字符串的方式整个入Hive表 然后通过使用UDF函数解析已经导入到hive中的数据 比如使用LATERAL VIEW json tuple的方法 获取所需要的列名 2 在导
  • 【Android之SmartImageView图片控件】

    源码地址是https github com loopj android smart image view 没有sample 本文最后会提供一个sample smartimageview提供的主要功能有 1 继承ImageView控件 在原生
  • 2014年1月2日星期四(流水线-UVN系统的实现)

    方位角A是绕 Y轴旋转的角度 与 Z轴重叠时为0度 仰角B是方向向量与地平面 X Z平面 之间的夹角 假定为右手坐标系 则 r psinB y pcosb z rcosA x rsinA 初始化UVN相机 先赋值给各个朝向 void ddr
  • 服务器异常 虚拟登陆,服务器登陆异常

    服务器登陆异常 内容精选 换一换 通过Web浏览器登录资源 会话连接断开 提示网络连接异常 连接已断开 请重试 Code T 1006 云堡垒机系统与资源服务器之间网络连接不稳定 导致连接断开 云堡垒机或资源服务器的带宽超限 导致连接断开
  • C#知识系列:GCHandleType的作用

    总结 Normal 对象被标记为完全不被GC管理的状态 但会被GC移动内存位置 需要我们手动调用Free来释放对应的GC对象 Pinned 对象被标记为完全不被GC管理的状态 不被GC回收也不被GC移动内存地址 需要我们手动调用Free来释
  • 探究Cache缓存功能---【pytest】

    前言 pytest运行完用例之后会生成一个 pytest cache的缓存文件夹 用于记录用例的ids和上一次失败的用例 1 跑自动化时经常会出现这样一个情况 一轮自动化跑完后零星出现了几个失败测试用例 无法断定失败的原因 所以可能需要重新
  • MQTT订阅和发送实战(JAVA)

    本列子使用MQTT EMQX Springboot 1 在pom xml中引入依赖