使用RabbitMQ模拟订单超时自动关闭订单及释放库存

2023-05-16

使用MQ模拟订单超时自动关闭订单及释放库存

    • 依赖
    • RabbitMQ 消息可靠投递配置、yml配置
    • springBean自动创建交换机、队列(订单超时-死信队列)、绑定
    • 监听队列处理示例
      • 1、模拟创建订单
      • 2、监听到订单超时了,开始业务处理,并通知释放库存队列
      • 3、监听释放库存队列,自动释放库存

依赖

        <!-- rabbitMQ 管理-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.7.1</version>
        </dependency>
        <!--dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.14.2</version>
        </dependency>-->

RabbitMQ 消息可靠投递配置、yml配置

package cn.jf.system.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

/**
 * RabbitMQ配置类
 *
 * @author jf
 * @version 1.0
 * @Description 描述
 * @date 2022/07/05 15:47
 */
@Slf4j
@Configuration
public class MyRabbitMQConfig {

    @Autowired
//    @Qualifier("redisTemplate")
    private RabbitTemplate rabbitTemplate;

//    @Primary
//    @Bean(name = "rabbitTemplate")
//    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
//        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//        rabbitTemplate.setMessageConverter(messageConverter());
//        initRabbitTemplate();
//        return rabbitTemplate;
//    }

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 定制RabbitTemplate<p>
     * -@PostConstruct: MyRabbitMQConfig 对象创建完成以后,执行这个方法<p>
     * 1、服务收到消息就会回调<p>
     * ---- spring.rabbitmq.publisher-confirms: true<p>
     * ---- 设置确认回调 setConfirmCallback<p>
     * 2、消息正确抵达队列就会进行回调<p>
     * ---- spring.rabbitmq.publisher-returns: true<p>
     * ---- spring.rabbitmq.template.mandatory: true<p>
     * ---- 设置确认回调ReturnCallback<p>
     * <p>
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)<p>
     * ---- 默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息<p>
     * -------- 问题: 在处理信息时宕机了,会把所有的消息确认了,<p>
     * -------- 解决:需要手动确认信息,手动ack消息,不使用默认的消费端确认 spring.rabbitmq.listener.simple.acknowledge-mode=manual<p>
     * ---- 签收了货物 channel.basicAck(deliveryTag, false);拒绝签收货物channel.basicNack(deliveryTag, false, false);<p>
     */
    @PostConstruct
    public void initRabbitTemplate() {
        /*
         * 设置确认回调
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         */
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("confirm." + correlationData + "==>ack:[" + ack + "]==>errorMsg:[" + cause + "]");
            }
        });
        /*
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路由键
         */
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("==>errorMsg[" + returned.getMessage() + "] ==>code[" + returned.getReplyCode() + "]" +
                        "==>text[" + returned.getReplyText() + "] ==>exchange[" + returned.getExchange() + "] ==>routingKey[" + returned.getRoutingKey() + "].\r\n");
            }
        });
    }
}


yml 程序配置

spring: 
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    # 虚拟主机配置
    virtual-host: /
    # 开启发送端消息抵达Broker确认类型【simple、correlated、none】
    publisher-confirm-type: simple
    # 开启发送端消息抵达Queue确认
    publisher-returns: true
    # 只要消息抵达Queue,就会异步发送优先回调 returnfirm
    template:
      mandatory: true
    # 手动ack消息,不使用默认的消费端确认
    listener:
      simple:
        acknowledge-mode: manual

springBean自动创建交换机、队列(订单超时-死信队列)、绑定

package cn.jf.system.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;

/**
 * 自动创建 Exchange类型:点对点(direct:精准匹配(完全匹配、单播模式)、header)、订阅(fanout:广播模式、topic:路由键模式)、队列、绑定<br/>
 * <p>
 * -- new Exchange (字符串名称、布尔持久型、布尔型自动删除、参数);<br/><p>
 * ---- String name, boolean durable, boolean autoDelete, Map< String, Object> arguments<br/><p>
 * -- new Queue(队列名字, 是否持久化, 是否独占, 是否自动删除, 参数);<br/><p>
 * ---- name - 队列的名称 - 不能为空;设置为 "" 让代理生成名称。<br/>
 * ---- durable - 持久 - 如果我们声明一个持久队列,则为真(该队列将在服务器重新启动后继续存在)<br/>
 * ---- exclusive - 独占 - 如果我们声明一个独占队列,则为真(该队列将仅由声明者的连接使用)<br/>
 * ---- autoDelete – 如果服务器在不再使用队列时应该删除队列,则为 true<br/>
 * ---- arguments - 用于声明队列的参数<br/>
 * <p>
 * 容器中的Queue、Exchange、Binding 会自动创建(在RabbitMQ)不存在的情况下<br/>
 *
 * @author jf
 * @version 1.0
 * @Description 描述
 * @date 2022/07/05 15:47
 */
@Configuration
public class MyAutoBeanMQConfig {
//交换机///

    /**
     * 创建 Topic 类型的Exchange
     *
     * @return
     */
    @Bean
    public Exchange demoEventTopicExchange() {
        return new TopicExchange("demo-topic-exchange", true, false);
    }

//消息队列///

