Netty入门-Channel

2023-11-03

目录

Channel详解

Channel的特点

Channel接口方法

ChannelOutboundInvoker接口

AttributeMap接口

ChannelHandler接口

ChannelInboundHandler接口

ChannelOutboundHandler接口

ChannelHandlerAdapter

ChannelInboundHandlerAdapor

ChannelOutboundHandlerAdapter

适配器的作用

ChannelPipeline接口

创建ChannelPipeline

ChannelHandlerContext接口


Channel详解

        Channel代表网络socket或能够进行IO操作的组件的连接关系。这些IO操作包括读、写、连接和绑定。Netty中的Channel提供了如下功能:

  • 查询当前Channel的状态。例如,是打开还是已连接状态等。
  • 提供Channel的参数配置。如接收缓冲区大小。
  • 提供支持的IO操作。如读、写连接和绑定
  • 提供ChannelPipeline。ChannelPipelin用于处理所有与Channel关联的IO事件和请求。

Channel的特点

1. 所有IO操作都是异步的

IO调用将立即返回,返回一个ChannelFuture实例。

2. Channel是分层的

3. 向下转型以下访问特定于传输的操作

4. 释放资源

Channel接口方法

public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
    ChannelId id();
    EventLoop eventLoop();
    Channel parent();
    ChannelConfig config();
    boolean isOpen();
    boolean isRegistered();
    boolean isActive();
    ChannelMetadata metadata();
    SocketAddress localAddress();
    SocketAddress remoteAddress();
    ChannelFuture closeFuture();
    boolean isWritable();
    long bytesBeforeUnwritable();
    long bytesBeforeWritable();
    Channel.Unsafe unsafe();
    ChannelPipeline pipeline();
    ByteBufAllocator alloc();
    Channel read();
    Channel flush();
    public interface Unsafe {
        Handle recvBufAllocHandle();
        SocketAddress localAddress();
        SocketAddress remoteAddress();
        void register(EventLoop var1, ChannelPromise var2);
        void bind(SocketAddress var1, ChannelPromise var2);
        void connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);
        void disconnect(ChannelPromise var1);
        void close(ChannelPromise var1);
        void closeForcibly();
        void deregister(ChannelPromise var1);
        void beginRead();
        void write(Object var1, ChannelPromise var2);
        void flush();
        ChannelPromise voidPromise();
        ChannelOutboundBuffer outboundBuffer();
    }
}
  • id()方法返回全局唯一的ChannelId
  • eventLoop()方法返回分配给该Channel的EventLoop,一个EventLoop就是一个线程,用来处理连接的生命周期中所发生的事件
  • parent()方法返回该Channel的父Channel
  • config()方法返回该Channel的ChannelConfig,其中包含了该Channel的所有配置设置支持热更新
  • pipeline()方法返回该Channel对应的ChannelPipeline
  • alloc方法返回分配给该Channel的ByteBufAllocator,可以用来分配ByteBuf

ChannelOutboundInvoker接口

声明了所有出站的网络操作:

package io.netty.channel;

import java.net.SocketAddress;

public interface ChannelOutboundInvoker {
    ChannelFuture bind(SocketAddress var1);
    ChannelFuture connect(SocketAddress var1);
    ChannelFuture connect(SocketAddress var1, SocketAddress var2);
    ChannelFuture disconnect();
    ChannelFuture close();
    ChannelFuture deregister();
    ChannelFuture bind(SocketAddress var1, ChannelPromise var2);
    ChannelFuture connect(SocketAddress var1, ChannelPromise var2);
    ChannelFuture connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);
    ChannelFuture disconnect(ChannelPromise var1);
    ChannelFuture close(ChannelPromise var1);
    ChannelFuture deregister(ChannelPromise var1);
    ChannelOutboundInvoker read();
    ChannelFuture write(Object var1);
    ChannelFuture write(Object var1, ChannelPromise var2);
    ChannelOutboundInvoker flush();
    ChannelFuture writeAndFlush(Object var1, ChannelPromise var2);
    ChannelFuture writeAndFlush(Object var1);
    ChannelPromise newPromise();
    ChannelProgressivePromise newProgressivePromise();
    ChannelFuture newSucceededFuture();
    ChannelFuture newFailedFuture(Throwable var1);
    ChannelPromise voidPromise();
}

