2. ZK客户端与服务端建立连接的过程(基于NIO)

2023-10-27

在上一篇《客户端启动源码分析》文章中讲到了客户端会使用两个线程(SendThread和EventThread)去协调处理客户端与服务端的通信和watchers事件的回调,原本打算在这篇文章去分析这两个线程是怎么相互纠缠的。但是写着写着发现在客户端连接就花了很大的篇幅,不如这篇把标题改成ZK客户端与服务端建立连接的过程,那我在下一篇文章中再去分析SendThread和EventThread。当然这篇文章中也介绍了SendThread在客户端建立连接过程中发挥的作用。

引例

首先还是由第一篇文章中的Test来作为例子

public class ZooKeeperTestClient extends ZKTestCase implements Watcher {
    protected String hostPort = "127.0.0.1:22801";
    protected static final String dirOnZK = "/test_dir";
    protected String testDirOnZK = dirOnZK + "/" + Time.currentElapsedTime();


    private void create_get_stat_test() throws IOException, InterruptedException, KeeperException {
        ZooKeeper zk = new ZooKeeper(hostPort, 10000, this);
        String parentName = testDirOnZK;
        String nodeName = parentName + "/create_with_stat_tmp";
        deleteNodeIfExists(zk, nodeName);
        deleteNodeIfExists(zk, nodeName + "_2");
        Stat stat = new Stat();
        //创建一个持久节点
        zk.create(nodeName, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
        assertNotNull(stat);
        assertTrue(stat.getCzxid() > 0);
        assertTrue(stat.getCtime() > 0);
        zk.close();
    }


    public synchronized void process(WatchedEvent event) {
        try {
            System.out.println("Got an event " + event.toString());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

先把涉及到的几个类的类图放出来,后面阅读的时候可做参考

类图:

1. 启动SendThread

在上一篇文章中最后讲到了客户端启动的时候调用SendThread#start()方法

    public void start() {
        //负责客户端和服务端的通信
        sendThread.start();
        //主要负责在客户端回调注册的Watchers进行通知处理
        eventThread.start();
    }

sendThread是一个线程,并且是ClientCnxn的内部类,条件反射地想到SendThread肯定有一个run方法,找到它:

        @Override
        public void run() {
			//省略部分代码
            while (state.isAlive()) {
               //省略部分代码
           }
        }

State#isAlive()

        public boolean isAlive() {
            return this != CLOSED && this != AUTH_FAILED;
        }

2. 状态初始化

可以看到run方法里面去监听了网络状态,这个state是由一个全局变量去标识的,只要状态不是关闭和认证失败的状态就会一直循环在那里,那么状态是什么时候初始化的呢,这要回到创建Zookeeper实例的时候:

ClientCnxn#changeZkState()

   
   
    volatile States state = States.NOT_CONNECTED;
    
   synchronized void changeZkState(ZooKeeper.States newState) throws IOException {
            if (!state.isAlive() && newState == States.CONNECTING) {
                throw new IOException(
                        "Connection has already been closed and reconnection is not allowed");
            }
            // It's safer to place state modification at the end.
            state = newState;
        }

由上面的流程知道,状态默认是NOT_CONNECTED,但在ZooKeeper实例化的时候就将状态(States)置为CONNECTING了,现在可以把SendThread的run方法拿出来。

public void run{
            while (state.isAlive()) {
                try {
                    if (!clientCnxnSocket.isConnected()) {
                        // don't re-establish connection if we are closing
                        if (closing) {
                            break;
                        }
                        if (rwServerAddress != null) {
                            serverAddress = rwServerAddress;
                            rwServerAddress = null;
                        } else {
                            serverAddress = hostProvider.next(1000);
                        }
                        onConnecting(serverAddress);
                        //开始连接服务
                        startConnect(serverAddress);
                        clientCnxnSocket.updateLastSendAndHeard();
                  }
                  //省略其他判断逻辑
            }
         }

由于初始状态是CONNECTING,那么首先会进入到第一个判断去连接服务:

3. 开始连接

请注意,接下来会在ClientCnxn和ClientCnxnSocketNIO两个类中跳来跳去,请抓稳!

ClientCnxn#startConnect()

  private void startConnect(InetSocketAddress addr) throws IOException {
            // initializing it for new connection
            changeZkState(States.CONNECTING);
            logStartConnect(addr);
			//省略部分代码
			//连接服务端
            clientCnxnSocket.connect(addr);
        }

connect方法是ClientCnxnSocket中的抽象方法,子类ClientCnxnSocketNIO中实现了这个方法:

ClientCnxnSocketNIO#connect()

   @Override
    void connect(InetSocketAddress addr) throws IOException {
        SocketChannel sock = createSock();
        try {
            registerAndConnect(sock, addr);
        } catch (UnresolvedAddressException | UnsupportedAddressTypeException | SecurityException | IOException e) {
            LOG.error("Unable to open socket to {}", addr);
            sock.close();
            throw e;
        }
        //是否初始化完成(是否连接成功)
        initialized = false;

        /*
         * Reset incomingBuffer
         */
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }


    void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
        sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
        //建立socket连接
        boolean immediateConnect = sock.connect(addr);
        if (immediateConnect) {
            sendThread.primeConnection();
        }
    }

连接成功后又会去调用SendThread#primeConnection()方法:

SendThread#primeConnection()

        void primeConnection() throws IOException {
            LOG.info(
                "Socket connection established, initiating session, client: {}, server: {}",
                clientCnxnSocket.getLocalSocketAddress(),
                clientCnxnSocket.getRemoteSocketAddress());
            isFirstConnect = false;
            long sessId = (seenRwServerBefore) ? sessionId : 0;
            //构造连接请求
            ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
            //讲请求报文添加到outgoingQueue队列
            outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
            //告知ClientCnxnSocket连接请求已经发送
            clientCnxnSocket.connectionPrimed();
            LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
        }

ClientCnxnSocketNIO#connectionPrimed():

   void connectionPrimed() {
        sockKey.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    }
 

好了,这里先暂停一下,咱们总结一下上面过程做了哪些事情:

  1. 初始化状态为CONNECTING
  2. 建立Socket连接
  3. 构造连接请求Packet
  4. 发送请求报文
  5. 将ClientCnxnSocketNIO的全局变量sockKey置为SelectionKey.OP_READ | SelectionKey.OP_WRITE,即设置读写事件的监听,因为后面需要监听服务端的返回,并且会影响到SendThread的run方法后面的逻辑。

4. 处理服务端连接响应

上面只是分析了SendThread#run()方法的一部分,这时候只是建立了Socket连接,但是还不能发送读写请求,接下来继续分析run方法剩下的部分:
SendThread#run()

public void run(){
	//省略部分代码,上面文章中已经分析了一部分,还有一部分这篇文章可忽略
   clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
}

又跑到了ClientCnxnSocketNIO#doTransport()方法:


   @Override
    void doTransport(
        int waitTimeOut,
        Queue<Packet> pendingQueue,
        ClientCnxn cnxn) throws IOException, InterruptedException {
        //等待服务端返回
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        // Everything below and until we get back to the select is
        // non blocking, so time is effectively a constant. That is
        // Why we just have to do this once, here
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    updateSocketAddresses();
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                doIO(pendingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
            if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                enableWrite();
            }
        }
        selected.clear();
    }

很简单地会想到服务端响应之后会走到:

doIO(pendingQueue, cnxn);

看看这个方法里面做了什么:

    void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        if (sockKey.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
                                               + Long.toHexString(sessionId)
                                               + ", likely server has closed socket");
            }
            if (!incomingBuffer.hasRemaining()) {
                incomingBuffer.flip();
                if (incomingBuffer == lenBuffer) {
                    recvCount.getAndIncrement();
                    readLength();
                 	//第一次接受服务端的响应肯定会走到这else if里面来
                } else if (!initialized) {
               		 //读取服务端返回的结果
                    readConnectResult();
                    enableRead();
                    if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
					//省略部分代码
                    initialized = true;
                } 
         		  //省略部分代码
            }
        }

    }   

