分布式锁之redis实现

2023-11-09

docker安装redis

拉取镜像

docker pull redis:6.2.6

查看镜像

87d429bb8dfa467baedf8733e62ac37b.png

启动容器并挂载目录

需要挂在的data和redis.conf自行创建即可

docker run --restart always -d -v /usr/local/docker/redis/redis.conf:/usr/local/etc/redis/redis.conf -v /usr/local/docker/redis/data:/data --name redis -p 6379:6379 redis:6.2.6 redis-server /usr/local/etc/redis/redis.conf

查看运行状态 

不要忘记开放端口6379

b8ff2272d9354ac39d198b5819e62aef.png

进入容器内部使用redis-cli

docker exec -it 13829d3f335a /bin/bash

redis-cli

[可选]用密码登录 

修改redis.conf配置文件,设置 requirepass xxxxx

spring boot 集成redis

添加依赖

      <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
      </dependency>

添加redis配置 


server.port= 10010

spring.datasource.driver-class-name= com.mysql.cj.jdbc.Driver
spring.datasource.url= jdbc:mysql://39.106.53.30:3306/lock_db?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=root
spring.datasource.password=root123456

spring.redis.host=39.106.53.30
spring.redis.port=6379

使用StringRedisTemplate

如果直接使用RedisTemplate使用的序列化器是jdk的,存的是二进制,使用StringRedisTemplate默认初始化序列化器就是String类型

    public StringRedisTemplate() {
        this.setKeySerializer(RedisSerializer.string());
        this.setValueSerializer(RedisSerializer.string());
        this.setHashKeySerializer(RedisSerializer.string());
        this.setHashValueSerializer(RedisSerializer.string());
    }

redis演示超卖问题

执行票数存入redis指令

set ticket 5000

 编写代码演示超卖问题

/**
 * @Author sl
 */
@Service

public class TicketServiceImpl implements TicketService {


    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public  void sellTicket(){

        //获取redis中的票数
        String ticket = redisTemplate.opsForValue().get("ticket");

        if(ticket!= null && ticket.length() != 0){
            // 扣减票数

            Integer integer = Integer.valueOf(ticket);

            if(integer >0){
                redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
            }

        }

    }
}

 5000请求压测,结果为4895,发生了超卖问题

a7fc4d7f1ee548169fd7f08b0fc6ca0a.png
e1d1f4af1d704938b8188b32b6ccbc36.png
redis解决超卖问题 

解决方案

解决方案

  •         本地jvm锁(这种情况仅限单机,不做介绍)
  •         redis乐观锁 watch  multi exec(性能低)
  •         分布式锁(redis+lua手动实现或者通过redission实现)

redis乐观锁实现 

watch: 监控一个或者多个key,如果这些key在提交事务(exec)之前被其他用户修改过,那么事务将执行失败,需要重新获取最新数据重头操作

multi: 开启事务,使用该命令,标记一个事务块的开始,redis会将这些操作放入队列中

exec: 执行事务

720c867464d145e68e27ad877dc0f155.png

 乐观锁的代码需要包在SessionCallback中实现

package com.test.lockservice.service.impl;

import com.test.lockservice.service.TicketService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.core.RedisOperations;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * @Author sl
 */
@Service