ChannelFuture用于获取异步的结果,ChannelPromise是对ChannelFuture的扩展,支持写的操作。ChannelPromise也被称为可写的ChannelFuture。

AttributeMap接口

package io.netty.util;
public interface AttributeMap {
    <T> Attribute<T> attr(AttributeKey<T> var1);
    <T> boolean hasAttr(AttributeKey<T> var1);
}

AttributeMap就是类似于Map的键值对,键就是AttributeKey类型,值是Attribute类型。

Netty提供了AttributeMap的默认实现类DefaultAttributeMap,与JDK中的ConcurrentHashMap相比,在高并发下DefaultAttributeMap可以更加节省内存。

package io.netty.util;

import io.netty.util.internal.ObjectUtil;

import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

/**
 * Default {@link AttributeMap} implementation which not exibit any blocking behaviour on attribute lookup while using a
 * copy-on-write approach on the modify path.<br> Attributes lookup and remove exibit {@code O(logn)} time worst-case
 * complexity, hence {@code attribute::set(null)} is to be preferred to {@code remove}.
 */
public class DefaultAttributeMap implements AttributeMap {

    private static final AtomicReferenceFieldUpdater<DefaultAttributeMap, DefaultAttribute[]> ATTRIBUTES_UPDATER =
            AtomicReferenceFieldUpdater.newUpdater(DefaultAttributeMap.class, DefaultAttribute[].class, "attributes");
    private static final DefaultAttribute[] EMPTY_ATTRIBUTES = new DefaultAttribute[0];

    /**
     * Similarly to {@code Arrays::binarySearch} it perform a binary search optimized for this use case, in order to
     * save polymorphic calls (on comparator side) and unnecessary class checks.
     */
    private static int searchAttributeByKey(DefaultAttribute[] sortedAttributes, AttributeKey<?> key) {
        int low = 0;
        int high = sortedAttributes.length - 1;

        while (low <= high) {
            int mid = low + high >>> 1;
            DefaultAttribute midVal = sortedAttributes[mid];
            AttributeKey midValKey = midVal.key;
            if (midValKey == key) {
                return mid;
            }
            int midValKeyId = midValKey.id();
            int keyId = key.id();
            assert midValKeyId != keyId;
            boolean searchRight = midValKeyId < keyId;
            if (searchRight) {
                low = mid + 1;
            } else {
                high = mid - 1;
            }
        }

        return -(low + 1);
    }

    private static void orderedCopyOnInsert(DefaultAttribute[] sortedSrc, int srcLength, DefaultAttribute[] copy,
                                            DefaultAttribute toInsert) {
        // let's walk backward, because as a rule of thumb, toInsert.key.id() tends to be higher for new keys
        final int id = toInsert.key.id();
        int i;
        for (i = srcLength - 1; i >= 0; i--) {
            DefaultAttribute attribute = sortedSrc[i];
            assert attribute.key.id() != id;
            if (attribute.key.id() < id) {
                break;
            }
            copy[i + 1] = sortedSrc[i];
        }
        copy[i + 1] = toInsert;
        final int toCopy = i + 1;
        if (toCopy > 0) {
            System.arraycopy(sortedSrc, 0, copy, 0, toCopy);
        }
    }

    private volatile DefaultAttribute[] attributes = EMPTY_ATTRIBUTES;