由上面分析过的代码知道initialized的初始值为false,不行可以去上面找,在ClientCnxnSocketNIO#connect() 中

所以后面走到了readConnectResult()中,处理服务端的相应:

ClientCnxnSocket#readConnectResult()


    void readConnectResult() throws IOException {
        if (LOG.isTraceEnabled()) {
            StringBuilder buf = new StringBuilder("0x[");
            for (byte b : incomingBuffer.array()) {
                buf.append(Integer.toHexString(b)).append(",");
            }
            buf.append("]");
            if (LOG.isTraceEnabled()) {
                LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf.toString());
            }
        }

        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        ConnectResponse conRsp = new ConnectResponse();
        //反序列化
        conRsp.deserialize(bbia, "connect");

        // read "is read-only" flag
        boolean isRO = false;
        try {
            isRO = bbia.readBool("readOnly");
        } catch (IOException e) {
            // this is ok -- just a packet from an old server which
            // doesn't contain readOnly field
            LOG.warn("Connected to an old server; r-o mode will be unavailable");
        }

        this.sessionId = conRsp.getSessionId();
        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
    }

ClientCnxn#onConnected():

       void onConnected(
            int _negotiatedSessionTimeout,
            long _sessionId,
            byte[] _sessionPasswd,
            boolean isRO) throws IOException {
            negotiatedSessionTimeout = _negotiatedSessionTimeout;
            //省略部分代码
            //读写客户端不能与只读服务端建立连接
            if (!readOnly && isRO) {
                LOG.error("Read/write client got connected to read-only server");
            }

            readTimeout = negotiatedSessionTimeout * 2 / 3;
            connectTimeout = negotiatedSessionTimeout / hostProvider.size();
            hostProvider.onConnected();
            sessionId = _sessionId;
            sessionPasswd = _sessionPasswd;
            changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED);
            seenRwServerBefore |= !isRO;
            LOG.info(
                "Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}",
                clientCnxnSocket.getRemoteSocketAddress(),
                Long.toHexString(sessionId),
                negotiatedSessionTimeout,
                (isRO ? " (READ-ONLY mode)" : ""));
            KeeperState eventState = (isRO) ? KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
            eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, eventState, null));
        }