public class TicketServiceImpl implements TicketService {


    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public  void sellTicket(){

        redisTemplate.execute(new SessionCallback<Object>() {
            @Override
            public  Object execute(RedisOperations redisOperations) throws DataAccessException {

                // 开启监听
                redisOperations.watch("ticket");
                //获取redis中的票数
                String ticket = redisTemplate.opsForValue().get("ticket");

                if(ticket!= null && ticket.length() != 0){
                    // 开启事务
                    redisOperations.multi();
                    Integer integer = Integer.valueOf(ticket);
                    // 扣减票数
                    redisOperations.opsForValue().set("ticket",String.valueOf(--integer));

                    // 提交事务
                    List exec = redisOperations.exec();

                    // 如果获取锁失败 ,重试
                    if(exec == null || exec.size() == 0){
                        try {
                            // 减少锁争抢,避免栈内存溢出
                            Thread.sleep(40);
                            sellTicket();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                }
                return null;
            }
        });


    }
}

 1000请求压测,结果为4000,没有发生超卖,但性能极低

b975be5edee2453597b57ccd66d557f1.pngredis实现分布式锁

分布式锁的实现方案中redis的实现主要思想就是独占排他使用,在redis中可以使用setnx命令进行独占排他使用

  • 加锁 setnx 
  • 解锁 del
  • 重试:递归(容易造成栈内存溢出),这里使用循环

 

package com.test.lockservice.service.impl;

import com.test.lockservice.service.TicketService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

/**
 * @Author sl
 */
@Service

public class TicketServiceImpl implements TicketService {


    @Autowired
    private StringRedisTemplate redisTemplate;

    @Override
    public  void sellTicket(){

        // setnx 排他使用,如果获取锁不成功,则重试
        while(!redisTemplate.opsForValue().setIfAbsent("lock", "111")){
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            //获取redis中的票数
            String ticket = redisTemplate.opsForValue().get("ticket");

            if(ticket!= null && ticket.length() != 0){
                // 扣减票数

                Integer integer = Integer.valueOf(ticket);

                if(integer >0){
                    redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
                }
            }
        } finally {
            // 解锁操作
            redisTemplate.delete("lock");
        }

    }
}

压测1000,显示无超卖现象 

5629d991544d4b7c9b77472abca1c682.png042eaa1342124922a9d528dc660a16ec.png

添加过期时间防止死锁问题

当前代码存在问题,假如现在有4台服务器争抢锁,编号为1的服务器抢到了锁,但是没来得及释放锁,就宕机啦,其他2,3,4服务器就永远拿不到锁,这就是产生的死锁问题,解决方案是给锁添加过期时间来解决

4affdb3b239141e78d695451512263fb.png

要保证枷锁和设置过期时间具有原子性,否则加了锁,没来得及给过期时间就宕机啦,又会产生死锁问题

expire key 20指令和枷锁指令是两条指令不具有原子性,在这里使用 set key ex 20 nx命令设置过期时间来保证原子性

9d6becfe33a84ebf93433896a4d65bcf.png

添加过期时间和获取锁的原子性

redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS)

 // setnx 排他使用,如果获取锁不成功,则重试
        while(!redisTemplate.opsForValue().setIfAbsent("lock", "111",3, TimeUnit.SECONDS)){
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
 }

通过UUID防止误删

因为已经加了过期时间,如果加了3秒过期时间,第一个请求到了第3秒还没执行完毕,锁就失效了,这时第二个请求获取锁,执行1s的时候,第一个请求执行到del指令,就把第二个锁删除掉啦(误删)

解决方案:通过uuid标识是自己的锁,通过判断是自己的锁,在删除

84e258e5fb574c91a78b4244a41923d9.png

添加uuid防止误删

    public  void sellTicket(){

        String uuid = UUID.randomUUID().toString();
        // setnx 排他使用,如果获取锁不成功,则重试
        while(!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)){
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            //获取redis中的票数
            String ticket = redisTemplate.opsForValue().get("ticket");

            if(ticket!= null && ticket.length() != 0){
                // 扣减票数

                Integer integer = Integer.valueOf(ticket);

                if(integer >0){
                    redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
                }
            }
        } finally {
            // 判断是自己的锁在删除
            if(uuid.equals(redisTemplate.opsForValue().get("lock"))){
                redisTemplate.delete("lock");
            }
            
        }

    }

使用Lua脚本解决防误删的原子性问题

判断和删除锁之间需要保证原子性第一个请求因为如果判断的时候,发现是自己的锁,然后此时锁超过了过期时间,此时,第二个请求获取到锁,第一个请求执行del指令,删除的是第二个请求的锁,所以需要在判断和删除锁之间保持原子性

解决方案:使用Lua脚本保证原子性,Lua脚本将多条命令一次性发给redis,redis单线程的特性可以保证原子性操作

Lua脚本介绍和redis执行Lua脚本 

Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能,编译后仅仅一百余K,可以很方便的嵌入别的程序里

菜鸟地址:https://www.runoob.com/lua/lua-variables.html

Lua脚本流程控制和变量定义

--[ 定义全局变量a 局部变量用local a --]
a = 100;
--[ 检查条件 --]
if( a < 20 )
then
   --[ if 条件为 true 时执行该语句块 --]
   print("a 小于 20" )
