分布式 Redis & RabbitMQ 终极秒杀

2023-05-16

♨️本篇文章记录的为RabbitMQ知识中企业级项目中秒杀相关内容,适合在学Java的小白,帮助新手快速上手,也适合复习中,面试中的大佬🙉🙉🙉。
♨️如果文章有什么需要改进的地方还请大佬不吝赐教❤️🧡💛

💖个人主页 : 阿千弟
💖点击这里👉👉👉: RabbitMQ专栏学习

上期我们使用阻塞队列分布式锁Redission分布式锁对业务功能进行优化,解决了在分布式环境下的秒杀安全问题,但是呢阻塞队列的确不够优雅, 而且还存在很多问题, 比如说这时候突然停电了,我们队列里的消息没来得及被消费就消失了.这就要出大问题.

现在我们要用一种更优雅的方式实现异步下单, 今天我们的主角就是RabbitMQ.

文章目录

      • 🐇思路分析
      • 🐇代码操作
      • 🐇展示结果
      • 🐇补充

🐇思路分析

  1. 我们先把MYSQL数据库中的订单数量在redis中保存一份.
  2. 在秒杀的业务中我们还是先执行Lua脚本, 判断我们是否具有购买资格, 如果没有购买资格我们直接返回结果;
  3. 如果有有购买资格,Lua脚本会直接进行redis中的订单扣减操作,进行秒杀
  4. 同时在redis的扣减操作完成后, 把下单信息保存到阻塞队列进行异步执行, 实现MYSQL数据库中订单扣减的同步操作,完美.

在这里插入图片描述

🐇代码操作

思路听起来很简单,下面我们来进行具体操作.

先引入RabbitMQ的核心依赖

		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

在yml文件中进行相关配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: /
    username: guest
    password: guest
    publisher-confirm-type: correlated
    publisher-returns: true
    template:
      mandatory: true
      retry:
        #发布重试,默认false
        enabled: true
        #重试时间 默认1000ms
        initial-interval: 1000
        #重试最大次数 最大3
        max-attempts: 3
        #重试最大间隔时间
        max-interval: 10000
        #重试的时间隔乘数,比如配20 第一次等于10s,第二次等于20s,第三次等于40s
        multiplier: 1
    listener:
      # 默认配置是simple
      type: simple
      simple:
        # 手动ack Acknowledge mode of container. auto none
        acknowledge-mode: manual
        #消费者调用程序线程的最小数量
        concurrency: 10
        #消费者最大数量
        max-concurrency: 10
        #限制消费者每次只处理一条信息,处理完在继续下一条
        prefetch: 1
        #启动时是否默认启动容器
        auto-startup: true
        #被拒绝时重新进入队列
        default-requeue-rejected: true

配置RabbitMQConfig文件

@Slf4j
@Configuration
public class RabbitMQConfig {

    @Bean("RabbitTemplate")
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        //设置Json转换器
        rabbitTemplate.setMessageConverter(jsonMessageConverter());

        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                log.info("ConfirmCallback:     "+"相关数据:"+correlationData);
                log.info("ConfirmCallback:     "+"确认情况:"+ack);
                log.info("ConfirmCallback:     "+"原因:"+cause);
            }
        });

        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {

            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.info("return 执行了....");
                log.info("ReturnCallback:     "+"消息:"+returnedMessage.getMessage().toString());
                log.info("ReturnCallback:     "+"回应码:"+returnedMessage.getReplyCode());
                log.info("ReturnCallback:     "+"回应信息:"+returnedMessage.getReplyText());
                log.info("ReturnCallback:     "+"交换机:"+returnedMessage.getExchange());
                log.info("ReturnCallback:     "+"路由键:"+returnedMessage.getRoutingKey());
            }
        });

        return rabbitTemplate;
    }

    /**
     * Json转换器
     */
    @Bean
    public Jackson2JsonMessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }

    //交换机名称
    public static final String SECKILL_EXCHANGE ="seckill_exchange";

    //队列名称
    public static final String ORDER_QUEUE ="order_queue";

    @Bean("ORDER_QUEUE")
    Queue confirmQueue(){
        return QueueBuilder.durable(ORDER_QUEUE).build();
    }

    /**
     * 创建一个交换机
     * @return
     */
    @Bean("SECKILL_EXCHANGE")
    Exchange confirmExchange(){
        return ExchangeBuilder.topicExchange(SECKILL_EXCHANGE).durable(true).build();
    }

    @Bean
    Binding confirmExchange(@Qualifier("SECKILL_EXCHANGE") Exchange exchange,
                            @Qualifier("ORDER_QUEUE") Queue queue){
        //bind(queue) 绑定队列 to(exchange) 绑定交换机 with("") routingKey这里绑定的是空白可以按照自己的要求绑定  noargs() 没有参数
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
    }
}

