ZooKeeper-Curator-InterProcessMutex分布式锁源码

2023-11-07


注意:临时节点下不能创建临时子节点

InterProcessMutex(可重入互斥锁)

具体流程图:
在这里插入图片描述

	// 入口1
    @Override
    public void acquire() throws Exception {
        if (!internalLock(-1, null)) {
            throw new IOException("Lost connection while trying to acquire lock: " + basePath);
        }
    }
	//入口2
    @Override
    public boolean acquire(long time, TimeUnit unit) throws Exception {
        return internalLock(time, unit);
    }

    private boolean internalLock(long time, TimeUnit unit) throws Exception {
        /*
           Note on concurrency: a given lockData instance
           can be only acted on by a single thread so locking isn't necessary
        */

        Thread currentThread = Thread.currentThread();

		//可重入,是通过lockCount(AtomicInteger)实现的,加锁-递增,解锁-递减
        LockData lockData = threadData.get(currentThread);
        if (lockData != null) {
            // re-entering
            lockData.lockCount.incrementAndGet();
            return true;
        }
		//第一次加锁,需要在ZooKeeper创建临时顺序节点
        String lockPath = internals.attemptLock(time, unit, getLockNodeBytes());
        if (lockPath != null) {
            LockData newLockData = new LockData(currentThread, lockPath);
            threadData.put(currentThread, newLockData);
            return true;
        }

        return false;
    }
	org.apache.curator.framework.recipes.locks.LockInternals#attemptLock

	//下面创建临时顺序节点后的 事件监听器
    private final CuratorWatcher revocableWatcher = new CuratorWatcher() {
       	@Override
        public void process(WatchedEvent event) throws Exception {
        	// 如果事件类型 = 没有数据更改
            if ( event.getType() == Watcher.Event.EventType.NodeDataChanged ) {
                checkRevocableWatcher(event.getPath());
            }
        }
    };

	//检查可撤销的事件监听器
    private void checkRevocableWatcher(String path) throws Exception {
        RevocationSpec entry = revocable.get();
        if (entry != null) {
            try {
                byte[] bytes = client.getData().usingWatcher(revocableWatcher).forPath(path);
                if (Arrays.equals(bytes, REVOKE_MESSAGE)) {
                    entry.getExecutor().execute(entry.getRunnable());
                }
            } catch (KeeperException.NoNodeException ignore) {
                // ignore
            }
        }
    }
    
	String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception {
        final long startMillis = System.currentTimeMillis();
        final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
        final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
        int retryCount = 0;

        String ourPath = null;
        boolean hasTheLock = false;
        boolean isDone = false;
        while (!isDone) {
            isDone = true;

            try {
                // 创建临时顺序节点
                ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
                // 如果获得锁成功,hasTheLock = true
                hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
            } catch (KeeperException.NoNodeException e) {
                // gets thrown by StandardLockInternalsDriver when it can't find the lock node
                // this can happen when the session expires, etc. So, if the retry allows, just try it all again
                if (client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper())) {
                    isDone = false;
                } else {
                    throw e;
                }
            }
        }
		//获得锁成功,返回路径,否则返回null
        if ( hasTheLock ) {
            return ourPath;
        }

        return null;
    }
    
 	private final Watcher watcher = new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            client.postSafeNotify(LockInternals.this);
        }
    };

	org.apache.curator.framework.CuratorFramework#postSafeNotify
	default CompletableFuture<Void> postSafeNotify(Object monitorHolder) {
        return runSafe(() -> {
            synchronized (monitorHolder) {
           		// 唤醒所有线程
                monitorHolder.notifyAll();
            }
        });
    }

	private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception {
        boolean haveTheLock = false;
        boolean doDelete = false;
        try {
            if (revocable.get() != null) {
            	//添加可撤销事件监听器
                client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
            }
			// 状态=启动,并未获得锁
			// InterProcessMutex两种请求锁:acquire()/acquire(long time, TimeUnit unit)
			// 释放本地锁,线程处于等待状态
            while ((client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock) {
            	//获得排序后的子节点集合
                List<String> children = getSortedChildren();
                //获得节点名
                String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash 加1-包含斜杠
				
				//通过节点 是否 子节点集合中的首个,判断是否获得锁
                PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
                if (predicateResults.getsTheLock()) {
                    haveTheLock = true;
                } else {
                	// 完整路径
                    String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
					//加本地静态类锁
                    synchronized (this) {
                        try {
                            // use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
                            //使用getData()而不是exists()来避免留下不需要的监视者,这是一种资源泄漏类型
                            //添加事件监控器(请求安全通知事件监听),可看上方watcher
                            client.getData().usingWatcher(watcher).forPath(previousSequencePath);
                            // 等待millisToWait 时间,超时删除节点
                            if (millisToWait != null) {
                                millisToWait -= (System.currentTimeMillis() - startMillis);
                                startMillis = System.currentTimeMillis();
                                // 如果超时,在finally代码块,删除节点
                                if (millisToWait <= 0) {
                                    doDelete = true;    // timed out - delete our node
                                    break;
                                }
								
                                wait(millisToWait);//释放本地锁
                            } else {
                                wait();//释放本地锁
                            }
                        } catch (KeeperException.NoNodeException e) {
                            // it has been deleted (i.e. lock released). Try to acquire again
                        }
                    }
                }
            }
        } catch (Exception e) {
            ThreadUtils.checkInterrupted(e);
            doDelete = true;
            throw e;
        } finally {
            if (doDelete) {
                deleteOurPath(ourPath);
            }
        }
        return haveTheLock;
    }
	org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver

	// 判断是否获得锁,通过判断 节点名 是否 children(子节点s)中第一个节点,通过maxLeases判断(默认为1)
	@Override
    public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception {
        // 节点名在children中的下表
        int ourIndex = children.indexOf(sequenceNodeName);
        // 校验ourIndex<0
        validateOurIndex(sequenceNodeName, ourIndex);
		// ourIndex < maxLeases = true,即表明是children首个节点,获得锁成功
        boolean getsTheLock = ourIndex < maxLeases;
        // 如果获得锁失败,从子节点集合中获得 上一个下标的节点名;pathToWatch 即要监听的节点名
        String pathToWatch = getsTheLock ? null : children.get(ourIndex - maxLeases);

        return new PredicateResults(pathToWatch, getsTheLock);
    }

    @Override
    public String createsTheLock(CuratorFramework client, String path, byte[] lockNodeBytes) throws Exception {
        String ourPath;
        // 创建临时顺序节点(CreateMode.EPHEMERAL_SEQUENTIAL)
        // lockNodeBytes不为空,即需要存数据
        if ( lockNodeBytes != null ) {
            ourPath = client.create().creatingParentContainersIfNeeded().
            	withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
            	.forPath(path, lockNodeBytes);
        } else {
            ourPath = client.create().creatingParentContainersIfNeeded().
            	withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
            	.forPath(path);
        }
        return ourPath;
    }

ConnectionStateListener

应用场景:当客户端因为网络等原因断开连接,当网络恢复后,zk客户端与集群会重新连接,但是恢复连接后,临时节点并不会自动复原。
这种情况下我们可以监听重连事件,在重连后手动恢复临时节点。
用于实现客户端与zk集群连接的状态发生变化时执行回调监听功能。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

ZooKeeper-Curator-InterProcessMutex分布式锁源码 的相关文章

  • SPNEGO 密码身份验证问题

    我已将我的应用程序配置为通过 SPNEGO 与 Websphere 使用 Kerberos 身份验证 这是详细信息 krb5 conf libdefaults default realm ABC MYCOMPANY COM default
  • 如何在url请求中发送数组

    我的要求如下 我想给出演员姓名 开始日期 结束日期并获取他在该时期出演的所有电影 因此 我的服务请求是这样的 http localhost 8080 MovieDB GetJson name Actor startDate 20120101
  • 最快的高斯模糊实现

    如何以最快的速度实施高斯模糊 http en wikipedia org wiki Gaussian blur算法 我要用Java来实现它 所以GPU http en wikipedia org wiki Graphics processi
  • 无法访问“不安全”java方法的java表达式语言

    我正在开发一个项目 让用户向服务器提交小 脚本 然后我将执行这些脚本 有很多脚本语言可以嵌入到Java程序中 例如mvel ognl uel clojure rhino javascript等 但是 据我所知 它们都允许脚本编写者调用Jav
  • 记录共享和映射的诊断上下文

    据我所知 其他人做了什么来解决 Commons Logging 项目 针对 NET 和 Java 不支持映射或嵌套诊断上下文这一事实 执行摘要 我们选择直接使用实现者日志框架 在我们的例子中为 log4j 长答案 您是否需要一个抽象日志框架
  • 这个等待通知线程语义的真正目的是什么?

    我刚刚遇到一些代码 它使用等待通知构造通过其其他成员方法与类中定义的线程进行通信 有趣的是 获取锁后 同步范围内的所有线程都会在同一锁上进行定时等待 请参见下面的代码片段 随后 在非同步作用域中 线程执行其关键函数 即 做一些有用的事情1
  • 具有 JPA 持久性的 Spring 状态机 - 存储库使用

    我试图弄清楚如何轻松使用 Spring 状态机 包括使用 JPA 进行持久化 这是我正在处理的问题 不兼容的数据类型 工厂和持久性 在程序的某个时刻 我想使用连接到用户的状态机 有用于此目的的存储库 项目spring statemachin
  • 如何在不反编译的情况下更改已编译的.class文件?

    我想更改 class 文件方法 我安装 JD Eclipse Decompiler 并打开 class 文件 我添加了一些代码并保存 class 文件 但是 class 文件没有改变 我不知道如何使用反编译器 如果可能的话 如何在不使用反编
  • Java 中 JButton 的击键/热键

    最初我使用 JMenu 并建立热键以使用加速器工作 它运行得很好 现在我想在 JButton 中实现相同的行为 但我陷入困境 这是我编写的代码 请分享您的想法 以便我可以走上正确的道路 import javax swing import j
  • C 与 C++ 中的 JNI 调用不同?

    所以我有以下使用 Java 本机接口的 C 代码 但是我想将其转换为 C 但不知道如何转换 include
  • 使用单独的线程在java中读取和写入文件

    我创建了两个线程并修改了 run 函数 以便一个线程读取一行 另一个线程将同一行写入新文件 这种情况会发生直到整个文件被复制为止 我遇到的问题是 即使我使用变量来控制线程一一执行 但线程的执行仍然不均匀 即一个线程执行多次 然后控制权转移
  • BadPaddingException:无效的密文

    我需要一些帮助 因为这是我第一次编写加密代码 加密代码似乎工作正常 但解密会引发错误 我得到的错误是 de flexiprovider api exceptions BadPaddingException 无效的密文 in the 解密函数
  • 如何减去两个 XmlGregorianCalendar 对象来创建一个 Duration 对象?

    我想计算两个时间之间的差值XmlGregorianCalendar对象 从而创建一个Duration object 但我还没有找到执行减法的干净方法 你会怎么做 那应该是 DatatypeFactory newDuration xgc2 t
  • javax.media.jai 类的公共下载?

    这是一个非常简单的问题 我一直在寻找可以下载 javax media jai 库的地方 我找到了 jai imageio 库 但是我发现的所有其他 jai 内容要么已经过时 2008 年及之前 然后我遇到了登录屏幕 是否有 javax me
  • 让 Hibernate 和 SQL Server 与 VARCHAR 和 NVARCHAR 良好配合

    我目前正在大型数据库的某些表中启用 UTF 8 字符 这些表已经是 MS SQL 类型 NVARCHAR 此外 我还有几个使用 VARCHAR 的字段 Hibernate 与 JDBC 驱动程序的交互存在一个众所周知的问题 例如 参见在 h
  • Axis2 错误:要输出的文本中的空白字符 (0x4) 无效

    我创建了一个 Java 客户端 使用 Axis2 1 7 6 作为代码生成器与 SOAP Web 服务进行交互 问题在于客户端的某些输入抛出异常并显示以下消息 org apache axis2 AxisFault Invalid white
  • spring data jpa复合键重复键记录插入导致更新

    我有一个具有复合键的实体 我试图通过使用 spring data jpa 存储库到 mysql 数据库来持久化它 如下所示 Embeddable public class MobileVerificationKey implements S
  • Java/MongoDB 按日期查询

    我将一个值作为 java util Date 存储在我的集合中 但是当我查询以获取两个特定日期之间的值时 我最终得到的值超出了范围 这是我的代码 插入 BasicDBObject object new BasicDBObject objec
  • Java中单例的其他方式[重复]

    这个问题在这里已经有答案了 只是我在考虑编写单例类的其他方法 那么这个类是否被认为是单例类呢 public class MyClass static Myclass myclass static myclass new MyClass pr
  • mybatis:使用带有 XML 配置的映射器接口作为全局参数

    我喜欢使用 XML 表示法来指定全局参数 例如连接字符串 我也喜欢 Mapper 注释 当我尝试将两者结合起来时 我得到这个例外 https stackoverflow com questions 4263832 type interfac

随机推荐

  • 如何给信号加噪声,matlab

    Matlab信号上叠加噪声和信噪比的计算 http www ilovematlab cn thread 54155 1 1 html 出处 MATLAB中文论坛 在信号处理中经常需要把噪声叠加到信号上去 在叠加噪声时往往需要满足一定的信噪比
  • 深度学习模型学习笔记

    我作为新手的一些学习笔记 backbone 主干网络 主干网络大多时候指的是提取特征的网络 其作用就是提取图片中的信息 供后面的网络使用 这些网络经常使用的是Resnet VGG等 而不是我们自己设计的网络 因为这些网络已经证明了在分类等问
  • Centos LVS DR模式详细搭建过程

    目录 前言 1 1 LVS环境组网 2 2 ipvsadm安装前准备 2 3 httpd与ipvsadm下载 3 4 LVS负载均衡配置 4 5 真实WEB服务器配置及arp抑制 5 6 LVS负载均衡测试 6 7 附 arp抑制参数 7
  • Unity 鼠标拖拽控制旋转物体

    Unity 鼠标拖拽控制旋转物体 需求 鼠标左键拖拽 控制物体旋转 摄像机固定不动 可以使用下面的代码 但是当摄像机移动到物体背后时 出现拖拽旋转方向与摄像机在正面时相反的问题 gameObj transform Rotate transf
  • FISCO BCOS 区块链(一)

    目录 一 搭建 FISCO BCOS 链 1 安装 openssl curl 依赖 2 安装 build chain 脚本 二 配置控制台 1 下载java jdk 2 安装控制台 3 拷贝配置文件以及证书 4 启动控制台 三 Java S
  • linux crontab 文件位置和日志位置

    一 文件位置 位置一般在 var spool cron 下 如果你是root用户 那下面有个root文件 建议日常备份 避免误删除导致crontab 文件丢失 二 日志文件位置 默认情况下 crontab中执行的日志写在 var log下
  • python 使用 with open() as 读写文件的操作方法

    读文件 要以读文件的模式打开一个文件对象 使用Python内置的open 函数 传入文件名和标示符 1 gt gt gt f open E python python test txt r 标示符 r 表示读 这样 我们就成功地打开了一个文
  • R语言倾向性评分:加权

    之前已经介绍过倾向性评分匹配 propensity score matching 倾向性评分回归和分层 R语言倾向性评分 回归和分层 R语言倾向性评分 匹配 今天继续介绍倾向性评分最后一个重要的部分 倾向性评分加权 主要介绍两种加权方法 逆
  • Anaconda环境下配置GPU版本pytorch和tensorflow、安装pycharm较全教程,避免踩坑

    一 安装Anaconda 1 安装anaconda的教程 1条消息 Anaconda的下载及安装 详细图文教程 伏城无嗔的博客 CSDN博客 anaconda如何下载https blog csdn net qq 45281807 artic
  • Linux开发板ping通WSL2并且能够通过无线网连接外网(虚拟机同样适用)

    Linux开发板ping通WSL2并且能够通过无线网连接外网 虚拟机同样适用 前言 安装Hyper V 创建虚拟网卡 将WSL的网络桥接 共享WIFI 修改共享网段的IP VMWare虚拟机 参考 前言 由于笔者入手了一块linux开发板
  • Linux下tty驱动框架

    在Linux下编写串口驱动程序 需要使用Linux提供的tty和serial驱动模块 以下是一个简单的tty串口驱动程序示例 使用C语言编写 include
  • 生成子集——位向量法

    生成0 n序列的子集 对于0 n的每一个值在集合中都有存在和不存在两种状态 所以递归每个值的存在状态即可生成子集 include
  • pyspark环境搭建,连接hive

    pyspark环境搭建 连接hive 一 环境搭建 1 1环境 1 1 1 集群环境 1 1 2 系统环境配置 1 1 3 host文件配置 1 1 4hive hadoop的配置文件 1 1 5 pyspark安装 2 1环境测试 二 项
  • 从windows下切换到Linux

    想从windows下切换到Linux 感觉好难 虽然早有这个想法 但是真正做起来后感觉还是有点难度 主要是命令行的界面感觉不习惯 感觉没有了鼠标就什么都不会干了 尤其是在vi下的时候 连上下左右都不会了 但是换了vim tiny后感觉我又会
  • ML算法基础——分类模型评估与调参

    文章目录 一 分类模型评估 1 准确率 2 精确率和召回率 2 1 混淆矩阵 2 2 精确率 Precision 与召回率 Recall 2 3 分类模型评估API 2 4 贝叶斯模型评估实例 二 模型的选择与调优 1 交叉验证 2 网格搜
  • SLVS-EC接口学习

    SLVS summarize 一 概述 SLVS EC高速串行接口技术 在CIS和DSP 数字信号处理器 之间实现了高帧率的宽带像素数据传输 SLVS EC引入了一个优化的数据包格式和控制协议 几乎没有冗余 而且结构简单 仅由两层组成 LI
  • Windows下部署Appium教程(Android App自动化测试框架搭建)

    摘要 1 appium是开源的移动端自动化测试框架 2 appium可以测试原生的 混合的 以及移动端的web项目 3 appium可以测试ios android firefox os 4 appium是跨平台的 可以用在osx windo
  • 【SpringCloudAlibaba学习 01】创建父工程项目

    文章目录 SpringBoot SpringCloud SpringCloudAlibaba版本对应关系 第一步 创建父工程 第二步 配置父工程Maven依赖 第三步 本地项目上传Git仓库 这一步可不做 1 登录自己的Git 2 进入个人
  • 详解深度语义匹配模型DSSM

    所谓语义匹配 就是在语义上衡量文本的相似度 在产业界有很多的应用需求 例如 在FAQ场景中需要计算用户输入与标问之间的相似度来寻找合适的答案 本文介绍一种经典的语义匹配技术 DSSM 主要用于语料的召回和粗排 作者 编辑 小Dream哥 1
  • ZooKeeper-Curator-InterProcessMutex分布式锁源码

    这里写自定义目录标题 InterProcessMutex 可重入互斥锁 ConnectionStateListener 注意 临时节点下不能创建临时子节点 InterProcessMutex 可重入互斥锁 具体流程图 入口1 Overrid