else
   --[ if 条件为 false 时执行该语句块 --]
   print("a 大于 20" )
end
print("a 的值为 :", a)

在redis中执行Lua脚本

redis中继承了Lua脚本,lua-time-limit参数现在脚本最长运行时间,默认是5秒,执行指令为:

eval script numkeys key [key ...] arg [arg ...]

numkeys:标识key的数量 不能省略

hello word

eval "return 'hello world'" 0

分支语句KEYS和ARGV必须大写

eval "if KEYS[1]==1 then return KEYS[1] else return  ARGV[1] end" 1 0 3 

4c5257c42e834f25958f88987fdfb151.png解决判断和删除之间的原子性问题

// 如果是自己的锁,则删除,否则返回0为false
if redis.call('get',KEYS[1]) == ARGV[1]
then
    return redis.call('del',KEYS[1])
else
    return 0
end

keys:lock

argv: uuid
 public  void sellTicket(){

        String uuid = UUID.randomUUID().toString();
        // setnx 排他使用,如果获取锁不成功,则重试
        while(!redisTemplate.opsForValue().setIfAbsent("lock", uuid,3, TimeUnit.SECONDS)){
            try {
                Thread.sleep(40);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        try {
            //获取redis中的票数
            String ticket = redisTemplate.opsForValue().get("ticket");

            if(ticket!= null && ticket.length() != 0){
                // 扣减票数

                Integer integer = Integer.valueOf(ticket);

                if(integer >0){
                    redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
                }
            }
        } finally {
            String script="if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";

            this.redisTemplate.execute(new DefaultRedisScript<>(script,Boolean.class), Collections.singletonList("lock"),uuid);

        }

    }

 压测1000 显示无超卖现象

daf44ce416a4420294d1bd923028b763.png

4b48b5d2be32427c9ec2bf1033598291.pnghash+Lua解决锁的可重复入问题

由于上述加锁命令使用了 SETNX ,一旦键存在就无法再设置成功,这就导致后续同一线程内继续加 锁,将会加锁失败。当一个线程执行一段代码成功获取锁之后,继续执行时,又遇到加锁的子任务代码,可重入性就保证线程能继续执行,而不可重入就是需要等待锁释放之后,再次获取锁成功,才能继续往下执行

第一个就是锁的重入问题

当前方法a获取锁,在方法之中调用b方法,b方法也需要获取锁,这个时候造成了死锁问题,采用hash+Lua脚本解决

第二个就是锁的自动续期问题:后续会解决续期问题

探讨ReentrantLock的可重入原理

ReentrantLock继承了aqs,aqs是锁的基石

可重入锁加锁流程

  • CAS获取锁,如果没有线程占用锁(state==0),加锁成功并记录当前线程是有锁线程
  • 如果state的值不为0,说明锁已经被占用。则判断当前线程是否是有锁线程,如果是则重入 (state + 1)
  • 否则加锁失败,入队等待

可重入锁解锁流程

  • 判断当前线程是否是有锁线程,不是则抛出异常
  • 对state的值减1之后,判断state的值是否为0,为0则解锁成功,返回true
  • 如果减1后的值不为0,则返回false

hash+Lua实现可重复入锁

参照ReentrantLock中的非公平可重入锁实现分布式可重入锁: hash + lua脚本
加锁

  •     判断锁是否存在 (exists),则直接获取锁 hset key field value
  •     如果锁存在则判断是否自己的锁 (hexists),如果是自己的锁则重入: hincrby key field increment
  •     否则重试:递归 循环
加锁
如果锁不存在或者这是自己的锁,就通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次
数加1

if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 
then 
	redis.call('hincrby',KEYS[1], ARGV[1], 1)
	redis.call('expire',KEYS[1],ARGV[2])
	return 1
else 
	return 0
end

keys lock
argv uuid 30


解锁
判断 hash set 可重入 key 的值是否等于 0
如果为 nil 代表 自己的锁已不存在,在尝试解其他线程的锁,解锁失败
如果为 0 代表 可重入次数被减 1
如果为 1 代表 该可重入 key 解锁成功
1 代表解锁成功,锁被释放
0 代表可重入次数被减 1
null 代表其他线程尝试解锁,解锁失败
if redis.call('hexists',KEYS[1],ARGV[1])==0 
then 
	return nil 
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 
then 
	return 0 
else 
	redis.call('del',KEYS[1]) 
	return 1 
end

keys lock
argv uuid

exists判断lock是否存在,hexists lock uuid 判断filed是否存在

通过hincrby(不存在就新增并加1,存在就加1)获取锁或者锁次数加1,hincrby命令,如果增加的key filed 不存在则新增并加1

ef39ebc7bb9840cd8169db5e279dc80a.png

加锁工具类

package com.test.lockservice.utils;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Collections;
import java.util.UUID;

public class RedisLock {

    private StringRedisTemplate redisTemplate;

    private String lockName;

    private String uuid;

    private Integer expire = 30;
    private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();

    public RedisLock(StringRedisTemplate redisTemplate, String lockName) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = THREAD_LOCAL.get();
        if (uuid == null) {
            this.uuid = UUID.randomUUID().toString();
            THREAD_LOCAL.set(uuid);
        }
        this.expire = expire;
    }

    public void lock(){
        this.lock(expire);
    }

    public void lock(Integer expire){
        this.expire = expire;
        String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 then redis.call('hincrby',KEYS[1], ARGV[1], 1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
        System.out.println(script);
        if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList(lockName), uuid, expire.toString())){
            try {
                // 没有获取到锁,重试
                Thread.sleep(60);
                lock(expire);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    public void unlock(){
        String script = "if redis.call('hexists',KEYS[1],ARGV[1])==0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 then return 0 else redis.call('del',KEYS[1]) return 1 end";
        /**
         * 如果返回值没有使用Boolean,spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断
         * 所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功
         */
        Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),  Collections.singletonList(lockName), uuid);

        // 如果未返回值,代表尝试解其他线程的锁
        if (result == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);
        } else if (result == 1) {
            THREAD_LOCAL.remove();
        }
    }
}

