最近项目中用到Emqx发布/订阅数据,特此记录便于日后查阅。
ThingsboardEmqxTransportApplication
/**
* Copyright © 2016-2023 The Thingsboard Authors
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.thingsboard.server.emqx;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.SpringBootConfiguration;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.integration.config.EnableIntegration;
import java.util.Arrays;
@SpringBootConfiguration
@EnableConfigurationProperties
@EnableIntegration
@ComponentScan({"org.thingsboard.server.transport.emqx"})
public class ThingsboardEmqxTransportApplication {
private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-emqx-transport";
public static void main(String[] args) {
SpringApplication.run(ThingsboardEmqxTransportApplication.class, updateArguments(args));
}
private static String[] updateArguments(String[] args) {
if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {
String[] modifiedArgs = new String[args.length + 1];
System.arraycopy(args, 0, modifiedArgs, 0, args.length);
modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;
return modifiedArgs;
}
return args;
}
}
GMqttPahoMessageDrivenChannelAdapter
package org.thingsboard.server.transport.emqx.adpter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
/**
* @author zhangzhixiang on 2023/4/12
*/
@Slf4j
public class GMqttPahoMessageDrivenChannelAdapter extends MqttPahoMessageDrivenChannelAdapter {
public GMqttPahoMessageDrivenChannelAdapter(String url, String clientId, MqttPahoClientFactory clientFactory,
String... topic) {
super(url, clientId, clientFactory, topic);
}
/**
* Fix Bug.
*
* <p> 死锁描述:
* Found one Java-level deadlock:
* =============================
* "MQTT Rec: iot-shadow-restapi_sub_hqxzgpcy":
* waiting for ownable synchronizer 0x00000000d73e9d70, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
* which is held by "main"
* "main":
* waiting to lock monitor 0x00007f5840008bf8 (object 0x00000000d73d2480, a org.springframework.integration
* .mqtt.inbound.MqttPahoMessageDrivenChannelAdapter),
* which is held by "MQTT Rec: iot-shadow-restapi_sub_hqxzgpcy"
*
* <p> 原因分析:
* main主线程
* AbstractEndpoint.start()获取到了ReentrantLock锁
* MqttPahoMessageDrivenChannelAdapter.scheduleReconnect()但是需要MqttPahoMessageDrivenChannelAdapter对象锁
* MQTT Rec线程
* 获取到了MqttPahoMessageDrivenChannelAdapter对象锁,但是需要ReentrantLock锁
*
* @param cause
*/
@Override
public void connectionLost(Throwable cause) {
try {
this.lifecycleLock.lock();
} catch (Exception e) {
log.error("Stack Trace: {}", e);
} finally {
this.lifecycleLock.unlock();
}
super.connectionLost(cause);
}
}
MqttConfig
package org.thingsboard.server.transport.emqx.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.thingsboard.server.transport.emqx.DefaultMqttSubMessageHandler;
import org.thingsboard.server.transport.emqx.MqttSubMessageHandler;
import org.thingsboard.server.transport.emqx.adpter.GMqttPahoMessageDrivenChannelAdapter;
import org.thingsboard.server.transport.emqx.constant.Qos;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.net.ssl.SSLSocketFactory;
/**
* @author zhangzhixiang on 2023/4/12
*/
@Data
@Slf4j
@Configuration
@ConfigurationProperties(prefix = "emqx")
@IntegrationComponentScan
public class MqttConfig {
private String url;
private String username;
private String password;
private int timeout;
private int keepalive;
private String enabled;
private String subClientId;
private String pubClientId;
private MqttSubMessageHandler messageHandler;
private MqttPahoMessageDrivenChannelAdapter adapter;
private boolean dataVerifyEnabled;
private boolean sslVerifyEnabled;
private String caCertFile;
private String clientCertFile;
private String clientKeyFile;
private String[] defaultTopics;
///
// mqtt subscribe
///
@PostConstruct
public void init() {
inbound();
addTopic(defaultTopics);
setMessageHandler(new DefaultMqttSubMessageHandler());
log.info("EMQX transport started!");
}
@PreDestroy
public void shutdown() {
log.info("Stopping EMQX transport!");
removeTopic(defaultTopics);
log.info("EMQX transport stopped!");
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
this.subClientId = createMqttClientId(false);
adapter = new GMqttPahoMessageDrivenChannelAdapter(url, subClientId, mqttClientFactory());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(Qos.DEFAULT.getValue());
adapter.setOutputChannel(mqttInputChannel());
log.info("Success to initialize Mqtt channel adapter for subscribe, " +
"url=[{}],subClientId=[{}],sslVerifyEnabled=[{}]", url, subClientId, sslVerifyEnabled);
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
messageHandler.receiveMessage(message);
}
};
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
// automaticReconnect 为 true 表示断线自动重连,但仅仅只是重新连接,并不订阅主题;在 connectComplete 回调函数重新订阅
options.setAutomaticReconnect(true);
options.setServerURIs(new String[]{url});
if (sslVerifyEnabled) {
try {
SSLSocketFactory sslSocketFactory = SSLFellow.createSSLSocketFactory(
caCertFile, clientCertFile, clientKeyFile);
options.setSocketFactory(sslSocketFactory);
} catch (Exception e) {
log.error("Stack Trace: {}", e);
}
}
factory.setConnectionOptions(options);
return factory;
}
///
// mqtt publish
///
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
this.pubClientId = createMqttClientId(true);
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler(pubClientId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultQos(Qos.DEFAULT.getValue());
log.info("Success to initialize Mqtt channel adapter for publish, " +
"url=[{}], pubClientId=[{}]", url, pubClientId);
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
///
// public functions
///
public void addTopic(String... topics) {
this.adapter.addTopic(topics);
}
public void removeTopic(String... topics) {
this.adapter.removeTopic(topics);
}
///
// private functions
///
private String createMqttClientId(boolean publish) {
String moduleName = Module.getName();
if (StringUtils.isBlank(moduleName)) {
moduleName = "iot_mqtt";
}
moduleName += publish ? "_pub_" : "_sub_";
return moduleName + RandomStringUtils.randomAlphanumeric(8).toLowerCase();
}
}
MqttSender
package org.thingsboard.server.transport.emqx.config;
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* @author zhangzhixiang on 2023/4/12
*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttSender {
void sendToMqtt(String data);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
到此SpringBoot 集成 Emqx 发布/订阅数据介绍完成。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)