redis 十二. 分布式锁

2023-11-16

一. 分布式锁概述

  1. 一个靠谱的分布式锁需要具备的条件

1)独占: 同一时间内只允许一个线程获取到锁
2)高可用: 例如使用redis做分布式锁,不能因为一个节点挂了而造成获取释放锁失败的情况
3)防止死锁: 杜绝死锁,必须有超时控制,可撤销,有最终兜底跳出解决方案
4)不乱抢: 只允许自己的锁自己释放
5)重入性: 同一个节点,同一个线程获取到锁后,运行再次获取

  1. redis 实现分布式锁与zookeeper实现分布式锁的不同(如果要保证高可用使用zookeeper,如果保证高并发用redis)

zookeeper实现的锁是cp: zk基于有且仅有一个zonde节点实现,加锁成功就是建立一个节点,使用完成后自己删除,zk在同步数据时所有节点都同步成功后才返回成功,所以说是cp
redis集群版是ap(高可用+分区容错性):基于key是否存在+lua脚本实现,官网推荐redlock,redis同步是异步进行的先响应成功,通过异步线程去同步数据,所以说是ap,进而引出一个问题,在同步前主节点宕机,后选举出的主节点中没有锁数据,也就是数据不一致问题,redlock中可以通过多多节点同时上锁,都上锁成功才返回成功来解决这个问题

  1. redis实现锁的功能主要用到的命令
    在这里插入图片描述
  2. 注意点 setnx+expire两条命令是非原子性的,不安全
    在这里插入图片描述

二. redis 锁基础版示例

  1. 项目中引入依赖,配置连接redis服务器,配置获取RedisTemplate
		<!--SpringBoot与Redis整合依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
        <!-- jedis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.1.0</version>
        </dependency>
  1. 购物需求分析redis分布式锁,将商品数据存储到reids,通过商品id,获取reids中商品减库存
import org.redisson.Redisson;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import redis.clients.jedis.Jedis;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

@RestController
public class GoodController {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    private Jedis jedis;

    @Autowired
    private Redisson redisson;
    
    @GetMapping("/buyGoods")
    public String lockRedis(String goodsId) throws Exception {
        //1.获取唯一id,释放锁时根据该id判断释放的是否是对应的
        String value = UUID.randomUUID().toString();
        try {
            //2.获取锁,并判断是否获取成功,使用setIfAbsent()方法,对应redis中的setnx命令,并指定锁的失效时间
            //不用get()+set()原因是这两个方法不能保证原子性 
            boolean b = stringRedisTemplate.opsForValue().setIfAbsent("goods:", value, 10L, TimeUnit.SECONDS);
            if (!b) {
                return "获取锁失败";
            }
            //3.到此处说明获取锁成功,执行正常逻辑,获取商品数据,对商品进行减库操作
            String result = stringRedisTemplate.opsForValue().get("goods:" + goodsId);
            int goodsNum = null == result ? 0 : Integer.parseInt(result);
            if (0 == goodsNum) {
                return "商品以售空";
            }
            //减库,并写回redis
            int num = goodsNum - 1;
            stringRedisTemplate.opsForValue().set("goods:" + goodsId, String.valueOf(num));
            return "购买成功";
		
		//4.防止代码异常再finally中释放锁
        } finally {
            //5.防止出现异常等在finally中释放锁
            //防止a线程释放b线程锁的问题,通过唯一id判断
            /*if (stringRedisTemplate.opsForValue().get("goods:").equalsIgnoreCase(value)) {
                stringRedisTemplate.delete("goods:");
            }*/

            //6.在第5步中分别去获取指定key的值然后删除该key的值,代表释放指定锁
            //分为两步,不是原子性的,若在此时不是一个客户端,会误删,优化: 使用脚本
            String script ="if redis.call('get',KEYS[1]) == AVG[1] " +
                            "then" +
                                " return redis.call('del',KEYS[1]) " +
                            "else" +
                                " return 0 end";
            try{
                Object result = jedis.eval(script, Collections.singletonList("goods:"), Collections.singletonList(value));
                if ("1".equals(result.toString())) {
                    System.out.println("del lock success");
                }else {
                    System.out.println("del lock error");
                }
            }finally {
                if (null != jedis) {
                    jedis.close();
                }
            }
        }
    }
}
  1. 上述代码第六步中的lua脚本转换为java就是
    在这里插入图片描述
  2. 配置连接池,配置jedis
	/**
     * 配置redis连接池
     *
     * @return
     */
    @Bean
    public JedisPoolConfig jedisPoolConfig() {
        JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
        //最大空闲数,默认8
        jedisPoolConfig.setMaxIdle(8);
        //最大连接数,默认8
        jedisPoolConfig.setMaxTotal(8);
        //建立连接最大等待时间,单位毫秒,默认-1,永不超时不建议使用
        jedisPoolConfig.setMaxWaitMillis(10000);
        //逐出连接最小空闲时间(默认1800000毫秒)
        //可根据自身业务决定,一般默认值即可,也可以考虑使用下方JeidsPoolConfig中的配置。
        jedisPoolConfig.setMinEvictableIdleTimeMillis(1800000);
        //每次逐出检查时,逐出的最大数据,如果为负就是:1/abs(n),默认3,
        //可根据自身应用连接数进行微调,如果设置为 -1,就是对所有连接做空闲监测。
        jedisPoolConfig.setNumTestsPerEvictionRun(3);
        //逐出扫描间隔时间单位毫秒,如果为负,则不允许逐出现场,默认-1
        //建议设置,周期自行选择,也可以默认也可以使用下方JedisPoolConfig 中的配置。
        jedisPoolConfig.setTimeBetweenEvictionRunsMillis(-1);
        //是否从翅中取出连接进行检查,如果检查失败,则从池中去除并新取一个
        //默认false,业务量很大时候建议设置为false,减少一次ping的开销。
        jedisPoolConfig.setTestOnBorrow(false);
        //在空闲时检查有效性,默认false,建议开启
        jedisPoolConfig.setTestWhileIdle(true);
        return jedisPoolConfig;
    }

    @Bean
    public Jedis jedis(JedisPoolConfig jedisPoolConfig) {
        JedisPool jedisPool = new JedisPool(jedisPoolConfig, "411.100.63.107", 16379);
        return jedisPool.getResource();
    }
  1. 对上述代码解释,通过向redis中存储一个指定key代表锁,存储成功说明加锁成功,失败说明加锁失败