    @SuppressWarnings("unchecked")
    @Override
    public <T> Attribute<T> attr(AttributeKey<T> key) {
        ObjectUtil.checkNotNull(key, "key");
        DefaultAttribute newAttribute = null;
        for (;;) {
            final DefaultAttribute[] attributes = this.attributes;
            final int index = searchAttributeByKey(attributes, key);
            final DefaultAttribute[] newAttributes;
            if (index >= 0) {
                final DefaultAttribute attribute = attributes[index];
                assert attribute.key() == key;
                if (!attribute.isRemoved()) {
                    return attribute;
                }
                // let's try replace the removed attribute with a new one
                if (newAttribute == null) {
                    newAttribute = new DefaultAttribute<T>(this, key);
                }
                final int count = attributes.length;
                newAttributes = Arrays.copyOf(attributes, count);
                newAttributes[index] = newAttribute;
            } else {
                if (newAttribute == null) {
                    newAttribute = new DefaultAttribute<T>(this, key);
                }
                final int count = attributes.length;
                newAttributes = new DefaultAttribute[count + 1];
                orderedCopyOnInsert(attributes, count, newAttributes, newAttribute);
            }
            if (ATTRIBUTES_UPDATER.compareAndSet(this, attributes, newAttributes)) {
                return newAttribute;
            }
        }
    }

    @Override
    public <T> boolean hasAttr(AttributeKey<T> key) {
        ObjectUtil.checkNotNull(key, "key");
        return searchAttributeByKey(attributes, key) >= 0;
    }

    private <T> void removeAttributeIfMatch(AttributeKey<T> key, DefaultAttribute<T> value) {
        for (;;) {
            final DefaultAttribute[] attributes = this.attributes;
            final int index = searchAttributeByKey(attributes, key);
            if (index < 0) {
                return;
            }
            final DefaultAttribute attribute = attributes[index];
            assert attribute.key() == key;
            if (attribute != value) {
                return;
            }
            final int count = attributes.length;
            final int newCount = count - 1;
            final DefaultAttribute[] newAttributes =
                    newCount == 0? EMPTY_ATTRIBUTES : new DefaultAttribute[newCount];
            // perform 2 bulk copies
            System.arraycopy(attributes, 0, newAttributes, 0, index);
            final int remaining = count - index - 1;
            if (remaining > 0) {
                System.arraycopy(attributes, index + 1, newAttributes, index, remaining);
            }
            if (ATTRIBUTES_UPDATER.compareAndSet(this, attributes, newAttributes)) {
                return;
            }
        }
    }

    @SuppressWarnings("serial")
    private static final class DefaultAttribute<T> extends AtomicReference<T> implements Attribute<T> {

        private static final AtomicReferenceFieldUpdater<DefaultAttribute, DefaultAttributeMap> MAP_UPDATER =
                AtomicReferenceFieldUpdater.newUpdater(DefaultAttribute.class,
                                                       DefaultAttributeMap.class, "attributeMap");
        private static final long serialVersionUID = -2661411462200283011L;

        private volatile DefaultAttributeMap attributeMap;
        private final AttributeKey<T> key;

        DefaultAttribute(DefaultAttributeMap attributeMap, AttributeKey<T> key) {
            this.attributeMap = attributeMap;
            this.key = key;
        }

        @Override
        public AttributeKey<T> key() {
            return key;
        }

        private boolean isRemoved() {
            return attributeMap == null;
        }

        @Override
        public T setIfAbsent(T value) {
            while (!compareAndSet(null, value)) {
                T old = get();
                if (old != null) {
                    return old;
                }
            }
            return null;
        }

        @Override
        public T getAndRemove() {
            final DefaultAttributeMap attributeMap = this.attributeMap;
            final boolean removed = attributeMap != null && MAP_UPDATER.compareAndSet(this, attributeMap, null);
            T oldValue = getAndSet(null);
            if (removed) {
                attributeMap.removeAttributeIfMatch(key, this);
            }
            return oldValue;
        }

        @Override
        public void remove() {
            final DefaultAttributeMap attributeMap = this.attributeMap;
            final boolean removed = attributeMap != null && MAP_UPDATER.compareAndSet(this, attributeMap, null);
            set(null);
            if (removed) {
                attributeMap.removeAttributeIfMatch(key, this);
            }
        }
    }
}

ChannelHandler接口

package io.netty.channel;

import io.netty.util.Attribute;
import io.netty.util.AttributeKey;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;


public interface ChannelHandler {

    /**
     * Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
     */
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
     * anymore.
     */
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if a {@link Throwable} was thrown.
     *
     * @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and
     * implement the method there.
     */
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

