Netty源码------Pipeline详细分析

2023-11-01

Netty源码------Pipeline详细分析

目录

Netty源码------Pipeline详细分析

1、Channel 与ChannelPipeline

2、再探ChannelPipeline 的初始化:

3、ChannelInitializer 的添加

4、自定义ChannelHandler 的添加过程

5、ChannelHandler 默认命名规则

6、Pipeline 的事件传播机制

6.1 Outbound 事件传播方式

6.2 Inbound 事件传播方式

6.3 Pipeline 事件传播小结

7、Handler 的各种姿势

8、Channel 的生命周期


1、Channel 与ChannelPipeline

  相信大家都已经知道,在Netty 中每个Channel 都有且仅有一个ChannelPipeline 与之对应,它们的组成关系如下:

  通过上图我们可以看到, 一个Channel 包含了一个ChannelPipeline , 而ChannelPipeline 中又维护了一个由ChannelHandlerContext 组成的双向链表。这个链表的头是HeadContext,链表的尾是TailContext,并且每个ChannelHandlerContext 中又关联着一个ChannelHandler。图示给了我们一个对ChannelPipeline 的直观认识,但是实际上Netty 实现的Channel 是否真的是这样的呢?我们继续用源码说话。在前我们已经知道了一个Channel 的初始化的基本过程,下面我们再回顾一下。下面的代码是AbstractChannel 构造器:

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

  AbstractChannel 有一个pipeline 字段,在构造器中会初始化它为DefaultChannelPipeline 的实例。这里的代码就印证了一点:每个Channel 都有一个ChannelPipeline。接着我们跟踪一下DefaultChannelPipeline 的初始化过程,首先进入到DefaultChannelPipeline 构造器中:

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }

  在DefaultChannelPipeline 构造器中, 首先将与之关联的Channel 保存到字段channel 中。然后实例化两个ChannelHandlerContext:一个是HeadContext 实例head,另一个是TailContext 实例tail。接着将head 和tail 互相指向, 构成一个双向链表。

  特别注意的是:我们在开始的示意图中head 和tail 并没有包含ChannelHandler,这是因为HeadContext 和TailContext继承于AbstractChannelHandlerContext 的同时也实现了ChannelHandler 接口了,因此它们有Context 和Handler的双重属性。

2、再探ChannelPipeline 的初始化:

  前面的学习我们已经对ChannelPipeline 的初始化有了一个大致的了解,不过当时重点没有关注ChannelPipeline,因此没有深入地分析它的初始化过程。那么下面我们就来看一下具体的ChannelPipeline 的初始化都做了哪些工作吧。先回顾一下,在实例化一个Channel 时,会伴随着一个ChannelPipeline 的实例化,并且此Channel 会与这个ChannelPipeline相互关联,这一点可以通过NioSocketChannel 的父类AbstractChannel 的构造器予以佐证:

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

  当实例化一个NioSocketChannel 是,其pipeline 字段就是我们新创建的DefaultChannelPipeline 对象。可以看到,在DefaultChannelPipeline 的构造方法中,将传入的channel 赋值给字段this.channel,接着又实例化了两个特殊的字段:tail 与head,这两个字段是一个双向链表的头和尾。其实在DefaultChannelPipeline 中,维护了一个以AbstractChannelHandlerContext 为节点的双向链表,这个链表是Netty 实现Pipeline 机制的关键。再回顾一下head和tail 的类层次结构:

  从类层次结构图中可以很清楚地看到,head 实现了ChannelInboundHandler与ChannelOutboundHandler,而tail 实现了ChannelOutboundHandler 接口,并且它们都实现了ChannelHandlerContext 接口, 因此可以说head 和tail 即是一个ChannelHandler,又是一个ChannelHandlerContext。接着看HeadContext与TailContext 构造器中的代码:

HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, (EventExecutor)null, DefaultChannelPipeline.HEAD_NAME, false, true);
            this.unsafe = pipeline.channel().unsafe();
            this.setAddComplete();
}
TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, (EventExecutor)null, DefaultChannelPipeline.TAIL_NAME, true, false);
            this.setAddComplete();
}

  我们可以看到,链表中head 是一个ChannelOutboundHandler,而tail 则是一个ChannelInboundHandler。它调用了父类AbstractChannelHandlerContext 的构造器,并传入参数inbound = false,outbound = true。而TailContext 的构造器与HeadContext 的相反,它调用了父类AbstractChannelHandlerContext 的构造器,并传入参数inbound = true,outbound = false。即header 是一个OutBoundHandler,而tail 是一个InBoundHandler。

3、ChannelInitializer 的添加

  前面我们已经分析过Channel 的组成,其中我们了解到,最开始的时候ChannelPipeline 中含有两个ChannelHandlerContext(同时也是ChannelHandler),但是这个Pipeline 并不能实现什么特殊的功能,因为我们还没有给它添加自定义的ChannelHandler。通常来说,我们在初始化Bootstrap,会添加我们自定义的ChannelHandler,就以我们具体的客户端启动代码片段来举例:

 

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
  @Override
  protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline pipeline = ch.pipeline();
    pipeline.addLast(new ChatClientHandler(nickName));
  }
});

 

  上面代码的初始化过程,相信大家都不陌生。在调用handler 时,传入了ChannelInitializer 对象,它提供了一个initChannel()方法给我我们初始化ChannelHandler。最后将这个匿名的Handler保存到AbstractBootstrap中。那么这个初始化过程是怎样的呢?下面我们来揭开它的神秘面纱。

  ChannelInitializer 实现了ChannelHandler,那么它是在什么时候添加到ChannelPipeline 中的呢?通过代码跟踪,我们发现它是在Bootstrap 的init()方法中添加到ChannelPipeline 中的,其代码如下(以客户端为例):

 

