SpringBoot 集成 Emqx 发布/订阅数据

2023-05-16

        最近项目中用到Emqx发布/订阅数据,特此记录便于日后查阅。

        ThingsboardEmqxTransportApplication

/**
 * Copyright © 2016-2023 The Thingsboard Authors
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.thingsboard.server.emqx;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.integration.config.EnableIntegration;

import java.util.Arrays;

@SpringBootConfiguration
@EnableConfigurationProperties
@EnableIntegration
@ComponentScan({"org.thingsboard.server.transport.emqx"})
public class ThingsboardEmqxTransportApplication {

    private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
    private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-emqx-transport";

    public static void main(String[] args) {
        SpringApplication.run(ThingsboardEmqxTransportApplication.class, updateArguments(args));
    }

    private static String[] updateArguments(String[] args) {
        if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {
            String[] modifiedArgs = new String[args.length + 1];
            System.arraycopy(args, 0, modifiedArgs, 0, args.length);
            modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;
            return modifiedArgs;
        }
        return args;
    }
}

        GMqttPahoMessageDrivenChannelAdapter

package org.thingsboard.server.transport.emqx.adpter;

import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;

/**
 * @author zhangzhixiang on 2023/4/12
 */
@Slf4j
public class GMqttPahoMessageDrivenChannelAdapter extends MqttPahoMessageDrivenChannelAdapter {

    public GMqttPahoMessageDrivenChannelAdapter(String url, String clientId, MqttPahoClientFactory clientFactory,
                                                String... topic) {
        super(url, clientId, clientFactory, topic);
    }

    /**
     * Fix Bug.
     *
     * <p> 死锁描述:
     * Found one Java-level deadlock:
     * =============================
     * "MQTT Rec: iot-shadow-restapi_sub_hqxzgpcy":
     * waiting for ownable synchronizer 0x00000000d73e9d70, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
     * which is held by "main"
     * "main":
     * waiting to lock monitor 0x00007f5840008bf8 (object 0x00000000d73d2480, a org.springframework.integration
     * .mqtt.inbound.MqttPahoMessageDrivenChannelAdapter),
     * which is held by "MQTT Rec: iot-shadow-restapi_sub_hqxzgpcy"
     *
     * <p> 原因分析:
     * main主线程
     * AbstractEndpoint.start()获取到了ReentrantLock锁
     * MqttPahoMessageDrivenChannelAdapter.scheduleReconnect()但是需要MqttPahoMessageDrivenChannelAdapter对象锁
     * MQTT Rec线程
     * 获取到了MqttPahoMessageDrivenChannelAdapter对象锁,但是需要ReentrantLock锁
     *
     * @param cause
     */
    @Override
    public void connectionLost(Throwable cause) {
        try {
            this.lifecycleLock.lock();
        } catch (Exception e) {
            log.error("Stack Trace: {}", e);
        } finally {
            this.lifecycleLock.unlock();
        }
        super.connectionLost(cause);
    }
}

          MqttConfig

package org.thingsboard.server.transport.emqx.config;

import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.thingsboard.server.transport.emqx.DefaultMqttSubMessageHandler;
import org.thingsboard.server.transport.emqx.MqttSubMessageHandler;
import org.thingsboard.server.transport.emqx.adpter.GMqttPahoMessageDrivenChannelAdapter;
import org.thingsboard.server.transport.emqx.constant.Qos;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.net.ssl.SSLSocketFactory;

/**
 * @author zhangzhixiang on 2023/4/12
 */
@Data
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "emqx")
@IntegrationComponentScan
public class MqttConfig {

    private String url;
    private String username;
    private String password;
    private int timeout;
    private int keepalive;
    private String enabled;
    private String subClientId;
    private String pubClientId;
    private MqttSubMessageHandler messageHandler;
    private MqttPahoMessageDrivenChannelAdapter adapter;
    private boolean dataVerifyEnabled;
    private boolean sslVerifyEnabled;
    private String caCertFile;
    private String clientCertFile;
    private String clientKeyFile;
    private String[] defaultTopics;

    ///
    // mqtt subscribe
    ///
    @PostConstruct
    public void init() {
        inbound();
        addTopic(defaultTopics);
        setMessageHandler(new DefaultMqttSubMessageHandler());
        log.info("EMQX transport started!");
    }

