Reids实战—黑马点评(三)秒杀篇

2023-05-16

Reids实战—黑马点评(三)秒杀篇

来自黑马的redis课程的笔记

【黑马程序员Redis入门到实战教程,深度透析redis底层原理+redis分布式锁+企业解决方案+黑马点评实战项目】

在这里插入图片描述

目录

  • Reids实战—黑马点评(三)秒杀篇
    • 一、全局唯一ID
      • 小结
    • 二、实现优惠券秒杀下单
    • 三、超卖问题
      • 3.1 问题描述
      • 3.2 乐观锁和悲观锁
      • 3.3 乐观锁实现
        • 3.3.1 版本号法
        • 3.3.2 CAS(Compare And Swap)
      • 3.4 小结
    • 四、一人一单
      • 4.1 添加功能
      • 4.2 并发问题
      • 4.3 字符串问题
      • 4.4 Spring事务问题
        • 4.4.1 锁在事务内
        • 4.4.2 事务不生效
      • 4.5 集群模式下的一人一单问题
    • 五、分布式锁
      • 5.1 分布式锁概述
      • 5.2 基于redis实现分布式锁
        • 5.2.1 锁误删
        • 5.2.2 Lua脚本保证命令原子性
      • 5.3 小结
    • 六、Redisson
      • 6.1 使用Redisson分布式锁
      • 6.2 Redisson分布式锁原理
        • 6.2.1 可重入原理
        • 6.2.2 可重试原理
        • 6.2.3 解决超时释放
        • 6.2.4 保证主从一致
        • 6.2.5 小结
    • 七、redis优化秒杀
    • 八、Redis消息队列实现异步秒杀
      • 8.1 基于List结构模拟消息队列
      • 8.2 PubSub发布订阅模式
      • 8.3 基于Stream结构的消息队列
        • 8.3.1 基本使用
        • 8.3.2 消费者组
        • 8.3.3 改造秒杀业务
        • 8.3.4 小结

一、全局唯一ID

每一个订单都需要不同的ID,如何做到ID唯一?

如果似乎用自增:

  • id规律明显(安全问题)
  • 受单表数据量限制

为了解决这些问题,我们有了全局ID生成器,这是一种生成全局唯一ID(或者叫分布式唯一ID)的工具,所生成的ID满足以下特征:

  • 唯一性
  • 高可用
  • 高性能
  • 递增性
  • 安全性(复杂递增)

唯一性和递增性我们可以用redis自增来实现,恰好,redis本身就满足高可用、高性能,安全性如何解决?

方案:

在这里插入图片描述

用Long(数值效率比字符串高)类型,一个Long类型占64位(Java中),我们可以在这些bit位上做文章。

首先:从左数第一位,是符号位,我们的id永远为正数,所以让它永远为0。接下来的31位,用来存时间戳,保证安全性的同时,让id基本不可能重复。最后的32位,我们存普通的递增值。这个方案,理论上可以保证68年都不会有id重复。

为了避免最后的自增值超出redis自增限制,我们可以每天新建一个key用于自增(例如,yyyy:MM:dd),不仅解决了问题,还方便统计。

实现:

@Component
public class RedisIdWorker {
    private StringRedisTemplate stringRedisTemplate;
    // 起始的时间戳值
    private static final Long BEGIN_TIMESTAMP = 1640995200L;

    public RedisIdWorker (StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }
    /**
    * @param “业务key前缀“
    * */
    public long nextId(String keyPrefix) {
        LocalDateTime now = LocalDateTime.now();
        long  nowSecond = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowSecond - BEGIN_TIMESTAMP;
        String data = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        long count = stringRedisTemplate.opsForValue().increment("incr:" + keyPrefix + ":" + data);
        // 位运算提高效率
        return timestamp << 32 | count;
    }
}

起始的时间戳的值可以这样计算:

@Test
void timeTest() {
    LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
    long epochSecond = time.toEpochSecond(ZoneOffset.UTC);
    System.out.println(epochSecond);
}

测试:

ExecutorService executorService = Executors.newFixedThreadPool(500);
@Test
void idTest() throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(300);
    Runnable task = () -> {
        for (int i = 0; i < 100; i++) {
            redisIdWorker.nextId("order");
        }
        latch.countDown();
    };
    long begin = System.currentTimeMillis();
    for (int i = 0; i < 300; i++) {
        executorService.submit(task);
    }
    // 必须阻塞 如果不阻塞,则程序结束时,我们的任务还没有结束
    latch.await();
    long end = System.currentTimeMillis();
    System.out.println(end - begin);
}

收获的小技巧:

  1. LocalDateTime的使用

    // 使用of可以快速获取指定时间的LocalDateTime对象
    LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0, 0);
    // 使用toEpochSecond可以快速获取相应时间的时间戳
    long epochSecond = time.toEpochSecond(ZoneOffset.UTC);
    // 使用format方法快速格式化时间
    LocalDateTime now = LocalDateTime.now();
    String data = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
    
  2. 并发包的小技巧

    // 快速创建线程池,但是阿里的Java开发规范不建议这样做
    ExecutorService executorService = Executors.newFixedThreadPool(500);
    // 快速创建一个任务,使用lambda表达式
    Runnable task = () -> {
        ……
    };
    // 提交任务
    executorService.submit(task);
    
  3. 位运算

    更好的利用bit位来处理信息

小结

在这里插入图片描述

