基于redis实现分布式锁

2023-11-03

目录

基本实现

​编辑防死锁

防误删

使用lua保证删除原子性

可重入锁 

加锁脚本

解锁脚本

代码实现

使用及测试 

 自动续期

总结


基本实现

借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同时有多个客户端发 送setnx命令,只有一个客户端可以成功,返回1(true);其他的客户端返回0(false)。

  • 1. 多个客户端同时获取锁(setnx)
  • 2. 获取成功,执行业务逻辑,执行完成释放锁(del)
  • 3. 其他客户端等待重试

改造StockService方法:

@Service
public class StockService {
    @Autowired
    private StockMapper stockMapper;
    @Autowired
    private LockMapper lockMapper;
    @Autowired
    private StringRedisTemplate redisTemplate;
    public void checkAndLock() {
        // 加锁,获取锁失败重试
        while (!this.redisTemplate.opsForValue().setIfAbsent("lock", 

"xxx")){
            try {
                Thread.sleep(100);
           } catch (InterruptedException e) {
                e.printStackTrace();
           }
       }
        // 先查询库存是否充足
        Stock stock = this.stockMapper.selectById(1L);
        // 再减库存
        if (stock != null && stock.getCount() > 0){
            stock.setCount(stock.getCount() - 1);
            this.stockMapper.updateById(stock);
       }
        // 释放锁
        this.redisTemplate.delete("lock");
   }
}

其中,加锁:

// 加锁,获取锁失败重试
while (!this.redisTemplate.opsForValue().setIfAbsent("lock", "xxx")){
    try {
        Thread.sleep(100);
   } catch (InterruptedException e) {
        e.printStackTrace();
   }
}

解锁:

// 释放锁

this.redisTemplate.delete("lock");

使用Jmeter压力测试如下:

 查看mysql数据库:

防死锁

 解决:给锁设置过期时间,自动释放锁。 设置过期时间两种方式:

1. 通过expire设置过期时间(缺乏原子性:如果在setnx和expire之间出现异常,锁也无法释放)

2. 使用set指令设置过期时间:set key value ex 3 nx(既达到setnx的效果,又设置了过期时间)

 

压力测试肯定也没有问题。

问题:可能会释放其他服务器的锁。 场景:如果业务逻辑的执行时间是7s。执行流程如下

1. index1业务逻辑没执行完,3秒后锁被自动释放。

2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。

3. index3获取到锁,执行业务逻辑

4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只 执行1s就被别人释放。 最终等于没锁的情况。

解决:setnx获取锁时,设置一个指定的唯一值(例如:uuid);释放前获取这个值,判断是否自己的 锁

防误删

实现如下:

问题:删除操作缺乏原子性。 场景:

1. index1执行删除时,查询到的lock值确实和uuid相等

2. index1执行删除前,lock刚好过期时间已到,被redis自动释放

3. index2获取了lock 4. index1执行删除,此时会把index2的lock删除

解决方案:没有一个命令可以同时做到判断 + 删除,所有只能通过其他方式实现(LUA脚本)

使用lua保证删除原子性

删除LUA脚本:

if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', 
KEYS[1]) else return 0 end

代码实现:

public void checkAndLock() {
    // 加锁,获取锁失败重试
    String uuid = UUID.randomUUID().toString();
    while (!this.redisTemplate.opsForValue().setIfAbsent("lock", uuid, 3, 

TimeUnit.SECONDS)){
        try {
            Thread.sleep(50);
       } catch (InterruptedException e) {
            e.printStackTrace();
       }
   }
    // 先查询库存是否充足
    Stock stock = this.stockMapper.selectById(1L);
    // 再减库存
    if (stock != null && stock.getCount() > 0){
        stock.setCount(stock.getCount() - 1);
        this.stockMapper.updateById(stock);
   }
    // 释放锁
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return 
redis.call('del', KEYS[1]) else return 0 end";
    this.redisTemplate.execute(new DefaultRedisScript<>(script, 

Long.class), Arrays.asList("lock"), uuid);
}

 压力测试:

可重入锁 

由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加 锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代 码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继 续往下执行。

用一段 Java 代码解释可重入:

public synchronized void a() {
    b();
}

public synchronized void b() {
    // pass
}

假设 X 线程在 a 方法获取锁之后,继续执行 b 方法,如果此时不可重入,线程就必须等待锁释放,再次争抢锁。

