锁->分布式锁->准实时方案

2023-11-03

概述:并发量由低到高,单机到集群,java对锁、分布式锁、准实时方案的概要实现;全文以商品抢购为例。

目录

1. 锁

2.分布式锁

2.1高可用

2.2性能调优

3.准实时方案

3.1性能提升

3.2高可用

正文:

1.锁

lock和synchronized均可,单机;
实例:

import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 锁控制类
 * @author Administrator
 * @date 20230523
 */
@RestController
@RequestMapping("/locks")
public class LockController {

    /**
     * 商品库存
     */
    public static int productInventoryNums = 100;

    @PostMapping("/standalone")
    public synchronized String standAlone(){
        if(productInventoryNums > 0){
            productInventoryNums--;
        } else {
            return "fail";
        }
        return "success";
    }
}```
```

2.分布式锁

先结论再说原理(此处暂未涉及数据强一致性,放在原理中进行分析);

结论:

package com.xxxx.distributelocks.controller;

import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * 商品抢购
 * 并发
 * @author py
 * @date 20230526
 */
@RestController
@RequestMapping("/products")
public class ProductsController {

    @Autowired
    private Redisson redisson;
    /**
     * 商品剩余库存
     * 从数据库或缓存中读取
     */
    public static int productNums = 100;

    /**
     * 下单
     * @return
     */
    @PostMapping("/buyproduct")
    public String order(String productId){
        //获取锁
        RLock rLock = redisson.getLock(productId);
        try {
            //加锁
            rLock.lock();
            if(productNums > 0){
                productNums--;
            } else {
                //商品已售罄
                return "fail";
            }
            //更新库存
            //库存写入缓存或数据库
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //释放锁
            rLock.unlock();
        }
        return "success";
    }
}

此处可对部分场景进行优化,第一种是读多写少,或许这也是大多数互联网公司采用的方案;
1)读多写少

`

/**
     * 下单
     * @param productId
     * @return
     */
    @PostMapping("/buyproduct1")
    public String writeOrder(String productId){
        //获取锁
        RReadWriteLock rwLock = redisson.getReadWriteLock(productId);
        RLock writeLock = rwLock.writeLock();
        try {
            //此处为写场景,故加写锁
            writeLock.lock();
            if(productNums > 0){
                productNums--;
            } else {
                //商品已售罄
                return "fail";
            }
            //更新库存
            //库存写入缓存或数据库
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //释放锁
            writeLock.unlock();
        }
        return "success";
    }

    /**
     * 读商品
     * @param productId
     * @return
     */
    @PostMapping("/readproduct")
    public String readOrder(String productId){
        //获取锁
        RReadWriteLock rwLock = redisson.getReadWriteLock(productId);
        RLock readLock = rwLock.readLock();
        try {
            //此处为读场景,故加写锁
            readLock.lock();
            if(productNums > 0){
                return  productNums + "";
            } else {
                //商品已售罄
                return "已售罄";
            }
            //更新库存
            //库存写入缓存或数据库
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //释放锁
            readLock.unlock();
        }
        return "售罄";
    }

2)读多写多场景
既然读多写也多,那还读写缓存有什么意义,直接用binlog更新缓存不香吗?

原理:

原理概述:通过setnx命令更新redis后(因为redis命令执行的单线程与数据分布式存储的特性,是分布式锁可用的前提),更新之后定时更新加锁的key(也就是业内说的锁续命逻辑);

`

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        return this.evalWriteAsync(this.getRawName(),
 LongCodec.INSTANCE, command, 
"if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);",
 Collections.singletonList(
this.getRawName()), 
new Object[]{unit.toMillis(leaseTime), 
this.getLockName(threadId)}
);
    }

其中lua脚本本身命令的原子性操作;