void init(Channel channel) throws Exception {
        ChannelPipeline p = channel.pipeline();
        p.addLast(new ChannelHandler[]{this.config.handler()});
     。。。。。。
}
//AbstractBootstrapConfig
public final ChannelHandler handler() {
  return this.bootstrap.handler();
}
//AbstractBootstrap
final ChannelHandler handler() {
        return this.handler;
}

  从上面的代码可见,将handler()返回的ChannelHandler 添加到Pipeline 中,而handler()返回的其实就是我们在初始化Bootstrap 时通过handler()方法设置的ChannelInitializer 实例,因此这里就是将ChannelInitializer 插入到了Pipeline的末端。此时Pipeline 的结构如下图所示:

  这时候,有小伙伴可能就有疑惑了,我明明插入的是一个ChannelInitializer 实例,为什么在ChannelPipeline 中的双向链表中的元素却是一个ChannelHandlerContext 呢?我们继续去源码中寻找答案。

  刚才,我们提到,在Bootstrap 的init()方法中会调用p.addLast()方法,将ChannelInitializer 插入到链表的末端:

 

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized(this) {
            checkMultiplicity(handler);
            newCtx = this.newContext(group, this.filterName(name, handler), handler);
       this.addLast0(newCtx);
 }
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, this.childExecutor(group), name, handler);
}

 

  addLast()有很多重载的方法,我们只需关注这个比较重要的方法就行。上面的addLast()方法中,首先检查ChannelHandler 的名字是否是重复,如果不重复,则调用newContex()方法为这个Handler 创建一个对应的DefaultChannelHandlerContext 实例,并与之关联起来(Context 中有一个handler 属性保存着对应的Handler 实例)。为了添加一个handler 到pipeline 中,必须把此handler 包装成ChannelHandlerContext。因此在上面的代码中我们可以看到新实例化了一个newCtx 对象,并将handler 作为参数传递到构造方法中。那么我们来看一下实例化的DefaultChannelHandlerContext 到底有什么玄机吧。首先看它的构造器:

 

DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        } else {
            this.handler = handler;
        }
    }

 

  在DefaultChannelHandlerContext 的构造器中,调用了两个很有意思的方法:isInbound()与isOutbound(),这两个方法是做什么的呢?从源码中可以看到,当一个handler 实现了ChannelInboundHandler 接口,则isInbound 返回true;类似地,当一个handler 实现了ChannelOutboundHandler 接口,则isOutbound 就返回true。而这两个boolean 变量会传递到父类AbstractChannelHandlerContext 中,并初始化父类的两个字段:inbound 与outbound。那么这里的ChannelInitializer 所对应的DefaultChannelHandlerContext 的inbound 与outbound 字段分别是什么呢? 那就看一下ChannelInitializer 到底实现了哪个接口不就行了?如下是ChannelInitializer 的类层次结构图:

  从类图中可以清楚地看到,ChannelInitializer 仅仅实现了ChannelInboundHandler 接口,因此这里实例化的DefaultChannelHandlerContext 的inbound = true,outbound = false。兜了一圈,不就是inbound 和outbound 两个字段嘛,为什么需要这么大费周折地分析一番?其实这两个字段关系到pipeline 的事件的流向与分类,因此是十分关键的,不过我在这里先卖个关子, 后面我们再来详细分析这两个字段所起的作用。至此, 我们暂时先记住一个结论:ChannelInitializer 所对应的DefaultChannelHandlerContext 的inbound =true,outbound = false。当创建好Context 之后,就将这个Context 插入到Pipeline 的双向链表中

 

private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = this.tail.prev;
        newCtx.prev = prev;
        newCtx.next = this.tail;
        prev.next = newCtx;
        this.tail.prev = newCtx;
    }

   添加完ChannelInitializer的Pipeline现在是长这样的:

 4、自定义ChannelHandler 的添加过程

  前面我们已经分析了ChannelInitializer 是如何插入到Pipeline 中的,接下来就来探讨ChannelInitializer 在哪里被调用,ChannelInitializer 的作用以及我们自定义的ChannelHandler 是如何插入到Pipeline 中的。先简单复习一下Channel 的注册过程:

  1. 首先在AbstractBootstrap 的initAndRegister()中,通过group().register(channel),调用MultithreadEventLoopGroup 的register()方法。
  2. 在MultithreadEventLoopGroup 的register()中调用next()获取一个可用的SingleThreadEventLoop,然后调用它的register()方法。
  3. 在SingleThreadEventLoop 的register()方法中,通过channel.unsafe().register(this, promise)方法获取channel的unsafe()底层IO 操作对象,然后调用它的register()。
  4. 在AbstractUnsafe 的register()方法中,调用register0()方法注册Channel 对象。
  5. 在AbstractUnsafe 的register0()方法中,调用AbstractNioChannel 的doRegister()方法。
  6. AbstractNioChannel 的doRegister()方法调用javaChannel().register(eventLoop().selector, 0, this)将Channel对应的Java NIO 的SockerChannel 对象注册到一个eventLoop 的Selector 中,并且将当前Channel 作为attachment。

  而我们自定义ChannelHandler 的添加过程,发生在AbstractUnsafe 的register0()方法中,在这个方法中调用了pipeline.fireChannelRegistered()方法,其代码实现如下:

 