锁明明是被 X 线程拥有,却还需要等待自己释放锁,然后再去抢锁,这看起来就很奇怪,我释放我自己~

可重入性就可以解决这个尴尬的问题,当线程拥有锁之后,往后再遇到加锁方法,直接将加锁次数加 1,然后再执行方法逻辑。退出加锁方法之后,加锁次数再减 1,当加锁次数为 0 时,锁才被真正的释 放。 可以看到可重入锁最大特性就是计数,计算加锁的次数。所以当可重入锁需要在分布式环境实现时,我们也就需要统计加锁次数。

解决方案:redis + Hash

加锁脚本

Redis 提供了 Hash (哈希表)这种可以存储键值对数据结构。所以我们可以使用 Redis Hash 存储的 锁的重入次数,然后利用 lua 脚本判断逻辑。

if (redis.call('exists', KEYS[1]) == 0 or 
    redis.call('hexists', KEYS[1], ARGV[1]) == 1) 
then
    redis.call('hincrby', KEYS[1], ARGV[1], 1);
    redis.call('expire', KEYS[1], ARGV[2]);
    return 1;
else
 return 0;
end

假设值为:KEYS:[lock], ARGV[uuid, expire]如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次 数加1。

解锁脚本

-- 判断 hash set 可重入 key 的值是否等于 0
-- 如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
-- 如果为 0 代表 可重入次数被减 1
-- 如果为 1 代表 该可重入 key 解锁成功
if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then 
    return nil; 
elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then 
    return 0; 
else 
    redis.call('del', KEYS[1]); 
    return 1; 
end;

这里之所以没有跟加锁一样使用 Boolean ,这是因为解锁 lua 脚本中,三个返回值含义如下:

1 代表解锁成功,锁被释放

0 代表可重入次数被减 1

null 代表其他线程尝试解锁,解锁失败

代码实现

由于加解锁代码量相对较多,这里可以封装成一个工具类:

 具体实现:

public class RedisDistributeLock{


    private StringRedisTemplate redisTemplate;

    //线程局部变量,可以在线程内共享参数
    private String lockName;
    private String static uuid;
    private Integer expire = 30;
    private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();

    public DistributedRedisLock(StringRedisTemplate redisTemplate, String lockName) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = THREAD_LOCAL.get();
        if (StringUtils.isBlank(uuid)) {
            this.uuid = UUID.randomUUID().toString();
            THREAD_LOCAL.set(uuid);
        }
    }

    public void lock() {
        this.lock(expire);
    }

    public void lock(Integer expire) {
        this.expire = expire;
        String script = "if (redis.call('exists', KEYS[1]) == 0 or" +
                "redis.call('hexists', KEYS[1], ARGV[1]) == 1)" +
                "        then" +
                "  redis.call('hincrby', KEYS[1], ARGV[1], 1);" +
                "  redis.call('expire', KEYS[1], ARGV[2]);" +
                "  return 1;" +
                "else" +
                "        return 0;" +
                " end";
        if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class),
                Arrays.asList(lockName), uuid, expire.toString())) {
            try {
                Thread.sleep(60);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //没有获取到锁重试
            lock(expire);
        }
    }

    public void unlock() {
        String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 0) then" +
                "  return nil; " +
                "elseif(redis.call('hincrby', KEYS[1], ARGV[1], -1) > 0) then" +
                "  return 0; " +
                "else" +
                "  redis.call('del', KEYS[1]);" +
                "  return 1;" +
                "end;";
        //如果返回值没有使用Boolean,Spring-data-redis 进行类型转换时将会把 null
        //转为 false,这就会影响我们逻辑判断
        //所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功。
        Long result = this.redisTemplate.execute(new DefaultRedisScript<>
                (script, Long.class), Arrays.asList(lockName), uuid);
        // 如果未返回值,代表尝试解其他线程的锁
        if (result == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);
        } else if (result == 1) {
            THREAD_LOCAL.remove();
        }
    }

}

使用及测试 

在业务代码中使用:

public void checkAndLock() {
    // 加锁,获取锁失败重试
    RedisDistributeLock lock = new RedisDistributeLock(this.redisTemplate, 
"lock");
    lock.lock();
    // 先查询库存是否充足
    Stock stock = this.stockMapper.selectById(1L);
    // 再减库存
    if (stock != null && stock.getCount() > 0){
        stock.setCount(stock.getCount() - 1);
        this.stockMapper.updateById(stock);
   }
    // this.testSubLock();
    // 释放锁
    lock.unlock();
}

 测试:

 测试可重入性:

 自动续期

