Redisson源码-多线程之首个获取锁的线程加解锁流程

2023-11-01

Redisson源码-多线程之首个获取锁的线程加解锁流程

简介
当有多个线程同时去获取同一把锁时,第一个获取到锁的线程会进行加解锁,其他线程需订阅消息并等待锁释放。

以下源码分析基于redisson-3.17.6版本,不同版本源码会有些许不同需注意。

		 <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.17.6</version>
        </dependency>

当我们调用Redisson.lock()并且不设置锁时间时,我们进入RedissonLock的lock方法。

    public void lock() {
        try {
        	// -1L为锁时间,表示不限时
            this.lock(-1L, (TimeUnit)null, false);
        } catch (InterruptedException var2) {
            throw new IllegalStateException();
        }
    }
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
		// 获取当前线程id
        long threadId = Thread.currentThread().getId();
        // 尝试获取锁
        Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
        // lock acquired
        // 获取到锁直接返回
        if (ttl == null) {
            return;
        }
		// 订阅锁消息:当锁被释放的时候,会通过publish发布一条消息,通知其它等待这个锁的线程,锁已经释放。
        CompletableFuture<RedissonLockEntry> future = subscribe(threadId);
        pubSub.timeout(future);
        RedissonLockEntry entry;
        if (interruptibly) {
            entry = commandExecutor.getInterrupted(future);
        } else {
            entry = commandExecutor.get(future);
        }

        try {
        	// 不停尝试获取锁
            while (true) {
                ttl = tryAcquire(-1, leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    break;
                }

                // waiting for message
                // 如果锁的过期时间>=0
                if (ttl >= 0) {
                    try {
                    	// 等待超时时间过去
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        if (interruptibly) {
                            throw e;
                        }
                        entry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                    }
                 // 锁过期时间<0,代表锁的超时时间未设置
                } else {
                    if (interruptibly) {
                        entry.getLatch().acquire();
                    } else {
                    	// 无限等待直至获取锁
                        entry.getLatch().acquireUninterruptibly();
                    }
                }
            }
        } finally {
            unsubscribe(entry, threadId);
        }
//        get(lockAsync(leaseTime, unit));
    }

我们按照代码逻辑先看一下尝试获取锁的代码

private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
    }
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
        RFuture<Long> ttlRemainingFuture;
        if (leaseTime > 0) {
        	// 指定了超时时间
            ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
        	// 未指定超时时间
            ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,
                    TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }
        CompletionStage<Long> f = ttlRemainingFuture.thenApply(ttlRemaining -> {
            // lock acquired
            // ttlRemaining为null, 本质就是加锁的LUA脚本中返回nil,表示获取锁成功
            if (ttlRemaining == null) {
                if (leaseTime > 0) {
                	// 如果设置了超时时间,则更新internalLockLeaseTime为指定的超时时间,并且不会启动看门狗
                    internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                	// 自动续期实现,开启看门狗机制
                    scheduleExpirationRenewal(threadId);
                }
            }
            return ttlRemaining;
        });
        return new CompletableFutureWrapper<>(f);
    }

tryLockInnerAsync方法里的代码是加锁的核心代码之一:

    <T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return nil; " +
                        "end; " +
                        "return redis.call('pttl', KEYS[1]);",
                Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
    }

先介绍一下lua脚本的中的参数:
KEYS[1]:锁住的对象(锁住的key),相当于下图的HASH
ARGV[1]:锁的过期时间
ARGV[2]:UUID+当前线程id,相当于下图的key
nil:相当于null

初次获取锁:
在这里插入图片描述
锁重入:
在这里插入图片描述