uuid效率较低,且不满足自增性。

Redis自增,就是刚刚的方案,很常用。

雪花算法,类似于刚刚的方案,区别在于雪花算法的最后32位是以机器时间为基准,Redis是自增。

数据库自增,也就是数据库版本的Redis方案,效率不如Redis。

二、实现优惠券秒杀下单

按照流程图,实现非常简单,但会出现非常多的问题(详见后文)。

小收获:

使用lambdaUpdate

// 减库存操作
lambdaUpdate().setSql("stock = stock -1")
              .eq(SeckillVoucher::getVoucherId, voucherId)
              .update();

三、超卖问题

3.1 问题描述

在刚刚的流程中,是有很多问题隐患的,比如,经典的超卖问题

在并发量较高的情况下,假如剩余最后一张票,在购买最后一张票的线程还未扣减库存时,其他线程进入,查询库存,都会查到还有余票的结果,此时再进行库存判断,并扣减库存,就会发生超卖。

在这里插入图片描述

如何解决超卖问题?

最简单的方法就是上锁,同一时间只让一个线程去执行查库存->扣减库存的操作。

3.2 乐观锁和悲观锁

加锁的话有两种锁可以加:

悲观锁:比较悲观,认为线程安全问题一定会发生,因此操作数据前先获取锁,确保线程串行执行。(如synchronized、Lock)

乐观锁:比较乐观,认为线程安全问题不一定发生,所以不加锁,而是在更新数据时去判断数据有没有被其他线程修改。若没有则更新,反之重试或异常。

3.3 乐观锁实现

悲观锁的解决方案较为简单,即在可能出问题的代码上加synchronized或Lock锁住。我们介绍一下乐观锁的解决方案:

常见两种方法:

3.3.1 版本号法

每次更新数据时,更新版本号,并判断版本号是否被修改。

在这里插入图片描述

3.3.2 CAS(Compare And Swap)

以数据本身为版本号

3.4 小结

秒杀场景下,悲观锁性能较差,乐观锁成功率较低。

小优化:更新库存时,不用判断库存是否和查询到的库存相同,只需要库存大于零即可,这样成功率大大提高。

在这里插入图片描述

四、一人一单

4.1 添加功能

某些业务我们需要用户只能购买一单,于是我们多加一个用户是否已经购买的判断:

在这里插入图片描述

从ThreadLocal中取出当前用户的id,根据用户和商品id查询用户是否已经有订单存在,若存在则失败。

在这里插入图片描述

4.2 并发问题

按照刚刚的业务流程,正常用户单线程操作的情况下是没有问题的,但是若是恶意用户,同时开多个线程访问我们的接口,则可能出现这样一种情况:

若多个线程同时进入该方法,在任一线程未保存订单前,都进行查询用户是否下单,得到的count都是0,拿到这个0后,再去判断是否一人一单则都会成功,都会下单,造成了一人多单的问题。

解决:给该方法加锁,因为涉及两个表的写操作,我们添加事务。

在这里插入图片描述

写完过后以看,这不成了悲观锁嘛!里面的乐观锁还拿来干嘛呢?

经过思考:我们只需要限制单个用户的并发操作,只需要锁住userId即可,于是我们不再锁整个方法。

该方法改进为:

@Override
@Transactional
public Result createVoucherOrder(Long voucherId) {
    Long userid = UserHolder.getUser().getId();
    synchronized(userid.toString()) {
       	......
        return Result.ok(voucherOrder.getId());   
    }
}

4.3 字符串问题

接着我们会发现锁不住,因为我们toString方法会创建一个新的字符串,不同的线程的userid.toString()出来的字符串不是同一个对象。

解决:使用intern方法

@Override
@Transactional
public Result createVoucherOrder(Long voucherId) {
    Long userid = UserHolder.getUser().getId();
    synchronized(userid.toString().intern()) {
       	......
        return Result.ok(voucherOrder.getId());   
    }
}

使用intern方法,该方法会从JVM常量池中找equals为true的字符串,意味着不同的线程都是同一个字符串。

4.4 Spring事务问题

spring的声明式事务是基于AOP实现的,意味着方法结束后才会提交事务。

4.4.1 锁在事务内

当synchronized在该方法内时,会出现这么一种情况:锁释放了,但事务未提交,事务未提交时,下一个线程进来,读到的是数据库未修改的快照,仍然会发生一人多单的问题,所以我们要在调用该方法处添加锁,或是使用编程式事务。

synchronized (userid.toString().intern()) {
	createVoucherOrder(voucherId);
}

4.4.2 事务不生效

正是因为spring事务是通过aop实现,而aop是通过动态代理实现。当我们直接调用该方法时,默认是this.createVoucherOrder(voucherId),使用的是this调用,而不是使用代理对象来调用,只有通过代理对象来调用,事务才会生效。

  1. 引入依赖

    <dependency>
        <groupId>org.aspectj</groupId>
        <artifactId>aspectjweaver</artifactId>
    </dependency>
    
  2. 暴露代理对象

    启动类上添加@EnableAspectJAutoProxy(exposeProxy = true)注解

    @MapperScan("com.hmdp.mapper")
    @EnableAspectJAutoProxy(exposeProxy = true)
    @SpringBootApplication
    public class HmDianPingApplication {
        public static void main(String[] args) {
            SpringApplication.run(HmDianPingApplication.class, args);
        }
    }
    
  3. 获取代理对象,并使用代理对象调用方法

    synchronized (userid.toString().intern()) {
    	// 确保事务生效
    	IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
    	proxy.createVoucherOrder(voucherId);
    }
    

