分布式锁学习

2023-05-16

概述

分布式锁是控制分布式系统之间同步访问共享资源的一种方式。

在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,这个时候,便需要使用到分布式锁。

分布式系统越来越普及,一个应用往往会部署在多台机器上(多节点),在有些场景中,为了保证数据不重复,要求在同一时刻,同一任务只在一个节点上运行,即保证某一方法同一时刻只能被一个线程执行。在单机环境中,应用是在同一进程下的,只需要保证单进程多线程环境中的线程安全性,通过 JAVA 提供的 volatile、ReentrantLock、synchronized 以及 concurrent 并发包下一些线程安全的类等就可以做到。而在多机部署环境中,不同机器不同进程,就需要在多进程下保证线程的安全性了。因此,分布式锁应运而生。
 

以往的工作中看到或用到几种实现方案,有基于zk的,也有基于redis的。由于实现上逻辑不严谨,线上时不时会爆出几个死锁case。

目录

概述

一、分布式锁应该具备哪些条件

二、分布式锁常用的有三种实现方式:

1.基于zookeeper的分布式锁;

1.持久节点 (PERSISTENT)

2.持久节点顺序节点(PERSISTENT_SEQUENTIAL)

3.临时节点(EPHEMERAL)

4.临时顺序节点(EPHEMERAL_SEQUENTIAL)

Zookeeper分布式锁的原理

获取锁

释放锁

1.任务完成,客户端显示释放

2.任务执行过程中,客户端崩溃

3.方案:

4.缺点:

2.基于数据库的分布式锁;

1. 悲观锁

2.乐观锁—基于版本号

3.基于redis的分布式锁;

1、选用Redis实现分布式锁原因:

2、使用命令介绍:

3、实现思想:


一、分布式锁应该具备哪些条件

1、在分布式系统环境下,一个方法在同一时间只能被一个机器的一个线程执行; 
2、高可用的获取锁与释放锁; 
3、高性能的获取锁与释放锁; 
4、具备可重入特性; 
5、具备锁失效机制,防止死锁; 
6、具备非阻塞锁特性,即没有获取到锁将直接返回获取锁失败。

二、分布式锁常用的有三种实现方式:

目前几乎很多大型网站及应用都是分布式部署的,分布式场景中的数据一致性问题一直是一个比较重要的话题。分布式的CAP理论告诉我们“任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项。”所以,很多系统在设计之初就要对这三者做出取舍。在互联网领域的绝大多数的场景中,都需要牺牲强一致性来换取系统的高可用性,系统往往只需要保证“最终一致性”,只要这个最终时间是在用户可以接受的范围内即可。

 
在很多场景中,我们为了保证数据的最终一致性,需要很多的技术方案来支持,比如分布式事务、分布式锁等。有的时候,我们需要保证一个方法在同一时间内只能被同一个线程执行。

1.基于zookeeper的分布式锁;

让我们来回顾一下Zookeeper节点的概念:

Zookeeper的数据存储结构就像一棵树,这棵树由节点组成,这种节点叫做Znode。

Znode分为四种类型:

1.持久节点 (PERSISTENT)

默认的节点类型。创建节点的客户端与zookeeper断开连接后,该节点依旧存在 。

2.持久节点顺序节点(PERSISTENT_SEQUENTIAL)

所谓顺序节点,就是在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号:

3.临时节点(EPHEMERAL)

和持久节点相反,当创建节点的客户端与zookeeper断开连接后,临时节点会被删除:

 
「每日分享」如何用Zookeeper实现分布式锁

 「每日分享」如何用Zookeeper实现分布式锁

4.临时顺序节点(EPHEMERAL_SEQUENTIAL)

顾名思义,临时顺序节点结合和临时节点和顺序节点的特点:在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号;当创建节点的客户端与zookeeper断开连接后,临时节点会被删除。

Zookeeper分布式锁的原理

Zookeeper分布式锁恰恰应用了临时顺序节点。具体如何实现呢?让我们来看一看详细步骤:

获取锁

首先,在Zookeeper当中创建一个持久节点ParentLock。当第一个客户端想要获得锁时,需要在ParentLock这个节点下面创建一个临时顺序节点 Lock1。

「每日分享」如何用Zookeeper实现分布式锁

之后,Client1查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock1是不是顺序最靠前的一个。如果是第一个节点,则成功获得锁。

「每日分享」如何用Zookeeper实现分布式锁

这时候,如果再有一个客户端 Client2 前来获取锁,则在ParentLock下载再创建一个临时顺序节点Lock2。

「每日分享」如何用Zookeeper实现分布式锁


 

Client2查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock2是不是顺序最靠前的一个,结果发现节点Lock2并不是最小的。

于是,Client2向排序仅比它靠前的节点Lock1注册Watcher,用于监听Lock1节点是否存在。这意味着Client2抢锁失败,进入了等待状态。

「每日分享」如何用Zookeeper实现分布式锁

这时候,如果又有一个客户端Client3前来获取锁,则在ParentLock下载再创建一个临时顺序节点Lock3。

「每日分享」如何用Zookeeper实现分布式锁

Client3查找ParentLock下面所有的临时顺序节点并排序,判断自己所创建的节点Lock3是不是顺序最靠前的一个,结果同样发现节点Lock3并不是最小的。

于是,Client3向排序仅比它靠前的节点Lock2注册Watcher,用于监听Lock2节点是否存在。这意味着Client3同样抢锁失败,进入了等待状态。

「每日分享」如何用Zookeeper实现分布式锁

这样一来,Client1得到了锁,Client2监听了Lock1,Client3监听了Lock2。这恰恰形成了一个等待队列,很像是Java当中ReentrantLock所依赖的

释放锁

释放锁分为两种情况:

1.任务完成,客户端显示释放

当任务完成时,Client1会显示调用删除节点Lock1的指令。

「每日分享」如何用Zookeeper实现分布式锁

2.任务执行过程中,客户端崩溃

获得锁的Client1在任务执行过程中,如果Duang的一声崩溃,则会断开与Zookeeper服务端的链接。根据临时节点的特性,相关联的节点Lock1会随之自动删除。

 「每日分享」如何用Zookeeper实现分布式锁

由于Client2一直监听着Lock1的存在状态,当Lock1节点被删除,Client2会立刻收到通知。这时候Client2会再次查询ParentLock下面的所有节点,确认自己创建的节点Lock2是不是目前最小的节点。如果是最小,则Client2顺理成章获得了锁。

「每日分享」如何用Zookeeper实现分布式锁

同理,如果Client2也因为任务完成或者节点崩溃而删除了节点Lock2,那么Client3就会接到通知。

「每日分享」如何用Zookeeper实现分布式锁

最终,Client3成功得到了锁。

方案:
 

可以直接使用zookeeper第三方库Curator客户端,这个客户端中封装了一个可重入的锁服务。

Curator提供的InterProcessMutex是分布式锁的实现。acquire方法用户获取锁,release方法用于释放锁。

GitHub - apache/curator: Apache Curator

 
缺点:

    性能上可能并没有缓存服务那么高。因为每次在创建锁和释放锁的过程中,都要动态创建、销毁瞬时节点来实现锁功能。ZK中创建和删除节点只能通过Leader服务器来执行,然后将数据同不到所有的Follower机器上。

    其实,使用Zookeeper也有可能带来并发问题,只是并不常见而已。考虑这样的情况,由于网络抖动,客户端可ZK集群的session连接断了,那么zk以为客户端挂了,就会删除临时节点,这时候其他客户端就可以获取到分布式锁了。就可能产生并发问题。这个问题不常见是因为zk有重试机制,一旦zk集群检测不到客户端的心跳,就会重试,Curator客户端支持多种重试策略。多次重试之后还不行的话才会删除临时节点。(所以,选择一个合适的重试策略也比较重要,要在锁的粒度和并发之间找一个平衡。)
 