测试可重入 

@Override
    public  void checkAndLock(){
        RedisLock lock = new RedisLock(redisTemplate, "lock");
        lock.lock();
        // 查询票数
        Ticket ticket = ticketMapper.selectOne(new QueryWrapper<Ticket>().eq("sell_company", "12306"));
        // 判断不为空和票数大于0
        if(ticket!=null&& ticket.getCount() > 0){
            ticket.setCount(ticket.getCount()-1);
            ticketMapper.updateById(ticket);
        }
        // 测试可重入
        testRepeatEntry();
        lock.unlock();
    }

    public void testRepeatEntry(){
        RedisLock lock = new RedisLock(redisTemplate, "lock");
        lock.lock();
        System.out.println("redis分布式锁测试可重入");
        lock.unlock();
    }

 压测1000,未发现超卖问题,并解决可重入的问题

7dfd3282120641e2a34d09da730a07c6.png

d920dd12a6cb4e7dbce13ea0c1b15000.png锁的自动续期

如果在锁还在使用过程中,锁还未使用完,就失效了,也就产生了锁如何自动添加过期时间的问题 

实现方案: 定时器 + Lua脚本定时续期


自动续期

if redis.call('hexists',KEYS[1],ARGV[1])==1
then
	redis.call('expire',KEYS[1],ARGV[2]) 
	return 1 
else 
	return 0 
end

这里没有选用线程池的原因在于释放锁之后没有取消定时任务的方法,所以选用jdk自带的

Timer作为定时任务 

package com.test.lockservice.utils;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.*;

public class RedisLock {

    private StringRedisTemplate redisTemplate;

    private String lockName;

    private String uuid;

    private Integer expire = 30;

    @SuppressWarnings("all")
    private static final Timer timer = new Timer();

    private static final ThreadLocal<String> THREAD_LOCAL = new ThreadLocal<>();

    public RedisLock(StringRedisTemplate redisTemplate, String lockName) {
        this.redisTemplate = redisTemplate;
        this.lockName = lockName;
        this.uuid = THREAD_LOCAL.get();
        if (uuid == null) {
            this.uuid = UUID.randomUUID().toString();
            THREAD_LOCAL.set(uuid);
        }
        this.expire = expire;
    }

    public void lock(){
        this.lock(expire);
    }

