3.1 ZK客户端创建节点源码解析上(基于NIO)

2023-10-27

前言

众所周知!
Zookeeper是一个分布式协调工具,我们能利用它特性来做特定的事情,如,利用创建节点的唯一性和有序性可以用来实现分布式锁、leader选举、分布式队列(消息),利用临时节点及其事件监听可以用来做缓存、分布式配置中心、注册中心等,其实目前最常见的就是拿它用来做注册中心。那我们今天来看看为什么能利用他来做注册中心,实现原理是怎样的。

1. 为什么能用来做注册中心

很多小伙伴在公司肯定用过Dubbo,目前比较火的RPC框架,Dubbo的注册中心可以有很多种,如Consul、Zookeeper、Nacos和Redis等,那它为什么能同时支持这么多不同的框架,得益于它牛逼的SPI机制+URL驱动,贫僧扒Dubbo这块源码的时候惊呆了,还可以这么玩,有兴趣的可以去Dubbo官网然后再结合源码看看,绝对受益。

回归正题,其实Zookeeper在Dubbo中是用来记录生产者和消费者节点信息以及动态感知节点信息变化的,在Dubbo源码中可以看到其使用ZK客户端(Curator)与ZK服务端交互,Curator是对Zookeeper客户端的封装。

    @Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
 

上面这个方法摘自Dubbo的ZookeeperRegistry#doRegister()方法,Dubbo服务注册时会在Zookeeper上创建一个节点,默认是临时节点,并且同时会添加一个监听事件,关于监听事件的代码上面没有贴出来。这个节点上会附带关于这个服务Provider的很多信息,如接口名、ip地址、端口等。

2. 创建节点

不要嫌弃前戏太长,下面可以回到Zookeeper源码里面来了,创建节点的时候都做了些什么事情,一起来扒扒看。

2.1 ZooKeeper#create()方法

    public String create(
        final String path,
        byte[] data,
        List<ACL> acl,
        CreateMode createMode,
        Stat stat,
        long ttl) throws KeeperException, InterruptedException {
        final String clientPath = path;
        //校验路径
        PathUtils.validatePath(clientPath, createMode.isSequential());
        EphemeralType.validateTTL(createMode, ttl);
        validateACL(acl);

        final String serverPath = prependChroot(clientPath);
        RequestHeader h = new RequestHeader();
        setCreateHeader(createMode, h);
        Create2Response response = new Create2Response();
        //封装请求
        Record record = makeCreateRecord(createMode, serverPath, data, acl, ttl);
        //提交请求
        ReplyHeader r = cnxn.submitRequest(h, record, response, null);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        if (cnxn.chrootPath == null) {
            return response.getPath();
        } else {
            return response.getPath().substring(cnxn.chrootPath.length());
        }
    }

这个create方法和Curator的入参是对应不上的,因为Curator对Zookeeper#create()方法有封装,可以看到在create方法中,先是对请求校验,然后封装请求,最后将请求提交到ClientCnxn的一个队列中去,关于CientCnxn在上篇文章中也提到了,现在看看它的submitRequest()方法里面做了什么:

ClientCnxn#submitRequest()

   public ReplyHeader submitRequest(
        RequestHeader h,
        Record request,
        Record response,
        WatchRegistration watchRegistration,
        WatchDeregistration watchDeregistration) throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        Packet packet = queuePacket(
            h,
            r,
            request,
            response,
            null,
            null,
            null,
            null,
            watchRegistration,
            watchDeregistration);
        synchronized (packet) {
        	//如果设置了请求超时
            if (requestTimeout > 0) {
                // Wait for request completion with timeout
                waitForPacketFinish(r, packet);
            } else {
                // Wait for request completion infinitely
                while (!packet.finished) {
                   //无限等待
                    packet.wait();
                }
            }
        }
        if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
            sendThread.cleanAndNotifyState();
        }
        return r;
    }

上面的方法分为三个步骤:

  1. 创建一个Packet添加到ClientCnxn的outgoingQueue队列中
  2. 如果设置了请求超时时间,那么限时等待请求及响应处理完成,后面在处理响应的时候会唤醒阻塞在这里的线程,下面会讲到。
  3. 如果没有设置,则无限等待