    /**
     * Indicates that the same instance of the annotated {@link ChannelHandler}
     * can be added to one or more {@link ChannelPipeline}s multiple times
     * without a race condition.
     * <p>
     * If this annotation is not specified, you have to create a new handler
     * instance every time you add it to a pipeline because it has unshared
     * state such as member variables.
     * <p>
     * This annotation is provided for documentation purpose, just like
     * <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
     */
    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}


 Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in
 its ChannelPipeline

ChannelHandler本身没有提供什么方法,可以使用其子类:

  • ChannelInboundHandler:处理入站IO事件
  • ChannelOutboundHandler:处理出站IO事件
  • ChannelHandlerAdapter:采用适配器模式的ChannelHandler适配器

ChannelInboundHandler接口

package io.netty.channel;

/**
 * {@link ChannelHandler} which adds callbacks for state changes. This allows the user
 * to hook in to state changes easily.
 */
public interface ChannelInboundHandler extends ChannelHandler {

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
     */
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
     */
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} is now active
     */
    void channelActive(ChannelHandlerContext ctx) throws Exception;

    /**
     * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
     * end of lifetime.
     */
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    /**
     * Invoked when the current {@link Channel} has read a message from the peer.
     */
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    /**
     * Invoked when the last message read by the current read operation has been consumed by
     * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
     * attempt to read an inbound data from the current {@link Channel} will be made until
     * {@link ChannelHandlerContext#read()} is called.
     */
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if an user event was triggered.
     */
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    /**
     * Gets called once the writable state of a {@link Channel} changed. You can check the state with
     * {@link Channel#isWritable()}.
     */
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    /**
     * Gets called if a {@link Throwable} was thrown.
     */
    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}

ChannelOutboundHandler接口

package io.netty.channel;

import java.net.SocketAddress;


public interface ChannelOutboundHandler extends ChannelHandler {
    /**
     * Called once a bind operation is made.
     *
     * @param ctx           the {@link ChannelHandlerContext} for which the bind operation is made
     * @param localAddress  the {@link SocketAddress} to which it should bound
     * @param promise       the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception    thrown if an error occurs
     */
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a connect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the connect operation is made
     * @param remoteAddress     the {@link SocketAddress} to which it should connect
     * @param localAddress      the {@link SocketAddress} which is used as source on connect
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    /**
     * Called once a disconnect operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the disconnect operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a close operation is made.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Called once a deregister operation is made from the current registered {@link EventLoop}.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the close operation is made
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    /**
     * Intercepts {@link ChannelHandlerContext#read()}.
     */
    void read(ChannelHandlerContext ctx) throws Exception;

    /**
    * Called once a write operation is made. The write operation will write the messages through the
     * {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
     * {@link Channel#flush()} is called
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the write operation is made
     * @param msg               the message to write
     * @param promise           the {@link ChannelPromise} to notify once the operation completes
     * @throws Exception        thrown if an error occurs
     */
    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    /**
     * Called once a flush operation is made. The flush operation will try to flush out all previous written messages
     * that are pending.
     *
     * @param ctx               the {@link ChannelHandlerContext} for which the flush operation is made
     * @throws Exception        thrown if an error occurs
     */
    void flush(ChannelHandlerContext ctx) throws Exception;
}

ChannelHandlerAdapter

package io.netty.channel;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerMask.Skip;
import io.netty.util.internal.InternalThreadLocalMap;
import java.util.Map;

public abstract class ChannelHandlerAdapter implements ChannelHandler {
    boolean added;
    public ChannelHandlerAdapter() {
    }
    protected void ensureNotSharable() {
        if (this.isSharable()) {
            throw new IllegalStateException("ChannelHandler " + this.getClass().getName() + " is not allowed to be shared");
        }
    }
    public boolean isSharable() {
        Class<?> clazz = this.getClass();
        Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
        Boolean sharable = (Boolean)cache.get(clazz);
        if (sharable == null) {
            sharable = clazz.isAnnotationPresent(Sharable.class);
            cache.put(clazz, sharable);
        }
        return sharable;
    }
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    }
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    }
    /** @deprecated */
    @Skip
    @Deprecated
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