补充 : RabbitMQ的Json转换器尤为重要, 因为rabbitMQ中发送和接收的都是字符串/字节数组类型的消息, 所以我们要把java对象转成json串进行发送.

Lua脚本

-- 1.参数列表
-- 1.1.优惠券id
local voucherId = ARGV[1]
-- 1.2.用户id
local userId = ARGV[2]
-- 2.数据key
-- 2.1.库存key
local stockKey = 'seckill:stock:' .. voucherId
-- 2.2.订单key
local orderKey = 'seckill:order:' .. voucherId

-- 3.脚本业务
-- 3.1.判断库存是否充足 get stockKey
if(tonumber(redis.call('get', stockKey)) <= 0) then
    -- 3.2.库存不足,返回1
    return 1
end
-- 3.2.判断用户是否下单 SISMEMBER orderKey userId
if(redis.call('sismember', orderKey, userId) == 1) then
    -- 3.3.存在,说明是重复下单,返回2
    return 2
end
-- 3.4.扣库存 incrby stockKey -1
redis.call('incrby', stockKey, -1)
-- 3.5.下单(保存用户)sadd orderKey userId
redis.call('sadd', orderKey, userId)
return 0

下面是秒杀业务代码

@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService seckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Resource
    private RedissonClient redissonClient;

    @Resource(description = "RabbitTemplate")
    private RabbitTemplate rabbitTemplate;

    private static final DefaultRedisScript<Long> SECKILL_SCRIPT;

    static {
        SECKILL_SCRIPT = new DefaultRedisScript<>();
        SECKILL_SCRIPT.setLocation(new ClassPathResource("seckill.lua"));
        SECKILL_SCRIPT.setResultType(Long.class);
    }

    private IVoucherOrderService proxy;


    public void handleVoucherOrder(VoucherOrder voucherOrder) {
        //1.获取用户
        Long userId = voucherOrder.getUserId();
        // 2.创建锁对象
        RLock redisLock = redissonClient.getLock("lock:order:" + userId);
        // 3.尝试获取锁
        boolean isLock = redisLock.tryLock();
        // 4.判断是否获得锁成功
        if (!isLock) {
            // 获取锁失败,直接返回失败或者重试
            log.error("不允许重复下单!");
            return;
        }
        try {
            //注意:由于是spring的事务是放在threadLocal中,此时的是多线程,事务会失效
            createVoucherOrder(voucherOrder);
        } finally {
            // 释放锁
            redisLock.unlock();
        }
    }


    @Override
    public Result seckillVoucher(Long voucherId) {
        //获取用户
        Long userId = UserHolder.getUser().getId();

        // 1.执行lua脚本
        Long result = stringRedisTemplate.execute(
                SECKILL_SCRIPT,
                Collections.emptyList(),
                voucherId.toString(), userId.toString()
        );
        int r = result.intValue();
        // 2.判断结果是否为0
        if (r != 0) {
            // 2.1.不为0 ,代表没有购买资格
            return Result.fail(r == 1 ? "库存不足" : "不能重复下单");
        }
        // 2.2.为0 ,有购买资格,把下单信息保存到阻塞队列
        VoucherOrder voucherOrder = new VoucherOrder();
        // 2.3.订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        // 2.4.用户id
        voucherOrder.setUserId(userId);
        // 2.5.代金券id
        voucherOrder.setVoucherId(voucherId);
        // 2.6.放入mq
        rabbitTemplate.convertAndSend("seckill_exchange", "order.voucher", voucherOrder);
        // 3.获取代理对象
        proxy = (IVoucherOrderService) AopContext.currentProxy();
        // 4.返回订单id
        return Result.ok(orderId);
    }


    @Transactional
    @Override
    public void createVoucherOrder(VoucherOrder voucherOrder) {
        Long userId = voucherOrder.getUserId();
        // 5.1.查询订单
        int count = Math.toIntExact(query().eq("user_id", userId).eq("voucher_id", voucherOrder.getVoucherId()).count());
        // 5.2.判断是否存在
        if (count > 0) {
            // 用户已经购买过了
            log.error("不允许重复下单!");
            return;
        }

        // 6.扣减库存
        boolean success = seckillVoucherService.update()
                .setSql("stock = stock - 1") // set stock = stock - 1
                .eq("voucher_id", voucherOrder.getVoucherId())
                .gt("stock", 0) // where id = ? and stock > 0
                .update();
        if (!success) {
            // 扣减失败
            log.error("库存不足!");
            return;
        }

        // 7.创建订单
        save(voucherOrder);

    }

}

总得有个消费者来监听我们的队列

消费者代码

@Slf4j
@Component
@RabbitListener(queues = "order_queue")
public class VoucherOrderListener implements ChannelAwareMessageListener {

    @Autowired
    VoucherOrderServiceImpl voucherOrderService;
    