4.5 集群模式下的一人一单问题

通过刚刚的几波操作,可以说是基本解决了单机部署情况下的一人一单问题,若是集群部署,仍然会有问题。

可以开两个服务debug一下:

(小坑:Intellij Idea 2022.3.1版本,断点调试打断点后,需要等待断点上的“√”出现才能启动服务,不然断点无效,多个服务建议多选并同时启动,不然要等上一个服务启动到“√”出现才能启动下一个服务。)

在这里插入图片描述

我们的 锁 锁的是userid,锁的是jvm常量池中的userid字符串。若是多个jvm,不是同一个常量池,自然就锁不住了。

五、分布式锁

解决集群情况下的并发问题,则需要一个多个进程能都使用的锁。

5.1 分布式锁概述

分布式锁:满足分布式系统或集群模式下多进程可见并互斥的锁。

在这里插入图片描述

分布式锁可以借助第三方中间件简单的实现:

在这里插入图片描述

5.2 基于redis实现分布式锁

通过redis的setnx命令可以轻松实现简单的互斥锁,因为redis是单线程的,不会存在并发问题。

在这里插入图片描述

通过set key value nx ex 10 来保证原子性,防止在设置ttl值前宕机,发生死锁。这是一种非阻塞的锁,拿不到锁立即返回false,没有重试机制。

5.2.1 锁误删

若只是简单的使用setnx和del来获取和释放锁,则会出现一些问题,例如,拿到锁的线程业务阻塞了,它的锁超时释放了,此时另一个线程拿到锁,在另一个线程还未释放锁时,它的业务完成,并删掉了另一个线程的锁。这样一来,第三个线程又能拿到锁。

在这里插入图片描述

所以我们需要有一种机制,防止锁的误删:

在这里插入图片描述

在释放锁时,再判断一下锁是否是自己的。

具体方案:在设置该锁的value时,我们使用threadid,因为在多个JVM中,threadid有可能相同,于是我们在threadid前拼接一个UUID。这样就保证了误删锁的情况不会再发生。

5.2.2 Lua脚本保证命令原子性

避免锁的误删后,我们的分布式锁就比较健壮了。但在某些极端场景下,仍会出现误删问题:

在释放锁时,会先判断是否是当前的锁(在redis中获取该锁的value),此时,因为一些原因(如GC)阻塞了,下一个线程拿到锁,此时阻塞结束,由于已经做过判断,上一个线程会把下一个线程拿到的锁误删,此时第三个线程又能拿到锁了。

在这里插入图片描述

解决方案:将判断和释放锁封装成一个原子操作。

使用Lua脚本可以完美解决这个问题。

在lua脚本中,可以直接使用redis.call()方法来编写redis命令,在redis中使用EVAL调用。有如下好处:

  1. 减少网络开销。可以将多个请求通过脚本的形式一次发送,减少网络时延。
  2. 原子操作。Redis会将整个脚本作为一个整体执行,中间不会被其他请求插入。因此在脚本运行过程中无需担心会出现竞态条件,无需使用事务。
  3. 复用。客户端发送的脚本会永久存在redis中,这样其他客户端可以复用这一脚本,而不需要使用代码完成相同的逻辑。

在这里插入图片描述

注意:lua语言中,数组下标是从1开始。

了解lua脚本后,我们就可以开始编写lua脚本了。

在这里插入图片描述

根据需求:

在这里插入图片描述

简写:

在这里插入图片描述

接下来就是使用Java的redis客户端嗲用Lua脚本:

使用execute方法:

在这里插入图片描述

于是,我们写出了简单且健壮的分布式锁:

package com.hmdp.utils;

import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
public class SimpleRedisLock implements ILock {
    private StringRedisTemplate stringRedisTemplate;
	// Lua脚本
    private static final DefaultRedisScript<Long> UNLOCK_SCRIPT;
    // 锁的名字 key一般为业务名
    private String key;
	// 锁前缀
    private static final String KEY_PREFIX = "lock:";
    // 该区分不同的JVM
    private static final String ID_PREFIX = UUID.randomUUID().toString(true) + "";
    // 初始化Lua脚本
    static {
        UNLOCK_SCRIPT = new DefaultRedisScript<>();
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        UNLOCK_SCRIPT.setResultType(Long.class);
    }
	
    // 初始化key
    public SimpleRedisLock(StringRedisTemplate stringRedisTemplate, String name) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.key = KEY_PREFIX + name;
    }

    /**
     * @param timeoutSec 锁持有的时长 过期自动释放 
     * @return
     */
    @Override
    public boolean tryLock(long timeoutSec) {
        String threadId = ID_PREFIX + Thread.currentThread().getId();
        Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(key, threadId + "", timeoutSec, TimeUnit.SECONDS);
        return BooleanUtil.isTrue(success);
    }

    /**
     * 释放锁
     */
    @Override
    public void unlock() {
        stringRedisTemplate.execute(UNLOCK_SCRIPT,
                Collections.singletonList(key),
                ID_PREFIX + Thread.currentThread().getId()
        );
    }
}

5.3 小结

在这里插入图片描述

