前言
众所周知!
Zookeeper是一个分布式协调工具,我们能利用它特性来做特定的事情,如,利用创建节点的唯一性和有序性可以用来实现分布式锁、leader选举、分布式队列(消息),利用临时节点及其事件监听可以用来做缓存、分布式配置中心、注册中心等,其实目前最常见的就是拿它用来做注册中心。那我们今天来看看为什么能利用他来做注册中心,实现原理是怎样的。
1. 为什么能用来做注册中心
很多小伙伴在公司肯定用过Dubbo,目前比较火的RPC框架,Dubbo的注册中心可以有很多种,如Consul、Zookeeper、Nacos和Redis等,那它为什么能同时支持这么多不同的框架,得益于它牛逼的SPI机制+URL驱动,贫僧扒Dubbo这块源码的时候惊呆了,还可以这么玩,有兴趣的可以去Dubbo官网然后再结合源码看看,绝对受益。
回归正题,其实Zookeeper在Dubbo中是用来记录生产者和消费者节点信息以及动态感知节点信息变化的,在Dubbo源码中可以看到其使用ZK客户端(Curator)与ZK服务端交互,Curator是对Zookeeper客户端的封装。
@Override
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
上面这个方法摘自Dubbo的ZookeeperRegistry#doRegister()方法,Dubbo服务注册时会在Zookeeper上创建一个节点,默认是临时节点,并且同时会添加一个监听事件,关于监听事件的代码上面没有贴出来。这个节点上会附带关于这个服务Provider的很多信息,如接口名、ip地址、端口等。
2. 创建节点
不要嫌弃前戏太长,下面可以回到Zookeeper源码里面来了,创建节点的时候都做了些什么事情,一起来扒扒看。
2.1 ZooKeeper#create()方法
public String create(
final String path,
byte[] data,
List<ACL> acl,
CreateMode createMode,
Stat stat,
long ttl) throws KeeperException, InterruptedException {
final String clientPath = path;
//校验路径
PathUtils.validatePath(clientPath, createMode.isSequential());
EphemeralType.validateTTL(createMode, ttl);
validateACL(acl);
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
setCreateHeader(createMode, h);
Create2Response response = new Create2Response();
//封装请求
Record record = makeCreateRecord(createMode, serverPath, data, acl, ttl);
//提交请求
ReplyHeader r = cnxn.submitRequest(h, record, response, null);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
if (cnxn.chrootPath == null) {
return response.getPath();
} else {
return response.getPath().substring(cnxn.chrootPath.length());
}
}
这个create方法和Curator的入参是对应不上的,因为Curator对Zookeeper#create()方法有封装,可以看到在create方法中,先是对请求校验,然后封装请求,最后将请求提交到ClientCnxn的一个队列中去,关于CientCnxn在上篇文章中也提到了,现在看看它的submitRequest()方法里面做了什么:
ClientCnxn#submitRequest()
public ReplyHeader submitRequest(
RequestHeader h,
Record request,
Record response,
WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(
h,
r,
request,
response,
null,
null,
null,
null,
watchRegistration,
watchDeregistration);
synchronized (packet) {
//如果设置了请求超时
if (requestTimeout > 0) {
// Wait for request completion with timeout
waitForPacketFinish(r, packet);
} else {
// Wait for request completion infinitely
while (!packet.finished) {
//无限等待
packet.wait();
}
}
}
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
sendThread.cleanAndNotifyState();
}
return r;
}
上面的方法分为三个步骤:
- 创建一个Packet添加到ClientCnxn的outgoingQueue队列中
- 如果设置了请求超时时间,那么限时等待请求及响应处理完成,后面在处理响应的时候会唤醒阻塞在这里的线程,下面会讲到。
- 如果没有设置,则无限等待
2.2 ClientCnxn#queuePacket()方法
public Packet queuePacket(
RequestHeader h,
ReplyHeader r,
Record request,
Record response,
AsyncCallback cb,
String clientPath,
String serverPath,
Object ctx,
WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
//创建数据包
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (state) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
//添加到outgoingQueue队列
outgoingQueue.add(packet);
}
}
//告诉ClientCnxn有新消息来了
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
上面有三个步骤:
-
创建数据包Packet
-
将创建好的Packet添加到ClientCnxn的outgoingQueue队列中,这个队列是专门放待发送的数据包的,后面ClientCnxnSocket的实现类会从这个队列中拿数据包来发送,这是个典型的生产者-消费者模型。
private final LinkedBlockingDeque outgoingQueue = new LinkedBlockingDeque();
这里要注意一下,ClientCnxnSocket的实现类(ClientCnxnSocketNIO)和CientCnxn中都有一个outgoingQueue队列,但是这两个都是引用同一个实例。可以看下SendThread#run()方法的第一行
-
通知ClientCnxn的通信类(ClientCnxnSocket实现类)有消息来了。
@Override
void packetAdded() {
wakeupCnxn();
}
private synchronized void wakeupCnxn() {
selector.wakeup();
}
唤醒了ClientCnxnSocketNIO#doTransport()方法中的select操作。
2.3 ClientCnxnSocketNIO#doTransport()方法
接下来重点讲一下ClientCnxnSocketNIO中的doTransport()方法
void doTransport(
int waitTimeOut,
Queue<Packet> pendingQueue,
ClientCnxn cnxn) throws IOException, InterruptedException {
//上面的wakeup会唤醒阻塞在这里的select操作
selector.select(waitTimeOut);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
// Everything below and until we get back to the select is
// non blocking, so time is effectively a constant. That is
// Why we just have to do this once, here
updateNow();
for (SelectionKey k : selected) {
SocketChannel sc = ((SocketChannel) k.channel());
if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
if (sc.finishConnect()) {
updateLastSendAndHeard();
updateSocketAddresses();
sendThread.primeConnection();
}
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
doIO(pendingQueue, cnxn);
}
}
if (sendThread.getZkState().isConnected()) {
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
//注册写监听
enableWrite();
}
}
selected.clear();
}
假设我们这是建立连接后创建的第一个节点,那么上面的select被唤醒后,会走到下面的这段:
if (sendThread.getZkState().isConnected()) {
//sendThread.tunnelAuthInProgress()判断与客户端与服务端的认证是不是还在进行中
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
enableWrite();
}
}
ClientCnxnSocketNIO#findSendablePacket()方法:
private Packet findSendablePacket(LinkedBlockingDeque<Packet> outgoingQueue, boolean tunneledAuthInProgres) {
if (outgoingQueue.isEmpty()) {
return null;
}
// If we've already starting sending the first packet, we better finish
//拿到队头的数据包
if (outgoingQueue.getFirst().bb != null || !tunneledAuthInProgres) {
return outgoingQueue.getFirst();
}
// Since client's authentication with server is in progress,
// send only the null-header packet queued by primeConnection().
// This packet must be sent so that the SASL authentication process
// can proceed, but all other packets should wait until
// SASL authentication completes.
//等待身份验证完成,将空头数据包放到队头
Iterator<Packet> iter = outgoingQueue.iterator();
while (iter.hasNext()) {
Packet p = iter.next();
if (p.requestHeader == null) {
// We've found the priming-packet. Move it to the beginning of the queue.
iter.remove();
outgoingQueue.addFirst(p);
return p;
} else {
// Non-priming packet: defer it until later, leaving it in the queue
// until authentication completes.
LOG.debug("Deferring non-priming packet {} until SASL authentication completes.", p);
}
}
return null;
}
上面的方法主要是拿到排在outgoingQueue队列队头的数据包。同时如果身份认证没完成得等待身份认证完成,等待期间将空头报文全部移到队头,即优先发送空头数据包。还记得建立连接的时候发送的报文吗?这个时候发送的就是null-header packet。
outgoingQueue内容不为空时就会在ClientCnxnSocketNIO中的doTransport()方法中调用enableWrite()方法注册写事件:
ClientCnxnSocketNIO#enableWrite()
synchronized void enableWrite() {
int i = sockKey.interestOps();
//是否注册过写事件
if ((i & SelectionKey.OP_WRITE) == 0) {
sockKey.interestOps(i | SelectionKey.OP_WRITE);
}
}
那接下来会走到doTransport()方法中的这一行了:
doIO(pendingQueue, cnxn);
ClientCnxnSocketNIO#doIO()
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isWritable()) {
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
//省略部分代码
if (p != null) {
updateLastSend();
// If we already started writing p, p.bb will already exist
if (p.bb == null) {
if ((p.requestHeader != null)
&& (p.requestHeader.getType() != OpCode.ping)
&& (p.requestHeader.getType() != OpCode.auth)) {
p.requestHeader.setXid(cnxn.getXid());
}
p.createBB();
}
//发送报文
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
pendingQueue.add(p);
}
}
}
}
//省略部分代码
}
}
因为注册了写事件,那么会走到doIO()的sockKey.isWritable()判断里面来。这里分为这几个步骤:
- 从outgoingQueue拿到一个数据包(Packet):正在发送的优先>连接和认证时的空头报文次之>队头的业务报文最后
- 发送报文
- 将发送完的报文从outgoingQueue队列中移除
- 将发送完的报文添加到pendingQueue队列中,这个队列放的是等待服务端响应报文。
3. 读取响应
3.1 ClientCnxnSocketNIO#doIO()
还是这个doIO方法,其他的流程和上面发送的流程是一样的,select监听有没有读事件发生,有的话就会到这个doIO方法里面来。
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sock == null) {
throw new IOException("Socket is null!");
}
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
+ Long.toHexString(sessionId)
+ ", likely server has closed socket");
}
//是否已经读完
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount.getAndIncrement();
readLength();
//还没初始化
} else if (!initialized) {
readConnectResult();
enableRead();
if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
// Since SASL authentication has completed (if client is configured to do so),
// outgoing packets waiting in the outgoingQueue can now be sent.
enableWrite();
}
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
initialized = true;
} else {
//处理响应
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
}
会走到下面这行:
sendThread.readResponse(incomingBuffer);
3.2 SendThread#readResponse()
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
switch (replyHdr.getXid()) {
case PING_XID:
LOG.debug("Got ping response for session id: 0x{} after {}ms.",
Long.toHexString(sessionId),
((System.nanoTime() - lastPingSentNs) / 1000000));
return;
case AUTHPACKET_XID:
LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));
if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
changeZkState(States.AUTH_FAILED);
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
eventThread.queueEventOfDeath();
}
return;
case NOTIFICATION_XID:
LOG.debug("Got notification session id: 0x{}",
Long.toHexString(sessionId));
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if (serverPath.compareTo(chrootPath) == 0) {
event.setPath("/");
} else if (serverPath.length() > chrootPath.length()) {
event.setPath(serverPath.substring(chrootPath.length()));
} else {
LOG.warn("Got server path {} which is too short for chroot path {}.",
event.getPath(), chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);
LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
eventThread.queueEvent(we);
return;
default:
break;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia, "token");
zooKeeperSaslClient.respondToServer(request.getToken(), ClientCnxn.this);
return;
}
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
}
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid " + replyHdr.getXid()
+ " with err " + replyHdr.getErr()
+ " expected Xid " + packet.requestHeader.getXid()
+ " for a packet with details: " + packet);
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
//反序列化
packet.response.deserialize(bbia, "response");
}
LOG.debug("Reading reply session id: 0x{}, packet:: {}", Long.toHexString(sessionId), packet);
} finally {
finishPacket(packet);
}
}
这里简单介绍一下:
- 上面见到的xid可以看做请求的标识,客户端每发送一次请求都会+1,放在Packet里面
-
zxid表示事务id,这里在客户端暂时用不到
接着就走到了:
finishPacket(packet)
ClientCnxn#finishPacket()
protected void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
//省略watch事件相关的代码
//Packet是否存在回调
if (p.cb == null) {
synchronized (p) {
p.finished = true;
//唤醒阻塞在ClientCnxn#submitRequest()的线程
p.notifyAll();
}
} else {
p.finished = true;
//添加到EventThread中的waitingEvents队列中去
eventThread.queuePacket(p);
}
}
上面忽略了watch事件的处理,后面专文分析,这个方法主要是响应的后置处理,如果Packet没有设置回调就直接完成并唤醒阻塞在ClientCnxn#submitRequest()方法中的线程,否则就扔到EventThread的waitingEvents队列中等待处理。
4. 处理回调
4.1 EventThread#queuePacket()
public void queuePacket(Packet packet) {
if (wasKilled) {
synchronized (waitingEvents) {
if (isRunning) {
waitingEvents.add(packet);
} else {
processEvent(packet);
}
}
} else {
waitingEvents.add(packet);
}
}
主要是这行,其他可暂时忽略:
waitingEvents.add(packet);
4.2 EventThread#run()
再看一下EventThread#run():
public void run() {
try {
isRunning = true;
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
//忽略其他代码
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
}
可以看到这里是一个循环一直从waitingEvents中拿数据去处理,这也是一个典型的生产者-消费者模型,processEvent()方法主要就是去处理Packed设置的回调了。
4.3 EventThread#processEvent()
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher.", t);
}
}
} else if (event instanceof LocalCallback) {
LocalCallback lcb = (LocalCallback) event;
if (lcb.cb instanceof StatCallback) {
((StatCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
} else if (lcb.cb instanceof DataCallback) {
((DataCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof ACLCallback) {
((ACLCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof ChildrenCallback) {
((ChildrenCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
} else if (lcb.cb instanceof Children2Callback) {
((Children2Callback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null, null);
} else if (lcb.cb instanceof StringCallback) {
((StringCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, null);
} else if (lcb.cb instanceof AsyncCallback.EphemeralsCallback) {
((AsyncCallback.EphemeralsCallback) lcb.cb).processResult(lcb.rc, lcb.ctx, null);
} else if (lcb.cb instanceof AsyncCallback.AllChildrenNumberCallback) {
((AsyncCallback.AllChildrenNumberCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, -1);
} else if (lcb.cb instanceof AsyncCallback.MultiCallback) {
((AsyncCallback.MultiCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx, Collections.emptyList());
} else {
((VoidCallback) lcb.cb).processResult(lcb.rc, lcb.path, lcb.ctx);
}
} else {
Packet p = (Packet) event;
int rc = 0;
String clientPath = p.clientPath;
if (p.replyHeader.getErr() != 0) {
rc = p.replyHeader.getErr();
}
if (p.cb == null) {
LOG.warn("Somehow a null cb got to EventThread!");
} else if (p.response instanceof ExistsResponse
|| p.response instanceof SetDataResponse
|| p.response instanceof SetACLResponse) {
StatCallback cb = (StatCallback) p.cb;
if (rc == 0) {
if (p.response instanceof ExistsResponse) {
cb.processResult(rc, clientPath, p.ctx, ((ExistsResponse) p.response).getStat());
} else if (p.response instanceof SetDataResponse) {
cb.processResult(rc, clientPath, p.ctx, ((SetDataResponse) p.response).getStat());
} else if (p.response instanceof SetACLResponse) {
cb.processResult(rc, clientPath, p.ctx, ((SetACLResponse) p.response).getStat());
}
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetDataResponse) {
DataCallback cb = (DataCallback) p.cb;
GetDataResponse rsp = (GetDataResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getData(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof GetACLResponse) {
ACLCallback cb = (ACLCallback) p.cb;
GetACLResponse rsp = (GetACLResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getAcl(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof GetChildrenResponse) {
ChildrenCallback cb = (ChildrenCallback) p.cb;
GetChildrenResponse rsp = (GetChildrenResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getChildren());
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetAllChildrenNumberResponse) {
AllChildrenNumberCallback cb = (AllChildrenNumberCallback) p.cb;
GetAllChildrenNumberResponse rsp = (GetAllChildrenNumberResponse) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getTotalNumber());
} else {
cb.processResult(rc, clientPath, p.ctx, -1);
}
} else if (p.response instanceof GetChildren2Response) {
Children2Callback cb = (Children2Callback) p.cb;
GetChildren2Response rsp = (GetChildren2Response) p.response;
if (rc == 0) {
cb.processResult(rc, clientPath, p.ctx, rsp.getChildren(), rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof CreateResponse) {
StringCallback cb = (StringCallback) p.cb;
CreateResponse rsp = (CreateResponse) p.response;
if (rc == 0) {
cb.processResult(
rc,
clientPath,
p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath().substring(chrootPath.length())));
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof Create2Response) {
Create2Callback cb = (Create2Callback) p.cb;
Create2Response rsp = (Create2Response) p.response;
if (rc == 0) {
cb.processResult(
rc,
clientPath,
p.ctx,
(chrootPath == null
? rsp.getPath()
: rsp.getPath().substring(chrootPath.length())),
rsp.getStat());
} else {
cb.processResult(rc, clientPath, p.ctx, null, null);
}
} else if (p.response instanceof MultiResponse) {
MultiCallback cb = (MultiCallback) p.cb;
MultiResponse rsp = (MultiResponse) p.response;
if (rc == 0) {
List<OpResult> results = rsp.getResultList();
int newRc = rc;
for (OpResult result : results) {
if (result instanceof ErrorResult
&& KeeperException.Code.OK.intValue()
!= (newRc = ((ErrorResult) result).getErr())) {
break;
}
}
cb.processResult(newRc, clientPath, p.ctx, results);
} else {
cb.processResult(rc, clientPath, p.ctx, null);
}
} else if (p.response instanceof GetEphemeralsResponse) {
EphemeralsCallback cb = (EphemeralsCallback) p.cb;
GetEphemeralsResponse rsp = (GetEphemeralsResponse) p.response;
if (rc == 0) {
cb.processResult(rc, p.ctx, rsp.getEphemerals());
} else {
cb.processResult(rc, p.ctx, null);
}
} else if (p.cb instanceof VoidCallback) {
VoidCallback cb = (VoidCallback) p.cb;
cb.processResult(rc, clientPath, p.ctx);
}
}
} catch (Throwable t) {
LOG.error("Unexpected throwable", t);
}
}
}
上面这个方法就不去具体分析了,很简单,判断回调的类型再处理,这里就没有Dubbo的SPI那么优雅了。
这篇文章暂时分析到这儿,主要是讲解客户端在创建节点的时候做了哪些事情,关于服务端的创建工作咱们下一篇再去分析!