    @RabbitHandler(isDefault=true)
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

        try {
            //1.接收转换消息
            ObjectMapper mapper = new ObjectMapper();
            VoucherOrder voucherOrder = mapper.readValue(message.getBody(), VoucherOrder.class);
            //2. 处理业务逻辑
            try {
                // 2.创建订单

                voucherOrderService.handleVoucherOrder(voucherOrder);
                System.out.println("订单已执行");
            } catch (Exception e) {
                log.error("处理订单异常", e);
            }
            //3. 手动签收
            channel.basicAck(deliveryTag,true);
        } catch (Exception e) {

            //4.拒绝签收
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);

        }
    }
}

代码部分就这么些, 思路并不算复杂

在这里插入图片描述

🐇展示结果

在这里插入图片描述

这里显示抢购成功了

我们再看看redis和mysql中情况如何

在这里插入图片描述

在这里插入图片描述
显而易见, 表中对应的订单数量也都扣减了, 完美.

🐇补充

RabbitMQ中JSON格式转换为实体对象

//将JSON格式数据转换为实体对象
ObjectMapper mapper = new ObjectMapper();
UserInfo userInfo = mapper.readValue(message.getBody(), VoucherOrder.class);

RabbitMQ中JSON格式转换为Map对象

 @Test
    public void sender() throws AmqpException
    {
        //创建用户信息Map
        Map<String, Object> userMap = new HashMap<>();
        userMap.put("userId", "1");
        userMap.put("userName", "阿千弟的博客");
        userMap.put("blogUrl", "https://blog.csdn.net/qq_51033936");
        userMap.put("userRemark", "您好,欢迎访问 阿千弟的博客");
 
        /**
         * 发送消息,参数说明:
         * String exchange:交换器名称。
         * String routingKey:路由键。
         * Object object:发送内容。
         */
        rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE_NAME, RabbitMqConfig.DIRECT_ROUTING_KEY, userMap);
//将JSON格式数据转换为Map对象
@RabbitListener(queues = "order_queue")
public class VoucherOrderListener implements ChannelAwareMessageListener {

    @Autowired
    VoucherOrderServiceImpl voucherOrderService;

    @RabbitHandler(isDefault=true)
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {

        long deliveryTag = message.getMessageProperties().getDeliveryTag();

       //将JSON格式数据转换为Map对象
            ObjectMapper mapper = new ObjectMapper();
            JavaType javaType = mapper.getTypeFactory().constructMapType(Map.class, String.class, Object.class);
            Map<String, Object> resultMap = mapper.readValue(message.getBody(),javaType);
 
            System.out.println("接收者收到Map格式消息:");
            System.out.println("用户编号:" + resultMap.get("userId"));
            System.out.println("用户名称:" + resultMap.get("userName"));
            System.out.println("博客地址:" + resultMap.get("blogUrl"));
            System.out.println("博客信息:" + resultMap.get("userRemark"));
 
            //确认消息
            channel.basicAck(deliveryTag, true)
        } catch (Exception e) {

            //4.拒绝签收
            /*
            第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
             */
            channel.basicNack(deliveryTag,true,true);
        }
    }
}

注意!!!如果@RabbitListener加在类上面,需要有一个默认的处理方法@RabbitHandler(isDefault=true),默认是false。不设置一个true,消费mq消息的时候会出现“Listener method ‘no match’ threw exception”异常。
在这里插入图片描述

如果这篇【文章】有帮助到你💖,希望可以给我点个赞👍,创作不易,如果有对Java后端或者对redis感兴趣的朋友,请多多关注💖💖💖
💖个人主页:阿千弟
如果大家对redis相关知识感兴趣请点击这里👉👉👉redis专栏学习

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

分布式 Redis & RabbitMQ 终极秒杀 的相关文章