2.2 ClientCnxn#queuePacket()方法

  public Packet queuePacket(
        RequestHeader h,
        ReplyHeader r,
        Record request,
        Record response,
        AsyncCallback cb,
        String clientPath,
        String serverPath,
        Object ctx,
        WatchRegistration watchRegistration,
        WatchDeregistration watchDeregistration) {
        Packet packet = null;

        // Note that we do not generate the Xid for the packet yet. It is
        // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
        // where the packet is actually sent.
        //创建数据包
        packet = new Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        packet.watchDeregistration = watchDeregistration;
        // The synchronized block here is for two purpose:
        // 1. synchronize with the final cleanup() in SendThread.run() to avoid race
        // 2. synchronized against each packet. So if a closeSession packet is added,
        // later packet will be notified.
        synchronized (state) {
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                //添加到outgoingQueue队列
                outgoingQueue.add(packet);
            }
        }
        //告诉ClientCnxn有新消息来了
        sendThread.getClientCnxnSocket().packetAdded();
        return packet;
    }

上面有三个步骤:

  1. 创建数据包Packet

  2. 将创建好的Packet添加到ClientCnxn的outgoingQueue队列中,这个队列是专门放待发送的数据包的,后面ClientCnxnSocket的实现类会从这个队列中拿数据包来发送,这是个典型的生产者-消费者模型。

    private final LinkedBlockingDeque outgoingQueue = new LinkedBlockingDeque();
    这里要注意一下,ClientCnxnSocket的实现类(ClientCnxnSocketNIO)和CientCnxn中都有一个outgoingQueue队列,但是这两个都是引用同一个实例。可以看下SendThread#run()方法的第一行

  3. 通知ClientCnxn的通信类(ClientCnxnSocket实现类)有消息来了。

        @Override
        void packetAdded() {
            wakeupCnxn();
        }
    
        private synchronized void wakeupCnxn() {
            selector.wakeup();
        }
    

    唤醒了ClientCnxnSocketNIO#doTransport()方法中的select操作。

2.3 ClientCnxnSocketNIO#doTransport()方法

接下来重点讲一下ClientCnxnSocketNIO中的doTransport()方法

   void doTransport(
        int waitTimeOut,
        Queue<Packet> pendingQueue,
        ClientCnxn cnxn) throws IOException, InterruptedException {
        //上面的wakeup会唤醒阻塞在这里的select操作
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        // Everything below and until we get back to the select is
        // non blocking, so time is effectively a constant. That is
        // Why we just have to do this once, here
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    updateSocketAddresses();
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                doIO(pendingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
            if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
            //注册写监听
                enableWrite();
            }
        }
        selected.clear();
    }

假设我们这是建立连接后创建的第一个节点,那么上面的select被唤醒后,会走到下面的这段:

        if (sendThread.getZkState().isConnected()) {
        //sendThread.tunnelAuthInProgress()判断与客户端与服务端的认证是不是还在进行中
            if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                enableWrite();
            }
        }

ClientCnxnSocketNIO#findSendablePacket()方法:

    private Packet findSendablePacket(LinkedBlockingDeque<Packet> outgoingQueue, boolean tunneledAuthInProgres) {
        if (outgoingQueue.isEmpty()) {
            return null;
        }
        // If we've already starting sending the first packet, we better finish
        //拿到队头的数据包
        if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) {
            return outgoingQueue.getFirst();
        }
        // Since client's authentication with server is in progress,
        // send only the null-header packet queued by primeConnection().
        // This packet must be sent so that the SASL authentication process
        // can proceed, but all other packets should wait until
        // SASL authentication completes.
        //等待身份验证完成,将空头数据包放到队头
        Iterator<Packet> iter = outgoingQueue.iterator();
        while (iter.hasNext()) {
            Packet p = iter.next();
            if (p.requestHeader == null) {
                // We've found the priming-packet. Move it to the beginning of the queue.
                iter.remove();
                outgoingQueue.addFirst(p);
                return p;
            } else {
                // Non-priming packet: defer it until later, leaving it in the queue
                // until authentication completes.
                LOG.debug("Deferring non-priming packet {} until SASL authentication completes.", p);
            }
        }
        return null;
    }

上面的方法主要是拿到排在outgoingQueue队列队头的数据包。同时如果身份认证没完成得等待身份认证完成,等待期间将空头报文全部移到队头,即优先发送空头数据包。还记得建立连接的时候发送的报文吗?这个时候发送的就是null-header packet。

outgoingQueue内容不为空时就会在ClientCnxnSocketNIO中的doTransport()方法中调用enableWrite()方法注册写事件:
ClientCnxnSocketNIO#enableWrite()


    synchronized void enableWrite() {
        int i = sockKey.interestOps();
        //是否注册过写事件
        if ((i & SelectionKey.OP_WRITE) == 0) {
            sockKey.interestOps(i | SelectionKey.OP_WRITE);
        }
    }