lua脚本:

if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then 
    redis.call('expire', KEYS[1], ARGV[2]); 
    return 1; 
else 
    return 0; 
end

 在RedisDistributeLock中添加renewExpire方法:

    private static final Timer TIMER = new Timer();

    /**
     * 开启定时器,自动续期
     */
    private void renewExpire() {
        String script = "if(redis.call('hexists', KEYS[1], ARGV[1]) == 1) then " +
                "redis.call('expire', KEYS[1], ARGV[2]); " +
                "return 1; " +
                "else " +
                "return 0; end";
        TIMER.schedule(new TimerTask() {
            @Override
            public void run() {
                //如果uuid为空,则终止定时任务
                if (StringUtils.isNotBlank(uuid)) {
                    redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class),
                            Arrays.asList(lockName), RedisDistributeLock.this.uuid,
                            expire.toString());
                    renewExpire();
                }
            }
        },expire * 1000 / 3);
    }

 在lock方法中使用:

 在unlock方法中添加红框中的代码:

总结


特征:
    1.独占排他:setnx
    2.防死锁:
        redis客户端程序获取到锁之后,立马宕机。给锁添加过期时间
        不可重入:可重入 
    3.防误删:
        先判断是否自己的锁才能删除
    4.原子性:
        加锁和过期时间之间
        判断和释放锁之间
    5.可重入性:hash + lua脚本 
    6.自动续期:Timer定时器 + lua脚本 


锁操作:

    1.加锁:
        1.setnx:独占排他   死锁、不可重入、原子性 
        2.set k v ex 30 nx:独占排他、死锁         不可重入 
        3.hash + lua脚本:可重入锁
            1.判断锁是否被占用(exists),如果没有被占用则直接获取锁(hset/hincrby)并设置过期时间(expire)
            2.如果锁被占用,则判断是否当前线程占用的,如果是则重入(hincrby)并重置过期时间(expire)
            3.否则获取锁失败,将来代码中重试
        4.Timer定时器 + lua脚本:实现锁的自动续期
    


    2.解锁 
        1.del:导致误删
        2.先判断再删除同时保证原子性:lua脚本
        3.hash + lua脚本:可重入 
            1.判断当前线程的锁是否存在,不存在则返回nil,将来抛出异常
            2.存在则直接减1(hincrby -1),判断减1后的值是否为0,为0则释放锁(del),并返回1
            3.不为0,则返回0

       3.重试:递归 循环  

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