1)在第一步中获取一个唯一id,在释放锁时通过这个唯一id防止a线程释放了b线程锁的问题
2)在第二步中使用 setIfAbsent(key,value) 代替"get()+setNx()"尝试添加锁,保证原子性
3)在第三步中对库存进行减少操作,并更新库存
4)在第四步中使用finally,防止在释放锁以前代码出现异常锁无法释放
5)在第五步中(有问题,所以注释掉了使用第六步)获取锁,并判断value释放相等释放锁,但是获取锁,释放锁分为两步,不能保证原子性
6)在第六步中使用脚本,防止第五步释放锁不能保证原子性问题

  1. 提出上面代码中还存在的问题

1)上面设置了过期时间为10,你怎么确定在这个时间内业务逻辑能够正常执行完毕?
2)如果当前架构中使用的是redis集群,redis集群情况下,采用的异步通知模式,一个节点接收请求,然后通过主节点去异步通知其它节点,在获取锁(想reids中存储指定key)或释放锁(删除redis指定key),例如获取锁,一个节点接收到插入请求,在插入成功后,通知其它节点前,主节点宕机,在没有被通知到的子节点中重新选出了一个主节点,这个主节点中并没有这把锁,也不回进行通知,这个时间进来另外一个获取锁的请求打到了还未被通知的节点上,这时候发现没有,就会出现锁不住的情况

  1. 解决上面的问题:

三. redis 锁进阶 Redlock

  1. 复习redis集群下与ZooKeeper集群下各自的同步流程

1)redis集群AP: 在集群环境中多个redis 分为master主库与Salve从库,当Salve启动后会连接到master主库,并发送一个sync命令,master接收到该命令后台启动一个进程,收集接收到的操作数据指令缓存为快照文件,当缓存完毕后,将这个文件发送给所有连接到该master的Salve从库,Salve将文件接收保存到磁盘上,然后加载到内存中,后续master主库接收到的修改数据指令都会通过这个后台进程发送给Salve,是异步的
2)Zookeeper集群CP: Zookeeper集群下也是分为master与salve节点,假设向salve拿到存储请求后,会将信息同步给主节点,主节点通知将数据同步给其它节点,全部同步成功后,才会响应成功,假设当主节点宕机时,集群会重新选举,在选举期间整个zookeepre是不可用的,只有轩主成功后才可以继续使用
3)总结: ZooKeeper为了保证数据一致性牺牲了可靠性,而redis集群是保证高可用,在选择时根据需求,是想要高一致性,还是高可用,例如金额相关对一致性要求极为严格的可以选择ZooKeeper,例如抢购等保证高可用的可以选择redis

  1. 使用 Redisson 中封装好的锁也就是Redlock
  2. 集群环境下增加引入Redisson 依赖,配置Redisson,做分布式锁
		<!-- redisson -->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.13.4</version>
        </dependency>
  1. 配置redisson注入到容器
	/**
     * 单机版
     * @return
     */
    @Bean
    public Redisson redisson() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://443.100.63.107:16379").setDatabase(0);
        return (Redisson) Redisson.create(config);
    }

    /**
     * 集群版
     * @return
     */
    @Bean
    public RedissonClient getRedisson() {
        //redi集群地址
        String cluster="10.10.1.1:7000,10.10.1.1:7001,10.10.1.1:7002,10.10.1.1:7003,10.10.1.1:7004,10.10.1.1:7005";
        String[] nodes = cluster.split(",");
        //redisson版本是3.5,集群的ip前面要加上“redis://”,不然会报错,3.2版本可不加
        for (int i = 0; i < nodes.length; i++) {
            nodes[i] = "redis://" + nodes[i];
        }

        Config config = new Config();
        
        //SentinelServersConfig serverConfig = config.useSentinelServers()
        //useSentinelServers() 与 useClusterServers() 前者要指定masterName 
        //调用 setMasterName("masterName")
        config.useClusterServers() //这是用的集群server
                .addNodeAddress(nodes)
                .setScanInterval(2000) //设置集群状态扫描时间
                .setPassword("password")
                .setTimeout(3000)
                .setMasterConnectionPoolSize(8)
                .setSlaveConnectionPoolSize(8)
                .setSlaveConnectionMinimumIdleSize(1)
                .setMasterConnectionMinimumIdleSize(1);;
        RedissonClient redisson = Redisson.create(config);
        //可通过打印redisson.getConfig().toJSON().toString()来检测是否配置成功
        return redisson;
    }
  1. 使用示例
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import redis.clients.jedis.Jedis;