    public void lock(Integer expire){
        this.expire = expire;
        String script = "if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1], ARGV[1]) == 1 then redis.call('hincrby',KEYS[1], ARGV[1], 1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
        if (!this.redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Collections.singletonList(lockName), uuid, expire.toString())){
            try {
                // 没有获取到锁,重试
                Thread.sleep(60);
                lock(expire);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        // 自动续期
        renewExpire();

    }

    public void unlock(){
        String script = "if redis.call('hexists',KEYS[1],ARGV[1])==0 then return nil elseif redis.call('hincrby',KEYS[1],ARGV[1],-1)>0 then return 0 else redis.call('del',KEYS[1]) return 1 end";
        /**
         * 如果返回值没有使用Boolean,spring-data-redis 进行类型转换时将会把 null 转为 false,这就会影响我们逻辑判断
         * 所以返回类型只好使用 Long:null-解锁失败;0-重入次数减1;1-解锁成功
         */
        Long result = this.redisTemplate.execute(new DefaultRedisScript<>(script, Long.class),  Collections.singletonList(lockName), uuid);

        // 如果未返回值,代表尝试解其他线程的锁
        if (result == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by lockName: " + lockName + " with request: " + uuid);
        } else if (result == 1) {
            THREAD_LOCAL.remove();
        }
        // 释放锁成功
        this.uuid = null;
    }


    @SuppressWarnings("all")
    private void renewExpire() {
        String script = "if redis.call('hexists',KEYS[1],ARGV[1])==1 then redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                if (uuid != null) {
                    redisTemplate.execute(new DefaultRedisScript<>(script, Boolean.class), Arrays.asList(lockName), RedisLock.this.uuid, expire.toString());
                    renewExpire();
                }
            }
        },expire * 1000 / 3);
    }
}

红锁算法 

利用红锁算法解决集群下锁的问题:

  • 1、应用程序获取当前系统时间
  • 2、应用程序以相同的kv值依次从多个redis实例中获取锁,如果某一个节点超过了一定时间(小于过期时间)没有获取到锁,则放弃,尽快从其他节点获取锁,避免一个节点宕机阻塞
  • 3、计算锁的消耗时间= 客户端当前时间-step1中的事件,获取锁的时间小于总的锁定时间,并且半数以上节点获取锁成功,认为获取锁成功
  • 4、如果获取锁失败,对所有节点释放锁

redis分布式锁小结

redis分布式锁最开始采用setnex+Lua脚本的方式,我们发现存在不可重入的问题,于是使用hash+Lua脚本解决可重入问题,并解决了自动续期问题,但是还存在一个重要问题,就是redis集群部署所带来的并发问题,所以使用Redission作为最终的分布式锁解决方案

redis集群状态下的问题:
  • 客户端A从master获取到锁
  • 在master将锁同步到slave之前,master宕掉了
  • slave节点被晋级为master节点
  • 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁

redisson中的分布式锁  

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅 提供了一系列的分布式的Java常用对象,还提供了许多分布式服务
Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上

Redisson引入依赖

 <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.11.2</version>
</dependency>

Redission配置 

package com.test.lockservice.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author sl
 */
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();
//        config.useClusterServers()
        config.useSingleServer().setAddress("redis://39.106.53.30:6379").setPassword("12345");
        return Redisson.create(config);
    }
}

Redission使用

@Autowired
    private RedissonClient redissonClient;


    public void userRedisson(){
        // 获取锁
        RLock lock = redissonClient.getLock("lock");
        try {
            // 加锁
            lock.lock();
            //获取redis中的票数
            String ticket = redisTemplate.opsForValue().get("ticket");

            if(ticket!= null && ticket.length() != 0){
                // 扣减票数
                Integer integer = Integer.valueOf(ticket);

                if(integer >0){
                    redisTemplate.opsForValue().set("ticket",String.valueOf(--integer));
                }
            }
        } finally {
            // 解锁
            lock.unlock();
        }
    }

1000并发压测,发现并无超卖问题 

60440ac508af4380ba2ecc5de16754fa.png

c017a7803ee2469c9e89014ef00362c7.png