编码收获:

  1. 读取文件,若该文件是不变的,则在static代码块中初始化一次就好了。
  2. ClassPathResource()类快速获取Resource目录下的资源。

六、Redisson

现在我们基于String类型的setnx实现的分布式锁已经足够健壮,但仍有不足:

在这里插入图片描述

要解决这些问题,我们的基础版redis分布式锁就不好发力了。

但Redisson轻松解决:

在这里插入图片描述

Redisson中不仅有分布式锁,分布式锁只是它的一个子集 。

6.1 使用Redisson分布式锁

  1. 引依赖

    <dependency>
        <groupId>org.redisson</groupId>
        <artifactId>redisson</artifactId>
        <version>3.19.3</version>
    </dependency>
    
  2. 配置

    如果使用yaml来配置,则会将redis的配置覆盖。我们使用redisson只是作为分布式锁使用,所以单独配置:

    @Configuration
    public class RedissonConfig {
        @Bean
        public RedissonClient redissonClient() {
            Config config = new Config();
            // 单点配置,也可以用useClusterServers添加集群地址
            config.useSingleServer().setAddress("redis://192.168.0.122:6379").setPassword("123456");
            return Redisson.create(config);
        }
    }
    
  3. 使用

    private void handleVoucherOrder(VoucherOrder voucherOrder) {
        Long userid = voucherOrder.getUserId();
        // 创建锁对象
        RLock lock = redissonClient.getLock("order:" + userid);
        // 尝试获取锁
        boolean isLock = lock.tryLock();
        // 是否拿到锁
        if (!isLock) {
            log.error("不能重复下单!");
        }
        try {
            proxy.createVoucherOrder(voucherOrder);
        } finally {
            lock.unlock();
        }
    }
    

    非常简单。

    也可以携带参数

在这里插入图片描述

6.2 Redisson分布式锁原理

redisson的分布式锁是如何解决这四个问题?

6.2.1 可重入原理

我们的基础版锁是不可重入的

redisson使用Hash结构轻松解决了这个问题

在这里插入图片描述

每重入一次,value+1,每次释放锁,value-1,value为0,则释放锁。非常巧妙

其底层都是lua脚本,保证命令原子性:

获取锁:

在这里插入图片描述

释放锁:

在这里插入图片描述

源码中lua脚本是写死了放在代码中的,可以去查看,和这些大差不差,释放锁的时候会发布一个释放信号(后文)。

6.2.2 可重试原理

我们基础版的redis是一个非阻塞式的锁,没有任何重试机制。redisson用PubSub模式解决了这个问题。

我们从redis源码中探究:

/**
 * 参数分别是等待时间,锁ttl,时间单位。我们不指定ttl时,默认为-1。  
 **/
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    long threadId = Thread.currentThread().getId();
    // 执行获取锁的方法,会返回锁的ttl,获取锁失败会返回null
    Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
    // 成功获取锁
    if (ttl == null) {
        return true;
    }
	// 获取失败,准备重试,先检查重试等待时间是否还有剩余
    time -= System.currentTimeMillis() - current;
    // 过期,获取锁失败,返回false
    if (time <= 0) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
	// 还可以重试,准备重试
    current = System.currentTimeMillis();
    // 订阅锁的释放信号
    CompletableFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    try {
        // 等待锁释放,获取信号
        subscribeFuture.get(time, TimeUnit.MILLISECONDS);
        // 超时 失败
    } catch (TimeoutException e) {
        if (!subscribeFuture.completeExceptionally(new RedisTimeoutException(
            "Unable to acquire subscription lock after " + time + "ms. " +
            "Try to increase 'subscriptionsPerConnection' and/or 'subscriptionConnectionPoolSize' parameters."))) {
            subscribeFuture.whenComplete((res, ex) -> {
                if (ex == null) {
                    unsubscribe(res, threadId);
                }
            });
        }
        acquireFailed(waitTime, unit, threadId);
        return false;
        // 异常 失败
    } catch (ExecutionException e) {
        acquireFailed(waitTime, unit, threadId);
        return false;
    }
	// 获取锁释放信号成功
    try {
        // 再次判断等待时间是否过期
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(waitTime, unit, threadId);
            return false;
        }
		// 没有过期 尝试获取锁
        while (true) {
            long currentTime = System.currentTimeMillis();
            ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
            // 获取锁成功
            if (ttl == null) {
                return true;
            }
			// 失败 检查等待时间是否过期
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }

            // 未过期 准备重试 
            currentTime = System.currentTimeMillis();
            // 等待释放锁的信号
            if (ttl >= 0 && ttl < time) {
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                commandExecutor.getNow(subscribeFuture).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }
			// 获得锁释放信号 再次检查等待时间
            time -= System.currentTimeMillis() - currentTime;
            if (time <= 0) {
                acquireFailed(waitTime, unit, threadId);
                return false;
            }
        }
    } finally {
        // 无论获取锁成功或失败,都要解除订阅
        unsubscribe(commandExecutor.getNow(subscribeFuture), threadId);
    }
    //        return get(tryLockAsync(waitTime, leaseTime, unit));
}

相比我们基础版redis分布式锁,有很多优点,例如:

  • 重试并不是无休止的重试,而是使用发布订阅模式,等待一个锁的释放信号再去重试,节约内存,性能。
  • 非常严谨,每次获取锁前,都要先判断重试等待时间是否过期。

6.2.3 解决超时释放