private void register0(ChannelPromise promise) {
        boolean firstRegistration = this.neverRegistered;
        AbstractChannel.this.doRegister();
        this.neverRegistered = false;
        AbstractChannel.this.registered = true;
        AbstractChannel.this.pipeline.invokeHandlerAddedIfNeeded();
        this.safeSetSuccess(promise);
        AbstractChannel.this.pipeline.fireChannelRegistered();
}
public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(this.head);
        return this;
}

 

再看AbstractChannelHandlerContext 的invokeChannelRegistered()方法:

 

static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRegistered();
        } else {
            executor.execute(new Runnable() {
                public void run() {
                    next.invokeChannelRegistered();
                }
            });
        }
    }

 

  很显然,这个代码会从head 开始遍历Pipeline 的双向链表,然后 findContextInbound()  找到第一个属性inbound 为true 的ChannelHandlerContext 实例。看代码:

 

public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            DefaultChannelPipeline.this.invokeHandlerAddedIfNeeded();
            ctx.fireChannelRegistered();
}
public ChannelHandlerContext fireChannelRegistered() {
        invokeChannelRegistered(this.findContextInbound());
        return this;
}

 

  想起来了没?我们在前面分析ChannelInitializer 时,花了大量的篇幅来分析了inbound和outbound 属性,现在这里就用上了。回想一下,ChannelInitializer 实现了ChannelInboudHandler,因此它所对应的ChannelHandlerContext 的inbound 属性就是true,因此这里返回就是ChannelInitializer 实例所对应的ChannelHandlerContext 对象,如下图所示:

  当获取到inbound 的Context 后,就调用它的invokeChannelRegistered()方法:

 

private void invokeChannelRegistered() {
        if (this.invokeHandler()) {
            try {
                ((ChannelInboundHandler)this.handler()).channelRegistered(this);
            } catch (Throwable var2) {
                this.notifyHandlerException(var2);
            }
        } else {
            this.fireChannelRegistered();
        }
}

 

  我们已经知道,每个ChannelHandler 都和一个ChannelHandlerContext 关联,我们可以通过ChannelHandlerContext获取到对应的ChannelHandler。因此很明显,这里handler()返回的对象其实就是一开始我们实例化的ChannelInitializer 对象,并接着调用了ChannelInitializer 的channelRegistered()方法。看到这里, 应该会觉得有点眼熟了。ChannelInitializer 的channelRegistered()这个方法我们在一开始的时候已经接触到了,但是我们并没有深入地分析这个方法的调用过程。下面我们来看这个方法中到底有什么玄机,继续看代码:

 

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
  protected abstract void initChannel(C ch) throws Exception;
    @Override
    @SuppressWarnings("unchecked")
    public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        if (initChannel(ctx)) {
            ctx.pipeline().fireChannelRegistered();
            removeState(ctx);
        } else {
            ctx.fireChannelRegistered();
        }
    }private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }
}

 

  initChannel((C) ctx.channel())这个方法我们也很熟悉,它就是我们在初始化Bootstrap 时,调用handler 方法传入的匿名内部类所实现的方法:

protected void initChannel(SocketChannel ch) throws Exception {
      ChannelPipeline pipeline = ch.pipeline();
      pipeline.addLast("handler", new MyClient());
}

  因此,当调用这个方法之后, 我们自定义的ChannelHandler 就插入到了Pipeline,此时Pipeline 的状态如下图所示:

 

  当添加完成自定义的ChannelHandler 后,在finally 代码块会删除自定义的ChannelInitializer,也就是remove(ctx)最终调用ctx.pipeline().remove(this),因此最后的Pipeline 的状态如下:(这里图有误!

  至此,自定义ChannelHandler 的添加过程也分析得差不多了。

5、ChannelHandler 默认命名规则

  不知道大家注意到没有,pipeline.addXXX 都有一个重载的方法,例如addLast()它有一个重载的版本是:ChannelPipeline addLast(String name, ChannelHandler handler);第一个参数指定添加的handler 的名字(更准确地说是ChannelHandlerContext 的名字,说成handler 的名字更便于理解)。那么handler 的名字有什么用呢?如果我们不设置name,那么handler 默认的名字是怎样呢?带着这些疑问,我们依旧还是去源码中找到答案。还是以addLast()方法为例:

public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        return addLast(null, name, handler);
    }

  这个方法会调用重载的addLast()方法:

 

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);return this;
    }

 

  第一个参数被设置为null,我们不用关心它。第二参数就是这个handler 的名字。看代码可知,在添加一个handler之前,需要调用checkMultiplicity()方法来确定新添加的handler 名字是否与已添加的handler 名字重复。

  如果我们调用的是如下的addLast()方法:ChannelPipeline addLast(ChannelHandler... handlers);那么Netty 就会调用generateName()方法为新添加的handler 自动生成一个默认的名字:

 