@RestController
public class GoodController {

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Autowired
    private Redisson redisson;

    @GetMapping("/buyGoods")
    public String buyGoods(String goodsId) throws Exception {
        //1.通过 Redisson 对指定key加锁(注意点相同锁key相同)
        RLock redissonLock = redisson.getLock("goods:");
        //2.加锁
        redissonLock.lock();
        //3.加锁并指定失效时间
        //redissonLock.lock(10,TimeUnit.SECONDS);

		//4.tryLock(waitTime, leaseTime, 时间单位)
        //waitTime: 抢锁时等待时间,正常情况下3秒
        //leaseTime: 获取到锁后的锁失效时间,正常情况下300秒
        redissonLock.tryLock(3, 300, TimeUnit.SECONDS);
  
        try {
            //4.获取到锁的线程获取商品对商品数量进行减少操作
            String result = stringRedisTemplate.opsForValue().get("goods:" + goodsId);
            int goodsNumber = result == null ? 0 : Integer.parseInt(result);
            if (goodsNumber <= 0) {
                return "商品已经售罄";
            }
            int realNumber = goodsNumber - 1;
            //5.更新商品数量
            stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
            return "成功秒杀商品,此时还剩余:" + realNumber + "件";

        } finally {
            //6.防止发生异常通过finally释放锁资源
            //7.redissonLock.isLocked()判断当前是否持有锁
            //redissonLock.isHeldByCurrentThread()//判断当前持有的锁是否是当前线程下的
            if (redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()) {
                redissonLock.unlock();
            }
        }
    }
}
  1. 方法摘要

Redisson getLock(“lockKey”) 对指定key加锁
lock() 获取锁 / lock(10,TimeUnit.SECONDS) 获取锁并指定失效时间
tryLock(waitTime, leaseTime, 时间单位)
isLocked() 判断是否还持有锁
isHeldByCurrentThread() 判断持有的锁是否是当前线程下的
unlock() 释放锁

四. Redlock 分析

  1. 对 redlock的解释: 多个服务器间,保证同一时间段内,只有一个请求,防止数据出现并发安全问题,该锁在java中通过Redisson实现,使用时需要引入Redisson依赖,主要是针对上面使用setNx+指定key做锁时,出现的锁超时问题,与集群环境下异步同步,主节点宕机无法锁无法同步问题
  2. 中文网
  3. 设计理念:(集群redis计算公式: number= 2*宕机台数+1, 宕机多少台后不影响正常使用)
    在这里插入图片描述