虽然锁超时释放可以解决业务异常带来的死锁问题,但是业务超时引发的自动释放也会产生线程安全问题。redisson使用看门狗机制解决了这个问题。

从redis源码中探究:

首先看获取锁的源码

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    // 参数和之前一致
    RFuture<Long> ttlRemainingFuture;
    if (leaseTime > 0) {
        ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    } else {
        // 默认情况 leaseTime为 -1 ,这个时候,redisson使用了一个变量internalLockLeaseTime,实际上是30 * 1000毫秒,也就是30秒
        // 该方法会执行lua脚本,尝试获取锁,若获取成功,返回null,获取失败,返回ttl
        ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    }
    CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
        // 判断是否获取成功
        if (ttlRemaining == null) {
            // 获取成功 刚才提到 默认情况下,leaseTime为 -1 所以这里走else分支
            if (leaseTime > 0) {
                internalLockLeaseTime = unit.toMillis(leaseTime);
            } else {
                // 续约 该方法就是解决超时释放的关键
                scheduleExpirationRenewal(threadId);
            }
        }
        // 返回lua脚本返回的ttl或是nil
        return ttlRemaining;
    });
    return new CompletableFutureWrapper<>(f);
}

再来看最关键的续约的方法

protected void scheduleExpirationRenewal(long threadId) {
    // 用来存线程对应的锁
    ExpirationEntry entry = new ExpirationEntry();
    // 如果是重入的锁,则无法放入这个map,放入这个map的锁才有资格续约,会获取一个旧的entry,意思就是每一把锁在这个map中有且仅有一条记录
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    // 判断是否是重入的锁
    if (oldEntry != null) {
        // 是重入的锁,只做记录
        oldEntry.addThreadId(threadId);
    } else {
        // 不是重入的锁,记录并开始续约任务
        entry.addThreadId(threadId);
        try {
            // 续约 核心方法
            renewExpiration();
        } finally {
            // 线程终止,取消续约任务
            if (Thread.currentThread().isInterrupted()) {
                cancelExpirationRenewal(threadId);
            }
        }
    }
}

看看redisson是如何巧妙地续约的:

private void renewExpiration() {
    // 获取当前锁的信息
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    // 没有记录 不用续约
    if (ee == null) {
        return;
    }
    
   	// 定时任务续约
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            // 检查是否需要续约
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // 执行lua脚本续约,每次执行,都会重置有效期为30秒,续约成功返回ture 续约失败返回false
            CompletionStage<Boolean> future = renewExpirationAsync(threadId);
            future.whenComplete((res, e) -> {
                // 异常 移除续约map
                if (e != null) {
                    log.error("Can't update lock {} expiration", getRawName(), e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }
                // 续约成功 继续续约
                if (res) {
                    renewExpiration();
                } else {
                    // 失败 取消定时任务 可以通过EXPIRATION_RENEWAL_MAP.get(getEntryName())快速获取定时任务并取消掉
                    cancelExpirationRenewal(null);
                }
            });
        }
        // 每隔10秒重置一次
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}

redisson相比我们的基础版redis,利用看门狗机制,解决了因为业务阻塞而导致锁超时释放的安全问题。有以下优点:

  • 维护一个可以存锁信息和定时任务的EXPIRATION_RENEWAL_MAP,可以快速的找到对应的锁,进行续约或是取消续约操作。
  • 每10秒钟重置一次,若业务没有完成就无限重置,不会因为业务阻塞而超时释放,若是业务异常或宕机,则会马上取消掉定时任务,让锁超时释放,避免导致死锁。

6.2.4 保证主从一致

普通redis分布式锁:

在这里插入图片描述

假如在未同步时,主节点故障,从节点成为主节点后,数据不一致,锁丢失了,有线程安全隐患。

相比普通的redis分布式锁,redisson是如何保证主从一致性的呢?

redisson让每一个redis都是主节点 ,并在这些节点后面跟上从节点

在这里插入图片描述

在这里插入图片描述

即使某个节点宕机,造成了数据不一致,锁也不会失效,因为redisson的连锁,需要在每一个节点上都能获取锁,才算成功。

6.2.5 小结

单体redisson原理:

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

七、redis优化秒杀

之前的业务流程,我们是按顺序同步执行的,其中包含了很多数据库的读写操作,并且还花了很多时间来保证线程安全,现在我们使用lua脚本保证一部分的线程安全(例如超卖问题就不用考虑了),再用java的阻塞队列来让耗时操作异步执行。

在这里插入图片描述

redis执行lua脚本是单线程的,所以线程安全。

我们需要做几件事:

  • 新增秒杀优惠券的同时,将优惠券库存存入redis。
  • 利用lua脚本判断库存,一人一单。
  • 如果有购买资格,则生成订单id返回,并将用户id等订单信息传入阻塞队列。
  • 开启线程任务,不断从阻塞队列中获取信息,操作数据库,保存订单。

做完后,会有一些问题:

在这里插入图片描述

由于阻塞队列是javautil包下的,占用的是jvm的内存,会受jvm内存的限制。

当阻塞队列中还有未处理订单,或是处理订单异常等订单丢失的情况时,会导致数据不一致。

八、Redis消息队列实现异步秒杀

为了解决以上问题,我们引入第三方的消息队列。好处就是不会受JVM内存限制。当然redis消息队列不好玩,一般都是用专门的mq中间件(rabbitmq,rocketmq,kafka等)。