private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
            return generateName(handler);
        }
        checkDuplicateName(name);
        return name;
    }
private String generateName(ChannelHandler handler) {
    Map<Class<?>, String> cache = nameCaches.get();
    Class<?> handlerType = handler.getClass();
    String name = cache.get(handlerType);
    if (name == null) {
        name = generateName0(handlerType);
        cache.put(handlerType, name);
    }
    if (context0(name) != null) {
        String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
        for (int i = 1;; i ++) {
            String newName = baseName + i;
            if (context0(newName) == null) {
                name = newName;
                break;
            }
        }
    }
    return name;
}

  而generateName()方法会接着调用generateName0()方法来实际生成一个新的handler 名字

private static String generateName0(Class<?> handlerType) {
        return StringUtil.simpleClassName(handlerType) + "#0";
}

  默认命名的规则很简单,就是用反射获取handler 的simpleName 加上"#0",因此我们自定义ChatClientHandler 的名字就是"ChatClientHandler#0"。

6、Pipeline 的事件传播机制

  前面章节中,我们已经知道AbstractChannelHandlerContext 中有inbound 和outbound 两个boolean 变量,分别用于标识Context 所对应的handler 的类型,即:

  1. inbound 为true 是,表示其对应的ChannelHandler 是ChannelInboundHandler 的子类。
  2. outbound 为true 时,表示对应的ChannelHandler 是ChannelOutboundHandler 的子类。

  这里大家肯定还有很多疑惑,不知道这两个字段到底有什么作用? 这还要从ChannelPipeline 的事件传播类型说起。Netty 中的传播事件可以分为两种:Inbound 事件和Outbound 事件。如下是从Netty 官网针对这两个事件的说明:

  从上图可以看出,inbound 事件和outbound 事件的流向是不一样的,inbound 事件的流行是从下至上,而outbound刚好相反,是从上到下。并且inbound 的传递方式是通过调用相应的ChannelHandlerContext.fireIN_EVT()方法,而outbound 方法的的传递方式是通过调用ChannelHandlerContext.OUT_EVT()方法。例如:ChannelHandlerContext的fireChannelRegistered()调用会发送一个ChannelRegistered 的inbound 给下一个ChannelHandlerContext,而ChannelHandlerContext 的bind()方法调用时会发送一个bind 的outbound 事件给下一个ChannelHandlerContext。

    Inbound 事件传播方法有:

 

public interface ChannelInboundHandler extends ChannelHandler {
    void channelRegistered(ChannelHandlerContext var1) throws Exception;

    void channelUnregistered(ChannelHandlerContext var1) throws Exception;

    void channelActive(ChannelHandlerContext var1) throws Exception;

    void channelInactive(ChannelHandlerContext var1) throws Exception;

    void channelRead(ChannelHandlerContext var1, Object var2) throws Exception;

    void channelReadComplete(ChannelHandlerContext var1) throws Exception;

    void userEventTriggered(ChannelHandlerContext var1, Object var2) throws Exception;

    void channelWritabilityChanged(ChannelHandlerContext var1) throws Exception;

    void exceptionCaught(ChannelHandlerContext var1, Throwable var2) throws Exception;
}

 

  Outbound 事件传播方法有:

 

public interface ChannelOutboundHandler extends ChannelHandler {
    void bind(ChannelHandlerContext var1, SocketAddress var2, ChannelPromise var3) throws Exception;

    void connect(ChannelHandlerContext var1, SocketAddress var2, SocketAddress var3, ChannelPromise var4) throws Exception;

    void disconnect(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void close(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void deregister(ChannelHandlerContext var1, ChannelPromise var2) throws Exception;

    void read(ChannelHandlerContext var1) throws Exception;

    void write(ChannelHandlerContext var1, Object var2, ChannelPromise var3) throws Exception;

    void flush(ChannelHandlerContext var1) throws Exception;
}

  大家应该发现了规律:inbound 类似于是事件回调(响应请求的事件),而outbound 类似于主动触发(发起请求的事件)。注意,如果我们捕获了一个事件,并且想让这个事件继续传递下去,那么需要调用Context 对应的传播方法 fireXXX,例如:

 

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
  @Override
  public void channelActive(ChannelHandlerContext ctx) throws Exception {
    System.out.println("连接成功");
    ctx.fireChannelActive();
  }
}

6.1 Outbound 事件传播方式

  Outbound 事件都是请求事件(request event),即请求某件事情的发生,然后通过Outbound 事件进行通知。Outbound 事件的传播方向是tail -> customContext -> head。我们接下来以connect 事件为例,分析一下Outbound 事件的传播机制。首先,当用户调用了Bootstrap 的connect()方法时,就会触发一个Connect 请求事件,我们就发现AbstractChannel 的connect()其实由调用了DefaultChannelPipeline 的connect()方法:

public ChannelFuture connect(SocketAddress remoteAddress) {
    return pipeline.connect(remoteAddress);
}

  而pipeline.connect()方法的实现如下:

public final ChannelFuture connect(SocketAddress remoteAddress) {
        return tail.connect(remoteAddress);
    }

