前提说明
客户端发送两个topic消息,第一个消息依赖第二个消息传入的值。两个topic分别为topic1、topic2
业务说明
topic1消息订阅之后,进行相关业务处理,查库插库等操作,然后轮训redis等待10s,redis中存的时topic2放入的数据。
现象说明
在轮训过程中,一直没有监听到topic2上报的数据,但是会在轮训10s结束之后,收到topic2发布的消息
原因说明
mqtt消息单线程,默认情况下是一条消息处理完才会处理下发消息。
解决方案
增加线程池,多线程处理
相关代码如下:
@Component()
@Slf4j
public class ReceiveMessageListener implements MessageHandler {
@Autowired
private ReceiveMessagehandler receiveMessagehandler;
/**
* 默认线程池
* 如果处理器无定制线程池,则使用此默认
*/
ExecutorService defaultExecutor = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors() + 1,
Runtime.getRuntime().availableProcessors() + 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1000), new ThreadFactoryBuilder().setNameFormat("mqtt-pool-%d").build());
@Override
public void handleMessage(Message<?> message) {
defaultExecutor.submit(() -> receiveMessagehandler.execute(message));
}
}
@Component
@Slf4j
public class ReceiveMessagehandler {
@Autowired
private SpringBeanHandler springBeanHandler;
@Autowired
private MqttAuthClient mqttAuthClient;
public void execute(Message<?> message) {
//根据业务具体消息解析内容
}