zookeeper选举流程源码分析

2023-10-27

zookeeper选举流程源码分析

选举的代码主要是在QuorumPeer.java这个类中。

它有一个内部枚举类,用来表示当前节点的状态。

   public enum ServerState {
        LOOKING, FOLLOWING, LEADING, OBSERVING;
    }

LOOKING: 当前节点在选举过程中

FOLLOWING:当前节点是从节点

LEADING: 当前节点是主节点

OBSERVING: 当前节点是观察者状态,这种状态的节点不参与选举的投票。

QuorumPeer有个run方法,就是用来根据当前节点不同的状态,进行不同的处理。

下面看下这段代码主要的框架

   @Override
    public void run() {
        updateThreadName();

        LOG.debug("Starting quorum peer");
 				// 这里是注册jmx消息,不用关注
 				//下面就是选举的框架代码了

        try {
      	//running 表示当前节点的状态,只要在运行过程中,就会一直根据当前节点的状态进行不同的处理
            while (running) {
              //getPeerState()用来获取当前节点的状态,就是上面提到的枚举类。
              //下面就会根据不同的状态进行不同的处理
                switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");
										......
                     //选举就是调用下面的这行代码来完成的。
                      //后面我们也就单独就这个代码来进行分析
                            setCurrentVote(makeLEStrategy().lookForLeader());
										......
                    break;
                case OBSERVING:
                  	......
                    //按照观察者的逻辑进行处理  
                    break;
                case FOLLOWING:
       							......
                    //按照从节点的逻辑进行处理
                    break;
                case LEADING:
										......
                    //按照主节点的逻辑进行处理
                    break;
                }
                start_fle = Time.currentElapsedTime();
            }
        } finally {
					......
        }
    }

上面代码的逻辑还是比较清楚的,就是一直在这几种状态之间处理。

每种状态的处理逻辑基本都是如下

                    try {
                      //处理业务逻辑,正常情况下,会一直在这里。
                      //除非当前的状态逻辑已经处理完毕,如LOOKING,或者抛出了异常,这时就需要重置状态
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                     	//重置状态
                      updateServerState();
                    }

下面我们看看上面选举的这行代码 setCurrentVote(makeLEStrategy().lookForLeader());。这行代码会调用具体执行选举的类执行具体的选举操作,并返回对应的投票信息,并设置成当前的投票信息。

默认的选举的是FastLeaderElection,对应的选举逻辑就在lookForLeader方法中。下面我们就直接去看看FastLeaderElectionlookForLeader方法吧。

选举的主要逻辑就是告诉其他节点。我是谁,我选谁做为主节点。