  可以看到,当outbound 事件(这里是connect 事件)传递到Pipeline 后,它其实是以tail 为起点开始传播的。而tail.connect()其实调用的是AbstractChannelHandlerContext 的connect()方法

 

public ChannelFuture connect(
            final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {

        final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeConnect(remoteAddress, localAddress, promise);return promise;
    }

 

  findContextOutbound()方法顾名思义,它的作用是以当前Context 为起点,向Pipeline 中的Context 双向链表的前端寻找第一个outbound 属性为true 的Context(即关联ChannelOutboundHandler 的Context),然后返回。findContextOutbound()方法代码实现如下:

 

private AbstractChannelHandlerContext findContextOutbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while ((ctx.executionMask & mask) == 0);
        return ctx;
    }

 

  当我们找到了一个outbound 的Context 后,就调用它的invokeConnect()方法,这个方法中会调用Context 其关联的ChannelHandler 的connect()方法

 

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        } else {
            connect(remoteAddress, localAddress, promise);
        }
    }

 

  如果用户没有重写ChannelHandler 的connect()方法,那么会调用ChannelOutboundHandlerAdapter 的connect()实现

public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.connect(remoteAddress, localAddress, promise);
    }

  我们看到,ChannelOutboundHandlerAdapter 的connect()仅仅调用了ctx.connect(),而这个调用又回到了:Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect这样的循环中,直到connect 事件传递到DefaultChannelPipeline 的双向链表的头节点,即head 中。为什么会传递到head 中呢?回想一下,head 实现了ChannelOutboundHandler,因此它的outbound 属性是true。因为head 本身既是一个ChannelHandlerContext,又实现了ChannelOutboundHandler 接口,因此当connect()消息传递到head 后,会将消息转递到对应的ChannelHandler 中处理,而head 的handler()方法返回的就是head 本身:

public ChannelHandler handler() {
      return this;
}

  因此最终connect()事件是在head 中被处理。head 的connect()事件处理逻辑如下:

public void connect(
                ChannelHandlerContext ctx,
                SocketAddress remoteAddress, SocketAddress localAddress,
                ChannelPromise promise) {
            unsafe.connect(remoteAddress, localAddress, promise);
        }

  到这里, 整个connect()请求事件就结束了。下图中描述了整个connect()请求事件的处理过程:

  我们仅仅以connect()请求事件为例,分析了outbound 事件的传播过程,但是其实所有的outbound 的事件传播都遵循着一样的传播规律,小伙伴们可以试着分析一下其他的outbound 事件,体会一下它们的传播过程。

6.2 Inbound 事件传播方式

  Inbound 事件和Outbound 事件的处理过程是类似的,只是传播方向不同。Inbound 事件是一个通知事件,即某件事已经发生了,然后通过Inbound 事件进行通知。Inbound 通常发生在Channel的状态的改变或IO 事件就绪。Inbound 的特点是它传播方向是head -> customContext -> tail。上面我们分析了connect()这个Outbound 事件,那么接着分析connect()事件后会发生什么Inbound 事件,并最终找到Outbound 和Inbound 事件之间的联系。当connect()这个Outbound 传播到unsafe 后,其实是在AbstractNioUnsafe的connect()方法中进行处理的:

 

public final void connect(
                final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
       boolean wasActive = isActive();
       if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
       } else {
       }
}

 

  在AbstractNioUnsafe 的connect()方法中,首先调用doConnect()方法进行实际上的Socket 连接,当连接上后会调用fulfillConnectPromise()方法

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            if (!wasActive && active) {
                pipeline().fireChannelActive();
            }
        }

  我们看到,在fulfillConnectPromise()中,会通过调用pipeline().fireChannelActive()方法将通道激活的消息(即Socket 连接成功)发送出去。而这里,当调用pipeline.fireXXX 后,就是Inbound 事件的起点。因此当调用pipeline().fireChannelActive()后,就产生了一个ChannelActive Inbound 事件,我们就从这里开始看看这个Inbound事件是怎么传播的?

public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }

  果然, 在fireChannelActive()方法中,调用的是head.invokeChannelActive(),因此可以证明Inbound 事件在Pipeline中传输的起点是head。那么,在head.invokeChannelActive()中又做了什么呢?

 

static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }

 

  接下去的调用流程是:

private void invokeChannelActive() {
        if (this.invokeHandler()) {
            try {
                ((ChannelInboundHandler)this.handler()).channelActive(this);
            } catch (Throwable var2) {
                this.notifyHandlerException(var2);
            }
        } else {
            this.fireChannelActive();
        }

}
public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();
            this.readIfIsAutoRead();
        }
public ChannelHandlerContext fireChannelActive() {
        AbstractChannelHandlerContext next = this.findContextInbound();
        invokeChannelActive(next);
        return this;
    }

 

  上面的代码应该很熟悉了。回想一下在Outbound 事件(例如connect()事件)的传输过程中时,我们也有类似的操作:

  1. 首先调用findContextInbound(),从Pipeline 的双向链表中中找到第一个属性inbound 为true 的Context,然后将其返回。
  2. 调用Context 的invokeChannelActive()方法.

  invokeChannelActive()方法源码如下:

 