解决集群环境master宕机数据不一致锁不住的问题

  1. 首先上面提到过集群架构redis台数计算公式:2*允许宕机台数+1=不影响正常使用的机器总台数
  2. 上面设计理念中提到过: 集群环境中使用Redlock解决数据不一致情况,舍弃主从节点架构,通过多个节点去获取锁,获取锁的个数是n/2+1(n表示允许宕机台数)=至少获取锁个数,才表示获取锁成功
  3. 使用示例: 项目中引入Redisson依赖
  4. redis集群环境不同服务器地址配置到不同的redisson中,注入到容器
	@Bean
    public RedissonClient redissonClient1() {
        Config config = new Config();
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress("redis://第一台redis地址")
                .setTimeout(3000)
                .setConnectionPoolSize(64)
                .setConnectionMinimumIdleSize(24);
        serverConfig.setPassword("Password");
        return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient2() {
        Config config = new Config();
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress("redis://第二台redis地址")
                .setTimeout(3000)
                .setConnectionPoolSize(64)
                .setConnectionMinimumIdleSize(24);
        serverConfig.setPassword("Password");
        return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient3() {
        Config config = new Config();
        SingleServerConfig serverConfig = config.useSingleServer()
                .setAddress("redis://第三台redis地址")
                .setTimeout(3000)
                .setConnectionPoolSize(64)
                .setConnectionMinimumIdleSize(24);
        serverConfig.setPassword("Password");
        return Redisson.create(config);
    }
  1. 使用示例
import lombok.extern.slf4j.Slf4j;
import org.redisson.RedissonRedLock;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;

@RestController
@Slf4j
public class RedLockController {

    public static final String CACHE_KEY_REDLOCK = "ATGUIGU_REDLOCK";

    @Autowired
    RedissonClient redissonClient1;

    @Autowired
    RedissonClient redissonClient2;

    @Autowired
    RedissonClient redissonClient3;

    @GetMapping(value = "/redlock")
    public void getlock() {
        //1.通过多个redis节点获取锁
        RLock lock1 = redissonClient1.getLock(CACHE_KEY_REDLOCK);
        RLock lock2 = redissonClient2.getLock(CACHE_KEY_REDLOCK);
        RLock lock3 = redissonClient3.getLock(CACHE_KEY_REDLOCK);
        //三个节点都获取到锁,才表示获取锁成功
        RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
        boolean isLockBoolean;
        try {
            //2.tryLock()尝试获取锁
            //waitTime 抢锁的等待时间,正常情况下 等3秒
            //leaseTime就是redis key的续时时间,正常情况下5分钟300秒。
            isLockBoolean = redLock.tryLock(3, 300, TimeUnit.SECONDS);
            log.info("线程{},是否拿到锁:{} ",Thread.currentThread().getName(),isLockBoolean);
            if (isLockBoolean) {
                System.out.println(Thread.currentThread().getName()+"\t"+"---come in biz");
                //业务逻辑,忙10分钟
                try { TimeUnit.MINUTES.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
            }else{
                System.out.println("获取锁失败");
            }
        } catch (Exception e) {
            log.error("redlock exception ",e);
        } finally {
            //无论如何, 最后都要解锁
            //由于锁的可重入性,加几次锁就要释放几次,具体查看锁的可重入性分析
            redLock.unlock();
            redLock.unlock();
            redLock.unlock();
        }
    }
}
  1. 解释上述代码:

1)不同redis服务器配置到不同的redisson中
2)获取锁时对多个redisson同时去获取,都获取到才表示获取锁成功,也就是分别对三个redis节点添加代表锁的key,都添加成功才表示获取锁成功,这样在后续操作中有一台机器如果宕机,不会影响到锁

锁的定时续期

  1. 怎么对锁设置超时时间,指定时间内无法释放怎么办
  2. watchdog看门狗: 在redisson中提供了一个额外的守护线程,定期检查主线程是否还持有锁,如果有则延长锁的过期时间,源码中设置检查时间为(每1/3锁的时间检查一次)

watchdog源码分析

  1. 查看Redisson 中的 lock()获取锁方法发现Redisson中实现了 "java.util.concurrent.locks " JUC 并发包下的接口,查看在Redisson中对该接口的实现类RedissonLock,发现

leaseTime 默认为 -1

	public void lock() {
        try {
        	//继续查看该lock方法
            this.lock(-1L, (TimeUnit)null, false);
        } catch (InterruptedException var2) {
            throw new IllegalStateException();
        }
    }
  1. 继续向下查看lock方法 ,在该方法中调用了tryAcquire(-1L, leaseTime, unit, threadId),在不设置leaseTime的情况下默认传递了-1
	private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        //查看tryAcquire()方法
        Long ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
        if (ttl != null) {
            RFuture<RedissonLockEntry> future = this.subscribe(threadId);
            if (interruptibly) {
                this.commandExecutor.syncSubscriptionInterrupted(future);
            } else {
                this.commandExecutor.syncSubscription(future);
            }

            try {
                while(true) {
                    ttl = this.tryAcquire(-1L, leaseTime, unit, threadId);
                    if (ttl == null) {
                        return;
                    }

                    if (ttl >= 0L) {
                        try {
                            ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        } catch (InterruptedException var13) {
                            if (interruptibly) {
                                throw var13;
                            }

                            ((RedissonLockEntry)future.getNow()).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                        }
                    } else if (interruptibly) {
                        ((RedissonLockEntry)future.getNow()).getLatch().acquire();
                    } else {
                        ((RedissonLockEntry)future.getNow()).getLatch().acquireUninterruptibly();
                    }
                }
            } finally {
                this.unsubscribe(future, threadId);
            }
        }
    }
  1. 查看tryAcquire()方法最终会执行到tryAcquireAsync(),在该方法内部会判断传递的leaseTime是否等于-1(默认情况下-1,也就是不设置情况下),如果传递了说明设置了超时时间, 如果未传递没有设置超时时间执行tryLockInnerAsync()时设置了一个lockWatchdogTimeout = 30000L,了解到默认不设置超时时间情况下watchdog超时时间为30秒,并且该方法执行会返回一个RFuture,这是一个FutureTask,通过该FutureTask实现延时续命
 private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
 		//1.在我们获取锁时如果传递了leaseTime 不等于-1,走该流程
        if (leaseTime != -1L) {
            return this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
        	//2.如果未传递leaseTime走该流程,执行getLockWatchdogTimeout()拿了一个超时时间30000L
        	//并且该方法执行会返回一个RFuture<Long>,这是一个FutureTask,通过该FutureTask实现延时续命
            RFuture<Long> ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
            //3.通过返回的RFuture执行 onComplete()
            ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
                if (e == null) {
                    if (ttlRemaining == null) {
                    	//4.内部执行scheduleExpirationRenewal()方法,插入了一个检查过期时间的定时任务线程
                        this.scheduleExpirationRenewal(threadId);
                    }

                }
            });
            return ttlRemainingFuture;
        }
    }
  1. 查看获取检查线程的scheduleExpirationRenewal()方法,在该方法中最先会执行renewExpiration(),查看该方法
	private void scheduleExpirationRenewal(long threadId) {
        RedissonLock.ExpirationEntry entry = new RedissonLock.ExpirationEntry();
        RedissonLock.ExpirationEntry oldEntry = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            //首先执行的该方法
            this.renewExpiration();
        }

    }

	//renewExpiration()方法插入执行定时任务,定时检查超时时间,执行方法内部的
	private void renewExpiration() {
        RedissonLock.ExpirationEntry ee = (RedissonLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.get(this.getEntryName());
        if (ee != null) {
            Timeout task = this.commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            	//开启一个线程
                public void run(Timeout timeout) throws Exception {
                    RedissonLock.ExpirationEntry ent = (RedissonLock.ExpirationEntry)RedissonLock.EXPIRATION_RENEWAL_MAP.get(RedissonLock.this.getEntryName());
                    if (ent != null) {
                        Long threadId = ent.getFirstThreadId();
                        if (threadId != null) {
                            RFuture<Boolean> future = RedissonLock.this.renewExpirationAsync(threadId);
                            future.onComplete((res, e) -> {
                                if (e != null) {
                                    RedissonLock.log.error("Can't update lock " + RedissonLock.this.getName() + " expiration", e);
                                } else {
                                    if (res) {
                                    	//2.续时,每次续时30秒
                                        RedissonLock.this.renewExpiration();
                                    }

                                }
                            });
                        }
                    }
                }
                //1.internalLockLeaseTime /3l: 也就是传进来的leaseTime超时时间默认30秒/3l=10秒
                //也就是当前开启的检查超时时间的定时任务执行的定时时间,默认10秒,默认情况下相当于每10秒中会执行一次
            }, this.internalLockLeaseTime / 3L, TimeUnit.MILLISECONDS);
            ee.setTimeout(task);
        }
    }
  1. watchdog总结: 默认情况下获取到锁成功后持有锁时间为30秒,每十秒扫描一次判断是否还持有锁,如果持有续时,每次默认续时30秒,否则传递leaseTime情况下每leaseTime/3检查一次,每次续时leaseTime秒