主要是这一行:

changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED);

这里就将状态置为CONNECTED了,后面就可以在SendThread里面响应其他的请求了啦。

这里再小小总结一下:

  1. 读取服务端的响应数据并反序列化
  2. 判断服务端的状态是否是ReadOnly的状态
  3. 如果不是ReadOnly状态就将状态置为CONNECTED

好了以上大概就是整个客户端与服务端建立连接的过程了,当然ClientCnxnSocket默认实现类由两个,本偏只是就ClientCnxnSocketNIO去分析,ClientCnxnSocketNIO是基于NIO的实现,还有另一个是基于Netty的实现,有兴趣的可以看看,后面有时间的话也会去分析。

5. 流程图

附赠整个流程图
流程:
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

2. ZK客户端与服务端建立连接的过程(基于NIO) 的相关文章

  • vtk创建点

    使用vtk库创建三维空间中的点 引言 开发环境 示例一 项目结构 实现代码 运行效果 示例二 项目结构 实现代码 运行效果 总结 引言 本文仅适合初学者 本文不提供vtk动态库的生成 以及在QtCreator中的引进vtk时的配置 本文先由
  • 牛人项目失败的总结

    tom lt 遇到的失败项目比较多 让人郁闷 仔细分析原因 主要在于 1 项目开始需求不明确 领导决定动手 就开始启动项目 造成和客户需要差距太大 导致失败 2 需求变更没有控制 客户提出新的需求 或者改变原来的需求 没有一个好的控制流程
  • 【模式识别4】YOLO目标检测数据集xml格式转txt格式

    YOLO目标检测数据集xml格式转txt格式 1 转换前的xml格式 2 xml格式转txt格式代码 2 1 源代码 2 2 需要修改的地方 3 转换后的txt格式 代码资源 voc2txt py 1 转换前的xml格式 如果我们使用Lab