然后再看锁续命逻辑
`

public RFuture tryLockAsync() {
return this.tryLockAsync(Thread.currentThread().getId());
}

private  RFuture tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture ttlRemainingFuture;
        if (leaseTime != -1L) {
            ttlRemainingFuture = this.tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        } else {
            ttlRemainingFuture = this.tryLockInnerAsync(waitTime, this.internalLockLeaseTime, TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        }

        CompletionStage<Long> f = ttlRemainingFuture.thenApply((ttlRemaining) -> {
            if (ttlRemaining == null) {
                if (leaseTime != -1L) {
                    this.internalLockLeaseTime = unit.toMillis(leaseTime);
                } else {
                    this.scheduleExpirationRenewal(threadId);
                }
            }

            return ttlRemaining;
        });
        return new CompletableFutureWrapper(f);
    }
protected void scheduleExpirationRenewal(long threadId) {
        RedissonBaseLock.ExpirationEntry entry = new RedissonBaseLock.ExpirationEntry();
        RedissonBaseLock.ExpirationEntry oldEntry = (RedissonBaseLock.ExpirationEntry)EXPIRATION_RENEWAL_MAP.putIfAbsent(this.getEntryName(), entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);

            try {
                this.renewExpiration();
            } finally {
                if (Thread.currentThread().isInterrupted()) {
                    this.cancelExpirationRenewal(threadId);
                }

            }
        }

    }

添加监听定时回调逻辑;

与netty中监听回调有异曲同工之妙,netty中是虽然并没有减少总处理时间,但却提高了吞吐量,将线程尽可能活跃起来;

3.准实时方案

此处暂时分析原理与优化方案,代码实例后续补充;
准实时方案,顾名思义,服务降级,在业务允许的延迟范围内进行计算,比如消息队列存储消息,storm、flink等下游消费,计算输出;

优化方案:

此处针对部分场景说明;
第一步:数据分流
数据分流,比如可根据用户id强制将每个用户的计算数据分到指定机器上;
第二步:降级、加锁
200K缓存或200条记录,200毫秒超时入库等读写降级;
单机可用sychronized或lock;
第三步:锁优化
客户端并发持续上升,单机扛不住,计算单机计算量,多台机器分布式锁计算,锁优化如上。

数据一致性

很多业务场景要求数据抗风险能力极强,比如商品交易金钱一致性等;
异常情况:如redis节点在写入数据后还未来得及同步到从节点时主节点突然宕机情况,此时从节点需晋升为主节点但却丢失锁数据;
至于解决方案,业内通常采用的有:
1)zookeeper
2)redlock
原理概要:
1)其中zookeeper集群相对redis集群不同之处在于zookeeper集群在主从同步时只有超过半数节点同步成功后才会将请求响应回客户端,并且从节点在晋升为主节点时一定是同步成功锁的节点晋升成功,而redis是主节点写入成功后则给客户端响应结果,并且从晋升主的过程中投票机制只能大概率保证数据偏移量最大节点晋升成功;
故如果对性能要求高建议redis,对数据一致性要求高建议zookeeper;
2)redlock在业内虽存在部分争议,但其实现仍可圈可点,其实现原理为采用多台完全独立的redis节点(通常为奇数),只有半数机器同步成功后才响应到客户端,此处与zookeeper原理大同小异;

因个人水平有限,时间精力有限,后续提升待完善,抛砖引玉,望大家多多提出改善之处;

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

锁->分布式锁->准实时方案 的相关文章

  • 按下按钮并在java中的新窗口中打开文件

    我创建了一个 JFrame 并放置了一个文本字段和按钮 在文本字段中我放置了从文本文件读取的名称 我知道我想单击按钮并打开一个已知窗口 我想在其中放置名称 其他信息来自同一个文件 这是我的代码 这是我的主框架 package Fronten
  • 如何在java中将数组值排序为循环格式?

    我的数组值如下 String value 1 2 3 4 5 6 7 8 9 10 假设如果我将值 5 传递给 tat 数组 它应该按如下顺序排序 5 6 7 8 9 10 1 2 3 4 怎么办 有人帮忙吗 感谢你 你需要的就是所谓的轮换
  • 如何在 JavaFX 中连接可观察列表?

    我所说的串联是指获得一个新列表 该列表侦听所有串联部分的更改 方法的目的是什么FXCollections concat ObservableList
  • 两个整数乘积的模

    我必须找到c c a b mod m a b c m 是 32 位整数 但 a b 可以超过 32 位 我正在尝试找出一种计算 c 的方法 而不使用 long 或任何 gt 32 位的数据类型 有任何想法吗 如果m是质数 事情可以简化吗 注
  • 如何在 JPQL 或 HQL 中进行限制查询?

    在 Hibernate 3 中 有没有办法在 HQL 中执行相当于以下 MySQL 限制的操作 select from a table order by a table column desc limit 0 20 如果可能的话 我不想使用
  • 如何在 Java 中向时间戳添加/减去时区偏移量?

    我正在使用 JDK 8 并且玩过ZonedDateTime and Timestamp很多 但我仍然无法解决我面临的问题 假设我得到了格式化的Timestamp在格林威治标准时间 UTC 我的服务器位于某处 假设它设置为Asia Calcu
  • Android studio - 如何保存先前活动中选择的数据

    这是我的代码片段 这Textview充当按钮并具有Onclicklistner在他们 当cpu1000时Textview单击它会导致cpu g1000其代码如下所示的类 public class Game 1000 extends AppC
  • Runtime.exec 处理包含多个空格的参数

    我怎样才能进行以下运行 public class ExecTest public static void main String args try Notice the multiple spaces in the argument Str
  • 断言 Kafka 发送有效

    我正在使用 Spring Boot 编写一个应用程序 因此要写信给 Kafka 我这样做 Autowired private KafkaTemplate
  • 如何检查某个元素是否存在于一组项目中?

    In an ifJava中的语句如何检查一个对象是否存在于一组项目中 例如 在这种情况下 我需要验证水果是苹果 橙子还是香蕉 if fruitname in APPLE ORANGES GRAPES Do something 这是一件非常微
  • 如何在java中将日期格式从YYMMDD更改为YYYY-MM-DD? [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我从机器可读代码中获取日期格式为 YYMMDD 如何将其更改为 YYYY MM DD 例如我收到 871223 YYMMDD 我想把它改成
  • Sun 在 EDT 之外做 GUI 工作的演示?

    我正在看SplashDemo java http download oracle com javase tutorial uiswing examples misc SplashDemoProject src misc SplashDemo
  • 如何仅从 Firestore 获取最新更新的数据?

    在 Firestore 上发现任何更改时始终获取整个文档 如何只获取最近更新的数据 这是我的数据 我需要在第一次加载时在聊天中按对象顺序 例如 2018 09 17 30 40 msg和sendby 并且如果数据更新则仅获取新的msg和se
  • Java Applet 中的 Apache FOP - 未找到数据的 ImagePreloader

    我正在研究成熟商业产品中的一个问题 简而言之 我们使用 Apache POI 库的一部分来读取 Word DOC 或 DOCX 文件 并将其转换为 XSL FO 以便我们可以进行标记替换 然后 我们使用嵌入到 Java 程序中的 FOP 将
  • Akka 与现有 java 项目集成的示例

    如果我已经有现有的javaWeb 应用程序使用spring and servlet容器 将 Akka 集成到其中的正确方法是什么 就像我将会有Actor1 and Actor2互相沟通的 开始使用这些演员的切入点是什么 例如 1 把它放在那
  • 在Java中运行bat文件并等待

    您可能会认为从 Java 启动 bat 文件是一项简单的任务 但事实并非如此 我有一个 bat 文件 它对从文本文件读取的值循环执行一些 sql 命令 它或多或少是这样的 FOR F x in CD listOfThings txt do
  • 为什么\0在java中不同系统中打印不同的输出

    下面的代码在不同的系统中打印不同的输出 String s hello vsrd replace 0 System out println s 当我在我的系统中尝试时 Linux Ubuntu Netbeans 7 1 它打印 When I
  • java 中的蓝牙 (J2SE)

    我是蓝牙新手 这就是我想做的事情 我想获取连接到我的电脑上的蓝牙的设备信息并将该信息写入文件中 我应该使用哪个 api 以及如何实现 我遇到了 bluecove 但经过几次搜索 我发现 bluecove 不能在 64 位电脑上运行 我现在应
  • MiniDFSCluster UnsatisfiedLinkError org.apache.hadoop.io.nativeio.NativeIO$Windows.access0

    做时 new MiniDFSCluster Builder config build 我得到这个异常 java lang UnsatisfiedLinkError org apache hadoop io nativeio NativeIO
  • 由 Servlet 容器提供服务的 WebSocket

    上周我研究了 WebSockets 并对如何使用 Java Servlet API 实现服务器端进行了一些思考 我没有花费太多时间 但在使用 Tomcat 进行一些测试时遇到了以下问题 如果不修补容器或至少对 HttpServletResp

随机推荐

  • 这是最简单的java输出表情

    public static void main String args TODO Auto generated method stub System out println o 执行结果 不要质疑欧 我们java就是这么简单 适合刚入jav
  • 【面试总结】AI音频降噪方向相关面试题总结

    前情提要 相同的内容我也发布在了知乎上 由于本人也参与过AI音频降噪的相关项目 所以在面试的过程中也有很多相关的问题 这里提前吐槽一下 虽然Rnnoise这个模型效果不怎么好 但是这个方案相当于是这个领域的开辟的工程方案 所以有相当多的人会
  • java三种方式实现文件的上传

    1 实现文件的上传可以有好多途径 最简单的就是用sun公司提供的File类 可以简单的实现文件的上传和显示 try InputStream stream file getInputStream 把文件读入 Savefilepath requ
  • 两年外包生涯做完,感觉自己废了一半....

    先说一下自己的情况 大专生 17年通过校招进入湖南某软件公司 干了接近2年的点点点 今年年上旬 感觉自己不能够在这样下去了 长时间呆在一个舒适的环境会让一个人堕落 而我已经在一个企业干了五年的功能测试 已经让我变得不思进取 谈了1年的女朋友
  • java boolean类型占多少字节

    今天面试问到了这个问题 java中boolean类型到底占多少字节呢 到网上搜了下 最后采用了这个答案 答 我的结论是 1 boolean a true 这个a在JVM中占4个字节即 32位 2 boolean b new boolean
  • 【js基础】怎样理解闭包

    在JavaScript中 在ES6出现之前 只有函数作用域和全局作用域 在正常情况下 外界是无法访问函数内部变量的 但是在函数中 如果我们返回了另一个函数 这个返回的函数使用了外层函数的变量 那么外界能够通过返回的函数 获取外界函数内部的变
  • STM32下载程序的三种方法(串口、ST-LINK、 ST-LINK Utility)

    ST LINK v2接线及下载程序 ST Link V2 ST Link v2是STM8 STM32系列单片机的在线仿真器和下载器 STM8采用SWIM接口模式 STM32采用的是SWD接口模式 因此ST Link出生就带有两种接口模式 S
  • Vue-i18n框架学习总结

    Vue框架 Vue i18n学习总结 1 概述 Vue I18n 是 Vue js 的国际化插件 格局比较大 具体怎么解释还是不太好说 直接看用法就能明白 简单说一下为什么叫这个名字 internationalization i 中间的18
  • Unity Toggle组件踩坑使用笔记

    项目中需要用到排序功能 两种排序 一个型号 一个是评分 当用户点击型号或者评分的时候 物品列表中的物品需要重新排序 有点类似游戏中的背包 希望武器按照品质或者强化等级排序 最简单的方法是制作两个Button 同属同一个View 通过中介者模
  • LiveCharts遇到的问题及解决

    LiveCharts遇到的问题及解决 LiveCharts遇到的问题及解决 1 如何设置横纵轴分隔符为虚线 2 如何添加横纵轴线 1 如何设置横纵轴分隔符为虚线
  • cadence 旋转快捷键_cadence原理图快捷键整理

    Allegro Design Entry CIS 原理图 1 shift 鼠标滚轮 左右移动 2 Ctrl 鼠标滚轮 放大缩小 3 Alt 鼠标滚轮 上下移动 4 按下鼠标滚轮可任意方向拖动图纸 可以一直保持按下状态或者按一下松开 5 CT
  • vscode 标签的使用

    使用标签就可以快速跳转到某一段代码 十分方便 安装 首先 我们需要安装 设置快捷键 shift command p 调出命令行 输入bookmark 即可看到标签的相关指令 生成一个标签 设置一个你喜欢的快捷键 这代表 在光标所在的行上添加
  • LeetCode 5926. 买票需要的时间

    有 n 个人前来排队买票 其中第 0 人站在队伍 最前方 第 n 1 人站在队伍 最后方 给你一个下标从 0 开始的整数数组 tickets 数组长度为 n 其中第 i 人想要购买的票数为 tickets i 每个人买票都需要用掉 恰好 1
  • 软件显示获取服务器更新失败,闪耀暖暖获取更新服务器失败的解决方法

    今天是闪耀暖暖国服正式上线的日子 很多玩家都想第一时间进入游戏试玩 但是频繁有玩家出现网络连接失败的提示 这可愁坏了很多玩家 那么出现这个问题我们要怎么解决呢 下面就跟我一起来看看闪耀暖暖获取更新服务器失败的解决方法吧 一 官方服务器超载
  • unzip 错误 checkdir error: cannot create ctchain

    在mac中用unzip命令解压时出现下面错误 may Desktop SO unzip chain zip Archive chain zip checkdir error cannot create ctchain Illegal byt
  • 函数(1)

    目录 一 函数是什么 二 函数的分类 库函数 自定义函数 三 函数的参数 实际参数 实参 形式参数 形参 四 函数的调用 传值调用 传址调用 五 结束语 本章需要了解的重点主要包括以下几点 1 函数是什么 2 库函数 3 自定义函数 4 函
  • Day123.ElasticSearch:CAP定理、集群搭建、架构原理及分片、倒排索引、面试题

    目录 一 CAP定理 二 ES集群 1 搭建集群 2 head 插件安装 3 集群测试 4 核心概念 二 架构原理及分片 一 ElasticSearch 分片 二 分片控制 三 分片原理 1 倒排索引 2 文档搜索 3 近实时搜索 缓存传递
  • Vue组件缓存之keep-alive正确使用姿势

    先来看一个项目中的需求 作为苦逼的前端开发者 我们无时无刻都要面对产品经理提的各种需求 比如下图这个场景 场景 从首页的点击导航进入列表页 列表页点击列表进入 该 数据详情页 从详情页返回 希望列表页缓存 不重新渲染数据 这样会提高用户体验
  • ROCKCHIP-Rv1126安装ARM64-ARCH-ARM-DEBIAN系统

    1 deboot qemu arm debian 64位 内核文件系统 安装依赖软件 sudo apt get install debian archive keyring gcc aarch64 linux gnu bison flex
  • 锁->分布式锁->准实时方案

    概述 并发量由低到高 单机到集群 java对锁 分布式锁 准实时方案的概要实现 全文以商品抢购为例 目录 1 锁 2 分布式锁 2 1高可用 2 2性能调优 3 准实时方案 3 1性能提升 3 2高可用 正文 1 锁 lock和synchr