public Vote lookForLeader() throws InterruptedException {
				......
        try {
  					//recvset用来保存投票信息,
          //key表示选民身份,也就是这个票是谁投的(注意:每个节点只会有一个有效的投票,后面的投票会覆盖掉之前的投票)
          //value用来表示具体投票的内容
            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();

            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
						
            int notTimeout = finalizeWait;

            synchronized(this){
              	//每次投票前,会先更新这个logicalclock逻辑时钟,这个用来表示当前是第几次选举了,对比投票信息的时候会用到,这个很关键
                logicalclock.incrementAndGet();
              //首先给自己投一票
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info("New election. My id =  " + self.getId() +
                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));
          	//发送投票信息
            sendNotifications();

在上面的代码,首先会把logicalclock+1,表示当前是启动后的第几轮选取,这个参数是保存在内存中的,也就是每次启动都会从0开始。

那会不会出现节点之间logicalclock不同的情况呢,这个情况是有可能会出现的。不过后面选举过程中,相互发送消息也就会发送logicalclock,会和自己的logicalclock比较,进行修正。

在开始选举的时候,首先会给自己投一票。

会调用sendNotifications方法将投票者(自己)的信息和投票信息发出去。

会发送这些信息:

  • proposedLeader : 选举的主节点
  • proposedZxid: 选举的节点zxid,这个字段是long类型,前面32 bit表示epoch,后面32bit表示事务id
  • logicalclock:投票者的逻辑时钟
  • QuorumPeer.ServerState.LOOKING:投票者的状态(投票的状态肯定是looking)
  • sid: 投票者的id
  • proposedEpoch :选举节点的epoch,也就是proposedZxid的前面32 bit

下面看看具体的选举代码

					//如果当前节点一直是looking,且服务没有停止,就会一直进行选举流程
            while ((self.getPeerState() == ServerState.LOOKING) &&
                    (!stop)){
								// 获取其他节点发送过来的消息
                Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

      					//如果没有收到消息,就去检查下和其他节点的连接是否正常,尽力使消息能发送。
                if(n == null){
     						......
                } 
                // 验证收到消息的节点和它选举的主节点是否有效
                else if (validVoter(n.sid) && validVoter(n.leader)) {
     								//这里就会根据收到消息的节点状态进行分别进行处理
     								// 比如自己是后加入进来的,这时就已经有了leader节点,对应的也就有follow节点
     								// 也有可能大家都刚启动,或者主节点挂掉了,这时大家都会又会是looking状态
                    switch (n.state) {
                    // 如果对方节点是投票状态
                    case LOOKING:
                        // If notification > current, replace and send messages out
                        //首先比较logicalclock,如果对方的logicalclock比自己的大,就修正自己的`logicalclock`,同时清空自己的票箱,重新计票
                        if (n.electionEpoch > logicalclock.get()) {
                            logicalclock.set(n.electionEpoch);
                            recvset.clear();
                            // 这里会比较票的信息,如果对方选的leader节点的比自己大,就推举对方选的leader节点,否则还是将票投给自己
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();
                            // 如果自己的logicalclock 比对方的大,直接忽略对方的票
                        } else if (n.electionEpoch < logicalclock.get()) {
														......
                            break;
                             // 如果logicalclock相等,那就直接比较自己当前选出来的leader和对方选出来的leader进行比较,如果自己的大,就不做处理,如果对方的大,就更新自己的票,重新投票
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

												//在这里将对方的票扔进投票箱
                        // don't care about the version if it's in LOOKING state
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                        
                        //这里就对投票进行统计了,如果过半,就要设置leader了,不过在这之前,会再等一个时间,看看其他节点是否有选出更适合的leader。
                        //如果没有,那就设置对方节点选出来的leader为主节点,对比下leader是不是自己,如果是自己,就将自己的状态修改为leader,否则就修改成follow。同时保存当前leader信息
                       

                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {

                            // Verify if there is any change in the proposed leader
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, logicalclock.get(), 
                                        proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        //这种状态的节点是不参与投票的,所以对它的发送的投票信息进行忽略。
                        break;
                    case FOLLOWING:
                    case LEADING:
//如果对方是following或者leading,说明当前已经有主节点了,在这里就直接统计票数信息,并验证根据票数信息统计出来的leader节点和回应自己消息的自称leader节点 是不是同一个,如果是同一个,说明信息是吻合的,就会去设置自己的节点状态。需要注意的是,投票信息不但会发送给其他节点,也会给自己发送。所以这里会判断对方节点是否是当前节点。
													......
                        break;
                    default:
										......
                    }
                } else {
								......
                }
            }
            return null;
        } finally {
				......
        }
    }

比较节点大小也比较简单。

((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));

首先比较epoch,其次比较zxid,最后比较myid。

myid就是我们在zookeeper每个节点中设置myid文件中对应的值。

zxid是两部分,前32bit epoch,后32 bit 事务序号。在一个节点成为leader节点后,首先会将epoch的值+1,同时将事务序号设置成0。zxid是持久化写入文件的,所以重启也不会丢失。

logicalclock在内存中,所以每次启动都会从0开始。

给其他节点发送投票消息的时候,也会给自己发送,其他节点是通过网络发送,给自己是直接放到接收投票信息的队列。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

zookeeper选举流程源码分析 的相关文章

随机推荐

  • sqlserver不能直接create table as select ......

    在sqlserver 下想复制一张表的 想到oracle下直接create table xxx as select from 即可 但是结果却是错误的 baidu一下发现 sqlserver的语法是 select into tablenew
  • 十分钟入门以太和Opensea测试网批量发行NFT实战

    一 环境准备 1 注意 需合理上网 2 准备素材 准备一套多个属性元素的不一样的图层素材 比如10张背景图 10张face图 10张眼睛图层 10张头发图层等 每张图特性不一样 像素大小一样 比如500 500 背景透明 这样就可以随机组合
  • python 基础知识汇总

    hellow一 填空题 1 Python安装扩展库常用的是 工具 pip 2 Python标准库math中用来计算平方根的函数是 sqrt 3 Python程序文件扩展名主要有 和两种 其中后者常用于GUI程序 py pyw 4 Pytho
  • 用pointnet++分类自己的点云数据

    目录 一 简单介绍pointnet 1 1 三维数据的表示方法 1 2 pointnet算法 1 3 pointnet 算法的提出 二 pointnet 如何运行自己的数据集 2 1 确定数据集的基本情况 2 2 以点云分割为例 2 2 1
  • android从assets文件夹中读取xml文件

    Context getAssets openXmlResourceParser fileNameString 出现FileNotFoundException 认为assets文件夹的文件须用open 打开 不能用方法openXmlResou
  • Kafka框架学习笔记 尚硅谷

    Kafka框架学习笔记 尚硅谷 因为本人不是大数据方向的 但是公司一个项目用到了kafka 我就学习一下 如果笔记有什么不对的地方 敬请谅解 文章目录 Kafka框架学习笔记 尚硅谷 因为本人不是大数据方向的 但是公司一个项目用到了kafk
  • 深圳市人力资源和社会保障局关于用人单位招用就业困难人员申请补贴和奖励有关事项的通知

    各有关单位 为鼓励用人单位招用就业困难人员 促进困难群体就业 根据 深圳市人民政府关于进一步完善就业援助政策的通知 深府 2013 60号 深圳市人民政府关于做好当前和今后一段时期就业创业工作的实施意见 深府规 2017 12号 有关规定
  • android 异步加载图片缩略图

    建一个AsyncLoadedImage类继承AsyncTask异步加载类 调用publishProgress方法更新onProgressUpdate贮存缩略图信息到Adapter 监听Adapter Change实现异步加载缩略图 main
  • Go基础(复杂类型):函数的闭包

    函数的闭包 Go 函数可以是一个闭包 闭包是一个函数值 它引用了其函数体之外的变量 该函数可以访问并赋予其引用的变量的值 换句话说 该函数被 绑定 在了这些变量上 例如 函数 adder 返回一个闭包 每个闭包都被绑定在其各自的 sum 变
  • OpenCV快速傅里叶变换(FFT)用于图像和视讯流的模糊检测

    OpenCV快速傅里叶变换 FFT 用于图像和视频流的模糊检测 翻译自 OpenCV Fast Fourier Transform FFT for blur detection in images and video streams 原文链
  • PHP实现一个简单的登录和注册,以及实现方法和页面

    下面是一个简单的PHP代码示例 实现了登录和注册功能 首先 创建一个名为index php的文件 用于显示登录和注册表单 h2 登录 h2
  • Linux 安装JDK17

    1 官网下载JDK17 这里我们下载的是 x64 Compressed Archive版本 2 解压tar 文件 进入文件下载目录 自己定义 我这里都放在software cd usr local softwar 解压tar文件 tar v
  • 【毕业项目】自主设计HTTP

    博客介绍 运用之前学过的各种知识 自己独立做出一个HTTP服务器 自主设计WEB服务器 背景 目标 描述 技术特点 项目定位 开发环境 WWW介绍 网络协议栈介绍 网络协议栈整体 网络协议栈细节 与http相关的重要协议 HTTP背景知识补
  • 最强自动化测试框架Playwright(11)- 录制视频

    视频 使用playwright 您可以录制测试视频 录制视频 视频在测试结束时在浏览器上下文关闭时保存 如果手动创建浏览器上下文 请确保等待 browser context close context browser new context
  • 分布式任务调度可选方案

    1 除了基于jvm的java之处 新接触一个JVM语言 SCALA 一种同时面向脚本和面向函数的语言 spark大数据框架是基于scala语言 照着网络教程 简单的写了几个例子 感觉object class与java语境中还是有一定的差异
  • 2023美赛F题完整数据代码模型成品文章思路-- Green GDP

    论文摘要 模型和其他部分内容如下 摘要 现行的以GDP为核心的国民经济核算体系 由于忽略非市场产出 环境破坏 资源浪费方面的有关计算 这样的指标并不完整 由于经济活动中 对资源消耗和对环境的负面影响越来越大 而长期忽略这种负面影响的后果 高
  • Hexo在多台电脑上提交和更新

    文章目录 1 博客搭建 2 创建一个新文件夹new 用于上传hexo博客目录 3 github上创建hexo分支并设置为默认分支 创建hexo分支 将hexo分支设置为默认分支 4 进入新建的文件夹中git clone 再上传相关文件至he
  • navicat for mysql 连接 mysql 出现1251错误

    navicat for mysql下载地址 链接 https pan baidu com s 1Nh2ippFKHrWXnzPx hda8g 密码 fumf 客户端使用navicat for mysql 本地安装了mysql 8 0 但是在
  • CVE-2023-35843:NocoDB任意文件读取漏洞复现

    文章目录 NocoDB 存在任意文件读取漏洞CVE 2023 35843 0x01 前言 0x02 漏洞描述 0x03 影响范围 0x04 漏洞环境 0x05 漏洞复现 1 访问漏洞环境 2 构造POC 3 复现 0x06修复建议 Noco
  • zookeeper选举流程源码分析

    zookeeper选举流程源码分析 选举的代码主要是在QuorumPeer java这个类中 它有一个内部枚举类 用来表示当前节点的状态 public enum ServerState LOOKING FOLLOWING LEADING O