RLock原理

 RLock对象实现了 java.util.concurrent.locks.Lock 接口,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗检查锁的超时时间 是30秒钟,也可以通过修改 Config.lockWatchdogTimeout 来另行指定

  • RLock 对象完全符合Java的Lock规范。也就是说只有拥有锁的进程才能解锁,其他进程解锁则会抛出 IllegalMonitorStateException 错误
  • 另外Redisson还通过加锁的方法提供了 leaseTime 的参数来指定加锁的时间。超过这个时间后锁便自动解开了

其实Redisson底层的实现思路同样是hash+Lua脚本的实现方式,在源码中可以看到,下面列举一下加锁的源码

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
    

 公平锁 

基于Redis的Redisson分布式可重入公平锁也是实现了 java.util.concurrent.locks.Lock 接口的一 种 RLock 对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了 当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队 列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个 线程都处于等待状态,那么后面的线程会等待至少25秒
public void useFairLock() {
        RLock fairLock = redissonClient.getFairLock("fairLock");
//        fairLock.lock();

        // 10秒钟以后自动解锁
        // 无需调用unlock方法手动解锁
        fairLock.lock(10, TimeUnit.SECONDS);
        System.out.println("加锁成功"+Thread.currentThread().getName());

        // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
//        boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
//        fairLock.unlock();
    }


加锁成功http-nio-10010-exec-5
加锁成功http-nio-10010-exec-10

可以看到,公平锁会维护一个队列,按发送顺序依次加锁

22641eac255c4cd78315c6cda75863ee.png 

联锁

   在多个redis实例上获取锁,联锁所有的锁都上锁成功才算成功

  @Override
    public void useMutiLock() {

        RLock lock1 = redissonClient.getLock("lock1");
//        RLock lock2 = redissonClient.getLock("lock2");
        //联锁所有的锁都上锁成功才算成功
        RedissonMultiLock redissonMultiLock = new RedissonMultiLock(lock1);
        redissonMultiLock.lock();
        System.out.println("业务内容");
        redissonMultiLock.unlock();
    }

红锁

在多个节点上加锁,大部分节点获取锁成功就算成功


   public void useRedLock() {
        RLock lock1 = redissonClient.getLock("lock1");
//        RLock lock2 = redissonClient.getLock("lock2");
        RedissonRedLock readLock = new RedissonRedLock(lock1);
        // 红锁在大部分节点上加锁成功就算成功
        readLock.lock();
        System.out.println("业务内容");
        readLock.unlock();
    }

读写锁

对读和写上锁,RReadWriteLock实现了java.util.concurrent.locks.ReadWriteLock接口,读-读不阻塞

 public void useReadWriteLock() {
        /**
         * 读-读 不阻塞 读-写 阻塞 写-写 阻塞
         * RReadWriteLock实现了java.util.concurrent.locks.ReadWriteLock接口
         */
        RReadWriteLock rwlock = redissonClient.getReadWriteLock("readWrite");
        // 最常见的读锁
        rwlock.readLock().lock();
        // 写锁
        rwlock.writeLock().lock();
        // 10秒钟以后自动解锁无需调用unlock方法手动解锁
        rwlock.readLock().lock(10, TimeUnit.SECONDS);

        rwlock.writeLock().lock(10, TimeUnit.SECONDS);
        // 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
        // boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);

        rwlock.readLock().unlock();
        rwlock.writeLock().unlock();
    }

 信号量

资源限流并发工具类,java.util.concurrent.semaphore是单机版限流,RSemaphore是分布式限流,下面的Semaphore会始终限流3个资源

单机版 

package com.test.lockservice.service.impl;

import java.util.concurrent.Semaphore;

/**
 * @Author sl
 */
public class SemaphoreTest {
    public static void main(String[] args) {
        // 3个有限资源
        Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < 6; i++) {
            new Thread(()->{
                try{
                    // 获取资源
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "抢到车位");
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName()  +"离开车位");
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    // 释放资源
                    semaphore.release();
                }

            }).start();
        }

    }
}

Thread-1抢到车位
Thread-0抢到车位
Thread-4抢到车位
Thread-0离开车位
Thread-1离开车位
Thread-4离开车位
Thread-3抢到车位
Thread-5抢到车位
Thread-2抢到车位
Thread-5离开车位
Thread-3离开车位
Thread-2离开车位