redis提供了三种不同的实现消息队列的方式:

  1. list:基于list结构模拟消息队列。
  2. PubSub:发布订阅模式,基本的点对点消息模型。
  3. stream:比较完善的消息队列模型。(功能强大但复杂的一批)

在这里插入图片描述

8.1 基于List结构模拟消息队列

就是利用LPUSH、RPOP这些命令,只要出入口不一致即可。但要模拟阻塞队列,就需要LPUSH、BRPOP这些阻塞的命令了。

8.2 PubSub发布订阅模式

在这里插入图片描述

在这里插入图片描述

8.3 基于Stream结构的消息队列

8.3.1 基本使用

发消息:

在这里插入图片描述

读消息:

阻塞地读消息:

在这里插入图片描述

这就是stream结构地基本使用。

问题:

我们是使用ID来指定读取地消息,当ID为0时,代表从第一个消息开始读,当ID为“$”时,代表读取最新消息,若在处理某条消息时,有n(n>1)条消息进入队列,当那条消息处理完,下次读取只会读取最新地一条,会漏掉中间地消息,造成消息漏读。

8.3.2 消费者组

在这里插入图片描述

消费者组不仅可以加快消息地处理速度,还有一定的数据保护措施。但不得不吐槽,真的麻烦,想要保证数据安全,有时候不得不手动ACK。

创建消费者组:

在这里插入图片描述

读消息:

在这里插入图片描述

这里比较特别的是ID可以取值为“>”。

8.3.3 改造秒杀业务

// 在类初始化完毕后,就开始监听队列
@PostConstruct
public void init() {
    Runnable task = () -> {
        String queueName = "stream.orders";
        while (true) {
            try {
                // 1. 获取消息队列中的订单消息
                List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1).block(Duration.ofSeconds(2)),
                    StreamOffset.create(queueName, ReadOffset.lastConsumed())
                );
                // 2. 判断消息是否获取成功
                if (list == null || list.isEmpty()) {
                    // 2.1 获取失败 下一轮循环
                    continue;
                }
                // 获取消息
                MapRecord<String, Object, Object> record = list.get(0);
                Map<Object, Object> value = record.getValue();
                VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), false);
                // 3. 获取成功 下单
                handleVoucherOrder(voucherOrder);
                // 4. ACK确认
                stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
            } catch (Exception e) {
                log.error("处理消息异常", e);
                // 异常,从pending-list中取出消息重试
                handlePendingList();
            }

        }
    };
    SECKILL_ORDER_EXECUTOR.submit(task);
}

异常消息重试:

private void handlePendingList() {
    String queueName = "stream.orders";
    while (true) {
        try {
            // 1. 获取pending-list中的订单消息
            List<MapRecord<String, Object, Object>> list = stringRedisTemplate.opsForStream().read(
                    Consumer.from("g1", "c1"),
                    StreamReadOptions.empty().count(1),
                    StreamOffset.create(queueName, ReadOffset.from("0"))
            );
            // 2. 判断消息是否获取成功
            if (list == null || list.isEmpty()) {
                // 2.1 获取失败 下一轮循环
                break;
            }
            // 获取消息
            MapRecord<String, Object, Object> record = list.get(0);
            Map<Object, Object> value = record.getValue();
            VoucherOrder voucherOrder = BeanUtil.fillBeanWithMap(value, new VoucherOrder(), false);
            // 3. 获取成功 下单
            handleVoucherOrder(voucherOrder);
            // 4. ACK确认
            stringRedisTemplate.opsForStream().acknowledge(queueName, "g1", record.getId());
        } catch (Exception e) {
            log.error("处理pending-list消息异常", e);
        }
    }
}