基于redis实现分布式锁 的相关文章

  • 仅当尚未设置时才进行原子设置

    仅当尚未在 Redis 中设置时 是否有办法执行原子设置 具体来说 我正在创建一个像 myapp user user email 这样的用户 并且希望 Redis 在 user email 已被占用时返回错误 而不是默默地替换旧值 比如声明
  • 如何在节点redis客户端上设置读取超时?

    在 github 上我没有看到读取超时的选项 https github com NodeRedis node redis https github com NodeRedis node redis There s connect timeo
  • 如何统计 Redis 流中未读或已确认的消息?

    使用 Redis 5 0 3 假设我们创建一个名为streamy和一个消费群体consumers XGROUP CREATE streamy consumers MKSTREAM 然后向其中添加一些消息 XADD streamy messa
  • Redis INCRBY 有限制

    我想知道是否有一种方法可以通过我的应用程序的单次往返在 Redis 中执行此操作 对于给定的键K 其可能值V是范围内的任意整数 A B 基本上 它有上限和下限 When an INCRBY or DECRBY发出命令 例如INCRBY ke
  • 如何将 ActionController::Live 与 Resque + Redis 一起使用(用于聊天应用程序)

    我正在尝试为我的 Rails 应用程序构建聊天功能 我在用ActionController Live Puma Resque Redis为了这 所以基本上在这种情况下 redissubscribe方法正在后台运行 使用resque 到目前为
  • Caffeine Expiry 中如何设置多个过期标准?

    我正在使用 Caffeine v2 8 5 我想创建一个具有可变到期时间的缓存 基于 值的创建 更新以及 该值的最后一次访问 读取 无论先发生什么都应该触发该条目的删除 缓存将成为三层值解析的一部分 The key is present i
  • 通过 StackExchange.Redis 连接到 Redis Servier

    我尝试使用以下方法制作一个测试项目Redis https redis io服务器 通过 Virtual Box 安装在 Linux Ubuntu 虚拟机上 Linux 机器通过 Virtual Box 的桥接适配器与本地网络连接 Virtu
  • socket.io 广播功能 & Redis pub/sub 架构

    如果有人能帮助我解决一个小疑问 我将不胜感激 使用socket io广播功能和在Redis上使用pub sub设计架构有什么区别 例如 在另一个示例中 node js 服务器正在侦听 socket io 针对 键 模型 todo 和值 数据
  • 使用 Sentinels 升级 Redis 的最佳实践?

    我有 3 个 Redis 节点 由 3 个哨兵监视 我进行了搜索 文档似乎不清楚如何最好地升级此类配置 我目前使用的是 3 0 6 版本 我想升级到最新的 5 0 5 我对这方面的程序有几个疑问 升级两个大版本可以吗 我在我们的暂存环境中执
  • 如何使 Redis 缓存中数据层次结构(树)的部分内容无效

    我有一些产品数据 需要在 Redis 缓存中存储多个版本 数据由 JSON 序列化对象组成 获取普通 基本 数据的过程很昂贵 将其定制为不同版本的过程也很昂贵 因此我想缓存所有版本以尽可能进行优化 数据结构看起来像这样 BaseProduc
  • redis dump.rdb / 保存小文件

    Context 我正在使用redis 数据库小于 100 MB 但是 我想进行每日备份 我也在 Ubuntu Server 12 04 上运行 当输入 redis cli save 我不知道 dump rdb 保存到哪里 因为 redis
  • Amazon Elasticache Redis 集群 - 无法获取端点

    我需要获取 Amazon Elasticache 中 Redis 集群的终端节点 以下代码适用于 Memcached 集群 但不适用于 Redis import com amazonaws auth AWSCredentials impor
  • 如何将“.csv”数据文件导入Redis数据库

    如何将 csv 数据文件导入 Redis 数据库 csv 文件中包含 id 时间 纬度 经度 列 您能否向我建议导入 CSV 文件并能够执行空间查询的最佳方法 这是一个非常广泛的问题 因为我们不知道您想要什么数据结构 您期望什么查询等等 为
  • Laravel 异常队列最大尝试次数超出

    我创建了一个应用程序来向多个用户发送电子邮件 但在处理大量收件人时遇到问题 该错误出现在failed jobs table Illuminate Queue MaxAttemptsExceededException App Jobs ESe
  • 节点应用程序之间共享会话?

    我目前有两个独立的节点应用程序在两个不同的端口上运行 但共享相同的后端数据存储 我需要在两个应用程序之间共享用户会话 以便当用户通过一个应用程序登录时 他们的会话可用 并且他们似乎已登录到另一个应用程序 在本例中 它是一个面向公众的网站和一
  • docker-compose:容器之间的 Redis 连接被拒绝

    我正在尝试设置一个 docker compose 文件 该文件旨在替换运行多个进程 RQ 工作线程 RQ 仪表板和 Flask 应用程序 的单个 Docker 容器解决方案导师 http supervisord org 主机系统是 Debi
  • 如何配置Lettuce Redis集群异步连接池

    我正在配置我的生菜重新分配池 当我按照官方文档配置时 连接池无法正常初始化 无法获取连接 官方文档指出 RedisClusterClient clusterClient RedisClusterClient create RedisURI
  • 超出 Redis 连接/缓冲区大小限制

    在对我们的应用程序服务器进行压力测试时 我们从 Redis 中得到以下异常 ServiceStack Redis RedisException 无法连接到 redis host 6379 处的 redis 实例 gt System Net
  • ServiceStack.Redis:无法连接:sPort:

    我经常得到 ServiceStack Redis 无法连接 sPort 0 或 ServiceStack Redis 无法连接 sPort 50071 或其他端口号 当我们的网站比较繁忙时 就会出现这种情况 Redis 本身看起来很好 CP
  • 没有适用于机器人的 Laravel 会话

    我在大型 Laravel 项目和 Redis 存储方面遇到问题 我们将会话存储在 Redis 中 我们已经有 28GB 的 RAM 然而 它的运行速度仍然相对较快 达到了极限 因为我们有来自搜索引擎机器人的大量点击 每天超过 250 000

随机推荐