随机推荐

  • 男人必读 --看了永不后悔,女人想看也可以进去……

    1 事业永远第一 虽然金钱不是万能的 但没有钱是万万不能的 虽然这句话很俗 但绝对有道理 所以30岁之前 请把你大部分精力放在你的事业上 事业远比爱情重要 如果说事业都不能永恒 那么爱情只能算是昙花一现 记 得那首 没有钱你会爱我吗 的歌吗
  • 基于Koa的微信第三方平台使用及搭建说明(一)

    2019独角兽企业重金招聘Python工程师标准 gt gt gt 前言 公司内部使用说明文档 记录一下 目的在于使内部人员了解和熟悉项目 不在于搭建教程 看不明白的就别看了 一 关于 Node Node是什么 官网上给出的定义是 一个搭建
  • Mac 好用的shell终端

    1 搜索关键词 my zsh 2 http ohmyz sh
  • 【docker】docker-compose实战之MySQL安装与配置

    唠嗑部分 首先说一下 MySQL是否适合容器化 数据库首先要解决数据的持久化问题 以确保服务宕机之后数据不会丢失 docker提供的数据卷虽然可以让数据保存在宿主机上 但是容器的 volumn数据卷的设计是围绕 Union FS镜像层提供持
  • scratch颜色实验(R+G+B)/自制素材/少儿编程scratch教研教案课件课程素材脚本

    scratch颜色实验 wmv
  • 在线接口文档管理工具推荐,支持在线测试,生成漂亮的http文档

    易文档 英文名叫easydoc 是新出来的文档管理平台 跟市面的那些有所不同 这个不管是编写体验还是预览 专业性和美观性都会大大超越其他的 看下他的预览效果 市面很多http接口文档的编写都是直接写markdown文档 这种编写起来特别麻烦
  • VBA 向文件写入编码为UTF-8的数据

    Sub Test 需要引入 ActiveX Data Objects ADO 组件 即 Microsoft ActiveX Data Objects 2 5 Library 或者 Microsoft ActiveX Data Objects
  • qt右键弹出菜单的一些实现方法

    在qt中 关于右键弹出菜单 有几种实现方法 1 在窗口初始化时 修改指定控件的右键菜单策略 然后把右键点击信号和某个槽函数连接 设置btnNew按钮的右键菜单策略 ui gt btnNew gt setContextMenuPolicy Q
  • 有趣的telnet站点

    这个都能做电影 不知道他们花了多长的时间 telnet towel blinkenlights nl 下面这个是其他的一些站点 我没有看过 留个链接 http www telnet org htm places htm
  • 记第一次拆机

    两周前的周日 也不知道抽了什么风萌生了拆笔记本电脑的想法 可能是由于从购买之后风扇都没清理过的原因吧 笔记本电脑品牌联想 型号G480 2013年大一暑假购置的机子 也就是普通的上网本吧 玩游戏确实有点卡 比如剑灵 好了 言归正传 下面开始
  • 卡西欧计算机的闹铃怎么取消,卡西欧g-shock怎么关闹钟

    卡西欧g shock怎么关闹钟 连续按mode按键 通常是左下角按键 直至液晶屏幕出现al1 al2 al3等等字样 然后 第二显示屏会显示 on 按 adjust 就可以改为 of 就关闭了 卡西欧g shock闹钟怎么设置 1 按MOD
  • 一文搞定Postman(菜鸟必看)

    什么是Postman Postman是一个可扩展的 API 测试工具 可以快速集成到 CI CD 管道中 它于 2012 年作为 Abhinav Asthana 的一个副项目启动 旨在简化测试和开发中的 API 工作流程 API 代表应用程
  • java高并发多线程架构_java架构师指南 高并发和多线程的区别

    高并发和多线程 总是被一起提起 给人感觉两者好像相等 那它们之间究竟有什么区别呢 1 多线程 多线程是java的特性 也是java架构师必须掌握的一项技术 因为现在cpu都是多核多线程的 可以同时执行多个任务 为了提高JVM的执行效率 Ja
  • 搭建Obsidian+picGo+Lsky Pro图床

    搭建Obsidian picGo Lsky Pro图床 0 前言 去年心血来潮买了个小主机 搭建了家庭服务器 安装了PVE系统 散热拉胯 性能不足目前只创建了个黑群晖系统 搭建一个图床 方便日常笔记工作 1 软件 1 1 Obsidian
  • 浅谈App的性能优化

    浅谈App的性能优化 2018 01 02 说到 Android 系统手机 大部分人的印象是用了一段时间就变得有点卡顿 有些程序在运行期间莫名其妙的出现崩溃 打开系统文件夹一看 发现多了很多文件 然后用手机管家 APP 不断地进行清理优化
  • Git第十讲 Git如何正确使用log快速查找内容/提交

    在Git中 你可以使用不同的命令来快速查找指定内容或指定提交 下面我将介绍两种常用的方法 快速查找指定内容 要快速查找包含特定内容的文件或代码行 可以使用 git grep 命令 它类似于常见的 grep 命令 但是专门用于搜索Git仓库中
  • 以太坊交易确认数如何获取

    以太坊和比特币一样 都有一个最长链的概念 因此也有一个交易确认数的概念 当一个以太坊交易所在区块被新加入区块链时 该交易的确认数为1 之后每增加一个区块 该交易的确认数加1 显然 一个以太坊交易的确认数越多 就意味着该交易在区块链中埋的越深
  • html css js实现抽奖,原生(纯)js+html+css实现移动端抽奖转盘系统

    这是我前个月使用纯javascript html写出的一个抽奖转盘系统 按理来说 我应该在当时做完这个小系统 就应该立即写bike总结才对 但是本人之前没有在网上写博客的习惯 平时总结更加习惯写在纸上 但是现在发现卸载网上可能更好 博客中有
  • 【第26篇】Swin Transformer

    文章目录 摘要 1 简介 2 相关工作 3 方法 3 1 整体架构 3 2 基于移动窗口的自注意力 3 3 架构变体 4 实验 4 1 ImageNet 1K 上的图像分类 4 2 COCO 上的物体检测 4 3 ADE20K 上的语义分割
  • 2. ZK客户端与服务端建立连接的过程(基于NIO)

    ZK客户端与服务端建立连接的过程 引例 1 启动SendThread 2 状态初始化 3 开始连接 4 处理服务端连接响应 5 流程图 在上一篇 客户端启动源码分析 文章中讲到了客户端会使用两个线程 SendThread和EventThre