那接下来会走到doTransport()方法中的这一行了:

doIO(pendingQueue, cnxn);

ClientCnxnSocketNIO#doIO()

    void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        if (sockKey.isWritable()) {
            Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
		//省略部分代码
            if (p != null) {
                updateLastSend();
                // If we already started writing p, p.bb will already exist
                if (p.bb == null) {
                    if ((p.requestHeader != null)
                        && (p.requestHeader.getType() != OpCode.ping)
                        && (p.requestHeader.getType() != OpCode.auth)) {
                        p.requestHeader.setXid(cnxn.getXid());
                    }
                    p.createBB();
                }
                //发送报文
                sock.write(p.bb);
                if (!p.bb.hasRemaining()) {
                    sentCount.getAndIncrement();
                    outgoingQueue.removeFirstOccurrence(p);
                    if (p.requestHeader != null
                        && p.requestHeader.getType() != OpCode.ping
                        && p.requestHeader.getType() != OpCode.auth) {
                        synchronized (pendingQueue) {
                            pendingQueue.add(p);
                        }
                    }
                }
            }
            		//省略部分代码
        }
    }

因为注册了写事件,那么会走到doIO()的sockKey.isWritable()判断里面来。这里分为这几个步骤:

  1. 从outgoingQueue拿到一个数据包(Packet):正在发送的优先>连接和认证时的空头报文次之>队头的业务报文最后
  2. 发送报文
  3. 将发送完的报文从outgoingQueue队列中移除
  4. 将发送完的报文添加到pendingQueue队列中,这个队列放的是等待服务端响应报文。

3. 读取响应

3.1 ClientCnxnSocketNIO#doIO()

还是这个doIO方法,其他的流程和上面发送的流程是一样的,select监听有没有读事件发生,有的话就会到这个doIO方法里面来。

  void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        if (sockKey.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
                                               + Long.toHexString(sessionId)
                                               + ", likely server has closed socket");
            }
            //是否已经读完
            if (!incomingBuffer.hasRemaining()) {
                incomingBuffer.flip();
                if (incomingBuffer == lenBuffer) {
                    recvCount.getAndIncrement();
                    readLength();
                    //还没初始化
                } else if (!initialized) {
                    readConnectResult();
                    enableRead();
                    if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                    initialized = true;
                } else {
                	//处理响应
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }
    }

会走到下面这行:

sendThread.readResponse(incomingBuffer);

3.2 SendThread#readResponse()


        void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();

            replyHdr.deserialize(bbia, "header");
            switch (replyHdr.getXid()) {
            case PING_XID:
                LOG.debug("Got ping response for session id: 0x{} after {}ms.",
                    Long.toHexString(sessionId),
                    ((System.nanoTime() - lastPingSentNs) / 1000000));
                return;
              case AUTHPACKET_XID:
                LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));
                if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                    changeZkState(States.AUTH_FAILED);
                    eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.AuthFailed, null));
                    eventThread.queueEventOfDeath();
                }
              return;
            case NOTIFICATION_XID:
                LOG.debug("Got notification session id: 0x{}",
                    Long.toHexString(sessionId));
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");

                // convert from a server path to a client path
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if (serverPath.compareTo(chrootPath) == 0) {
                        event.setPath("/");
                    } else if (serverPath.length() > chrootPath.length()) {
                        event.setPath(serverPath.substring(chrootPath.length()));
                     } else {
                         LOG.warn("Got server path {} which is too short for chroot path {}.",
                             event.getPath(), chrootPath);
                     }
                }

                WatchedEvent we = new WatchedEvent(event);
                LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
                eventThread.queueEvent(we);
                return;
            default:
                break;
            }

            // If SASL authentication is currently in progress, construct and
            // send a response packet immediately, rather than queuing a
            // response as with other packets.
            if (tunnelAuthInProgress()) {
                GetSASLRequest request = new GetSASLRequest();
                request.deserialize(bbia, "token");
                zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
                return;
            }

            Packet packet;
            synchronized (pendingQueue) {
                if (pendingQueue.size() == 0) {
                    throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
                }
                packet = pendingQueue.remove();
            }
            /*
             * Since requests are processed in order, we better get a response
             * to the first request!
             */
            try {
                if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                    packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
                    throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid()
                                          + " with err " + replyHdr.getErr()
                                          + " expected Xid " + packet.requestHeader.getXid()
                                          + " for a packet with details: " + packet);
                }

                packet.replyHeader.setXid(replyHdr.getXid());
                packet.replyHeader.setErr(replyHdr.getErr());
                packet.replyHeader.setZxid(replyHdr.getZxid());
                if (replyHdr.getZxid() > 0) {
                    lastZxid = replyHdr.getZxid();
                }
                if (packet.response != null && replyHdr.getErr() == 0) {
                //反序列化
                    packet.response.deserialize(bbia, "response");
                }

                LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);
            } finally {
                finishPacket(packet);
            }
        }