分布式版

  public void useSemaphore() {
        /**
         * RSemaphore 采用了与java.util.concurrent.semaphore相似的接口
         * 资源限流信号量, 3个资源 6个线程, semaphore是单机版限流,RSemaphore是分布式限流
         */
        RSemaphore semaphore = redissonClient.getSemaphore("semaphore");
        try{
            semaphore.acquire();
        }catch(Exception e){
            e.printStackTrace();
        }finally {
            semaphore.release();
        }
    }

闭锁(CountDownLatch

CountDownLatch并发工具类,一个线程等待一组线程结束是一个做减法的倒计时器,RCountDownLatch 采用了与java.util.concurrent.CountDownLatch 相似的接口和用法,

 单机版

package com.test.lockservice.service.impl;

import java.util.concurrent.CountDownLatch;

/**
 * @Author sl
 */
public class CountDownLatchTest {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "\t上完自习");
                countDownLatch.countDown();
            },String.valueOf(i)).start();
        }
        // 班长等待所有线程同学走完在锁门
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t班长离开,锁门");
    }
}

1	上完自习
3	上完自习
4	上完自习
5	上完自习
2	上完自习
6	上完自习
main	班长离开,锁门

顺道介绍一下CyclicBarrier并发工具类,与CountDownLatch正好相反,它做的是加法

package com.test.lockservice.service.impl;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @Author sl
 */
public class CyclicBarrierTest {

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
            System.out.println("集齐了卡片,开始召唤神龙");
        });

        for (int i = 0; i < 7; i++) {
            String s = String.valueOf(i);
            new Thread(()->{
                System.out.println(Thread.currentThread().getName() + "\t 收集到第"+s+"卡片");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();

        }
    }
}

0	 收集到第0卡片
6	 收集到第6卡片
2	 收集到第2卡片
1	 收集到第1卡片
5	 收集到第5卡片
4	 收集到第4卡片
3	 收集到第3卡片
集齐了卡片,开始召唤神龙