接下来我们详细看一下lua脚本的逻辑:

 		 // 通过exists指令判断需要加锁的key是否存在,如果不存在,说明还没被加锁,可以直接进行加锁
		"if (redis.call('exists', KEYS[1]) == 0) then " +
        // 通过hincrby指令往redis中插入一个哈希结构的数据,key[1]=加锁key   ARGV[2]=uuid+当前线程ID  1=锁的重入次数
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        // 通过pexpire指令设置锁的过期时间:ARGV[1]=锁的过期时间
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        // 返回nil, 表示加锁成功:nil=null
        "return nil; " +
        "end; " +

        // 如下是可重入锁的逻辑
        // 通过hexists指令判断当前的锁是不是自己的,只有是自己的锁时,才支持可重入
        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
        // 通过hincrby指令更新hash结构的数据(锁结构数据),将value对应的可重入次数加一
        "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
        // 通过pexpire指令设置锁的过期时间
        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
        // 返回nil, 表示加锁成功
        "return nil; " +
        "end; " +

        // 如果当前已经有人获取了锁,并且这个锁不是自己的,那么将会执行pttl指令,返回当前锁剩余的过期时间
        "return redis.call('pttl', KEYS[1]);"

从上面我们可以看出,redisson加锁的本质是通过执行lua脚本,返回nil(相当于null)或锁的剩余过期时间。如果返回并且未设置过期时间则开启看门狗机制。

接下来我们看下开启看门狗机制的代码

protected void scheduleExpirationRenewal(long threadId) {
        ExpirationEntry entry = new ExpirationEntry();
        // 将当前锁的名称和 ExpirationEntry 对象放入 EXPIRATION_RENEWAL_MAP中,并返回之前关联的对象(如果存在)。
        // 本质就是检查当前锁是否已开启自动续期。
        ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
        // 已开启
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        // 未开启
        } else {
            entry.addThreadId(threadId);
            try {
            	// 执行自动续期机制
                renewExpiration();
            } finally {
                if (Thread.currentThread().isInterrupted()) {
                	// 如果线程中断则解除自动续期机制避免死锁
                    cancelExpirationRenewal(threadId);
                }
            }
        }
    }
private void renewExpiration() {
		// 获取当前锁名称的 ExpirationEntry 对象。
        ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
        if (ee == null) {
            return;
        }
        // 构建Timeout任务去执行锁续期,本质是调用了netty框架中的newTimeout方法,相当于一个延迟定时任务。
        // 相当于每隔 过期时间/3 (默认10秒)毫秒,递归调用renewExpiration方法去执行锁续期直至锁被释放。
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
            	// 二次获取当前锁名称的 ExpirationEntry 对象。
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                // 获取 ExpirationEntry 对象中的第一个线程 ID。
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                // 锁续期
                CompletionStage<Boolean> future = renewExpirationAsync(threadId);
                future.whenComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getRawName() + " expiration", e);
                        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                        return;
                    }
                    // 续期成功
                    if (res) {
                        // reschedule itself
                        // 递归调用自身,不断续期
                        renewExpiration();
                    // 续期失败,表示锁被释放
                    } else {
                    	// 取消定时任务等操作
                        cancelExpirationRenewal(null);
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

下面我们看一下锁续期的核心代码:

protected CompletionStage<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getRawName()),
                internalLockLeaseTime, getLockName(threadId));
    }

该lua脚本的中的参数与上面相同:
KEYS[1]:锁住的对象(锁住的key)
ARGV[1]:锁的过期时间
ARGV[2]:UUID+当前线程id

下面我们看一下这段lua脚本的逻辑:

	// 通过hexists指令判断当前的锁是不是自己的
	"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
	// 如果是的话通过pexpire指令设置锁的过期时间
	"redis.call('pexpire', KEYS[1], ARGV[1]); " +
	"return 1; " +
	"end; " +
	// 否则返回0
	"return 0;",

整理上文可以看出看门狗机制简单来说就是每隔 过期时间/3 毫秒去执行lua脚本,若锁未被释放则刷新其过期时间,直至锁被释放为止。

接下来我们再看下解锁的逻辑:

@Override
    public void unlock() {
        try {
        	// 释放锁
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
public RFuture<Void> unlockAsync(long threadId) {
		// 释放锁的核心代码
        RFuture<Boolean> future = unlockInnerAsync(threadId);

        CompletionStage<Void> f = future.handle((opStatus, e) -> {
        	// 取消看门狗机制(就是取消上文的定时任务等操作)
            cancelExpirationRenewal(threadId);

            if (e != null) {
                throw new CompletionException(e);
            }
            if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + threadId);
                throw new CompletionException(cause);
            }

            return null;
        });

        return new CompletableFutureWrapper<>(f);
    }