private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }

 

  这个方法和Outbound 的对应方法(如:invokeConnect()方法)如出一辙。与Outbound 一样,如果用户没有重写channelActive() 方法,那就会调用ChannelInboundHandlerAdapter 的channelActive()方法:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

  同样地, 在ChannelInboundHandlerAdapter 的channelActive()中,仅仅调用了ctx.fireChannelActive()方法,因此就会进入Context.fireChannelActive() -> Connect.findContextInbound() -> nextContext.invokeChannelActive() ->nextHandler.channelActive() -> nextContext.fireChannelActive()这样的循环中。同理,tail 本身既实现了ChannelInboundHandler 接口,又实现了ChannelHandlerContext 接口,因此当channelActive()消息传递到tail 后,会将消息转递到对应的ChannelHandler 中处理,而tail 的handler()返回的就是tail 本身:

public ChannelHandler handler() {
            return this;
        }

  因此channelActive Inbound 事件最终是在tail 中处理的,我们看一下它的处理方法:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
        }

  TailContext 的channelActive()方法是空的。如果大家自行查看TailContext 的Inbound 处理方法时就会发现,它们的实现都是空的。可见,如果是Inbound,当用户没有实现自定义的处理器时,那么默认是不处理的。下图描述了Inbound事件的传输过程:

6.3 Pipeline 事件传播小结

  Outbound 事件总结:

  • Outbound 事件是请求事件(由connect()发起一个请求,并最终由unsafe 处理这个请求)。
  • Outbound 事件的发起者是Channel。
  • Outbound 事件的处理者是unsafe。
  • Outbound 事件在Pipeline 中的传输方向是tail -> head。
  • 在ChannelHandler 中处理事件时,如果这个Handler 不是最后一个Handler,则需要调用ctx 的方法(如:ctx.connect()方法)将此事件继续传播下去。如果不这样做,那么此事件的传播会提前终止。
  • Outbound 事件流:Context.OUT_EVT() -> Connect.findContextOutbound() -> nextContext.invokeOUT_EVT()-> nextHandler.OUT_EVT() -> nextContext.OUT_EVT()

  

Inbound 事件总结:

  • Inbound 事件是通知事件,当某件事情已经就绪后,通知上层。
  • Inbound 事件发起者是unsafe。
  • Inbound 事件的处理者是Channel,如果用户没有实现自定义的处理方法,那么Inbound 事件默认的处理者是TailContext,并且其处理方法是空实现。Inbound 事件在Pipeline 中传输方向是head -> tail。
  • 在ChannelHandler 中处理事件时,如果这个Handler 不是最后一个Handler,则需要调用ctx.fireIN_EVT()事件(如:ctx.fireChannelActive()方法)将此事件继续传播下去。如果不这样做,那么此事件的传播会提前终止。
  • Outbound 事件流:Context.fireIN_EVT() -> Connect.findContextInbound() -> nextContext.invokeIN_EVT() ->nextHandler.IN_EVT() -> nextContext.fireIN_EVT().

  outbound 和inbound 事件设计上十分相似,并且Context 与Handler 直接的调用关系也容易混淆,因此我们在阅读这里的源码时,需要特别的注意。

7、Handler 的各种姿势

  ChannelHandlerContext

  每个ChannelHandler 被添加到ChannelPipeline 后,都会创建一个ChannelHandlerContext 并与之创建的ChannelHandler 关联绑定。ChannelHandlerContext 允许ChannelHandler 与其他的ChannelHandler 实现进行交互。ChannelHandlerContext 不会改变添加到其中的ChannelHandler,因此它是安全的。下图描述了ChannelHandlerContext、ChannelHandler、ChannelPipeline 的关系:

 

