使用MQ模拟订单超时自动关闭订单及释放库存
- 依赖
- RabbitMQ 消息可靠投递配置、yml配置
- springBean自动创建交换机、队列(订单超时-死信队列)、绑定
- 监听队列处理示例
- 1、模拟创建订单
- 2、监听到订单超时了,开始业务处理,并通知释放库存队列
- 3、监听释放库存队列,自动释放库存
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.7.1</version>
</dependency>
RabbitMQ 消息可靠投递配置、yml配置
package cn.jf.system.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Slf4j
@Configuration
public class MyRabbitMQConfig {
@Autowired
private RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
@PostConstruct
public void initRabbitTemplate() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("confirm." + correlationData + "==>ack:[" + ack + "]==>errorMsg:[" + cause + "]");
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("==>errorMsg[" + returned.getMessage() + "] ==>code[" + returned.getReplyCode() + "]" +
"==>text[" + returned.getReplyText() + "] ==>exchange[" + returned.getExchange() + "] ==>routingKey[" + returned.getRoutingKey() + "].\r\n");
}
});
}
}
yml 程序配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
publisher-confirm-type: simple
publisher-returns: true
template:
mandatory: true
listener:
simple:
acknowledge-mode: manual
springBean自动创建交换机、队列(订单超时-死信队列)、绑定
package cn.jf.system.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
@Configuration
public class MyAutoBeanMQConfig {
@Bean
public Exchange demoEventTopicExchange() {
return new TopicExchange("demo-topic-exchange", true, false);
}
@Bean
public Queue demoDelayQueue() {
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "demo-topic-exchange");
arguments.put("x-dead-letter-routing-key", "demo.release.order");
arguments.put("x-message-ttl", 60000);
return new Queue("demo.delay.queue", true, false, false, arguments);
}
@Bean
public Queue demoReleaseQueue() {
return new Queue("demo.release.order.queue", true, false, false);
}
@Bean
public Queue stockReleaseQueue() {
return new Queue("demo.release.stock.queue", true, false, false);
}
@Bean
public Binding demoCreateBinding() {
return new Binding("demo.delay.queue",
Binding.DestinationType.QUEUE,
"demo-topic-exchange",
"demo.create.order",
null);
}
@Bean
public Binding demoReleaseBinding() {
return new Binding("demo.release.order.queue",
Binding.DestinationType.QUEUE,
"demo-topic-exchange",
"demo.release.order.#",
null);
}
@Bean
public Binding demoReleaseOtherBinding() {
return new Binding("demo.release.stock.queue",
Binding.DestinationType.QUEUE,
"demo-topic-exchange",
"demo.release.stock.#",
null);
}
}
监听队列处理示例
1、模拟创建订单
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendOrderMessage")
public R sendOrderMessage() {
for (int i = 1; i < 4; i++) {
MqEntity mqEntity = new MqEntity();
mqEntity.setExchange("demo-topic-exchange");
mqEntity.setMessage("订单id--" + i);
rabbitTemplate.convertAndSend(mqEntity.getExchange(), "demo.create.order",
mqEntity, new CorrelationData(UUID.randomUUID().toString()));
}
return R.success("创建订单消息发送完成");
}
2、监听到订单超时了,开始业务处理,并通知释放库存队列
package cn.jf.system.listener;
import cn.jf.system.entity.MqEntity;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@RabbitListener(queues = "demo.release.order.queue")
@Component
public class OrderListener {
@Autowired
private RabbitTemplate rabbitTemplate;
@RabbitHandler
public void listener(MqEntity mqEntity, Channel channel, Message message) throws IOException {
log.info("准备关闭订单:{}", mqEntity.getMessage());
try {
log.info("关闭订单{}--OK", mqEntity.getMessage());
mqEntity.setMessage("释放库存");
rabbitTemplate.convertAndSend(mqEntity.getExchange(), "demo.release.stock", mqEntity);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("关闭订单{}--error", mqEntity.getMessage());
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
3、监听释放库存队列,自动释放库存
package cn.jf.system.listener;
import cn.jf.system.entity.MqEntity;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@RabbitListener(queues = "demo.release.stock.queue")
@Component
public class StockListener {
@RabbitHandler
public void listener(MqEntity mqEntity, Channel channel, Message message) throws IOException {
log.info("准备释放库存:{}", mqEntity.getMessage());
try {
log.info("释放库存--OK");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("释放库存--error");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
查看交换机
查看队列
模拟创建订单:http://127.0.0.1:9210/jf-system-dev/mq/sendOrderMessage
消息投递到消息服务端情况
查看结果
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)