这里简单介绍一下:

  1. 上面见到的xid可以看做请求的标识,客户端每发送一次请求都会+1,放在Packet里面
  2. zxid表示事务id,这里在客户端暂时用不到

接着就走到了:

finishPacket(packet)

ClientCnxn#finishPacket()

    protected void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
      	//省略watch事件相关的代码
		//Packet是否存在回调
        if (p.cb == null) {
            synchronized (p) {
                p.finished = true;
                //唤醒阻塞在ClientCnxn#submitRequest()的线程
                p.notifyAll();
            }
        } else {
            p.finished = true;
            //添加到EventThread中的waitingEvents队列中去
            eventThread.queuePacket(p);
        }
    }

上面忽略了watch事件的处理,后面专文分析,这个方法主要是响应的后置处理,如果Packet没有设置回调就直接完成并唤醒阻塞在ClientCnxn#submitRequest()方法中的线程,否则就扔到EventThread的waitingEvents队列中等待处理。

4. 处理回调

4.1 EventThread#queuePacket()

        public void queuePacket(Packet packet) {
            if (wasKilled) {
                synchronized (waitingEvents) {
                    if (isRunning) {
                        waitingEvents.add(packet);
                    } else {
                        processEvent(packet);
                    }
                }
            } else {
                waitingEvents.add(packet);
            }
        }

主要是这行,其他可暂时忽略:

waitingEvents.add(packet);

4.2 EventThread#run()

再看一下EventThread#run():

        public void run() {
            try {
                isRunning = true;
                while (true) {
                    Object event = waitingEvents.take();
                    if (event == eventOfDeath) {
                        wasKilled = true;
                    } else {
                        processEvent(event);
                    }
                    //忽略其他代码
                }
            } catch (InterruptedException e) {
                LOG.error("Event thread exiting due to interruption", e);
            }

        }

可以看到这里是一个循环一直从waitingEvents中拿数据去处理,这也是一个典型的生产者-消费者模型,processEvent()方法主要就是去处理Packed设置的回调了。