    @PreDestroy
    public void shutdown() {
        log.info("Stopping EMQX transport!");
        removeTopic(defaultTopics);
        log.info("EMQX transport stopped!");
    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        this.subClientId = createMqttClientId(false);
        adapter = new GMqttPahoMessageDrivenChannelAdapter(url, subClientId, mqttClientFactory());
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(Qos.DEFAULT.getValue());
        adapter.setOutputChannel(mqttInputChannel());
        log.info("Success to initialize Mqtt channel adapter for subscribe, " +
                "url=[{}],subClientId=[{}],sslVerifyEnabled=[{}]", url, subClientId, sslVerifyEnabled);
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                messageHandler.receiveMessage(message);
            }
        };
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        options.setConnectionTimeout(timeout);
        options.setKeepAliveInterval(keepalive);
        // automaticReconnect 为 true 表示断线自动重连,但仅仅只是重新连接,并不订阅主题;在 connectComplete 回调函数重新订阅
        options.setAutomaticReconnect(true);
        options.setServerURIs(new String[]{url});
        if (sslVerifyEnabled) {
            try {
                SSLSocketFactory sslSocketFactory = SSLFellow.createSSLSocketFactory(
                        caCertFile, clientCertFile, clientKeyFile);
                options.setSocketFactory(sslSocketFactory);
            } catch (Exception e) {
                log.error("Stack Trace: {}", e);
            }
        }
        factory.setConnectionOptions(options);
        return factory;
    }

    ///
    // mqtt publish
    ///
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        this.pubClientId = createMqttClientId(true);
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler(pubClientId, mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultQos(Qos.DEFAULT.getValue());
        log.info("Success to initialize Mqtt channel adapter for publish, " +
                "url=[{}], pubClientId=[{}]", url, pubClientId);
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    ///
    // public functions
    ///
    public void addTopic(String... topics) {
        this.adapter.addTopic(topics);
    }

    public void removeTopic(String... topics) {
        this.adapter.removeTopic(topics);
    }

    ///
    // private functions
    ///
    private String createMqttClientId(boolean publish) {
        String moduleName = Module.getName();
        if (StringUtils.isBlank(moduleName)) {
            moduleName = "iot_mqtt";
        }
        moduleName += publish ? "_pub_" : "_sub_";
        return moduleName + RandomStringUtils.randomAlphanumeric(8).toLowerCase();
    }
}

        MqttSender