2.基于数据库的分布式锁;

1. 悲观锁

基于数据库的实现方式的核心思想是:在数据库中创建一个表,表中包含方法名等字段,并在方法名字段上创建唯一索引,想要执行某个方法,就使用这个方法名向表中插入数据,成功插入则获取锁,执行完成后删除对应的行数据释放锁。

(1)创建一个表:

DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `method_name` varchar(64) NOT NULL COMMENT '锁定的方法名',
  `desc` varchar(255) NOT NULL COMMENT '备注信息',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';


(2)想要执行某个方法,就使用这个方法名向表中插入数据:

INSERT INTO method_lock (method_name, desc) VALUES ('methodName', '测试的methodName');


因为我们对method_name做了唯一性约束,这里如果有多个请求同时提交到数据库的话,数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁,可以执行方法体内容。

(3)成功插入则获取锁,执行完成后删除对应的行数据释放锁:

delete from method_lock where method_name ='methodName';

2.乐观锁—基于版本号

乐观锁常常用于分布式系统对数据库某张特定表执行update操作。考虑线上选座的场景,用户A和B同时选择了某场次电影的一个座位,都去将座位的状态设置为已售。

设想这样的执行序列:
1、用户A判断该座位为未售状态;
2、用户B判断该座位为未售状态;
3、用户A执行update座位为已售;
4、用户B执行update座位为已售。

这样会出现同一个座位售出两次的情况,解决方案是在这张数据库表中增加一个版本号的字段。执行操作前读取当前数据库表中的版本号,在执行update语句时将版本号放在where语句中,如果更新了记录则说明成功,如果没有更新记录,则说明此次update失败。

加了乐观锁的执行序列:
1、用户A查询该座位,得到该座位是未售状态,版本号是5;
2、用户B查询该座位,得到该座位是未售状态,版本号是5;
3、用户A执行update语句将座位状态更新为已售,版本号更新为6;
4、用户B执行update语句时此时这个座位的记录版本号为6,没有版本号为5的这个座位的记录,执行失败。

(1)创建一个表:

DROP TABLE IF EXISTS `method_lock`;
CREATE TABLE `method_lock` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `method_name` varchar(64) NOT NULL COMMENT '锁定的方法名',
  `state` tinyint NOT NULL COMMENT '1:未分配;2:已分配',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  `version` int NOT NULL COMMENT '版本号',
  `PRIMARY KEY (`id`),
  UNIQUE KEY `uidx_method_name` (`method_name`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COMMENT='锁定中的方法';

(2)先获取锁的信息

     select id, method_name, state,version from method_lock where state=1 and method_name='methodName';

(3)占有锁
       

update t_resoure set state=2, version=2, update_time=now() where method_name='methodName' and state=1 and version=2;

如果没有更新影响到一行数据,则说明这个资源已经被别人占位了。
 

优点:乐观锁的性能高于悲观锁,并不容易出现死锁。

缺点:乐观锁只能对一张表的数据进行加锁,如果是需要对多张表的数据操作加分布式锁,基于版本号的乐观锁是办不到的。

3.基于redis的分布式锁;

1、选用Redis实现分布式锁原因:

(1)Redis有很高的性能;
(2)Redis命令对此支持较好,实现起来比较方便

2、使用命令介绍:

(1)SETNX

SETNX key val:当且仅当key不存在时,set一个key为val的字符串,返回1;若key存在,则什么都不做,返回0。


(2)expire

expire key timeout:为key设置一个超时时间,单位为second,超过这个时间锁会自动释放,避免死锁。


(3)delete

delete key:删除key


在使用Redis实现分布式锁的时候,主要就会使用到这三个命令。

3、实现思想:

(1)获取锁的时候,使用setnx加锁,并使用expire命令为锁添加一个超时时间,超过该时间则自动释放锁,锁的value值为一个随机生成的UUID,通过此在释放锁的时候进行判断。

(2)获取锁的时候还设置一个获取的超时时间,若超过这个时间则放弃获取锁。

(3)释放锁的时候,通过UUID判断是不是该锁,若是该锁,则执行delete进行锁释放。
 

1)加锁实现

SETNX 可以直接加锁操作,比如说对某个关键词foo加锁,客户端可以尝试
SETNX foo.lock <current unix time>

如果返回1,表示客户端已经获取锁,可以往下操作,操作完成后,通过
DEL foo.lock

命令来释放锁。
如果返回0,说明foo已经被其他客户端上锁,如果锁是非堵塞的,可以选择返回调用。如果是堵塞调用调用,就需要进入以下个重试循环,直至成功获得锁或者重试超时。理想是美好的,现实是残酷的。仅仅使用SETNX加锁带有竞争条件的,在某些特定的情况会造成死锁错误。

2)处理死锁

在上面的处理方式中,如果获取锁的客户端端执行时间过长,进程被kill掉,或者因为其他异常崩溃,导致无法释放锁,就会造成死锁。所以,需要对加锁要做时效性检测。因此,我们在加锁时,把当前时间戳作为value存入此锁中,通过当前时间戳和Redis中的时间戳进行对比,如果超过一定差值,认为锁已经时效,防止锁无限期的锁下去,但是,在大并发情况,如果同时检测锁失效,并简单粗暴的删除死锁,再通过SETNX上锁,可能会导致竞争条件的产生,即多个客户端同时获取锁。

C1获取锁,并崩溃。C2和C3调用SETNX上锁返回0后,获得foo.lock的时间戳,通过比对时间戳,发现锁超时。
C2 向foo.lock发送DEL命令。
C2 向foo.lock发送SETNX获取锁。
C3 向foo.lock发送DEL命令,此时C3发送DEL时,其实DEL掉的是C2的锁。
C3 向foo.lock发送SETNX获取锁。

此时C2和C3都获取了锁,产生竞争条件,如果在更高并发的情况,可能会有更多客户端获取锁。所以,DEL锁的操作,不能直接使用在锁超时的情况下,幸好我们有GETSET方法,假设我们现在有另外一个客户端C4,看看如何使用GETSET方式,避免这种情况产生。

C1获取锁,并崩溃。C2和C3调用SETNX上锁返回0后,调用GET命令获得foo.lock的时间戳T1,通过比对时间戳,发现锁超时。
C4 向foo.lock发送GESET命令,
GETSET foo.lock <current unix time>
并得到foo.lock中老的时间戳T2

如果T1=T2,说明C4获得时间戳。
如果T1!=T2,说明C4之前有另外一个客户端C5通过调用GETSET方式获取了时间戳,C4未获得锁。只能sleep下,进入下次循环中。

现在唯一的问题是,C4设置foo.lock的新时间戳,是否会对锁产生影响。其实我们可以看到C4和C5执行的时间差值极小,并且写入foo.lock中的都是有效时间错,所以对锁并没有影响。
为了让这个锁更加强壮,获取锁的客户端,应该在调用关键业务时,再次调用GET方法获取T1,和写入的T0时间戳进行对比,以免锁因其他情况被执行DEL意外解开而不知。以上步骤和情况,很容易从其他参考资料中看到。客户端处理和失败的情况非常复杂,不仅仅是崩溃这么简单,还可能是客户端因为某些操作被阻塞了相当长时间,紧接着 DEL 命令被尝试执行(但这时锁却在另外的客户端手上)。也可能因为处理不当,导致死锁。还有可能因为sleep设置不合理,导致Redis在大并发下被压垮。最为常见的问题还有

GET返回nil时应该走那种逻辑?

第一种走超时逻辑
C1客户端获取锁,并且处理完后,DEL掉锁,在DEL锁之前。C2通过SETNX向foo.lock设置时间戳T0 发现有客户端获取锁,进入GET操作。
C2 向foo.lock发送GET命令,获取返回值T1(nil)。
C2 通过T0>T1+expire对比,进入GETSET流程。
C2 调用GETSET向foo.lock发送T0时间戳,返回foo.lock的原值T2
C2 如果T2=T1相等,获得锁,如果T2!=T1,未获得锁。

第二种情况走循环走setnx逻辑
C1客户端获取锁,并且处理完后,DEL掉锁,在DEL锁之前。C2通过SETNX向foo.lock设置时间戳T0 发现有客户端获取锁,进入GET操作。
C2 向foo.lock发送GET命令,获取返回值T1(nil)。
C2 循环,进入下一次SETNX逻辑

两种逻辑貌似都是OK,但是从逻辑处理上来说,第一种情况存在问题。当GET返回nil表示,锁是被删除的,而不是超时,应该走SETNX逻辑加锁。走第一种情况的问题是,正常的加锁逻辑应该走SETNX,而现在当锁被解除后,走的是GETST,如果判断条件不当,就会引起死锁,很悲催,我在做的时候就碰到了,具体怎么碰到的看下面的问题

GETSET返回nil时应该怎么处理?

C1和C2客户端调用GET接口,C1返回T1,此时C3网络情况更好,快速进入获取锁,并执行DEL删除锁,C2返回T2(nil),C1和C2都进入超时处理逻辑。
C1 向foo.lock发送GETSET命令,获取返回值T11(nil)。
C1 比对C1和C11发现两者不同,处理逻辑认为未获取锁。
C2 向foo.lock发送GETSET命令,获取返回值T22(C1写入的时间戳)。
C2 比对C2和C22发现两者不同,处理逻辑认为未获取锁。

此时C1和C2都认为未获取锁,其实C1是已经获取锁了,但是他的处理逻辑没有考虑GETSET返回nil的情况,只是单纯的用GET和GETSET值就行对比,至于为什么会出现这种情况?一种是多客户端时,每个客户端连接Redis的后,发出的命令并不是连续的,导致从单客户端看到的好像连续的命令,到Redis server后,这两条命令之间可能已经插入大量的其他客户端发出的命令,比如DEL,SETNX等。第二种情况,多客户端之间时间不同步,或者不是严格意义的同步。

时间戳的问题

我们看到foo.lock的value值为时间戳,所以要在多客户端情况下,保证锁有效,一定要同步各服务器的时间,如果各服务器间,时间有差异。时间不一致的客户端,在判断锁超时,就会出现偏差,从而产生竞争条件。
锁的超时与否,严格依赖时间戳,时间戳本身也是有精度限制,假如我们的时间精度为秒,从加锁到执行操作再到解锁,一般操作肯定都能在一秒内完成。这样的话,我们上面的CASE,就很容易出现。所以,最好把时间精度提升到毫秒级。这样的话,可以保证毫秒级别的锁是安全的。

分布式锁的问题

1:必要的超时机制:获取锁的客户端一旦崩溃,一定要有过期机制,否则其他客户端都降无法获取锁,造成死锁问题。
2:分布式锁,多客户端的时间戳不能保证严格意义的一致性,所以在某些特定因素下,有可能存在锁串的情况。要适度的机制,可以承受小概率的事件产生。
3:只对关键处理节点加锁,良好的习惯是,把相关的资源准备好,比如连接数据库后,调用加锁机制获取锁,直接进行操作,然后释放,尽量减少持有锁的时间。
4:在持有锁期间要不要CHECK锁,如果需要严格依赖锁的状态,最好在关键步骤中做锁的CHECK检查机制,但是根据我们的测试发现,在大并发时,每一次CHECK锁操作,都要消耗掉几个毫秒,而我们的整个持锁处理逻辑才不到10毫秒,可以查看没有选择做锁的检查。
5:sleep学问,为了减少对Redis的压力,获取锁尝试时,循环之间一定要做sleep操作。但是sleep时间是多少是门学问。需要根据自己的Redis的QPS,加上持锁处理时间等进行合理计算。
6:至于为什么不使用Redis的muti,expire,watch等机制,可以查一参考资料,找下原因。

锁测试数据

未使用sleep
第一种,锁重试时未做sleep。单次请求,加锁,执行,解锁时间 


可以看到加锁和解锁时间都很快,当我们使用

ab -n1000 -c100 'http://sandbox6.wanke.etao.com/test/test_sequence.php?tbpm=t'
AB 并发100累计1000次请求,对这个方法进行压测时。


我们会发现,获取锁的时间变成,同时持有锁后,执行时间也变成,而delete锁的时间,将近10ms时间,为什么会这样?
1:持有锁后,我们的执行逻辑中包含了再次调用Redis操作,在大并发情况下,Redis执行明显变慢。
2:锁的删除时间变长,从之前的0.2ms,变成9.8ms,性能下降近50倍。
在这种情况下,我们压测的QPS为49,最终发现QPS和压测总量有关,当我们并发100总共100次请求时,QPS得到110多。当我们使用sleep时

使用Sleep时

单次执行请求时


我们看到,和不使用sleep机制时,性能相当。当时用相同的压测条件进行压缩时 

 


获取锁的时间明显变长,而锁的释放时间明显变短,仅是不采用sleep机制的一半。当然执行时间变成就是因为,我们在执行过程中,重新创建数据库连接,导致时间变长的。同时我们可以对比下Redis的命令执行压力情况 

上图中细高部分是为未采用sleep机制的时的压测图,矮胖部分为采用sleep机制的压测图,通上图看到压力减少50%左右,当然,sleep这种方式还有个缺点QPS下降明显,在我们的压测条件下,仅为35,并且有部分请求出现超时情况。不过综合各种情况后,我们还是决定采用sleep机制,主要是为了防止在大并发情况下把Redis压垮,很不行,我们之前碰到过,所以肯定会采用sleep机制。

3.2  Redisson分布式锁的实现

1)Redisson 分布式重入锁用法

Redisson 支持单点模式、主从模式、哨兵模式、集群模式,这里以单点模式为例:

// 1.构造redisson实现分布式锁必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:5379").setPassword("123456").setDatabase(0);
// 2.构造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 3.获取锁对象实例(无法保证是按线程的顺序获取到)
RLock rLock = redissonClient.getLock(lockKey);
try {
    /**
     * 4.尝试获取锁
     * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
     * leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
     */
    boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
    if (res) {
        //成功获得锁,在这里处理业务
    }
} catch (Exception e) {
    throw new RuntimeException("aquire lock fail");
}finally{
    //无论如何, 最后都要解锁
    rLock.unlock();
}

redisson这个框架重度依赖了Lua脚本和Netty,代码很牛逼,各种Future及FutureListener的异步、同步操作转换。

自己先思考下,如果要手写一个分布式锁组件,怎么做?肯定要定义2个接口:加锁、解锁;大道至简,redisson的作者就是在加锁和解锁的执行层面采用Lua脚本,逼格高,而且重要有原子性保证啊。当然,redisson的作者毕竟牛逼,加锁和解锁过程中还巧妙地利用了redis的发布订阅功能,后面会讲到。下面先对加锁和解锁Lua脚本了解下。

2} 加锁&解锁Lua脚本

加锁、解锁Lua脚本是redisson分布式锁实现最重要的组成部分。首先不看代码,先研究下Lua脚本都是什么逻辑


1、加锁Lua脚本

  • 脚本入参
参数   示例值含义
KEY个数1KEY个数
KEYS [1]my_first_lock_name锁名
ARGV [1]60000持有锁的有效时间:毫秒
ARGV [2]58c62432-bb74-4d14-8a00-9908cc8b828f:1唯一标识:获取锁时set的唯一值,实现上为redisson客户端ID(UUID)+线程ID
  • 脚本内容
  • -- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间
    if (redis.call('exists', KEYS[1]) == 0) then
        redis.call('hset', KEYS[1], ARGV[2], 1);
        redis.call('pexpire', KEYS[1], ARGV[1]);
        return nil;
    end;
     
    -- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间
    if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
        redis.call('hincrby', KEYS[1], ARGV[2], 1);
        redis.call('pexpire', KEYS[1], ARGV[1]);
        return nil;
    end;
     
    -- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间
    return redis.call('pttl', KEYS[1]);

  • 脚本解读

Q:返回nil、返回剩余过期时间有什么目的? 
A:当且仅当返回nil,才表示加锁成功;客户端需要感知加锁是否成功的结果 

2、解锁Lua脚本

参数示例值含义
KEY个数2KEY个数
KEYS [1]my_first_lock_name锁名
KEYS [2]redisson_lock__channel:{my_first_lock_name}解锁消息PubSub频道
ARGV [1]0redisson定义0表示解锁消息
ARGV [2]30000设置锁的过期时间;默认值30秒
ARGV [3]58c62432-bb74-4d14-8a00-9908cc8b828f:1唯一标识;同加锁流程
  • 脚本内容
    -- 若锁不存在:则直接广播解锁消息,并返回1
    if (redis.call('exists', KEYS[1]) == 0) then
        redis.call('publish', KEYS[2], ARGV[1]);
        return 1; 
    end;
     
    -- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁
    if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
        return nil;
    end; 
     
    -- 若锁存在,且唯一标识匹配:则先将锁重入计数减1
    local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); 
    if (counter > 0) then 
        -- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期
        redis.call('pexpire', KEYS[1], ARGV[2]); 
        return 0; 
    else 
        -- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程
        redis.call('del', KEYS[1]); 
        redis.call('publish', KEYS[2], ARGV[1]); 
        return 1;
    end;
     
    return nil;

  • 脚本解读

Q1:广播解锁消息有什么用? 
A:是为了通知其他争抢锁阻塞住的线程,从阻塞中解除,并再次去争抢锁。

Q2:返回值0、1、nil有什么不一样? 
A:当且仅当返回1,才表示当前请求真正触发了解锁Lua脚本;但客户端又并不关心解锁请求的返回值,好像没什么用?

源码搞起


1、加锁流程源码
解锁流程相对比较简单,完全就是执行解锁Lua脚本,无额外的代码逻辑,直接看org.redisson.RedissonLock#unlock代码

 @Override
    public void unlock() {
        // 执行解锁Lua脚本,这里传入线程id,是为了保证加锁和解锁是同一个线程,避免误解锁其他线程占有的锁
        Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }
 
// 见org.redisson.RedissonLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; " +
            "end;" +
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
 
}

加锁&解锁流程串起来


上面结合Lua脚本和源码,分别分析了加锁流程和解锁流程。下面升级下挑战难度,模拟下多个线程争抢锁会是怎样的流程。示意图如下,比较关键的三处已用红色字体标注。

概括下整个流程

1、线程A和线程B两个线程同时争抢锁。线程A很幸运,最先抢到了锁。线程B在获取锁失败后,并未放弃希望,而是主动订阅了解锁消息,然后再尝试获取锁,顺便看看没有抢到的这把锁还有多久就过期,线程B就按需阻塞等锁释放。

2、线程A拿着锁干完了活,自觉释放了持有的锁,于此同时广播了解锁消息,通知其他抢锁的线程再来枪;

3、解锁消息的监听者LockPubSub收到消息后,释放自己持有的信号量;线程B就瞬间从阻塞中被唤醒了,接着再抢锁,这次终于抢到锁了!后面再按部就班,干完活,解锁

其他料


Q1:订阅频道名称(如:redisson_lock__channel:{my_first_lock_name})为什么有大括号? 
A: 
1.在redis集群方案中,如果Lua脚本涉及多个key的操作,则需限制这些key在同一个slot中,才能保障Lua脚本执行的原子性。否则运行会报错Lua script attempted to access a non local key in a cluster node . channel; 2.HashTag是用{}包裹key的一个子串,若设置了HashTag,集群会根据HashTag决定key分配到哪个slot;HashTag不支持嵌套,只有第一个左括号{和第一个右括号}里面的内容才当做HashTag参与slot计算;通常,客户端都会封装这个计算逻辑。

// 见org.redisson.cluster.ClusterConnectionManager#calcSlot
@Override
public int calcSlot(String key) {
    if (key == null) {
        return 0;
    }
 
    int start = key.indexOf('{');
    if (start != -1) {
        int end = key.indexOf('}');
        key = key.substring(start+1, end);
    }
 
    int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
    log.debug("slot {} for {}", result, key);
    return result;
}

3.在解锁Lua脚本中,操作了两个key:一个是锁名my_lock_name,一个是解锁消息发布订阅频道redisson_lock__channel:{my_first_lock_name},按照上面slot计算方式,两个key都会按照内容my_first_lock_name来计算,故能保证落到同一个slot

Q2:redisson代码几乎都是以Lua脚本方式与redis服务端交互,如何跟踪这些脚本执行过程? 
A:启动一个redis客户端终端,执行monitor命令以便在终端上实时打印 redis 服务器接收到的命令;然后debug执行redisson加锁/解锁测试用例,即可看到代码运行过程中实际执行了哪些Lua脚本

eg:上面整体流程示意图的测试用例位:
 

@RunWith(SpringRunner.class)
@SpringBootTest
public class RedissonDistributedLockerTest {
 
    private static final Logger log = LoggerFactory.getLogger(RedissonDistributedLocker.class);
 
    @Resource
    private DistributedLocker distributedLocker;
 
    private static final ExecutorService executorServiceB = Executors.newSingleThreadExecutor();
 
    private static final ExecutorService executorServiceC = Executors.newSingleThreadExecutor();
 
    @Test
    public void tryLockUnlockCost() throws Exception {
        StopWatch stopWatch = new StopWatch("加锁解锁耗时统计");
        stopWatch.start();
        for (int i = 0; i < 10000; i++) {
            String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
            Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
            Assert.assertTrue(optLocked.isPresent());
            optLocked.get().unlock();
        }
        stopWatch.stop();
        log.info(stopWatch.prettyPrint());
    }
 
    @Test
    public void tryLock() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
        Assert.assertTrue(optLocked.isPresent());
 
        Optional<LockResource> optLocked2 = distributedLocker.tryLock(key, 600000, 600000);
        Assert.assertTrue(optLocked2.isPresent());
 
        optLocked.get().unlock();
    }
 
    /**
     * 模拟2个线程争抢锁:A先获取到锁,A释放锁后,B再获得锁
     */
    @Test
    public void tryLock2() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Future<Optional<LockResource>> submit = executorServiceB.submit(() -> {
                    countDownLatch.await();
                    log.info("B尝试获得锁:thread={}", currentThreadId());
                    return distributedLocker.tryLock(key, 600000, 600000);
                }
        );
 
        log.info("A尝试获得锁:thread={}", currentThreadId());
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 300000, 600000);
        Assert.assertTrue(optLocked.isPresent());
 
        log.info("A已获得锁:thread={}", currentThreadId());
        countDownLatch.countDown();
 
        optLocked.get().unlock();
        log.info("A已释放锁:thread={}", currentThreadId());
 
        Optional<LockResource> lockResource2 = submit.get();
        Assert.assertTrue(lockResource2.isPresent());
 
        executorServiceB.submit(() -> {
            log.info("B已获得锁:thread={}", currentThreadId());
            lockResource2.get().unlock();
            log.info("B已释放锁:thread={}", currentThreadId());
        });
    }
 
    /**
     * 模拟3个线程争抢锁:A先获取到锁,A释放锁后,B和C同时争抢锁
     */
    @Test
    public void tryLock3() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString().replace("-", "");
 
        log.info("A尝试获得锁:thread={}", currentThreadId());
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
 
        if (optLocked.isPresent()) {
            log.info("A已获得锁:thread={}", currentThreadId());
        }
        Assert.assertTrue(optLocked.isPresent());
 
        CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        Future<Optional<LockResource>> submitB = executorServiceB.submit(() -> {
                    cyclicBarrier.await();
                    log.info("B尝试获得锁:thread={}", currentThreadId());
                    return distributedLocker.tryLock(key, 600000, 600000);
                }
        );
 
        Future<Optional<LockResource>> submitC = executorServiceC.submit(() -> {
                    cyclicBarrier.await();
                    log.info("C尝试获得锁:thread={}", currentThreadId());
                    return distributedLocker.tryLock(key, 600000, 600000);
                }
        );
 
        optLocked.get().unlock();
        log.info("A已释放锁:thread={}", currentThreadId());
 
        CountDownLatch countDownLatch = new CountDownLatch(2);
        executorServiceB.submit(() -> {
            log.info("B已获得锁:thread={}", currentThreadId());
            try {
                submitB.get().get().unlock();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            log.info("B已释放锁:thread={}", currentThreadId());
            countDownLatch.countDown();
        });
 
        executorServiceC.submit(() -> {
            log.info("C已获得锁:thread={}", currentThreadId());
            try {
                submitC.get().get().unlock();
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            log.info("C已释放锁:thread={}", currentThreadId());
            countDownLatch.countDown();
        });
 
        countDownLatch.await();
    }
 
    private static Long currentThreadId() {
        return Thread.currentThread().getId();
    }
 
    @Test
    public void tryLockWaitTimeout() throws Exception {
        String key = "mock-key:" + UUID.randomUUID().toString();
 
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 2000);
        Assert.assertTrue(optLocked.isPresent());
 
        Optional<LockResource> optLockResource = CompletableFuture.supplyAsync(() -> {
                long now = System.currentTimeMillis();
                Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 1000, 10);
                long cost = System.currentTimeMillis() - now;
                log.info("cost={}", cost);
                return optLockedAgain;
        }).exceptionally(th -> {
            log.error("Exception: ", th);
            return Optional.empty();
        }).join();
 
        Assert.assertTrue(!optLockResource.isPresent());
    }
 
    @Test
    public void tryLockWithLeaseTime() throws Exception {
        String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
        Optional<LockResource> optLocked = distributedLocker.tryLock(key, 3000, 1000);
        Assert.assertTrue(optLocked.isPresent());
 
        // 可重入
        Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 3000, 1000);
        Assert.assertTrue(optLockedAgain.isPresent());
    }
 
    /**
     * 模拟1000个并发请求枪一把锁
     */
    @Test
    public void tryLockWithLeaseTimeOnMultiThread() throws Exception {
        int totalThread = 1000;
        String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
        AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);
        AtomicInteger acquiredLockTimes = new AtomicInteger(0);
 
        ExecutorService executor = Executors.newFixedThreadPool(totalThread);
        for (int i = 0; i < totalThread; i++) {
            executor.submit(new Runnable() {
 
                @Override
                public void run() {
                    tryAcquireLockTimes.getAndIncrement();
                    Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 10000);
                    if (optLocked.isPresent()) {
                        acquiredLockTimes.getAndIncrement();
                    }
                }
            });
        }
        executor.awaitTermination(15, TimeUnit.SECONDS);
 
        Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
        Assert.assertTrue(acquiredLockTimes.get() == 1);
    }
 
    @Test
    public void tryLockWithLeaseTimeOnMultiThread2() throws Exception {
        int totalThread = 100;
        String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
        AtomicInteger tryAcquireLockTimes = new AtomicInteger(0);
        AtomicInteger acquiredLockTimes = new AtomicInteger(0);
 
        ExecutorService executor = Executors.newFixedThreadPool(totalThread);
        for (int i = 0; i < totalThread; i++) {
            executor.submit(new Runnable() {
 
                @Override
                public void run() {
                    long now = System.currentTimeMillis();
                    Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10000, 5);
                    long cost = System.currentTimeMillis() - now;
                    log.info("tryAcquireLockTimes={}||wait={}", tryAcquireLockTimes.incrementAndGet(), cost);
                    if (optLocked.isPresent()) {
                        acquiredLockTimes.getAndIncrement();
                        // 主动释放锁
                        optLocked.get().unlock();
                    }
                }
            });
        }
        executor.awaitTermination(20, TimeUnit.SECONDS);
 
        log.info("tryAcquireLockTimes={}, acquireLockTimes={}", tryAcquireLockTimes.get(), acquiredLockTimes.get());
        Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
        Assert.assertTrue(acquiredLockTimes.get() == totalThread);
    }
 
}
 
 
public interface DistributedLocker {
 
    Optional<LockResource> tryLock(String lockKey, int waitTime);
 
    Optional<LockResource> tryLock(String lockKey, int waitTime, int leaseTime);
 
}
 
public interface LockResource {
 
    void unlock();
 
}
 
 

执行的Lua脚本如下:

加锁:redissonClient.getLock("my_first_lock_name").tryLock(600000, 600000); 
解锁:redissonClient.getLock("my_first_lock_name").unlock();

# 线程A
## 1.1.1尝试获取锁 -> 成功
1568357723.205362 [0 127.0.0.1:56419] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357723.205452 [0 lua] "exists" "my_first_lock_name"
1568357723.208858 [0 lua] "hset" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1" "1"
1568357723.208874 [0 lua] "pexpire" "my_first_lock_name" "600000"
 
 
# 线程B
### 2.1.1尝试获取锁,未获取到,返回锁剩余过期时间
1568357773.338018 [0 127.0.0.1:56417] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357773.338161 [0 lua] "exists" "my_first_lock_name"
1568357773.338177 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357773.338197 [0 lua] "pttl" "my_first_lock_name"
 
 
## 2.1.1.3 添加订阅(非Lua脚本) -> 订阅成功
1568357799.403341 [0 127.0.0.1:56421] "SUBSCRIBE" "redisson_lock__channel:{my_first_lock_name}"
 
 
## 2.1.1.4 再次尝试获取锁 -> 未获取到,返回锁剩余过期时间
1568357830.683631 [0 127.0.0.1:56418] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357830.684371 [0 lua] "exists" "my_first_lock_name"
1568357830.684428 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357830.684485 [0 lua] "pttl" "my_first_lock_name"
 
 
# 线程A
## 3.1.1 释放锁并广播解锁消息,0代表解锁消息
1568357922.122454 [0 127.0.0.1:56420] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "my_first_lock_name" "redisson_lock__channel:{my_first_lock_name}" "0" "30000" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357922.123645 [0 lua] "exists" "my_first_lock_name"
1568357922.123701 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
1568357922.123741 [0 lua] "hincrby" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1" "-1"
1568357922.123775 [0 lua] "del" "my_first_lock_name"
1568357922.123799 [0 lua] "publish" "redisson_lock__channel:{my_first_lock_name}" "0"
 
 
# 线程B
## 监听到解锁消息消息 -> 释放信号量,阻塞被解除;4.1.1.1 再次尝试获取锁  -> 获取成功
1568357975.015206 [0 127.0.0.1:56419] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568357975.015579 [0 lua] "exists" "my_first_lock_name"
1568357975.015633 [0 lua] "hset" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26" "1"
1568357975.015721 [0 lua] "pexpire" "my_first_lock_name" "600000"
 
## 4.1.1.3 取消订阅(非Lua脚本)
1568358031.185226 [0 127.0.0.1:56421] "UNSUBSCRIBE" "redisson_lock__channel:{my_first_lock_name}"
 
 
# 线程B
## 5.1.1 释放锁并广播解锁消息
1568358255.551896 [0 127.0.0.1:56417] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "my_first_lock_name" "redisson_lock__channel:{my_first_lock_name}" "0" "30000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568358255.552125 [0 lua] "exists" "my_first_lock_name"
1568358255.552156 [0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
1568358255.552200 [0 lua] "hincrby" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26" "-1"
1568358255.552258 [0 lua] "del" "my_first_lock_name"
1568358255.552304 [0 lua] "publish" "redisson_lock__channel:{my_first_lock_name}" "0"
 
 

需要特别注意的是,RedissonLock 同样没有解决 节点挂掉的时候,存在丢失锁的风险的问题。而现实情况是有一些场景无法容忍的,所以 Redisson 提供了实现了redlock算法的 RedissonRedLock,RedissonRedLock 真正解决了单点失败的问题,代价是需要额外的为 RedissonRedLock 搭建Redis环境。

所以,如果业务场景可以容忍这种小概率的错误,则推荐使用 RedissonLock, 如果无法容忍,则推荐使用 RedissonRedLock。
 

四、Redlock算法

1. Redis 官网对 redLock 算法的介绍大致如下:

The Redlock algorithm

在分布式版本的算法里我们假设我们有N个Redis master节点,这些节点都是完全独立的,我们不用任何复制或者其他隐含的分布式协调机制。之前我们已经描述了在Redis单实例下怎么安全地获取和释放锁。我们确保将在每(N)个实例上使用此方法获取和释放锁。在我们的例子里面我们把N设成5,这是一个比较合理的设置,所以我们需要在5台机器上面或者5台虚拟机上面运行这些实例,这样保证他们不会同时都宕掉。为了取到锁,客户端应该执行以下操作:

        1.获取当前Unix时间,以毫秒为单位。

        2.依次尝试从5个实例,使用相同的key和具有唯一性的value(例如UUID)获取锁。当向Redis请求获取锁时,客户端应该设置一个尝试从某个Reids实例获取锁的最大等待时间(超过这个时间,则立马询问下一个实例),这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为10秒,则超时时间应该在5-50毫秒之间。这样可以避免服务器端Redis已经挂掉的情况下,客户端还在死死地等待响应结果。如果服务器端没有在规定时间内响应,客户端应该尽快尝试去另外一个Redis实例请求获取锁。

        3.客户端使用当前时间减去开始获取锁时间(步骤1记录的时间)就得到获取锁消耗的时间。当且仅当从大多数(N/2+1,这里是3个节点)的Redis节点都取到锁,并且使用的总耗时小于锁失效时间时,锁才算获取成功。

        4.如果取到了锁,key的真正有效时间 = 有效时间(获取锁时设置的key的自动超时时间) - 获取锁的总耗时(询问各个Redis实例的总耗时之和)(步骤3计算的结果)。

        5.如果因为某些原因,最终获取锁失败(即没有在至少 “N/2+1 ”个Redis实例取到锁或者“获取锁的总耗时”超过了“有效时间”),客户端应该在所有的Redis实例上进行解锁(即便某些Redis实例根本就没有加锁成功,这样可以防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。

用 Redisson 实现分布式锁(红锁 RedissonRedLock)及源码分析(实现三)

这里以三个单机模式为例,需要特别注意的是他们完全互相独立,不存在主从复制或者其他集群协调机制。

Config config1 = new Config();
config1.useSingleServer().setAddress("redis://172.0.0.1:5378").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient1 = Redisson.create(config1);
 
Config config2 = new Config();
config2.useSingleServer().setAddress("redis://172.0.0.1:5379").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient2 = Redisson.create(config2);
 
Config config3 = new Config();
config3.useSingleServer().setAddress("redis://172.0.0.1:5380").setPassword("a123456").setDatabase(0);
RedissonClient redissonClient3 = Redisson.create(config3);
 
/**
 * 获取多个 RLock 对象
 */
RLock lock1 = redissonClient1.getLock(lockKey);
RLock lock2 = redissonClient2.getLock(lockKey);
RLock lock3 = redissonClient3.getLock(lockKey);
 
/**
 * 根据多个 RLock 对象构建 RedissonRedLock (最核心的差别就在这里)
 */
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
 
try {
    /**
     * 4.尝试获取锁
     * waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
     * leaseTime   锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
     */
    boolean res = redLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
    if (res) {
        //成功获得锁,在这里处理业务
    }
} catch (Exception e) {
    throw new RuntimeException("aquire lock fail");
}finally{
    //无论如何, 最后都要解锁
    redLock.unlock();
}

最核心的变化就是需要构建多个 RLock ,然后根据多个 RLock 构建成一个 RedissonRedLock,因为 redLock 算法是建立在多个互相独立的 Redis 环境之上的(为了区分可以叫为 Redission node),Redission node 节点既可以是单机模式(single),也可以是主从模式(master/salve),哨兵模式(sentinal),或者集群模式(cluster)。这就意味着,不能跟以往这样只搭建 1个 cluster、或 1个 sentinel 集群,或是1套主从架构就了事了,需要为 RedissonRedLock 额外搭建多几套独立的 Redission 节点。 比如可以搭建3个 或者5个 Redission节点,具体可看视资源及业务情况而定。

下图是一个利用多个 Redission node 最终 组成 RedLock分布式锁的例子,需要特别注意的是每个 Redission node 是互相独立的,不存在任何复制或者其他隐含的分布式协调机制。


# Redisson 实现redlock算法源码分析(RedLock)

 
加锁核心代码

org.redisson.RedissonMultiLock#tryLock
 

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
    long newLeaseTime = -1;
    if (leaseTime != -1) {
        newLeaseTime = unit.toMillis(waitTime)*2;
    }
    
    long time = System.currentTimeMillis();
    long remainTime = -1;
    if (waitTime != -1) {
        remainTime = unit.toMillis(waitTime);
    }
    long lockWaitTime = calcLockWaitTime(remainTime);
    /**
     * 1. 允许加锁失败节点个数限制(N-(N/2+1))
     */
    int failedLocksLimit = failedLocksLimit();
    /**
     * 2. 遍历所有节点通过EVAL命令执行lua加锁
     */
    List<RLock> acquiredLocks = new ArrayList<>(locks.size());
    for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
        RLock lock = iterator.next();
        boolean lockAcquired;
        /**
         *  3.对节点尝试加锁
         */
        try {
            if (waitTime == -1 && leaseTime == -1) {
                lockAcquired = lock.tryLock();
            } else {
                long awaitTime = Math.min(lockWaitTime, remainTime);
                lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
            }
        } catch (RedisResponseTimeoutException e) {
            // 如果抛出这类异常,为了防止加锁成功,但是响应失败,需要解锁所有节点
            unlockInner(Arrays.asList(lock));
            lockAcquired = false;
        } catch (Exception e) {
            // 抛出异常表示获取锁失败
            lockAcquired = false;
        }
        
        if (lockAcquired) {
            /**
             *4. 如果获取到锁则添加到已获取锁集合中
             */
            acquiredLocks.add(lock);
        } else {
            /**
             * 5. 计算已经申请锁失败的节点是否已经到达 允许加锁失败节点个数限制 (N-(N/2+1))
             * 如果已经到达, 就认定最终申请锁失败,则没有必要继续从后面的节点申请了
             * 因为 Redlock 算法要求至少N/2+1 个节点都加锁成功,才算最终的锁申请成功
             */
            if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                break;
            }
 
            if (failedLocksLimit == 0) {
                unlockInner(acquiredLocks);
                if (waitTime == -1 && leaseTime == -1) {
                    return false;
                }
                failedLocksLimit = failedLocksLimit();
                acquiredLocks.clear();
                // reset iterator
                while (iterator.hasPrevious()) {
                    iterator.previous();
                }
            } else {
                failedLocksLimit--;
            }
        }
 
        /**
         * 6.计算 目前从各个节点获取锁已经消耗的总时间,如果已经等于最大等待时间,则认定最终申请锁失败,返回false
         */
        if (remainTime != -1) {
            remainTime -= System.currentTimeMillis() - time;
            time = System.currentTimeMillis();
            if (remainTime <= 0) {
                unlockInner(acquiredLocks);
                return false;
            }
        }
    }
 
    if (leaseTime != -1) {
        List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
        for (RLock rLock : acquiredLocks) {
            RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
            futures.add(future);
        }
        
        for (RFuture<Boolean> rFuture : futures) {
            rFuture.syncUninterruptibly();
        }
    }
 
    /**
     * 7.如果逻辑正常执行完则认为最终申请锁成功,返回true
     */
    return true;
}

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

分布式锁学习 的相关文章

  • [控制算法]

    常用控制算法 0 博览众长 0 1 视频 1 DR CAN b站 0 2 文章 1 控制算法整理 0 3 传统 VS 现代控制算法 1 传统 传统控制算法 xff1a PID xff0c 模糊 xff0c 神经网络控制算法 2 现代 现代控
  • 七、输入/输出流--streambuffer类介绍--

    缓冲区类 类模板定义为basic streambuf xff0c 由 lt iostream gt 给出 xff1a 1 stream缓冲区 通常stream不负责实际读写操作 xff0c 而是stream buffer实现streambu
  • 七、输入/输出流--streambuffer类介绍--自定义缓冲区

    基本上没看懂 xff0c 那个大神如果可以的话 xff0c 推荐一点相关资料 xff0c 真的不太明白这个缓冲区的内部原理 3 自定义缓冲区 缓冲区有basic streambuf定义 xff0c 针对字型为char和wchar标准库提供了
  • 串口通信学习(GPS模块)2021.5.10

    GPS串口通信学习实践 2021 5 10 1 串口通信简介1 1 波特率1 2 数据位1 3 停止位1 4 奇偶校验位 2 GPS模块串口通信配置2 1 驱动安装2 2 插入GPS模块2 3 GPS模块串口通信数据简介 3 Java实现G
  • Lambda表达式的使用

    对于jdk1 8其实并不是那么熟悉 xff0c 但是要学习这一点 xff0c 对以后工作有好处 xff0c 接下来开始学习jdk1 8在Android studio的配置以及lambda表达式的使用吧 Lambda表达式 jdk1 8中新增
  • ROS实现无人机目标跟踪/物体跟随/循迹

    无人机自主物体跟随 循迹 1 物体跟踪1 1 实现思路1 2 代码示例 2 自主寻线 本实验采用ROS和OpenCV实现功能 xff0c 实验平台采用Parrot的Bebop2无人机ROS部分的学习可以参考我的专栏 xff1a ROS学习记
  • vs code中项目的基本配置--include路径、运行参数、debug配置

    1 安装C C 43 43 for Visual Studio Code 点击左边扩展栏图标 gt 搜索C C 43 43 gt 安装 gt Reload xff1a 安装完成之后 xff0c 打开你的包含c 43 43 的文件夹 xff0
  • CMakeLists.txt编写规范模板及CMake常用变量

    文章目录 基本语法规则常见CMakeLists txt中指令剖析从VS项目配置过程理解CMakeLists内容CMake中常用变量汇总常用CMakeLists文件模板 基础模板使用OpenCV库CMakeLists文件模板使用PCL库CMa
  • C++ 多线程detach()操作的坑以及传参

    detach 的作用是将子线程和主线程的关联分离 xff0c 也就是说detach 后子线程在后台独立继续运行 xff0c 主线程无法再取得子线程的控制权 xff0c 即使主线程结束 xff0c 子线程未执行也不会结束 当主线程结束时 xf
  • 条件变量中的唤醒丢失问题分析

    本文是在其他作者博文的基础上进行了部分补充 原文 xff1a https zhuanlan zhihu com p 55123862 0 前言 条件变量 xff08 condition variable xff09 和互斥锁 xff08 m
  • 无人车传感器 IMU与GPS数据融合进行定位机制

    前言 上一次的分享里 xff0c 我介绍了GPS的原理 xff08 三角定位 xff09 及特性 xff08 精度 频率 xff09 xff0c 同时也从无人车控制的角度 xff0c 讨论了为什么仅有GPS无法满足无人车的定位要求 为了能让
  • C++类对象的赋值与=运算符重载

    本文主要介绍C 43 43 中的赋值运算符重载函数 xff08 operator 61 xff09 的相关知识 1 概述 1 1 why 首先介绍为什么要对赋值运算符 61 进行重载 某些情况下 xff0c 当我们编写一个类的时候 xff0
  • 微信小程序开发入门(一)

    所有示例的完整代码 xff0c 都可以从 GitHub 的代码仓库下载 一 小程序是什么 xff1f 学习小程序之前 xff0c 先简单说一下 xff0c 它到底是什么 字面上讲 xff0c 小程序就是微信里面的应用程序 xff0c 外部代
  • rtp载荷H264解包过程分析,ffmpeg解码qt展示

    网络抽象层单元 NALU NALU头 NALU 头 由1个byte组成 它的语法如下 43 43 0 1 2 3 4 5 6 7 43 43 43 43 43 43 43 43 43 F NRI Type 43 43 F 1 个比特 for
  • CPU上下文切换、进程上下文、中断上下文

    由于Linux是一个多任务操作系统 xff0c 能够支持远大于CPU数量的任务同时运行 当然 xff0c 这些任务实际上并不是真的在同时运行 xff0c 而是由CPU进行调度 xff0c 将时间分片 xff0c 每个任务占用1个时间片 xf
  • Gstreamer概述

    1 什么是GStreamer GStreamer 是用来构建流媒体应用的开源多媒体框架 framework xff0c 其基本设计思想来自于俄勒冈 Oregon 研究生学院有关视频管道的创意 同时也借鉴了DirectShow的设计思想 其目
  • grbl学习之旅---serial篇

    serial c和serial h文件是实现了通过串行端口发送和接受字节的功能 首先是serial h中定义了基本函数和常量大小 xff1a ifndef RX BUFFER SIZE define RX BUFFER SIZE 128 定
  • ip rule,ip route,iptables 三者之间的关系

    以一例子来说明 xff1a 公司内网要求192 168 0 100 以内的使用 10 0 0 1 网关上网 xff08 电信 xff09 xff0c 其他IP使用 20 0 0 1 xff08 网通 xff09 上网 首先要在网关服务器上添
  • 什么是Zero-copy零拷贝

    考虑这样 种常 的情形 xff1a 你需要将静态内容 xff08 类似图 件 xff09 展 给 户 那么这个情形就意味着你需要先将静态内容从磁盘中拷贝出来放到 个内存buf中 xff0c 然后将这个buf通过socket传输给 户 xff
  • 图解协程原理

    前言 协程 Coroutines xff0c 是 Kotlin 最神奇 的特性 xff0c 没有之一 本文会以图解 43 动画的形式解释 Kotlin 协程的原理 看完本文后 xff0c 你会发现 xff0c 原来协程也没有那么难 本文要求

随机推荐

  • ubuntu 16.04下安裝和配置ros

    書上和網上關於ubuntu下安裝ros的文章很多 xff0c 但是很多介紹的不完整 xff0c 並且ubuntu和ros之間其實是有版本對應關系的 xff0c 並不是所有的ros都能安裝到所有的ubuntu上 xff0c xff08 很多書
  • Ubuntu 16.04安装docker详细步骤

    因需要安装opendronemap 而这个依赖于docker 所以记录了一下安装docker的步骤 比较简单 通过apt的docker官方源安装最新的Docker CE Community Edition xff0c 即Docker社区版
  • 在本地shell脚本中ssh到远程服务器并执行命令

    shell远程执行 xff1a 经常需要远程到其他节点上执行一些shell命令 xff0c 如果分别ssh到每台主机上再去执行很麻烦 xff0c 因此能有个集中管理的方式就好了 一下介绍两种shell命令远程执行的方法 前提条件 xff1a
  • Catkin_make执行过程

    这是一个比较复杂的问题 xff0c 但是有时候会有莫名其妙的编译错误 xff0c 在找错误的过程中会非常需要了解这个过程 1 模板文件 首先说一下 in文件 在catkin的目录中有许多 in文件 这些都是模板文件 xff0c 以 opt
  • Docker用yum安装步骤

    Docker用yum安装步骤 一 安装docker xff08 完整版 xff09 1 Docker 要求 CentOS 系统的内核版本高于 3 10 uname r 2 使用 root 权限登录 Centos 确保 yum 包更新到最新
  • 1024,如果全世界程序员都消失了,会怎样?

    这两天 xff0c 有一个话题引起了程序员的广泛讨论 xff1a 年薪80W程序员相亲被鄙视 某知名互联网社区 xff0c 一网友发帖 xff0c 自己年薪80W去相亲 xff0c 竟然被鄙视不如在二本学校教书的大学老师 估计令他没想到的是
  • 非线性控制1.1——稳定与跟踪问题概念

    一 非线性控制系统的两大任务 1 稳定 xff08 或称调节 xff09 问题 稳定问题是要使得闭环系统的状态稳定在一个平衡点附近 对于稳定问题 xff0c 系统的输出不一定要有具体的物理意义 xff0c 此时可以借助输入 输出状态线性化方
  • 在 linux ubuntu 18.04 上运行QQ音乐

    在 linux ubuntu 18 04 上运行QQ音乐 我使用的组合为 ubuntu 18 04 43 wine stable 3 6 43 QQ音乐17 63 xff0c 未在其它平台做过尝试 一直想在ubuntu上好好听音乐 xff0
  • 非线性控制1.0——自适应控制和鲁棒控制

    1 鲁棒控制和自适应控制的联系与区别 鲁棒控制是以目的定义的控制方法集合 xff0c 自适应控制是以手段定义的控制方法集合 xff0c 这两种控制都是为了应对 当数学模型不能精确表示实际系统的情况下 狭义的鲁棒控制是指H2 xff0c Hi
  • 非线性控制2.0——鲁棒控制之H无穷控制器设计

    一 基本概念 对于图1所示系统 xff0c u为控制输入 xff0c y为测量输出 xff0c z为被调输出 xff0c w为干扰输入 xff0c 由输入u xff0c w到输出y xff0c z的传递函数G成为增广被控对象 xff0c 控
  • 非线性控制1.0——控制理论生态及结构

    一 控制理论地图 二 控制理论发展及结构 上图应用于 xff1a https www zhihu com people xiang yi 55 49 answers
  • 四旋翼飞行器——飞行原理

    1 机械结构 旋翼对称分布在机体的前后 左右四个方向 xff0c 四个旋翼处于同一高度平面 xff0c 且四个旋翼的结构和半径都相同 xff0c 四个电机对称的安装在飞行器的支架端 xff0c 支架中间空间安放飞控板 结构形式如图 1 1所
  • 四旋翼飞行器——非线性化模型

    一 建模思路 该模型目标控制量是机体相对于地面坐标系的线速度的三个分量Vx xff0c Vy xff0c Vz xff0c 而我们能控的实质上只有四个电机的转速W1 xff0c W2 xff0c W3 xff0c W4 怎样由输入一步步推导
  • 故障诊断4—龙伯格状态观测器设计

    1 龙伯格状态观测器概念 已经线性系统模型如下 xff1a 当系统状态量难以获取 xff0c 但实际控制中又需要利用系统状态量时 xff0c 如何通过输入量和输出量重构系统状态量 xff1f 这便是龙伯格状态观测器设计初衷 xff0c 将实
  • 故障诊断5——状态观测器和输出观测器

    1 状态观测器分类 在现代控制理论中 xff0c 控制系统的基本结构和经典控制理论一样 xff0c 仍然是由受控对象和反馈控制器两部分构成的闭环系统 不过在经典理论中习惯于采用输出反馈 xff0c 而在现代控制理论中则更多地采用状态反馈 由
  • GPS漂移和定位不准确的解决办法

    解决GPS漂移主要从两方面入手 xff1a 一 主系统的设计主要减少在近距离内对GPS信号的干扰 二 软件处理 软件处理主要集中在导航软件处完成 xff0c 导航软件会将坐标定位在道路之内 xff0c 如果GPS接收到的信号超出道路的半径范
  • AI---是什么?可以做什么?

    1 AI的项目简单介绍 图像识别 描述 xff1a 给定图片 xff0c 识别图片中有什么 xff1f 算法 xff1a KNN CNN 情感分析 描述 xff1a 判断文本包含的情感是正面 负面还是中性 关键 xff1a 文本如何表示成向
  • realsense的安装问题

    realsense的安装问题 0 旁白1 SDK的安装2 python开发包的安装3 nodejs开发包的安装方法1 xff1a 方法2 xff1a 接手一位同事的realsense相关项目 xff0c 先配置一个环境 xff0c 出现不少
  • 二叉排序树的删除

    xff08 网上讲二叉排序树删除的资料很少 xff0c 这篇很不错 xff01 转自 xff1a http bbs csdn net topics 110010437 xff09 二叉排序树的删除 xff1a 对于一般的二叉树来说 xff0
  • 分布式锁学习

    概述 分布式锁是控制分布式系统之间同步访问共享资源的一种方式 在分布式系统中 xff0c 常常需要协调他们的动作 如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源 xff0c 那么访问这些资源的时候 xff0c 往往需要互斥来