ChannelHandlerAdaptor常用的两个子类,分别是ChannelInboundHandlerAdapor、ChannelOutboundHandlerAdatper

ChannelInboundHandlerAdapor

package io.netty.channel;

import io.netty.channel.ChannelHandlerMask.Skip;

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
    public ChannelInboundHandlerAdapter() {
    }

    @Skip
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    @Skip
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();
    }

    @Skip
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

    @Skip
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Skip
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    @Skip
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }

    @Skip
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    @Skip
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }

    @Skip
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}

ChannelOutboundHandlerAdapter

package io.netty.channel;

import io.netty.channel.ChannelHandlerMask.Skip;
import java.net.SocketAddress;

public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
    public ChannelOutboundHandlerAdapter() {
    }
    @Skip
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.bind(localAddress, promise);
    }
    @Skip
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.connect(remoteAddress, localAddress, promise);
    }
    @Skip
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.disconnect(promise);
    }
    @Skip
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.close(promise);
    }
    @Skip
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.deregister(promise);
    }
    @Skip
    public void read(ChannelHandlerContext ctx) throws Exception {
        ctx.read();
    }
    @Skip
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }
    @Skip
    public void flush(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}

适配器的作用

使用适配器是因为适配器的子类不需要实现父类中的所有方法,按需覆盖适配器的方法即可。

ChannelPipeline接口

ChannelPipeline接口设计采用了责任链模式,底层采用双向链表的数据结构,将链上个各个处理器串联起来。

 *                                                 I/O Request
 *                                            via {@link Channel} or
 *                                        {@link ChannelHandlerContext}
 *                                                      |
 *  +---------------------------------------------------+---------------+
 *  |                           ChannelPipeline         |               |
 *  |                                                  \|/              |
 *  |    +---------------------+            +-----------+----------+    |
 *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  .               |
 *  |               .                                   .               |
 *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
 *  |        [ method call]                       [method call]         |
 *  |               .                                   .               |
 *  |               .                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  |               |                                  \|/              |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
 *  |    +----------+----------+            +-----------+----------+    |
 *  |              /|\                                  |               |
 *  +---------------+-----------------------------------+---------------+
 *                  |                                  \|/
 *  +---------------+-----------------------------------+---------------+
 *  |               |                                   |               |
 *  |       [ Socket.read() ]                    [ Socket.write() ]     |
 *  |                                                                   |
 *  |  Netty Internal I/O Threads (Transport Implementation)            |
 *  +-------------------------------------------------------------------+
public interface ChannelPipeline
        extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {

    ChannelPipeline addFirst(String name, ChannelHandler handler);

    ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);

    ChannelPipeline addLast(String name, ChannelHandler handler);

    ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);

    ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);

    ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

    ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);

    ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);

    ChannelPipeline addFirst(ChannelHandler... handlers);

    ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);

    ChannelPipeline addLast(ChannelHandler... handlers);

    ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);

    ChannelPipeline remove(ChannelHandler handler);

    ChannelHandler remove(String name);

    <T extends ChannelHandler> T remove(Class<T> handlerType);

    ChannelHandler removeFirst();

    ChannelHandler removeLast();

    ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);

    ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

    <T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
                                         ChannelHandler newHandler);

    ChannelHandler first();

    ChannelHandlerContext firstContext();

    ChannelHandler last();

    ChannelHandlerContext lastContext();

    ChannelHandler get(String name);

    <T extends ChannelHandler> T get(Class<T> handlerType);

    ChannelHandlerContext context(ChannelHandler handler);

    ChannelHandlerContext context(String name);

    ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);

    Channel channel();

    List<String> names();

    Map<String, ChannelHandler> toMap();

    @Override
    ChannelPipeline fireChannelRegistered();

    @Override
    ChannelPipeline fireChannelUnregistered();

    @Override
    ChannelPipeline fireChannelActive();

    @Override
    ChannelPipeline fireChannelInactive();

    @Override
    ChannelPipeline fireExceptionCaught(Throwable cause);

    @Override
    ChannelPipeline fireUserEventTriggered(Object event);

    @Override
    ChannelPipeline fireChannelRead(Object msg);

    @Override
    ChannelPipeline fireChannelReadComplete();

    @Override
    ChannelPipeline fireChannelWritabilityChanged();

    @Override
    ChannelPipeline flush();
}