8、Channel 的生命周期:

  Netty 有一个简单但强大的状态模型,并完美映射到ChannelInboundHandler 的各个方法。下面是Channel 生命周期中四个不同的状态:

  1. channelUnregistered() Channel已创建,还未注册到一个EventLoop上
  2. channelRegistered() Channel已经注册到一个EventLoop上
  3. channelActive() Channel是活跃状态(连接到某个远端),可以收发数据
  4. channelInactive() Channel未连接到远端

  一个Channel 正常的生命周期如下图所示。随着状态发生变化相应的事件产生。这些事件被转发到ChannelPipeline中的ChannelHandler 来触发相应的操作。

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Netty源码------Pipeline详细分析 的相关文章

  • HTTP 状态 404 - 请求的资源不可用

    在使用 MyEclipse IDE 中的 Tomcat 服务器和 Struts 2 框架时 我遇到了反复出现的问题 我将我的程序作为服务器应用程序运行 当它运行时 默认的index jsp 文件将成功打开 但应用程序的其他过去都不起作用 当
  • Google 地图查询返回的 JSON 包含像 \x26 这样的编码字符(如何解码?)

    在 Java 应用程序中 我获取 JSON 来自 Google 地图 其中包含以下字符 x26我想将其转换为其原始字符 据我所知 这是一个 UTF 8 表示法 但我不完全确定 在源 JSON 中 可能会出现各种编码字符 例如 x3c div
  • 检查双精度值的等于和不等于条件

    我在比较两者时遇到困难double values using and 我创建了 6 个双变量并尝试进行比较If健康 状况 double a b c d e f if a b c d e f My code here in case of t
  • Android - 除了普通 SSL 证书之外还验证自签名证书

    我有一个通过 SSL 调用 Web 服务的 Android 应用程序 在生产中 我们将拥有由受信任的 CA 签名的普通 SSL 证书 但是 我们需要能够支持自签名证书 由我们自己的 CA 签名 我已经成功实施了接受自签名证书的建议解决方案
  • 服务器到 Firebase HTTP POST 结果为响应消息 200

    使用 Java 代码 向下滚动查看 我使用 FCM 向我的 Android 发送通知消息 当提供正确的服务器密钥令牌时 我收到如下所示的响应消息 之后从 FCM 收到以下响应消息 Response 200 Success Message m
  • 使用 Java 在浏览器中下载 CSV 文件

    我正在尝试在 Web 应用程序上添加一个按钮 单击该按钮会下载一个 CSV 文件 该文件很小 大小仅约 4KB 我已经制作了按钮并附加了一个侦听器 文件也准备好了 我现在唯一需要做的就是创建单击按钮时下载 csv 文件的实际事件 假设 fi
  • 所有junit测试后的清理

    在我的项目中 我必须在所有测试之前进行一些存储库设置 这是使用一些棘手的静态规则来完成的 然而 在所有测试之后我不知道如何进行清理 我不想保留一些神奇的静态数字来引用所有测试方法的数量 我应该一直维护它 最受赞赏的方法是添加一些侦听器 该侦
  • 想要开发像 Facebook 这样的网站 - 处理数百万个请求 - 高性能 [关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我想用 Java 开发一个像 Fac
  • 尝试在没有 GatewayIntent 的情况下访问消息内容

    我希望每当我写一条打招呼的消息时 机器人都会在控制台中响应一条消息 但它只是给我一个错误 JDA MainWS ReadThread WARN JDA Attempting to access message content without
  • 参数动态时如何构建 JPQL 查询?

    我想知道是否有一个好的解决方案来构建基于过滤器的 JPQL 查询 我的查询太 富有表现力 我无法使用 Criteria 就像是 query Select from Ent if parameter null query WHERE fiel
  • 如何在java中使jpeg无损?

    有没有人可以告诉我如何使用编写 jpeg 文件losslessjava中的压缩 我使用下面的代码读取字节来编辑字节 WritableRaster raster image getRaster DataBufferByte buffer Da
  • tomcat 过滤所有 web 应用程序

    问题 我想对所有网络应用程序进行过滤 我创建了一个过滤器来监视对 apache tomcat 服务器的请求 举例来说 它称为 MyFilter 我在 netbeans 中创建了它 它创建了 2 个独立的目录 webpages contain
  • 从 Java 日历迁移到 Joda 日期时间

    以前 当我第一次设计股票应用相关软件时 我决定使用java util Date表示股票的日期 时间信息 后来我体会到了大部分方法java util Date已弃用 因此 很快 我重构了所有代码以利用java util Calendar 然而
  • 让JScrollPane控制多个组件

    对于我的应用程序 我正在设计一个脚本编辑器 目前我有一个JPanel其中包含另一个JPanel保存行号 位于左侧 以及JTextArea用于允许用户输入代码 位于右侧 目前 我已经实施了JScrollPane on the JTextAre
  • 不兼容的类型:在 java netbeans 中对象无法转换为 String

    我试图在我的项目中使用对象数组 但出现错误 incompatible types Object cannot be converted to String 在这一行 ST1 new String emt1 emt2 emt3 emt4 现在
  • 如何在android sdk上使用PowerMock

    我想为我的 android 项目编写一些单元测试和仪器测试 然而 我遇到了一个困扰我一段时间的问题 我需要模拟静态方法并伪造返回值来测试项目 经过一些论坛的调查 唯一的方法是使用PowerMock来模拟静态方法 这是我的 gradle 的一
  • 来自客户端的超时 Web 服务调用

    我正在使用 RestEasy 客户端调用网络服务 一项要求是 如果调用运行时间超过 5 秒 则中止 超时调用 我如何使用 RestEasy 客户端实现这一目标 我只看到服务器端超时 即如果在一定时间内未完成请求 Rest Easy 网络服务
  • Spock模拟inputStream导致无限循环

    我有一个代码 gridFSFile inputStream bytes 当我尝试这样测试时 given def inputStream Mock InputStream def gridFSDBFile Mock GridFSDBFile
  • Spring Boot MSSQL Kerberos 身份验证

    目前在我的春季靴子中application properties文件中 我指定以下行来连接到 MSSql 服务器 spring datasource url jdbc sqlserver localhost databaseName spr
  • 尝试使用带有有效购买令牌的 Java Google Play Developer API v3 检索应用内购买信息时出现错误请求(无效值)

    当使用 Java Google Play Developer API 版本 3 并请求有效购买令牌的购买信息时 我收到以下异常 API 调用返回 400 Bad Request 响应以及以下消息 code 400 errors domain

随机推荐

  • OMP算法笔记

    OMP算法笔记 OMP算法整理 以备自己后期查阅 集合了几篇博主的文章 1 数理知识基础 投影矩阵 详见 作者 nineheaded bird 来源 CSDN 原文 https blog csdn net tengweitw article
  • sudo java not found_ubuntu终端sudo java提示“command not found”解决办法

    我在ubuntu 12 04里想启动一个java程序 sudo java jar xxx jar 但是结果提示sudo java command not found Ubuntu下用sudo运行java程序时 要注意此时用户目录为 root
  • python单元测试:unittest

    1 unittest 简介 unittest是python内置的用于测试代码的模块 无需安装 使用简单方便 unittest case的运行流程 写好一个完整的TestCase 多个TestCase 由TestLoder被加载到TestSu
  • 【雷达原理】基本雷达方程的推导

    基本雷达方程 一 研究目的 二 推导过程 1 基本雷达方程常用的表达形式 2 计算案例 3 仿真代码 参考文献 一 研究目的 雷达方程定量地描述了作用距离与雷达参数及目标特性之间的关系 研究雷达方程主要有以下作用 1 根据雷达参数来估算雷达
  • Flutter CustomScrollView要点

    在上篇文章中我们学了SingleChildScrollView这个滑动控件 现在我们学习一下CustomScrollView这个控件 CustomScrollView这个控件是针对多个滚动布局的组件 比如顶部一个GridView 底部又来一
  • pycharm连接远程服务器,报错No such file or directory

    参考文章 pycharm远程连接服务器运行代码加调试 解决 1 2 是因为映射路径与服务器上执行文件的路径不符导致的上述错误 在第二步中将Deployment path 改为服务器上要执行文件的相应路径即可
  • tsconfig.json编译选项配置说明

    incremental 增量编译 默认在 composite为true的为true 否则false 语言和环境 target 编译目标 默认 es3 reactNamespace react的命名空间 默认React lib TypeScr
  • 时钟抖动(Clock Jitter)和时钟偏斜(Clock Skew)

    系统时序设计中对时钟信号的要求是非常严格的 因为我们所有的时序计算都是以恒定的时钟信号为基准 但实际中时钟信号往往不可能总是那么完美 会出现抖动 Jitter 和偏移 Skew 问题 所谓抖动 jitter 就是指两个时钟周期之间存在的差值
  • TMS系统

    运输管理系统 Transportation Management System 英文缩写 TMS 是一种 供应链 分组下的 基于网络的 操作软件 它能通过多种方法和其他相关的操作一起提高物流的管理能力 包括管理装运单位 指定企业内 国内和国
  • OpenCV-Python 图像平滑处理2:blur函数及滤波案例

    一 图像平滑处理简介 图像平滑处理的基本概念非常直观 它使用滤波器模板确定的邻域内像素的平均 加权平均灰度值代替图像中每个像素的值 平滑线处理滤波器也称均值滤波器 所有系数都相等 非加权平均 的空间均值滤波器也称为盒状滤波器 在 OpenC
  • Docker不香吗?为什么还要用k8s

    本文禁止转载 目录 容器化时代来了 容器化技术的尖刀武器 Docker 横空出世 Docker怎么用 编排系统的需求催生 k8s k8s与Docker Swarm江湖恩怨 k8s是做什么用的 K8s 架构和组件 Docker与k8s 难舍难
  • oracle数据库中有哪些时间类型?

    1 date类型 用于存储 年月日时分秒 在数据库中 固定存储为7个字节 2 timestamp类型 除了保存 年月日时分秒 还有小数秒 小数秒默认6位 例如 timestamp 6 表示秒的小数点后面 可以存储6位数字 3 区别 通常在数
  • 【论文评审】怎样审稿?

    文章目录 说明 翻译 整理后的内容 英文原版内容 说明 本博客素材来源于网址 http web njit edu bieber review html 内容为英文 可以打开上述网页后在网站上翻译着看 本博客对上述网页内容进行翻译整理 翻译
  • 浅谈C语言中断处理机制

    一 中断机制 1 实现中断响应和中断返回 当CPU收到中断请求后 能根据具体情况决定是否响应中断 如果CPU没有更急 更重要的工作 则在执行完当前指令后响应这一中断请求 CPU中断响应过程如下 首先 将断点处的PC值 即下一条应执行指令的地
  • 查找 综述类文献

    转自http blog sina com cn s blog 4ded56360100wlwo html 查找 综述类文献 的方法有三种 第一种 直接用内容关键字 overview survey 这样的词 在各大数据库 google中直接搜
  • (转)Spring项目整合MyBatis-Plus

    目录 1 简介 2 项目整合 2 1 MyBatis的配置 2 2 MyBatis Plus配置 1 简介 MyBatis Plus 是基于Mybatis的一个工具 它是对MyBatis的增强 在其基础上只进行增强 不进行改变 是为了简化M
  • 直流无刷电机(BLDC)的相电流和反电动势是同相的吗?

    在直流无刷电机中 电机转子上的永磁体与定子上的线圈之间存在磁场相互作用 当定子上的线圈被通电时 会产生一个旋转磁场 这个旋转磁场与转子上的永磁体磁场相互作用 从而使得转子开始旋转 同时 在直流无刷电机中 当转子旋转时 会在定子上的线圈中感应
  • ElementUI给表格加上输入框和单位

    关键代码 可以使用判断的语句进行判断添加
  • 垃圾回收机制

    Garbage Collection简称为GC 垃圾回收 工作原理 当程序员创建对象时 GC就开始监视这个对象地址 大小以及使用情况 通常GC采用有向图的方式记录并管理堆中的对象 通过这种方式确定哪些对象是可达的 哪些是不可达的 当确定对象
  • Netty源码------Pipeline详细分析

    Netty源码 Pipeline详细分析 目录 Netty源码 Pipeline详细分析 1 Channel 与ChannelPipeline 2 再探ChannelPipeline 的初始化 3 ChannelInitializer 的添