8.3.4 小结

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Reids实战—黑马点评(三)秒杀篇 的相关文章

  • STM32 CAN 设置多个过滤器接收多ID方法

    1 标识符列表模式 xff0c 32位模式下 void MX CAN Init void 这里是实现了两个地址的接收 一个是用来接收广播信息 一个用来接收私有地址 如果想实现多个地址可以添加多个过滤器组 stm32103 有0 13 共14
  • linux下运行动态库问题 cannot open shared object file: No such file or directory

    如果动态库不在同一级目录下 xff0c 则需要将以上文件的目录加载到动态库搜索路径中 xff0c 设置的方式有以下几种 一 将动态库路径加入到LD LIBRARY PATH环境变量 1 在终端输入 xff1a export LD LIBRA
  • 几个串口通信协议的整理

    一 UART UART是一个大家族 xff0c 其包括了RS232 RS499 RS423 RS422和RS485等接口标准规范和总线标准规范 它们的主要区别在于其各自的电平范围不相同 嵌入式设备中常常使用到的是TTL TTL转RS232的
  • 单片机中断的过程

    1 根据响应的中断源的中断优先级 使相应的优先级状态触发器置1 xff1b 2 把当前程序计数器PC的内容压入堆栈 xff0c 保护断点 xff0c 寻找中断源 xff1b 3 执行硬件中断服务子程序调用 xff1b 4 清除相应的中断请求
  • Ruby学习札记(3)- Ruby中gem的安装与卸载

    Ruby 学习札记 3 Ruby 中 gem 的安装与卸载 在 Ruby 中有 gem 包这种概念 xff0c 类似 PHP 中的 pear xff0c 相当于一种插件 具体可以 Google 一下 xff08 1 xff09 查看已经安装
  • 【linux】ubuntu20.04 运行软件 提示找不到过时的库 libQtCore.so.4、libQtGui.so.4、libpng12.so.0

    先上结果 1 nxView运行起来 环境 硬件 xff1a Jetson Xavier NX 套件 系统 xff1a Ubuntu 20 04 软件 xff1a nxView 43 libQtCore so 4 解决 0 现象 运行软件提示
  • rtt相关问题总结

    1 总结RT Thread的启动流程 xff08 启动文件部分跳过 xff09 关中断 rt hw interrupt disable 板级初始化 xff1a 需在该函数内部进行系统堆的初始化 rt hw board init 打印 RT
  • FTP 客户端C实现

    使用 Socket 通信实现 FTP 客户端程序 FTP 概述 文件传输协议 xff08 FTP xff09 作为网络共享文件的传输协议 xff0c 在网络应用软件中具有广泛的应用 FTP的目标是提高文件的共享性和可靠高效地传送数据 在传输
  • Qt编写串口通信程序全程图文讲解

    说明 我们的编程环境是windows xp下 xff0c 在Qt Creator中进行 xff0c 如果在Linux下或直接用源码编写 xff0c 程序稍有不同 xff0c 请自己改动 在Qt中并没有特定的串口控制类 xff0c 现在大部分
  • VLC播放器调试经验总结

    一 前言 在使用VS学习VLC源码时 xff0c 可以打断点分析变量数据 xff0c 跟踪代码流程 xff0c 方便我们理解源码 但是在定位音视频卡顿 延时等疑难问题时 xff0c 这一招就不管用了 xff0c 因为打上断点就会导致实时计算
  • http协议如何解决粘包问题

    在讲粘包问题之前 xff0c 首先得明白这个包是应用层的数据包 当数据在传输层时 xff0c 由于TCP是面向字节流的 xff0c 所以它看到的数据是按照顺序一个个放在缓冲区中的 xff0c 而对于应用层而言 xff0c 看到的只是一连串的
  • ROS- 解决 sudo rosdep init和update 出现的错误

    大家在使用ROS时都需要执行sudo rosdep init 方法和rosdep update方法 但是在执行rosdep init时会提示如下错误 ERROR cannot download default sources list fr
  • 如何用MQTT网关快速接入阿里云IOT

    深圳市钡铼技术有限公司推出的BL102 xff0c 是采集西门子 xff0c 欧姆龙 xff0c 三菱 xff0c 台达 xff0c AB xff0c 施耐德等主流PLC及Modbus xff0c DT L645协议设备数据 xff0c 简
  • 闫刚 qgc模块mavlinklog实现过程

    mavlink log qml部分 这样logController就和LogDownloadController进行了绑定 AnalyzeView qml Rectangle span class token punctuation spa
  • 初识TVM--TVM的编译与安装

    TVM是什么 xff1f Apache incubating TVM is an open deep learning compiler stack for CPUs GPUs and specialized accelerators It
  • iOS上简单推送通知(Push Notification)的实现

    iOS上简单推送通知 xff08 Push Notification xff09 的实现 根据这篇很好的教程 xff08 http www raywenderlich com 3443 apple push notification ser
  • Android学习记录(十三) http之digest鉴权之填坑6.0。

    背景 xff1a android 6 0 1 的手机发现使用webdav下载文件实效 xff0c httpclient execute get的时候出现 xff1a CrashHandler java lang ArrayIndexOutO
  • 开源视频播放器IjkPlayer使用记录之(三)--播放视频从上次播放的时间点播放。

    方法 xff1a 1 在关闭视频的时候 xff0c 使用getCurrentPosition 获取当前的时间点 2 使用SharedPreferences记录当前的时间点 3 重新播放时 xff0c 获取该时间点 xff0c 使用seekt
  • 开源视频播放器IjkPlayer使用记录之(四)--多音轨的探路之旅

    前言 xff1a 在视频播放中 xff0c 我们经常会遇到多音轨的资源文件 xff0c 比如某个mkv文件同时支持英语 国语 xff0c 那么最好是能够进行音轨的切换 在IjkPlayer中并没有支持多音轨的代码 xff0c 所以在移植的过
  • KERAS-YOLOV3的代码走读

    KERAS YOLOV3的代码走读 GITHUB地址 xff1a https github com qqwweee keras yolo3 YOLOV3的论文中文翻译 xff1a https zhuanlan zhihu com p 349