下面我们看一下解锁的核心代码:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                        "return nil;" +
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                        "if (counter > 0) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                        "return 0; " +
                        "else " +
                        "redis.call('del', KEYS[1]); " +
                        "redis.call('publish', KEYS[2], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return nil;",
                Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
    }

先说明一下该lua脚本的中的参数:
KEYS[1]:锁住的对象(锁住的key)
KEYS[2]:监听该锁的频道
ARGV[1]:解锁消息
ARGV[2]:锁的过期时间
ARGV[3]:UUID+当前线程id

下面我们看下这段lua脚本的逻辑:

	// 通过hexists指令判断当前的锁是不是自己的
	"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
	// 不是自己的则返回nil(相当于null)
	"return nil;" +
	"end; " +
	// 则使用 hincrby 指令将字段锁的可重入次数减去 1,即减少持有锁的线程数。
	"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
	// 如果结果 > 0,代表解锁成功,但是锁仍然存在
	"if (counter > 0) then " +
	// 通过pexpire指令设置锁的过期时间
	"redis.call('pexpire', KEYS[1], ARGV[2]); " +
	// 返回0
	"return 0; " +
	"else " +
	// 如果结果 < 0 ,代表持有锁的线程数为0,这时需要完全释放锁,通过del指令删除指定key的锁
	"redis.call('del', KEYS[1]); " +
	// 通过publish指令向订阅该锁的频道发送解锁消息
	"redis.call('publish', KEYS[2], ARGV[1]); " +
	// 返回1
	"return 1; " +
	"end; " +
	"return nil;",

以上我们解释说明了单个线程在不等待锁的情况下,直接获取锁,对锁进行续期和解锁的代码逻辑,可以看出加解锁本质上都是通过lua脚本去执行,当有多个线程同时去获取锁时,第一个获取到锁的线程会按照此逻辑执行,其他线程需订阅消息并等待锁释放。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Redisson源码-多线程之首个获取锁的线程加解锁流程 的相关文章