创建ChannelPipeline

ChannelPipeline数据管道是与Channel通道绑定的,一个Channel通道对应一个ChannelPipeline,ChannelPipeline是在Channel初始化时被创建的。

ChannelHandlerContext接口

ChannelHandlerContext接口是联系ChannelHandler与其ChannelPipeline之间的纽带。

每当有ChannelHandler添加到ChannelPipeline中时,都会常见ChannelHandlerContext。ChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler之间的交互。例如,ChannelHandlerContext可以通知ChannelPipeline中的下一个ChannelHandler开始执行及动态修改其所属的ChannelPipeline。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Netty入门-Channel 的相关文章

  • 使用 TLS 1.2 将客户端连接到 TCP 服务器

    我尝试将设备连接到 Net 4 5 2 服务器 但没有成功 它是设备打开的 TCP 连接 使用 TLS 1 2 在服务器端 我有一个 TCP 服务器的标准 Net 实现 SslStream包裹着DotNetty https github c
  • Netty-无法访问类 jdk.internal.misc.Unsafe

    当我将 Java 从 8 升级到 11 时 我收到来自 Netty 的错误 jdk internal misc Unsafe 详细信息如下 我知道这是一条调试级别消息 我可以更改日志级别以忽略它 但我不确定当我忽略它时是否会出现其他问题 例
  • Spring Integration 通道统计指标

    不知怎的 我没有捕捉到 Spring Integration Metrics 内容 我想要的是关于每秒有多少消息通过消息通道 最小和最大吞吐量是多少的统计输出 如果我使用newTicketChannel getSendRate 然后我变成以
  • 每个连接的 Netty 多线程

    我是 Netty 新手 我想开发一个服务器 旨在接收来自可能少数 假设最多有 2 个 客户端的请求 但是每个客户端都会不断地向服务器发送许多请求 服务器必须处理此类请求并响应客户端 因此 在这里我假设即使我配置了多个工作线程 它也可能没有用
  • 泄漏:ByteBuf.release() 在被垃圾收集之前没有被调用。 Spring Reactor TcpServer

    我正在使用reactor core 1 1 0 RELEASE reactor net 1 1 0 RELEASE 正在使用netty all 4 0 18 Final reactor spring context 1 1 0 RELEAS
  • 直接通道使用与使用代理?

    正如标题所暗示的 我试图理解为什么在 WCF 中有时人们选择 生成代理 而不是使用 ChannelFactory 手动创建新的通道实例 我已经看过每一种的例子 但还没有真正找到任何解释为什么你会选择其中一种 老实说 我只与渠道合作过Chan
  • netty中非阻塞通道中的SO_TIMEOUT

    如果通道在超时毫秒内未收到读取 响应 SO TIMEOUT 是否会使非阻塞通道过期 bootstrap group workerGroup channel NioSocketChannel class handler channelInit
  • 通道是否通过引用隐式传递

    gotour 有这个频道示例 https tour golang org concurrency 2 https tour golang org concurrency 2 package main import fmt func sum
  • 如何知道Netty ByteBuf中是否没有数据可读取?

    我是 Netty 新手 文件传输的问题让我困惑了好几天 我想发送image文件从客户端到服务器 下面的代码是可执行的 但只有我shutdown服务器强制我可以正常打开收到的图像文件 否则 显示 您似乎没有查看此文件的权限 检查权限并重试 所
  • 客户端 ECC SSL 证书包含“未知命名曲线”

    问题背景 我正在一个现有的库中工作 该库在远程服务器上使用 SSL 和 netty 框架 我遇到 SSL TLS 握手错误 错误如下 javax net ssl SSLProtocolException java io IOExceptio
  • netty ChannelInboundHandlerAdapter 将帧裁剪为 ~1500 字节

    我已经实现了一个服务器应用程序 它使用 netty 框架通过 ChannelInblundHandlerAdapter 读取传入的字节 如标题所示 我的问题是 我不定期地从客户端获取内容 我认为这些内容在 1 500 字节后被剪切 例如 在
  • Spring WebClient:SSLEngine 已关闭

    我们使用 Spring boot 版本 2 3 1 也使用 WebClient 我的网络客户端配置 private val client WebClient init val sslCtx SslContextBuilder forClie
  • 是否可以将多个通道复用为一个通道?

    这个想法是在切片中拥有可变数量的通道 将通过它们接收到的每个值推送到单个通道中 并在最后一个输入通道关闭后关闭该输出通道 类似这样 但对于两个以上的通道数 func multiplex cin1 cin2 cout chan int n 2
  • netty DefaultChannelPipeline 异常捕获

    不幸的是 我不明白 netty 服务器的输出 BUILD SUCCESSFUL Total time 3 seconds Jul 27 2014 2 04 44 AM io netty handler logging LoggingHand
  • Java 互操作——Netty + Clojure

    我正在尝试通过 clojure 使用 netty 我可以启动服务器 但是它无法初始化接受的套接字 下面分别是错误消息和代码 有谁知道什么是 或可能是错误的 我相信问题在于 Channels pipeline server handler T
  • netty 4.x.x 中的 UDP 广播

    我们需要使用 Netty 4 0 0 二进制文件通过 UDP 通道广播对象 Pojo 在 Netty 4 0 0 中 它允许我们仅使用 DatagramPacket 类来发送 UDP 数据包 此类仅接受 ByteBuf 作为参数 还有其他方
  • 匿名结构和空结构

    http play golang org p vhaKi5uVmm http play golang org p vhaKi5uVmm package main import fmt var battle make chan string
  • HashedWheelTimer 与 ScheduledThreadPoolExecutor 相比以获得更高的性能

    我正在考虑如果您需要在一台机器上的 jvm 内尽可能快地调度大量 非阻塞 任务 则应使用哪种计时器实现 我学过ScheduledThreadPoolExecutor and HashedWheelTimer来源 轮计时器一般文档 和以下是基
  • select 语句是否保证通道选择的顺序?

    继从这个答案 https stackoverflow com a 25795236 274460 如果一个 goroutine 在两个通道上进行选择 是否保证通道的选择顺序与其发送的顺序相同 我对发送者是单线程的情况特别感兴趣 例如 是否保
  • Spring WebFlux Netty SSL 与自签名证书错误

    我正在尝试使用服务器端的自签名证书通过本地主机中的 https 访问在 Netty 上运行的 Spring Boot 应用程序 My application properties看起来像这样 server ssl enabled true