1)查看Redisson 中的 lock()获取锁方法发现Redisson实现了"java.util.concurrent.locks " JUC 并发包下的接口实现类RedissonLock,查看lock源码
2)在lock()方法中调用了tryAcquire(-1L, leaseTime, unit, threadId),发现在不设置leaseTime的情况下默认传递了-1
3)查看tryAcquire()方法,在该方法中首先会判断leaseTime是不是等于-1,如果等于-1,执行tryLockInnerAsync()时拿了一个lockWatchdogTimeout = 30000L ,并且该方法执行会返回一个RFuture,这是一个FutureTask,到这里确认到锁持有锁的默认时间为30秒
4)上面拿到了一个RFuture,通过返回的RFuture执行 onComplete(),内部调用了scheduleExpirationRenewal()方法,插入了一个检查过期时间的定时任务线程
5)查看scheduleExpirationRenewal()方法,内部调用renewExpiration(),该方法中内部run了一个线程,该线程是个定时任务,定时时间为"internalLockLeaseTime / 3L",internalLockLeaseTime 也就是刚刚传递下来的30秒,也就是默认情况下每10秒执行一次该定时任务,检查是否还持有锁,如果持有续时30秒

锁的可重入性分析

  1. 根据上面分析的在lock()方法中会调用到tryAcquireAsync(),在该方法内部会执行tryLockInnerAsync(),该方法中有一段lua脚本,
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        this.internalLockLeaseTime = unit.toMillis(leaseTime);
        return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, command, 
        		"if (redis.call('exists', KEYS[1]) == 0) then " +
                	"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                	"redis.call('pexpire', KEYS[1], ARGV[1]); " +
                	"return nil;" +
                " end; " +
        		"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                	"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                	"redis.call('pexpire', KEYS[1], ARGV[1]); " +
                	"return nil; end; return redis.call('pttl', KEYS[1]);", Collections.singletonList(this.getName()), this.internalLockLeaseTime, this.getLockName(threadId));
    }
  1. 对lua脚本解释:

1)首先判断添加的key是否存在"redis.call(‘exists’, KEYS[1])" 等于0为不存在
2)如果不存在进行加锁"redis.call(‘hincrby’, KEYS[1], ARGV[2], 1)", 其中"KEYS[1]"也就是加锁的key,"ARGV[1]"表示加锁的客户端id,后面的"1"为加锁次数
3)"redis.call(‘pexpire’, KEYS[1], ARGV[1]); " 表示设置过期时间,前面通过看门狗了解到默认30秒
4) 如果判断锁已经存在执行下一个if,会判断是否是当前线程,如果是当前线程会增加加锁次数
5)如果锁已经存在并且不是当前线程,会返回过期时间ttl
在这里插入图片描述

  1. 了解到锁的可重入性,在我们加锁时,加几次锁就要释放几次(前面多客户端分别加锁情况下)

释放锁分析

  1. 查看unlock()方法内部调用了unlockAsync(),首先发现释放锁是异步执行的,在该方法中通过执行unlockInnerAsync()去释放锁,并拿到一个RFuture可以理解为FutureTask,
  2. 通过拿到的FutureTask中执行cancelExpirationRenewal() 取消watchdog获取锁是插入的定时续期的定时任务
	public void unlock() {
        try {
        	//1.查看该方法中的unlockAsync()
            this.get(this.unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException var2) {
            if (var2.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException)var2.getCause();
            } else {
                throw var2;
            }
        }
    }

	public RFuture<Void> unlockAsync(long threadId) {
        RPromise<Void> result = new RedissonPromise();
        //1.unlockInnerAsync()释放锁逻辑
        RFuture<Boolean> future = this.unlockInnerAsync(threadId);
        future.onComplete((opStatus, e) -> {
        	//2.取消watchdog看门狗续期的定时任务
            this.cancelExpirationRenewal(threadId);
            if (e != null) {
                result.tryFailure(e);
            } else if (opStatus == null) {
                IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + this.id + " thread-id: " + threadId);
                result.tryFailure(cause);
            } else {
                result.trySuccess((Object)null);
            }
        });
        return result;
    }
  1. 查看 unlockInnerAsync(),该方法内部又是一堆lua脚本,通过脚本删除redis中的key释放锁,重点里面有一个"local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1)",这个就是重入锁释放时,累减操作,所以重入锁时加了几次,就需要释放几次
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return this.evalWriteAsync(this.getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;", Arrays.asList(this.getName(), this.getChannelName()), LockPubSub.UNLOCK_MESSAGE, this.internalLockLeaseTime, this.getLockName(threadId));
    }

在这里插入图片描述

另外还有几个api

在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