随机推荐

  • KERAS-YOLOV3的数据增强

    前言 上篇KERAS YOLOV3的代码走读 https blog csdn net yangchengtest article details 80664415 有数据增强的内容没有看明白 这篇来介绍一下 简介 数据增强的方法主要有 xf
  • 基于KITTI数据集的KERAS-YOLOV3实践

    数据整理 KERAS YOLOV3的GITHUB地址 xff1a https github com yangchengtest keras yolo3 该项目支持的数据结构 xff1a One row for one image Row f
  • 图像语义分割 DEEPLAB V3+的代码走读

    概述 GITHUB路径 xff1a https github com tensorflow models tree master research deeplab 论文 xff1a https arxiv org abs 1802 0261
  • android dlib 交叉编译

    继续趟NDK的坑 DLIB使用C 43 43 11的标准 但是使用gnustl static的时候 xff0c 有些c 43 43 11的特性是无法使用的 由于NCNN的库使用的是静态库 xff0c OPENCV xff0c OPENBLA
  • ANDROID CMAKE DEBUG的记录

    android 如果使用DEBUG模式 xff0c CMAKE编译的SO是DEBUG版本的 xff0c 会造成性能下降 但是使用RELEASE编译的SO xff0c 使用DEBUG模式 xff0c JNI的速度不会变化 终于知道为什么 xf
  • window下使用vnc远程登录linux图形界面和运行应用程序 和odroid Xu4开发板的使用和视频接口VGA、DVI、HDMI的联系

    注 xff1a 自己曾经尝试过很多次使用VNC远程登录odroid XU4的开发板 xff0c 但是连接后均显示未解码的连接 xff0c 刚开始烧写的是odroid官方的ubuntu系统 xff0c 我靠 xff0c 就是因为烧写了这个坑爹
  • Putty的ppk文件转成Xshell使用的key文件

    Putty的ppk文件转成Xshell使用的key文件 今天同学给我一个Putty远程登录使用的ppk文件 xff08 即后缀名为ppk xff09 让我远程登录主机 xff0c 但是我用的是Xshell xff0c 导入这个ppk文件时
  • GD32串口读取GPS模块数据并解析经纬度教程-附完整代码和资料文件

    前言 xff1a 最近入手了个GPS模块 xff0c 手上只有GD32的开发板 网上有很多使用STM32库函数的GPS驱动程序 xff0c 但是基于GD32库函数读取GPS驱动的教程居然一篇都没有 所以为了学习GD32库的同学 xff0c
  • opencv 所有lib文件

    今天在vs上写一段代码 xff0c 编译后总是显示有无法解析的函数 xff0c 又不知道该函数在哪个lib文件中 xff0c 在百度上找了半天 xff0c 也没找到 已是就将所有lib库都添加到vs链接中 如下 xff1a opencv c
  • Java多线程(含生产者消费者模式详解)

    多线程 导航 多线程1 线程 进程 多线程概述2 创建线程 xff08 重点 xff09 2 1 继承Thread类 xff08 Thread类也实现了Runnable接口 xff09 2 2 实现Runnable接口 xff08 无消息返
  • Java网络编程(两种聊天室:TCP和UDP)

    网络编程 您的导航 网络编程网络编程基础知识一 网络编程三要素IP地址端口协议 二 IP地址与InetAddress类IP地址分类InetAddress类三 端口 xff08 Port xff09 与 InetSocketAddressIn
  • 免费发布一个网站(保姆级图文教程)

    利用GitHub Pages发布一个网页 第一步 xff1a 注册一个github账户 访问官网 点这两个都可以注册 根据提示输入一些信息 xff0c 然后创建账户 xff1a 然后你会收到一封邮件 xff0c 输入验证码或是打开邮件的验证
  • 修改键盘映射、交换按键

    修改键盘映射 交换按键 导航 修改键盘映射 交换按键写在前面一 创建配置文件二 修改键盘映射三 重启四 键位表 写在前面 这两天买了个黑爵的小键盘 xff0c del和ins键是同一个键 xff0c 通过fn来区分 xff08 我的笔记本电
  • Spring Cloud Gateway(黑马springcloud笔记)

    Gateway 目录 Gateway一 为什么需要网关二 gateway入门三 断言工厂四 过滤器工厂五 全局过滤1 实现2 过滤器执行顺序 六 跨域问题 一 为什么需要网关 不能让外部能够直接访问微服务 xff0c 而是需要通过网关访问
  • Docker(黑马spring cloud笔记)

    Docker 目录 Docker一 介绍和安装1 安装2 启动3 镜像加速 二 Docker基本操作1 镜像操作2 容器操作3 数据卷操作 三 Dockerfile1 镜像结构2 Dockerfile 四 Docker Compose1 安
  • RabbitMQ(黑马spring cloud笔记)

    MQ 目录 MQ一 同步通讯和异步通讯1 同步通讯2 异步通讯 二 RabbitMQ1 部署2 架构3 常见消息模型3 1 基本消息队列 xff08 Basic Queue xff09 3 2 工作消息队列 xff08 Work Queue
  • Redis实战—黑马点评(一) 登录篇

    Redis实战 黑马点评 xff08 一 xff09 登录篇 来自黑马的redis课程的笔记 黑马程序员Redis入门到实战教程 xff0c 深度透析redis底层原理 43 redis分布式锁 43 企业解决方案 43 黑马点评实战项目
  • tigerVNC的简单使用教程(CentOS的远程桌面连接)

    tigerVNC的简单使用教程 xff08 CentOS的远程桌面连接 xff09 1 环境和软件准备 1 CentOS 6 3下 root 64 localhost rpm q tigervnc tigervnc server tiger
  • Redis实战—黑马点评(二)缓存篇

    Redis实战 黑马点评 xff08 二 xff09 缓存篇 目录 Redis实战 黑马点评 xff08 二 xff09 缓存篇1 什么是缓存1 1 缓存的作用和成本 2 添加 Redis 缓存3 缓存更新策略3 1 三种更新策略3 1 1
  • Reids实战—黑马点评(三)秒杀篇

    Reids实战 黑马点评 xff08 三 xff09 秒杀篇 来自黑马的redis课程的笔记 黑马程序员Redis入门到实战教程 xff0c 深度透析redis底层原理 43 redis分布式锁 43 企业解决方案 43 黑马点评实战项目