随机推荐

  • nginx的配置和详解

    nginx简介 nginx xff08 发音同engine x xff09 是一款轻量级的Web服务器 反向代理服务器及电子邮件 xff08 IMAP POP3 xff09 代理服务器 xff0c 并在一个BSD like协议下发行 ngi
  • The following packages have unmet dependencies问题解决

    当出现这些问题时一直追加安装即可
  • Python下载网易云音乐(云音乐飙升榜)

    最近突然想用python写一个自动下载的工具 xff0c 于是就先拿网易云来练练手 xff0c 并把过程中的心得写下来便于后面有想玩这个的童鞋们参考 首先我们分析网页源码 xff0c 找到我们想要的获取数据位置 xff1a 每一个标签对应着
  • ubuntu20.04 桌面图标显示异常及解决方法

    前言 更新至ubuntu20 04后 xff0c 出现了一些以前没有的问题 桌面上有些图标不显示 文章目录 前言一 具体表现二 原因三 解决方法总结 一 具体表现 例如有一次我在做备忘录时 我习惯地打开终端 span class token
  • Java类名的命名规则

    1 类名必须使用有意义的名字 xff1b 2 类名的每个单词的首字母必须大写 帕斯卡命名法 xff1b 3 类名不能使用数字 除了 和 之外的任何符号 xff0c 中间不能添加空格 xff0c 不能使用java关键字 xff1b 如 xff
  • firewalld高级配置

    1 IP地址伪装 masquerade xff1a 伪装 通过地址伪装 xff0c NAT设备将经过设备的包转发到指定接收方 xff0c 同时将通过的数据包的原地址更改为NAT的接口地址转发到不同步目的地 当是返回数据包是 xff0c 会将
  • Java中关于JSON格式数据的操作

    对于java格式数据的处理 xff1a 1 xff1a 先创建java实体类 xff0c 例如 xff1a public class Brand private String id private String brandName publ
  • 线程常用调度方法

    目录 一 线程等待 二 线程通知 三 线程休眠 四 请求让出CPU执行权 五 线程中断 一 线程等待 1 wait xff08 xff09 xff1a 当一个线程调用了wait xff08 xff09 方法后 xff0c 这个线程会被阻塞挂
  • centos7 安装jdk详细教程

    一 前言 本文主要介绍的是Centos7 Linux环境下安装jdk 8u333的详细图文教程 xff0c 用过linux服务器的开发人员都知道 xff0c JDK是作为日常开发常用的基础环境 xff0c 所以安装jdk是必要的 xff0c
  • KDE 美化(Manjaro)-记录

    KDE 美化 Manjaro 要想在不同的工具包之间获得相似的外观 xff0c 你很可能需要修改以下内容 xff1a 主题 包含一套风格 图标主题和颜色主题 风格 图形布置 xff0c 观感 图标主题 一套整体的图标 颜色主题 一套连接风格
  • spring容器对Bean组件的管理

    spring容器对Bean组件的管理 1 Bean对象创建时机 默认是随着容器创建 xff0c 可以使用lazy init 61 true xff08 在调用getBean创建 xff09 延迟创建 xff0c 也可以使用 lt beans
  • nginx平滑升级(添加echo功能)配置和状态监控

    添加echo模块 配置 1 先去github或者gitee中找到nginx module echo master zip包 2 将原来的ngin 1 20 1删除 重新编译安装 span class token punctuation sp
  • 字节对齐的原理和方法

    Pragma是什么 小发猫的博客 CSDN博客 pragma是什么 Pragma是什么 Pragma是什么 翻译 SkyJacker后附英文原文 译者注 一句话 xff0c pragma就是为了让编译器编译出的C或C 43 43 程序与机器
  • 【Android】Banner2.1的使用

    com youth banner Banner 2 1的使用 与前版本不同的是 xff0c 2 1版本是用的适配器 设置适配器和点击事件 banner span class token punctuation span span class
  • linux系统中安装Java环境

    Ubuntu安装Java环境 步骤1 xff1a 下载jdk 我选择的jdk版本文件 xff1a jdk 8u131 linux x64 tar gz 步骤2 xff1a 创建单独的目录 sudo mkdir usr local java
  • SpringMVC --01.2023Idea搭建全注解式开发的SpringMVC

    1 创建项目 打开Idea xff0c 并点击新建项目 注 xff1a 使用的是2022 2的商业版 xff0c 该版本跟2021 2的商业版创建Maven项目不一样 点击右侧的新建项目 gt 取名 gt 创建 这样我们就创建了一个空依赖的
  • Java中的异常及异常处理

    目录 1 什么是异常 2 为什么要处理异常 3 异常分类 4 如何进行异常处理 4 1 捕获异常 4 2 手动抛出异常 4 3 自定义异常 4 4 debug调试模式 5 其他异常 1 什么是异常 程序中 在代码编译或运行过程中 xff0c
  • Spring之配置文件

    目录 什么是配置文件 配置文件作用 配置文件的格式 properties 配置文件说明 properties 基本语法 三种读取properties的方法 yml 配置文件 yml 基本语法 总结 什么是配置文件 首先我们知道我们的程序是
  • 已解决:前、后端打包部署至服务器后,背景图片不显示并且一些图标都变成了方块

    将打包好的jar包部署至服务器后 xff0c 输入项目网址后 xff0c 发现背景图片没有显示出来并且一些图标变成了方块 解决办法 xff1a 在前端找到bulid文件目录下的utils js文件 xff0c 添加以下语句 xff1a pu
  • 分布式 Redis & RabbitMQ 终极秒杀

    本篇文章记录的为RabbitMQ知识中企业级项目中秒杀相关内容 xff0c 适合在学Java的小白 帮助新手快速上手 也适合复习中 xff0c 面试中的大佬 x1f649 x1f649 x1f649 如果文章有什么需要改进的地方还请大佬不吝