package org.thingsboard.server.transport.emqx.config;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * @author zhangzhixiang on 2023/4/12
 */
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttSender {

    void sendToMqtt(String data);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

        到此SpringBoot 集成 Emqx 发布/订阅数据介绍完成。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SpringBoot 集成 Emqx 发布/订阅数据 的相关文章

随机推荐

  • Spring框架介绍及使用(一)

    文章目录 概念为什么要用 xff1f Spring的体系结构Spring框架之控制反转 xff08 IOC xff09 概念Spring文件包解释入门程序入门程序需要的jar包配置文件入门程序的建立ApplicationContext与Be
  • SpringMVC 相关配置

    SpringMVC 相关配置 打印请求与响应日志 打印 64 RequestBody 64 Response日志 https blog csdn net ww 1997 article details 116006445 https www
  • 普通表到分区表转换

    A 通过 Export import 方法 B 通过 Insert with a subquery 方法 C 通过 Partition Exchange 方法 D 通过 DBMS REDEFINITION 方法 比如把test用户下的普通表
  • Ubuntu 20.04 上安装 Node.js 和 npm 的三种方法

    主要介绍三种在 Ubuntu 20 04 上安装 Node js 和 npm 的方法 xff1a 通过Ubuntu标准软件库 这是最简单的安装方法 xff0c 并且适用大多数场景 但是标准软件库中最高版本只有 v10 19 0 root 6
  • android databinding 数据绑定错误 错误:任务':app:compileDebugJavaWithJavac' 的执行失败

    今天到公司照常打开项目 xff0c 突然运行不了显示databinding错误 Error Execution failed for task 39 app compileDebugJavaWithJavac 39 gt android d
  • 解决idea新建Module的奇怪路径问题

    问题由来 xff1a 在部署SpringCloud的时候想新建一个module来快速创建 xff0c 结果被创建出来的目录结构搞得一脸懵逼 xff0c 新建的module的根目录跑到了 xff0c 项目的src目录下 xff0c 整个看起来
  • ThingsBoard源码解析-数据订阅与规则链数据处理

    前言 结合本篇对规则链的执行过程进行探讨 根据之前对MQTT源码的学习 xff0c 我们由消息的处理入手 org thingsboard server transport mqtt MqttTransportHandler void pro
  • Thingsboard使用gateway网关

    简介 xff1a 本次是想测试一下thingsboard网关的使用 xff0c 实现通过网关 43 mqtt 43 thingsboard 43 emqx 实现间接设备创建和数据传输 前期准备 xff1a thingsboard平台 thi
  • Thingsboard(2.4 postgresql版)数据库表结构说明

    本文描述的表结构是根据thingsboard2 4 xff08 postgresql版 xff09 数据库中整理出来的 xff0c 不一定完整 xff0c 后续有新的发现再补充文档 一 数据库E R关系 Thingsboard2 4社区版共
  • ThingsBoard—自定义规则节点

    一般的功能 xff0c 可以使用现有的节点来完成 但如果有比较复杂 xff0c 或有自己特殊业务需求的 xff0c 可能就需要自定义了 按官方教程来基本就可以入门 xff0c 如果需要深入 xff0c 可以参考ThingsBoard自有节点
  • Thingsboard 报错 Cannot resolve symbol ‘TransportProtos‘

    本人idea 版本为 2021 1 xff0c 顺利编译 thingsboard 打开进行源码阅读时 xff0c 发现报 Cannot resolve symbol 39 TransportProtos 39 xff0c 如下图 xff1a
  • ThingsBoard 规则引擎-邮件通知

    之前我们已经学习了Thingsboard安装 设备接入 简单的数据可视化内容 xff0c 今天来继续学习下thingsboard其他特性 规则引擎 应用场景 ThingsBoard规则引擎是一个支持高度可定制复杂事件处理的框架 xff0c
  • ThingsBoard编译报错:Failure to find org.gradle:gradle-tooling-api:jar:6.3

    删除本地仓库未下载完成的缓存文件 xff08 删除像图片显示这样以 lastUpdated结尾的文件 xff09 执行mvn v确保maven命令可以正常执行执行下面命令 xff0c 将下载的jar安装到本地仓库 注意 xff1a 将 Df
  • Thingsboard3.4-OTA升级

    背景 在做设备端对接thingsboard平台得时候 xff0c 去研究设备端对接平台的过程中 xff0c 花了不少时间 xff0c 在此之前也没有找到相关的文档 xff0c 于是出于减少大家去研究的时间 xff0c 写了这篇博客 xff0
  • PyCharm更换pip源为国内源、模块安装、PyCharm依赖包导入导出教程

    一 更换pip为国内源 1 使用PyCharm创建一个工程 2 通过File gt Setting 选择解释器为本工程下的Python解释器 3 单击下图中添加 43 xff0c 4 单击下图中的 Manage Repositories 按
  • Pycharm没有找到manage repositories按钮解决方案

    问题描述 xff1a 不知道是因为版本原因还是其他 xff0c pycharm没有找到manage repositories按钮 xff0c 无法更改下载源 xff0c 导致安装库的速度会很慢 解决办法 xff1a 1 点击左下角的pyth
  • 通过改变JVM参数配置降低内存消耗

    有个项目 xff0c 其服务器端原本内存占用很大 xff0c 16G内存几乎都用光了 原先的JVM参数配置是这样的 xff1a Xms16384m Xmx16384m XX PermSize 61 64m XX MaxPermSize 61
  • NodeJS yarn 或 npm如何切换淘宝或国外镜像源

    一 查看当前的镜像源 npm config get registry 或 yarn config get registry 二 设置为淘宝镜像源 xff08 全局设置 xff09 npm config set registry https
  • Centos7 部署InfluxDB

    因为目前网络上关于InfluxDB的资料并不多 xff0c 所以这里建议多参考官网 官网 xff1a Home InfluxData 点击此处的Docs xff1a 这里选择 InfluxDB OSS xff1a 使用文档时根据需求选择查看
  • SpringBoot 集成 Emqx 发布/订阅数据

    最近项目中用到Emqx发布 订阅数据 xff0c 特此记录便于日后查阅 ThingsboardEmqxTransportApplication Copyright 2016 2023 The Thingsboard Authors lt p