随机推荐

  • linux安装php7的方法

    1 安装依赖包 1 安装依赖包 1 yum install y gcc gcc c make zlib zlib devel pcre pcre devel libjpeg libjpeg devel libpng libpng devel
  • linux中的USB摄像头驱动(应用层)(基于V4L2)

    V4L2 是 Video4Linux2 的缩写 是 Linux 内核中的一个视频设备驱动接口 USB V4L2 初始化流程 1 打开设备节点 open 2 配置参数 分辨率 fps 格式 ioctl 3 请求分配帧缓存 gt 地址映射 4
  • Remember The Word-Trie

    题目 UVaLive 3942 include
  • C++11新特性探究:显式override和final

    C 中 我们一般可以以基类声明纯虚函数 然后让派生类继承并重写这个虚函数 用 override表示显示覆盖基类方法 但一直没有提供一种方法来阻止派生类继承基类的虚函数 C 11标准引入了final说明符 很好的解决了上面的问题 final告
  • M1芯片安装Photoshop 2021原生激活安装教程(附云盘下载)已支持M1芯片big sur系统 ARM M1处理器安装PS2021解决方案教程

    M1芯片MAC用户 收到2021年4月第二波更新 适配M1芯片MAC版的PS 2021原生JP终于来了 转眼购买M1芯片版的macbook pro已经3个多月了 之前因为这款芯片的特殊性 一直没能用上完美的PS 之前在网上找的PS安装大部分
  • Qt自定义类型的注册与使用

    使用C 时 经常会自定义各种类型的结构体 而在Qt中 用信号传递新的类型时 这时就需要将这种类型进行注册了 在使用Qt进行应用程序开发时 经常要自定义数据类型 而且在需要的时候还要把这些数据放到QVariant中去 因为QVariant是一
  • Navicat for MySQL 连接 Mysql 8.0.11 出现1251- Client does not support authentication protocol 错误

    1 打开cmd 进入mysql 2 依次输入以下命令 ALTER USER root localhost IDENTIFIED BY password PASSWORD EXPIRE NEVER 修改加密规则 ALTER USER root
  • 浅谈matlab数学建模中@符号-----函数句柄

    很多刚开始学数学建模的小伙伴第一次在matlab程序中遇到 这个符号都不知道是什么意思 如 f myfunction 或者 fun sin 其实这种用法叫创建函数句柄 当我们在万能的matlab帮助文档搜索函数句柄 什么花里胡哨的 打开即劝
  • zlib导入到工程与数据压缩

    一 将zlib导入到工程中 1 编译zlib库后 会生成一个ZlibDllRelease文件夹 Release版本 和一个ZlibDllDebug文件夹 Debug版本 使用zlib库 共需4个文件 zlib 1 2 8的根目录下的zcon
  • vue-cli-service serve --open启动项目时打开浏览器失效无法打开浏览器解决方法

    当在vue项目中的package json文件修改启动命令时 发现也无法自动打开浏览器 scripts serve vue cli service serve open build vue cli service build npm run
  • Python正则表达式模块(re)简介

    author skate time 2014 10 13 Python正则表达式模块 re 简介 一 Python中转义字符 正则表达式使用反斜杠 来代表特殊形式或用作转义字符 这里跟Python的语法冲突 因此 Python用 表示正则表
  • nginx 使用教程

    一 win7 nginx 常用命令 1 启动nginx start nginx 2 nginx 服务器重启命令 关闭 nginx s reload 修改配置后重新加载生效 nginx s reopen 重新打开日志文件 nginx t c
  • RabbitMQ应用之消息堆积、消息丢失、有序消费、重复消费

    文章目录 前言 一 消息堆积 1 消息堆积的产生与影响 2 消息堆积的解决方案 二 消息丢失 1 情景 2 解决方案 三 有序消费 1 情景 2 解决方案 四 重复消费 1 情景 2 解决方案 前言 最近接触了多线程和MQ等性能相关的内容
  • HTML 特殊符号编码对照表

    特殊符号 命名实体 十进制编码 特殊符号 命名实体 十进制编码 特殊符号 命名实体 十进制编码 Alpha 913 Beta 914 Gamma 915 Delta 916 Epsilon 917 Zeta 918 Eta 919 Thet
  • 工业互联网产业链全景图深度分析

    工业互联网领域有哪些投资机会 新基建 是与传统基建相对应 结合新一轮科技革命和产业变革特征 面向国家战略需求 为经济社会的创新 协调 绿色 开放 共享发展提供底层支撑的具有乘数效应的战略性 网络型基础设施 其中 新基建 包括5G基建 特高压
  • openwrt 修改feeds.conf.default为GitHub源

    lede和openwrt合并之后 lede官网挂了 git openwrt org 也访问不了 只好去github上找最新源码 git clone https github com openwrt openwrt git 复制代码 最新的l
  • 嵌入式linux 配置usb otg,嵌入式系统设计中的USB OTG方案

    速外设操作时最大为80mA TD1120整个芯片支持功率节省模式 包括主机控制器以及外设控制器的延缓模式以使功率消耗最小化www cechina cn 延长系统电池寿命 对于移动设备来说 电池寿命是很关键的性能 接口性能表现 USB数据传输
  • Bitmap之压缩方案

    文章目录 前言 1 基础知识 1 1色彩模式 1 2四种模式的区别 1 3具体对比 1 4bitmap内存占用大小计算方式 1 5图片存在的形式 1 6BitampFactory加载Bitmap对象的方式 2 压缩方案 2 1采样率压缩 2
  • Bugku 计算器

    首先打开题目链接 发现一个式子 但答案有三位数 而只能输入一个数字 直接F12查看原代码 发现maxlenthen 1 maxlenthen意思是文件域可接受的字符数量的上限 可输入字符串最大的长度 容质 所以把1改为3就好啦 然后得到fl
  • Redisson源码-多线程之首个获取锁的线程加解锁流程

    Redisson源码 多线程之首个获取锁的线程加解锁流程 简介 当有多个线程同时去获取同一把锁时 第一个获取到锁的线程会进行加解锁 其他线程需订阅消息并等待锁释放 以下源码分析基于redisson 3 17 6版本 不同版本源码会有些许不同