redis 十二. 分布式锁 的相关文章

  • Stackexchange.redis 缺乏“WAIT”支持

    我在客户端应用程序正在使用的负载均衡器后面有 3 个 Web API 服务器 我正在使用这个库来访问具有一个主服务器和几个从服务器的 Redis 集群 目前不支持 WAIT 操作 我需要此功能来存储新创建的用户会话并等待它复制到所有从属服务
  • PooledRedisClientManager 未释放连接

    我将 json 数据列表存储在 redis 中并使用 ServiceStack c 客户端访问它 我本质上是在管理自己的外键 我在其中存储zrangeid 我使用应用程序内部的接口从zrange然后从 Redis 获取底层 json 对象并
  • redis集群不断打印日志WSA_IO_PENDING

    当我启动redis集群的所有redis服务器时 所有这些服务器不断打印类似WSA IO PENDING clusterWriteDone的日志 9956 03 Feb 18 17 25 044 WSA IO PENDING writing
  • 如何设置 Celery 以通过 ssl 与 Azure Redis 实例对话

    使用 的伟大答案 如何在microsoft azure上的django项目中配置celery redis https stackoverflow com questions 39616701 how to configure celery
  • Docker-compose Predis 不通过 PHP 连接

    我正在尝试使用 docker compose 将 PHP 与 redis 连接 docker compose yml version 2 services redis image redis 3 2 2 php image company
  • Redis INCRBY 有限制

    我想知道是否有一种方法可以通过我的应用程序的单次往返在 Redis 中执行此操作 对于给定的键K 其可能值V是范围内的任意整数 A B 基本上 它有上限和下限 When an INCRBY or DECRBY发出命令 例如INCRBY ke
  • 如何批量删除Redis中数十万个带有特殊字符的key

    我们有一个包含数十万个 Redis 键的列表 其中包含各种特殊字符 我们希望批量删除它们 对于这个问题上的类似问题 有一些很好的答案 如何使用 Redis 自动删除与模式匹配的键 https stackoverflow com questi
  • 如何将 ActionController::Live 与 Resque + Redis 一起使用(用于聊天应用程序)

    我正在尝试为我的 Rails 应用程序构建聊天功能 我在用ActionController Live Puma Resque Redis为了这 所以基本上在这种情况下 redissubscribe方法正在后台运行 使用resque 到目前为
  • redis 阻塞直到 key 存在

    我是 Redis 新手 想知道是否有办法能够await get通过它的键来获取值 直到该键存在 最小代码 async def handler data await self fetch key async def fetch key ret
  • 如何测试我的 Redis 缓存是否正常工作?

    我已经安装了 django redis cache 和 redis py 我遵循了 Django 的缓存文档 据我所知 以下设置就是我所需要的 但我如何判断它是否正常工作 设置 py CACHES default BACKEND redis
  • SignalR 无法连接到 SSL 上的 Azure Redis

    我目前在 Azure 上托管我的 redis 缓存服务器 并让 signalR 依赖它作为骨干 使用以下内容 GlobalHost DependencyResolver UseRedis 服务器 端口 密码 eventKey 这可以在端口
  • 如何在Redis中进行持久化存储?

    关闭redis服务器后 使用set存储的值被破坏 在这里我找到了使用持久性存储的方法 有人帮助我 如何使用javascript实现这一点 我想将客户端的一些值存储在 redis 数据库中 并且必须在其他客户端中使用该值 您需要配置 Redi
  • Node Js:Redis 作业在完成其任务后未完成

    希望你们做得很好 我在我的 Nodejs 项目中实现了 BullMQ Bull 的下一个主要版本 来安排发送电子邮件的作业 例如 发送忘记密码请求的电子邮件 所以 我编写了如下所示的代码 用户服务 await resetPasswordJo
  • Lua中按字符分割字符串

    我有像这样的字符串 ABC DEF 我需要将它们分开 字符并将两个部分分别分配给一个变量 在 Ruby 中 我会这样做 a b ABC DEF split 显然Lua没有这么简单的方法 经过一番挖掘后 我找不到一种简短的方法来实现我所追求的
  • Laravel 所有会话 ID 与 Redis 驱动程序

    在我的应用程序中 我希望允许某些用户能够注销除他 她之外的所有其他用户 当会话驱动程序设置为文件时 我已经完成了此功能 但现在我使用 redis 作为会话驱动程序 并且我无法找到任何方法来列出所有当前会话 就像我在文件时所做的那样司机 问题
  • redis 2.8.7 Linux Sentinel环境配置问题,如何使其自启动,应该订阅什么?

    现在我们尝试使用 redis 2 8 7 作为缓存存储 来自使用 booksleeve 客户端的 NET Web 应用程序 目前看来这是一个非常有趣和令人兴奋的任务 redis 文档非常好 但由于缺乏真正的实践经验 我确实有几个关于如何正确
  • Scala 使用的 Redis 客户端库建议

    我正在计划使用 Scala 中的 Redis 实例进行一些工作 并正在寻找有关使用哪些客户端库的建议 理想情况下 如果存在一个好的库 我希望有一个为 Scala 而不是 Java 设计的库 但如果现在这是更好的方法 那么仅使用 Java 客
  • 使用redis进行树形数据结构

    我需要为基于树的键值开发一个缓存系统 与Windows注册表编辑器非常相似 其中缓存键是字符串 表示树中到值的路径 可以是原始类型 int string bool double 等 或子树本身 例如 key root x y z w val
  • 如何使用 Redis 自动删除与模式匹配的键

    在我的 Redis DB 中 我有很多prefix
  • 如何使用redis发布/订阅

    目前我正在使用node js和redis来构建应用程序 我使用redis的原因是因为发布 订阅功能 该应用程序只是在用户进入用户或离开房间时通知经理 function publishMsg channel mssage redisClien