    /*
     * 死信队列
     * <p>
     * 对队列设置过期,而不是对消息设置过期
     * <p>
     * 设计建议规范:(基于事件模型的交换机设计)<p>
     * 1、交换机命名:业务+ exchange; 交换机为Topic<p>
     * 2、路由键:事件+需要感知的业务(可以不写)<p>
     * 3、队列命名:事件+想要监听服务名+ queue<p>
     * 4、绑定关系:事件+感知的业务(#)<p>
     */

    /**
     * 订单延迟队列
     *
     * @return
     */
    @Bean
    public Queue demoDelayQueue() {
        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange", "demo-topic-exchange");
        arguments.put("x-dead-letter-routing-key", "demo.release.order");
        // 消息过期时间 单位(毫秒)测试以1分钟为例
        arguments.put("x-message-ttl", 60000);
        //消息过期后丢到demo.delay.queue队列,而不是删除
        return new Queue("demo.delay.queue", true, false, false, arguments);
    }

    /**
     * 释放订单队列
     *
     * @return
     */
    @Bean
    public Queue demoReleaseQueue() {
        return new Queue("demo.release.order.queue", true, false, false);
    }

    /**
     * 释放库存队列
     *
     * @return
     */
    @Bean
    public Queue stockReleaseQueue() {
        return new Queue("demo.release.stock.queue", true, false, false);
    }

/绑定

    /**
     * 绑定订单延迟队列
     * <p>
     * 消息队列:demo.delay.queue --> 交换机:demo-topic-exchange --路由键:demo.create.order
     *
     * @return
     */
    @Bean
    public Binding demoCreateBinding() {
        return new Binding("demo.delay.queue",
                Binding.DestinationType.QUEUE,
                "demo-topic-exchange",
                "demo.create.order",
                null);
    }

    /**
     * 绑定释放订单队列
     * <p>
     * 消息队列:demo.release.order.queue --> 交换机:demo-topic-exchange --路由键:demo.release.order
     */
    @Bean
    public Binding demoReleaseBinding() {
        return new Binding("demo.release.order.queue",
                Binding.DestinationType.QUEUE,
                "demo-topic-exchange",
                "demo.release.order.#",
                null);
    }

    /**
     * 订单释放直接和库存释放进行绑定
     */
    @Bean
    public Binding demoReleaseOtherBinding() {
        return new Binding("demo.release.stock.queue",
                Binding.DestinationType.QUEUE,
                "demo-topic-exchange",
                "demo.release.stock.#",
                null);
    }

}


监听队列处理示例

1、模拟创建订单

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 创建订单消息
     */
    @GetMapping("/sendOrderMessage")
    public R sendOrderMessage() {
        for (int i = 1; i < 4; i++) {
            MqEntity mqEntity = new MqEntity();
            mqEntity.setExchange("demo-topic-exchange");
            mqEntity.setMessage("订单id--" + i);
            rabbitTemplate.convertAndSend(mqEntity.getExchange(), "demo.create.order",
                    mqEntity, new CorrelationData(UUID.randomUUID().toString()));
        }
        return R.success("创建订单消息发送完成");
    }

2、监听到订单超时了,开始业务处理,并通知释放库存队列

package cn.jf.system.listener;

import cn.jf.system.entity.MqEntity;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 监听关闭订单队列,定时关闭订单
 *
 * @author jf
 * @version 1.0
 * @Description 描述
 * @date 2022/07/05 19:32
 */
@Slf4j
@RabbitListener(queues = "demo.release.order.queue")
@Component
public class OrderListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitHandler
    public void listener(MqEntity mqEntity, Channel channel, Message message) throws IOException {
        log.info("准备关闭订单:{}", mqEntity.getMessage());
        try {
            log.info("关闭订单{}--OK", mqEntity.getMessage());
            //并通知库存队列
            mqEntity.setMessage("释放库存");
            rabbitTemplate.convertAndSend(mqEntity.getExchange(), "demo.release.stock", mqEntity);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("关闭订单{}--error", mqEntity.getMessage());
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }

    }

}

3、监听释放库存队列,自动释放库存

package cn.jf.system.listener;

import cn.jf.system.entity.MqEntity;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * 监听释放库存队列,自动释放库存
 *
 * @author jf
 * @version 1.0
 * @Description 描述
 * @date 2022/07/12 18:19
 */
@Slf4j
@RabbitListener(queues = "demo.release.stock.queue")
@Component
public class StockListener {

    @RabbitHandler
    public void listener(MqEntity mqEntity, Channel channel, Message message) throws IOException {
        log.info("准备释放库存:{}", mqEntity.getMessage());
        try {
            //TODO Service层处理关闭
            log.info("释放库存--OK");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            log.error("释放库存--error");
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }

    }
}


查看交换机
在这里插入图片描述查看队列
在这里插入图片描述模拟创建订单:http://127.0.0.1:9210/jf-system-dev/mq/sendOrderMessage

在这里插入图片描述消息投递到消息服务端情况
在这里插入图片描述
查看结果
在这里插入图片描述
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用RabbitMQ模拟订单超时自动关闭订单及释放库存 的相关文章

随机推荐