4.3 EventThread#processEvent()

  private void processEvent(Object event) {
            try {
                if (event instanceof WatcherSetEventPair) {
                    // each watcher will process the event
                    WatcherSetEventPair pair = (WatcherSetEventPair) event;
                    for (Watcher watcher : pair.watchers) {
                        try {
                            watcher.process(pair.event);
                        } catch (Throwable t) {
                            LOG.error("Error while calling watcher.", t);
                        }
                    }
                } else if (event instanceof LocalCallback) {
                    LocalCallback lcb = (LocalCallback) event;
                    if (lcb.cb instanceof StatCallback) {
                        ((StatCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
                    } else if (lcb.cb instanceof DataCallback) {
                        ((DataCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
                    } else if (lcb.cb instanceof ACLCallback) {
                        ((ACLCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
                    } else if (lcb.cb instanceof ChildrenCallback) {
                        ((ChildrenCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
                    } else if (lcb.cb instanceof Children2Callback) {
                        ((Children2Callback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
                    } else if (lcb.cb instanceof StringCallback) {
                        ((StringCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
                    } else if (lcb.cb instanceof AsyncCallback.EphemeralsCallback) {
                        ((AsyncCallback.EphemeralsCallback) lcb.cb).processResult(lcb.rc, lcb.ctx, null);
                    } else if (lcb.cb instanceof AsyncCallback.AllChildrenNumberCallback) {
                        ((AsyncCallback.AllChildrenNumberCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, -1);
                    } else if (lcb.cb instanceof AsyncCallback.MultiCallback) {
                        ((AsyncCallback.MultiCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, Collections.emptyList());
                    } else {
                        ((VoidCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx);
                    }
                } else {
                    Packet p = (Packet) event;
                    int rc = 0;
                    String clientPath = p.clientPath;
                    if (p.replyHeader.getErr() != 0) {
                        rc = p.replyHeader.getErr();
                    }
                    if (p.cb == null) {
                        LOG.warn("Somehow a null cb got to EventThread!");
                    } else if (p.response instanceof ExistsResponse
                               || p.response instanceof SetDataResponse
                               || p.response instanceof SetACLResponse) {
                        StatCallback cb = (StatCallback) p.cb;
                        if (rc == 0) {
                            if (p.response instanceof ExistsResponse) {
                                cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response).getStat());
                            } else if (p.response instanceof SetDataResponse) {
                                cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response).getStat());
                            } else if (p.response instanceof SetACLResponse) {
                                cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response).getStat());
                            }
                        } else {
                            cb.processResult(rc, clientPath, p.ctx, null);
                        }
                    } else if (p.response instanceof GetDataResponse) {
                        DataCallback cb = (DataCallback) p.cb;
                        GetDataResponse rsp = (GetDataResponse) p.response;
                        if (rc == 0) {
                            cb.processResult(rc, clientPath, p.ctx, rsp.getData(), rsp.getStat());
                        } else {
                            cb.processResult(rc, clientPath, p.ctx, null, null);
                        }
                    } else if (p.response instanceof GetACLResponse) {
                        ACLCallback cb = (ACLCallback) p.cb;
                        GetACLResponse rsp = (GetACLResponse) p.response;
                        if (rc == 0) {
                            cb.processResult(rc, clientPath, p.ctx, rsp.getAcl(), rsp.getStat());
                        } else {
                            cb.processResult(rc, clientPath, p.ctx, null, null);
                        }
                    } else if (p.response instanceof GetChildrenResponse) {
                        ChildrenCallback cb = (ChildrenCallback) p.cb;
                        GetChildrenResponse rsp = (GetChildrenResponse) p.response;
                        if (rc == 0) {
                            cb.processResult(rc, clientPath, p.ctx, rsp.getChildren());
                        } else {
                            cb.processResult(rc, clientPath, p.ctx, null);
                        }
                    } else if (p.response instanceof GetAllChildrenNumberResponse) {
                        AllChildrenNumberCallback cb = (AllChildrenNumberCallback) p.cb;
                        GetAllChildrenNumberResponse rsp = (GetAllChildrenNumberResponse) p.response;
                        if (rc == 0) {
                            cb.processResult(rc, clientPath, p.ctx, rsp.getTotalNumber());
                        } else {
                            cb.processResult(rc, clientPath, p.ctx, -1);
                        }
                    } else if (p.response instanceof GetChildren2Response) {
                        Children2Callback cb = (Children2Callback) p.cb;
                        GetChildren2Response rsp = (GetChildren2Response) p.response;
                        if (rc == 0) {
                            cb.processResult(rc, clientPath, p.ctx, rsp.getChildren(), rsp.getStat());
                        } else {
                            cb.processResult(rc, clientPath, p.ctx, null, null);
                        }
                    } else if (p.response instanceof CreateResponse) {
                        StringCallback cb = (StringCallback) p.cb;
                        CreateResponse rsp = (CreateResponse) p.response;
                        if (rc == 0) {
                            cb.processResult(
                                rc,
                                clientPath,
                                p.ctx,
                                (chrootPath == null
                                    ? rsp.getPath()
                                    : rsp.getPath().substring(chrootPath.length())));
                        } else {
                            cb.processResult(rc, clientPath, p.ctx, null);
                        }
                    } else if (p.response instanceof Create2Response) {
                        Create2Callback cb = (Create2Callback) p.cb;
                        Create2Response rsp = (Create2Response) p.response;
                        if (rc == 0) {
                            cb.processResult(
                                    rc,
                                    clientPath,
                                    p.ctx,
                                    (chrootPath == null
                                            ? rsp.getPath()
                                            : rsp.getPath().substring(chrootPath.length())),
                                    rsp.getStat());
                        } else {
                            cb.processResult(rc, clientPath, p.ctx, null, null);
                        }
                    } else if (p.response instanceof MultiResponse) {
                        MultiCallback cb = (MultiCallback) p.cb;
                        MultiResponse rsp = (MultiResponse) p.response;
                        if (rc == 0) {
                            List<OpResult> results = rsp.getResultList();
                            int newRc = rc;
                            for (OpResult result : results) {
                                if (result instanceof ErrorResult
                                    && KeeperException.Code.OK.intValue()
                                       != (newRc = ((ErrorResult) result).getErr())) {
                                    break;
                                }
                            }
                            cb.processResult(newRc, clientPath, p.ctx, results);
                        } else {
                            cb.processResult(rc, clientPath, p.ctx, null);
                        }
                    } else if (p.response instanceof GetEphemeralsResponse) {
                        EphemeralsCallback cb = (EphemeralsCallback) p.cb;
                        GetEphemeralsResponse rsp = (GetEphemeralsResponse) p.response;
                        if (rc == 0) {
                            cb.processResult(rc, p.ctx, rsp.getEphemerals());
                        } else {
                            cb.processResult(rc, p.ctx, null);
                        }
                    } else if (p.cb instanceof VoidCallback) {
                        VoidCallback cb = (VoidCallback) p.cb;
                        cb.processResult(rc, clientPath, p.ctx);
                    }
                }
            } catch (Throwable t) {
                LOG.error("Unexpected throwable", t);
            }
        }

    }

上面这个方法就不去具体分析了,很简单,判断回调的类型再处理,这里就没有Dubbo的SPI那么优雅了。
这篇文章暂时分析到这儿,主要是讲解客户端在创建节点的时候做了哪些事情,关于服务端的创建工作咱们下一篇再去分析!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

3.1 ZK客户端创建节点源码解析上(基于NIO) 的相关文章

  • #QGIS源码官方编译指南

    QGIS源码官方编译指南 将QGIS官方的编译指南windows部分翻译一下 供大家参考 这个版本是QGIS源代码工程中2017年4月30日最后修改的版本 对应QGIS 2 99 也就是即将发布为QGIS 3 0的版本 翻译 Jacory
  • INFO zookeeper.ClientCnxn: Opening socket connection to server***/192.168.80.151:2181. Will not

    at org apache zookeeper ClientCnxnSocketNIO doTransport ClientCnxnSocketNIO java 361 at org apache zookeeper ClientCnxn
  • 【零知ESP8266教程】快速入门5-使用按键来控制你的灯

    上节课 我们已经学习了如何制作一个简易交通灯 那么如何去控制一个LED的亮或者灭呢 此次试验采用按键来控制我们的LED 实现LED的简单控制 一 工具原料 电脑 windows系统 ESP8266开发板 micro usb线 LED灯一个
  • zookeeper3.4.6集群部署

    在安装Zookeeper之前 首先需要确保的就是主机名称 可选 hosts都已经更改 并且JDK成功安装 1 安装Zookeeper 使用命令 tar zxvf 命令将gz压缩文件解压 笔者Zookeeper的安装目录为 home Hado
  • kafka + zookeeper下载/安装/使用(超详细)

    kafka是需要zk来支持 所以先下载zk 1 下载安装zookeeper 下载地址 选择不带source的 下载下来解压2次 进入到 D zookeeper apache zookeeper 3 6 1 bin conf 目录下 把zoo
  • zookeeper学习草稿纸

    指令重排序 https baijiahao baidu com s id 1701616903992143186 wfr spider for pc JVM JDK JRE 静态方法为什么不能调用非静态成员 重载和重写的区别 可变参数 基本
  • QGis二次开发 -- 源码编译终极篇

    由于是开源软件 QGis版本迭代比较快 在保持long term release版本的基础上 每个月都会有一个monthly release的新版本发布 源码工程变化快速 给想要上手编译开发的新人朋友带来了一些困惑 我之前分别写过QGis1
  • 【HBZ分享】数仓里面的概念-宽表-维度表-事实表概念讲解

    数仓概念 1 度量值 可被统计的 比如 次数 销量 营销额 订单表中的下单金额等可以统计的值叫度量值 2 维度表 1 对事实描述的信息 每一张表都对应现实世界中的一个对象或概念 比如 用户 商品 日期 地区维度 2 比如要分析商品的销售情况
  • kafka的安装和使用

    ZooKeeper简介 ZooKeeper 是一个为分布式应用所设计的分布的 开源的 java 协调服务 分布式的应用可以建立在同步配置管理 选举 分布式锁 分组和命名等服务的更高级别的实现的基础之上 ZooKeeper 意欲设计一个易于编
  • Spring源码深度解析:文章目录

    文章目录 序号 内容 链接地址 1 一 Spring整体架构和源码环境搭建 https blog csdn net wts563540 article details 126686645 2 二 手写模拟Spring https blog
  • 投资捕鱼游戏市场的如何避雷?以及研发技术问题。

    随着国内捕鱼市场在姚记科技 波克城市 途游等捕鱼龙头的深耕下 整个产品的研发 运营门槛都了非常大的提高 对于目前想要研发出一款具有竞争力的产品和版本 投入低于500万的资金很难出有竞争力的产品 加上运营门槛的提高 运营成本至少需要准备500
  • Zookeeper(三)—分布式锁实现

    一 独占锁原理 独占锁是利用zk同一目录下不能创建多个相同名称的节点这个特性 来实现分布式锁的功能 竞争锁的分布式系统 都在zk根目录下创建一个名为lock的节点 创建节点成功的系统 说明抢到了这把锁 没有创建成功的系统 说明这个节点已经被
  • ZooKeeper踩坑

    一 下载安装包时要下载文件名中带有bin的安装包 否则会报错 找不到或无法加载主类 org apache zookeeper server quorum QuorumPeerMain Error contacting service 这是由
  • SpringCloud使用Zookeeper作为服务注册发现中心

    本篇文章主要记录SpringCloud使用Zookeeper作为服务注册发现中心 通过服务提供者和消费者为例 来真正掌握zk注册中心 目录 一 搭建服务提供者 1 创建cloud provider payment8004项目 2 修改配置
  • 什么是医院绩效管理系统?它有哪些功能特点?

    医院绩效 定义 医院工作量绩效方案 是一套以 工作量 RBRVS 相对价值比率 为核算基础 以工作岗位 技术含量 风险程度 服务数量等业绩为主要依据 以工作效率和效益 工作质量 患者满意度等指标为综合考核体系 综合计量和评价的绩效分配体系
  • 开源Cloudreve云盘系统源码/ 支持本地储存+对接各大对象储存/带云盘系统安装教程/公私兼备网盘系统

    源码介绍 Cloudreve云盘系统源码 它不仅支持本地储存 而且还对接各大对象储存 附带云盘系统安装教程 轻松搭建个人网盘 拥有美观界面 云盘系统安装教程 公私兼备网盘系统 多功能仿百度网盘源码 测试环境 PHP7 1 MYSQL5 6
  • 医院绩效核算系统源码,java语言开发

    医院绩效考核系统全套源码 医院绩效核算系统源码 java语言开发 医院绩效考核系统可根据工作绩效考核管理规定 配置相应的绩效考核模型 从工作量统计 核算维度 核算权重三方面计算工作绩效 利用数据处理和数据分析的支撑作用 实现对工作量统计和绩
  • 终于找到了最新版的Zookeeper入门级教程,建议收藏!

    小熊学Java https javaxiaobear cn 1 分布式一致性 1 CAP 理论 CAP 理论指出对于一个分布式计算系统来说 不可能同时满足以下三点 一致性 在分布式环境中 一致性是指数据在多个副本之间是否能够保持一致的特性
  • 医院绩效核算系统源码,java语言开发

    医院绩效考核系统全套源码 医院绩效核算系统源码 java语言开发 医院绩效考核系统可根据工作绩效考核管理规定 配置相应的绩效考核模型 从工作量统计 核算维度 核算权重三方面计算工作绩效 利用数据处理和数据分析的支撑作用 实现对工作量统计和绩
  • 通过 Jquery 从 Datebox 中清除日期

    下面的代码在日期框模式弹出窗口中显示一个按钮 但我想清除单击该按钮时的日期 我尝试了很多东西 但无法通过 jQuery 方法做到这一点

随机推荐

  • Hugo themes Doit 合并 tags , categories 为检索页

    Hugo themes Doit 合并 tags categories 为检索页 原文 总觉得 tags categories 等页面可以合并成为一页 这样检索起来更方便一些 成果 https www ftls xyz retrieval
  • 编译程序和解释程序有什么区别?

    1 编译程序和解释程序的区别 编译型是使用编译器编译后生成计算机硬件可直接执行的指令 解释型是在运行时才由解释器逐语句去执行 编译型代表 C C C Java 解释型代表 html javascript 区别有很多 说说常见的几个 编译型语
  • MATLAB bp神经网络预测代码

    清除变量 清楚变量 claer clc 导入数据 变量个数较少可以自己输入变量 变量数目较大时建议采用读取Excel并保存成 mat的方法来导入数据 读取 m数据 以data mat为例 load data mat load data1 m
  • React路由

    安装 npm i react router dom S 导入 import BrowserRouter as Router Route Link from react router dom HashRouter与BrowserRouter
  • iOS基础教程-SQLite数据库操作(二简单实例学生信息增删改查数据库操作)

    学生名单管理界面实现数据库的增删改操作 关于UI部分 我们使用storyboard简单完成 在上一篇文章中有详细的描述iOS基础教程 SQLite数据库操作 一 StoryBoard操作 SQLite操作前准备工作 本篇结束以后 就可以完成
  • webpack设置分包

    Webpack中设置分包 code splitting 是一种优化技术 它允许将你的代码分割成多个小块 以便在不同的页面或情境中按需加载 这可以显著减小初始加载的资源大小 提高网页性能 Webpack提供了几种方式来设置分包 其中最常见的是
  • 搞懂 API ,地图 API 制作方法分享

    地图 API 是一种基于 Web 开发的应用程序编程接口 可以用于创建和展示地图及地理信息 以下是一些地图 API 制作的方法 选择地图 API 平台 目前市场上有很多地图 API 平台供选择 比如 Google Maps API 百度地图
  • WorkTool企微机器人自动接收图片回传(方案三)

    自动接收图片并上传到服务器 仅适用企业微信应用 前言 WorkTool企微机器人可以接收客户群的消息 但接收图片一直是个问题 前面也介绍过两种图片接收方案 但都会影响运行效率 并且不能达到100 的图片接收率 实测95 本方案三是通过企微官
  • 2023年测试之路,从功能测试进阶测试开发工程师,突破内卷...

    目录 导读 前言 一 Python编程入门到精通 二 接口自动化项目实战 三 Web自动化项目实战 四 App自动化项目实战 五 一线大厂简历 六 测试开发DevOps体系 七 常用自动化测试工具 八 JMeter性能测试 九 总结 尾部小
  • QT5.9.6和VS2015的配置使用

    要做界面但是MFC实在是 所以果断尝试QT实现界面化 QT5 9 6 VS2015 opencv2 4 13 所用软件 QT5 9 6 VS2015 参考地址 Qt5 7 VS2015 环境搭建https blog csdn net lia
  • kafka常用命令汇总

    新建topic bin kafka topics sh zookeeper localhost 2181 create replication factor 1 partitions 1 topic test service 删除topic
  • Qt发送端用自定义结构体发送,接收端QByteArray接收

    Qt TCP UDP 一端用自定义结构体发送消息 一端用QByteArray接收消息 用自定义结构体发送消息 void TcpServer timeOut QDateTime nowTime QDateTime currentDateTim
  • select top语句 mysql_SQL SELECT TOP 语句

    SELECT TOP 子句用于规定要返回的记录的数目 SELECT TOP 子句对于拥有数千条记录的大型表来说 是非常有用的 注释 并非所有的数据库系统都支持 SELECT TOP 子句 SQL Server MS Access 语法 SE
  • SSE3和SSSE3 Intrinsics各函数介绍

    SIMD相关头文件包括 include
  • string类常见用法

    需要包含头文件和命名空间 include
  • Linux高级命令05:压缩和解压缩命令

    学习目标 能够使用tar命令完成文件的压缩和解压缩 1 压缩格式的介绍 Linux默认支持的压缩格式 gz bz2 zip 说明 gz和 bz2的压缩包需要使用tar命令来压缩和解压缩 zip的压缩包需要使用zip命令来压缩 使用unzip
  • mysql连接中的Access denied for user ‘root’@‘localhost’ 和Unknown database问题解决

    1 端口端口 看看是否3306有被占用 主要原因 2 密码加双引号试试 3 unknown database的时候 一定看看服务中是否有其他的mysql服务在运行 要关掉其他的
  • STM32G030Cx HAL库Flash擦除或编程操作出错的解决办法

    STM32G030Cx HAL库Flash擦除与编程操作 例程说明 一 宏定义及变量 二 获取页 三 写数据 双字 四 调用验证 例程说明 STM32G0芯片对Flash操作容易出错 经常出现擦除失败或编程失败的情况 故有此记录 STM32
  • Jetbrains系列开发工具日常配置与使用

    文章目录 IntelliJ IDEA 各种常用配置 常见问题 PyCharm 常见问题 IntelliJ IDEA IDEA的确是Java开发利器 之前一直用Eclipse 后来实习单位都用IDEA 就慢慢转了 开始还不习惯 后来就不禁被其
  • 3.1 ZK客户端创建节点源码解析上(基于NIO)

    ZK客户端创建节点源码解析上 前言 1 为什么能用来做注册中心 2 创建节点 2 1 ZooKeeper create 方法 2 2 ClientCnxn queuePacket 方法 2 3 ClientCnxnSocketNIO doT