随机推荐

  • C++万能头文件#include<bits/stdc++.h>

    include
  • 发放金币(循环)

    分享一下个人思路 如果拿1金币 可以拿1天 2金币拿两天 n金币拿n天 也就是说 我们要拿n金币 从当前的天数开始循环n次 每次拿n 每次拿完之后天数 1 include
  • 微信小程序之获取用户位置权限(拒绝后提醒)

    小编推荐 Fundebug专注于JavaScript 微信小程序 微信小游戏 Node js和Java实时BUG监控 真的是一个很好用的bug监控费服务 众多大佬公司都在使用 微信小程序获取用户当前位置有三个方式 1 wx getLocat
  • Google guava之Table简介说明

    转自 Google guava之Table简介说明 下文笔者讲述guava中Table集合的简介说明 如下所示 guava之Table集合简介 Table集合 用于存储数据表 类似于Map
  • 报错异常:java.lang.NoClassDefFoundError

    一 问题背景 由原先的jdk1 8升级至jdk20 启动项目登录后台出现报错问题 org springframework web util NestedServletException Handler dispatch failed nes
  • HttpClient介绍

    本文内容整理自 https blog csdn net w372426096 article details 82713315 HttpClient相比传统JDK自带的URLConnection 增加了易用性和灵活性 它不仅使客户端发送Ht
  • WIN10-22H2专业版_电脑维修人员专用装机系统镜像【04.20更新】

    WIN10 22H2专业版是由站长亲自封装的电脑维修人员专用装机系统镜像 系统干净无广告 稳定长效不卡顿 适合电脑维修店用来维修电脑重装系统 此版本是WIN10系统里非常稳定的正式版本之一 适合在维修电脑时重装系统或者大批量装机使用 本次封
  • 服务器入门

    GPU工作站 服务器 1 cdot 1 1 型号 AMD宵龙 RTX3090 为例 内存类型 REG 内存 8个DIMM DDR4插槽 3200高频内存 gt system长时间稳定运作 存储 8个 3 5英寸驱动 8块3 5存硬盘 2个N
  • C++ 11 新特性之统一初始化语法

    C 之前的初始化语法很乱 有四种初始化方式 而且每种之前甚至不能相互转换 让人有种剪不断 理还乱的感觉 曾经去面试 就有人问我string有几种初始化方式 当时就说出了两种 fuck 面试官还得意的说 你连基本的初始化方式都记不清 还做啥2
  • Qt 中 connect 函数实现信号与槽函数的连接

    目录 简介 connect 函数原型 代码示例 自定义信号和槽函数 信号和槽函数的线程安全性 总结 简介 Qt 是一个功能强大的跨平台应用程序开发框架 其提供了 connect 函数用于信号和槽的连接 实现了对象之间的通信 本文将介绍 co
  • RUST中所有权/生命周期/借用本质探讨

    本书github链接 深入RUST标准库内核 本文摘自以上链接的书籍 如果对本文中涉及的若干知识想进一步了解 请阅读此书 RUST在定义一个变量时 实际上把变量在逻辑上分成了两个部分 变量的内存块与变量内容 变量类型定义了内存块内容的格式
  • vue 源码解读

    前端基本 0基础 尝试从代码入手 不会的直接搜索就行了 成功添加页面 test33 1 在 src config menu js 源码24行增加 title Test33 name Test33 icon 2 在 src routers r
  • pycharm的git密码错误

    原文地址 https www cnblogs com wangjian941118 p 10721650 html pycharm使用gitlab输错密码解决办法 这个问题困扰我两周了 今天抱着试一试的想法 随手搜了一下 出现了新的结果 就
  • 去银行写代码是什么体验?

    最近在知乎上的一个回答火了 关于如何学习操作系统的 分享给大家 如何学会操作系统这门课程 一线互联网岗位和银行 国企还是有点区别的 这篇文章 讲详细讲一讲银行或者金融科技的相关问题 包括面试 待遇等等 虽然前阵子网传几大互联网公司都去掉了大
  • ChatGPT解决了我的出行规划焦虑

    我的五一出行规划 五一旅游季又将到来 许多人为了规划理想的行程而苦恼 需要投入相当时间来筛选各种信息 然而 现在有了Chat GPT 安排美好旅途变得异常简单 只要您告诉GPT您的日期和目的地 不到30秒就可以生成个性化的行程攻略 同时还可
  • APP兼容性测试如何测试?

    随着 APP 应用范围越来越广 用户群体越来越大 终端设备的型号也越来越多 移动终端碎片化加剧 使得 APP兼容性测试成为测试质量保障必须要考虑的环节 APP兼容性的测试主要包含系统兼容 产商ROM 兼容性 屏幕分辨率兼容 网络兼容 其他兼
  • PHP+nginx完成大文件下载处理

    最近在板子上做文件下载的处理 需求相对简单 一个下载请求过来 根据请求的数据决定给那些文件回去 于是采用了php nginx的方式来处理 尝试 nginx用来处理下载请求 拿到请求以后 调用配置好的php文件 php文件中对请求的参数做处理
  • actuator--基础--6.1--端点解析--health端点

    actuator 基础 6 1 端点解析 health端点 代码位置 https gitee com DanShenGuiZu learnDemo tree master actuator learn actuator01 1 health
  • java队列模拟_Java模拟队列

    用Java模拟队列的出队和进队 1 代码 Java 代码 package com stackANDqueue import java io DataInputStream import java io IOException 循环队列的入队
  • redis 十二. 分布式锁

    目录 一 分布式锁概述 二 redis 锁基础版示例 三 redis 锁进阶 Redlock 四 Redlock 分析 解决集群环境master宕机数据不一致锁不住的问题 锁的定时续期 watchdog源码分析 锁的可重入性分析 释放锁分析