分布式版

 public void useCountDownLatch() {
        /**
         * RCountDownLatch 采用了与java.util.concurrent.CountDownLatch 相似的接口和用法
         * 一个线程 等待一组线程完事
         * 班长等待所有同学走出门口在锁门 CountDownLatch是单机版 RCountDownLatch是分布式版
         */
        RCountDownLatch latch = redissonClient.getCountDownLatch("anyCountDownLatch");
        latch.trySetCount(6);
        latch.countDown();
        try{
            latch.await();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

关于zookeeper实现分布式锁,在本专栏zookeeper章节中做了简单介绍,就是创建临时顺序节点,值最小的就是锁。 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

分布式锁之redis实现 的相关文章

随机推荐

  • 软件测试分类

    一 软件测试的分类 1 按开发阶段 单元测试 集成测试 系统测试 验收测试 2 按测试实施组织 第三方 3 按测试执行方式 静态测试 动态测试 4 按是否查看代码 黑盒测试 白盒测试 灰盒测试 5 按是否手工执行划分 手工测试 自动化测试
  • subList方法踩坑记录

    前言 现有一个简化的需求 假设有一个点位列表 1 2 4 5 现在需要将该列表从中间分成两份 序列1 1 2 序列2 4 5 分开后在序列1的末尾添加一个元素 3 变成 1 2 3 问题复现 在编写下面的测试代码运行后 结果并不是想要的 p
  • 这可能是,Flutter 中最“强悍”的内存泄漏检测方案......

    近两年来 无论是创新型应用还是老牌旗舰型应用 都在或多或少地使用 Flutter 技术 然而 目前 Flutter 业务团队反馈最普遍的问题是 Flutter 内存占用过高 Flutter 内存占用过高原因比较复杂 需另开一个主题才能说清楚
  • git rebase和merge区别

    一 概述 merge和rebase 标题上的两个命令 merge和rebase都是用来合并分支的 这里不解释rebase命令 以及两个命令的原理 详细解释参考这里 下面的内容主要说的是两者在实际操作中的区别 1 1 什么是分支 分支就是便于
  • sqli-labs第十七关(post-报错注入)

    进入到十七关后发现这关有点不同 上面多了一行提示 密码重新设置 咱也不懂啥意思 在试了写常规注入的方法后 没有发现什么可以利用的点 于是决定去看看代码 虽然我不会php 先看这一部分有sql语句的 从users表中搜索出uname和psw
  • linux查找服务器大文件,查找Linux系统中的大文件和大目录方法

    查找大文件 eg 查找当前目录下大于10MB的文件 复制代码 代码如下 find type f size 10000k exec ls lh awk print 8 5 Sample output kde share apps akrega
  • Zmap的原理(回复包校验机制)

    Nmap使用的方法是发送请求然后监听回应 虽然请求可以同步进行 但为了记录每一个未回应请求需要大量开销 导致速度下降 而ZMap使用了不同的方法 它发生的是无状态请求 发送之后就忽略了 它不需要记录未回应请求的名单 而是在发送的数据包中编码
  • java中两个list对象取交集、差集

    在一般操作中 对于list集合取交集 差集 并集 比较简单 网上有很多例子 如 今天我们来说一下对于两个list集合该如何取交集与并集 如下两个集合 groupEntityList saveEntities groupEntityList是
  • JS正则判断多个连续相同字符

    var reg1 w 1 1 g 判断2个连续字符 var reg2 w 1 2 g 判断3个连续字符 var reg3 w 1 1 g 判断3个连续字符 var str aa 123AaAaAAA3 str match reg1 输出 a
  • SpringBoot学习笔记(一):先跑懂再说

    一 Spring Boot 入门 1 Spring Boot 简介 简化Spring应用开发的一个框架 整个Spring技术栈的一个大整合 J2EE开发的一站式解决方案 2 Spring Boot HelloWorld 一个功能 浏览器发送
  • 怎么样才能开期权账户

    为了保护投资者权益 上交所设定了50万的准入门槛 挡着了很多想入手期权交易的小伙伴 如果资金不够50万 那么有什么办法能零门槛参与期权呢 下文给大家介绍怎么样才能开期权账户的知识点 本文来自 期权酱 一 期权开户要什么条件 1 申请开户时保
  • android webview setwebviewclient,android – setWebViewClient和setWebChromeClient之间有什么区别?...

    从 source code Instance of WebViewClient that is the client callback private volatile WebViewClient mWebViewClient Instan
  • OpenCSV web下载csv文件demo

    OpenCSV web下载csv文件demo pom xml
  • 嵌入式Linux&Android开发-LCD屏幕调试

    目录 一 简介 二 开发流程 三 硬件说明 四 电子特性 五 关注启动时序 六 关注引脚 七 屏参适配 7 1 DTS 驱动配置 7 2 屏参配置 案例一 7 3屏参配置 案例二 7 4 屏参配置 案例三 7 5 屏参配置 案例四 7 6
  • 单元测试、集成测试、系统测试、验收测试

    本文是按照开发阶段划分测试技术 单元测试 单元测试是对软件组成单元进行测试 目的是检验软件基本组成单元的正确性 测试对象是软件设计的最小单位 模块 又称为模块测试 单元测试的实质是代码测代码 测试阶段 编码后或者编码前 TDD 编码前属于测
  • 树莓派笔记4:树莓派游戏机

    这次记录比较轻松的内容 将树莓派做成 游戏主机 当然这个主机只是具备模拟器功能而已 可以模拟街机 FC等平台上的游戏 最早要在树莓派上玩模拟器游戏需要手动安装和配置不同的模拟器 而现在国外很多爱好者专门制作了定制化的系统 直接把系统烧到树莓
  • latex插图\begin{minipage}强制左移\hspace命令

    事情是这样的 我在latex中插图 上面一张图是排列整整齐齐的图片 下面一张图就是我绘制的概率密度图 在使用latex插图的时候 因为概率密度图的纵坐标是有title的 所以会显得不整齐 如下图所示 在includegraphics前面添加
  • Inkscape 捕捉图标翻译

  • Docker Portainer 安装与报错处理

    安装docker 管理器 Portainer 最近在看spring cloud alibaba的时候 觉得docker是肯定要用的 然后找了个管理的docker的东东 比较方便的查询docker的情况 直接看操作吧 root localho
  • 分布式锁之redis实现

    docker安装redis 拉取镜像 docker pull redis 6 2 6 查看镜像 启动容器并挂载目录 需要挂在的data和redis conf自行创建即可 docker run restart always d v usr l