随机推荐

  • 路由交换-华为usg6000防火墙上配置内网外网通过公网ip访问http服务

    源nat是将私网地址转换为公网地址 实现内部网络访问外网 目的dnat是将对公网访问Ip转换为内网ip 实现外部网络访问内网资源 目的nat的实现有多种方式 一对一转换 带端口和不带端口的转换 最常用的就是使用带端口的一对多转换 即我们常说
  • Levinson-Durbin快速递推法功率谱估计(Python实现版)

    Levinson Durbin快速递推法功率谱估计是在Yule Walker方程法之上建立的 如果对于Yule Walker方程法不熟悉的话可以参考我的一篇博客 Yule Walker方程法参数化谱估计 Python实现版 声明 博客原本在
  • 文件上传漏洞upload-libs pass5

    文件上传漏洞upload libs pass4 首先查看源码 无法使用空格和大小写绕过 且黑名单过滤了 htaccess 查看提示 利用readme php文件 因为没有过滤ini文件 创建 text ini和一句话木马文件 内容为 aut
  • HIVE厂牌艺人_Labelwarts Vol. 2:洛杉矶天才厂牌 Odd Future Records 的开始到结束

    We re F kin Radical been F kin Awesome 我们太TMD激进 太TMD耀眼 Talked a lotta sh t so far words you re at a loss 说着一大堆胡话 让你们都不知所
  • 将ant design pro打包的JS分离出去

    通过analyze分析发现其实react dom并不算小 有100多kb 所以就想把它单独引用 于是就在config ts增加 externals react window React react dom window ReactDOM b
  • 利用python3 生成密码本

    一 思路 1 把密码中含有哪些字符串都放入一个迭代器中 2 确定生成的密码是几位数的 3 将生成的所有密码写入一个文件里面 二 代码 import itertools as its 迭代器 words 1234567890 生成密码本的位数
  • 3.2 Python图像的频域图像增强-高通和低通滤波器

    3 2 Python图像的频域图像增强 高通和低通滤波器 文章目录 3 2 Python图像的频域图像增强 高通和低通滤波器 1 算法原理 1 1理想滤波器 1 2巴特沃斯滤波器 1 3指数滤波器 2 代码 3 效果 1 算法原理 高通和低
  • Mongodb笔记六:排序与限制输出

    一 排序 db collectionname find sort key1 1 key 1 这里的1代表升序 1代表降序 如 对所有人按年龄升序排序 降序排序 二 索引 索引是特殊的数据结构 索引存储在一个易于遍历读取的数据集合中 索引是对
  • FFmpeg中RTSP客户端拉流测试代码

    之前在https blog csdn net fengbingchun article details 91355410中给出了通过LIVE555实现拉流的测试代码 这里通过FFmpeg来实现 代码量远小于LIVE555 实现模块在liba
  • 蓝桥杯每日一题——手算题·空间

    本题为填空题 只需要算出结果后 在代码中使用输出语句将所填结果输出即可 小蓝准备用 256MB 的内存空间开一个数组 数组的每个元素都是 3232 位 二进制整数 如果不考虑程序占用的空间和维护内存需要的辅助空间 请问 56MB 的空间可以
  • [阶段二] 4. MySQL的基本操作

    mysql的基本操作 数据插入 INSERT 语句可以向数据表写入数据 可以是一条记录 也可以是多条记录 INSERT INTO 数据表名称 字段1 字段2 VALUES 值1 值2 插入一条记录 INSERT INTO 数据表名称 字段1
  • 分析工具 nvprof简介

    nvprof 是一个可用于Linux Windows和OS X的命令行探查器 使用 nvprof myApp 运行我的应用程序 我可以快速看到它所使用的所有内核和内存副本的摘要 摘要将对同一内核的所有调用组合在一起 显示每个内核的总时间和总
  • 十六进制转二进制

    public static String hexToBinary String hex if hex null hex length 2 0 return null String bString String tmp for int i 0
  • Visual Studio(VS) 编程推荐字体和主题设置

    首先是字体 工具 gt 选项 gt 环境 gt 字体和颜色 具体图如下 选择Consolas的原因 Consolas算是最常见的编码字体了 在很多的编译软件都是这个字体 而且在这个字体下的中英文标点和半角圆角符号也能有比较明显的区别 至于字
  • Java 集合 - Map 接口

    文章目录 1 概述 2 常用 API 3 遍历 Map 集合 4 HashMap 和 Hashtable 5 LinkedHashMap 6 TreeMap 7 Properties 8 Set 集合与 Map 集合的关系 9 总结 1 概
  • C++11/14之模板全特化,偏特化

    目录 模板全特化 偏特化 类模板特化 类模板全特化 a 常规全特化 b 特化成员函数而不是模板 类模板偏特化 局部特化 a 模板参数数量 b 模板参数范围 int const int 比int小 函数模板特化 函数模板全特化 函数模板偏特化
  • LayerNorm的理解

    LayerNorm计算公式 y x E x
  • C语言实现多级反馈队列调度算法

    include
  • java架构师进阶之路

    Java架构师 应该算是一些Java程序员们的一个职业目标了吧 很多码农码了五六年的代码也没能成为架构师 那成为Java架构师要掌握哪些技术呢 总体来说呢 有两方面 一个是基础技术 另一个就是组织能力和提出解决方案能力了 如果你是想成为Ja
  • Netty入门-Channel

    目录 Channel详解 Channel的特点 Channel接口方法 ChannelOutboundInvoker接口 AttributeMap接口 ChannelHandler接口 ChannelInboundHandler接口 Cha