目录
Channel详解
Channel的特点
Channel接口方法
ChannelOutboundInvoker接口
AttributeMap接口
ChannelHandler接口
ChannelInboundHandler接口
ChannelOutboundHandler接口
ChannelHandlerAdapter
ChannelInboundHandlerAdapor
ChannelOutboundHandlerAdapter
适配器的作用
ChannelPipeline接口
创建ChannelPipeline
ChannelHandlerContext接口
Channel详解
Channel代表网络socket或能够进行IO操作的组件的连接关系。这些IO操作包括读、写、连接和绑定。Netty中的Channel提供了如下功能:
- 查询当前Channel的状态。例如,是打开还是已连接状态等。
- 提供Channel的参数配置。如接收缓冲区大小。
- 提供支持的IO操作。如读、写连接和绑定
- 提供ChannelPipeline。ChannelPipelin用于处理所有与Channel关联的IO事件和请求。
Channel的特点
1. 所有IO操作都是异步的
IO调用将立即返回,返回一个ChannelFuture实例。
2. Channel是分层的
3. 向下转型以下访问特定于传输的操作
4. 释放资源
Channel接口方法
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
ChannelId id();
EventLoop eventLoop();
Channel parent();
ChannelConfig config();
boolean isOpen();
boolean isRegistered();
boolean isActive();
ChannelMetadata metadata();
SocketAddress localAddress();
SocketAddress remoteAddress();
ChannelFuture closeFuture();
boolean isWritable();
long bytesBeforeUnwritable();
long bytesBeforeWritable();
Channel.Unsafe unsafe();
ChannelPipeline pipeline();
ByteBufAllocator alloc();
Channel read();
Channel flush();
public interface Unsafe {
Handle recvBufAllocHandle();
SocketAddress localAddress();
SocketAddress remoteAddress();
void register(EventLoop var1, ChannelPromise var2);
void bind(SocketAddress var1, ChannelPromise var2);
void connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);
void disconnect(ChannelPromise var1);
void close(ChannelPromise var1);
void closeForcibly();
void deregister(ChannelPromise var1);
void beginRead();
void write(Object var1, ChannelPromise var2);
void flush();
ChannelPromise voidPromise();
ChannelOutboundBuffer outboundBuffer();
}
}
- id()方法返回全局唯一的ChannelId
- eventLoop()方法返回分配给该Channel的EventLoop,一个EventLoop就是一个线程,用来处理连接的生命周期中所发生的事件
- parent()方法返回该Channel的父Channel
- config()方法返回该Channel的ChannelConfig,其中包含了该Channel的所有配置设置支持热更新
- pipeline()方法返回该Channel对应的ChannelPipeline
- alloc方法返回分配给该Channel的ByteBufAllocator,可以用来分配ByteBuf
ChannelOutboundInvoker接口
声明了所有出站的网络操作:
package io.netty.channel;
import java.net.SocketAddress;
public interface ChannelOutboundInvoker {
ChannelFuture bind(SocketAddress var1);
ChannelFuture connect(SocketAddress var1);
ChannelFuture connect(SocketAddress var1, SocketAddress var2);
ChannelFuture disconnect();
ChannelFuture close();
ChannelFuture deregister();
ChannelFuture bind(SocketAddress var1, ChannelPromise var2);
ChannelFuture connect(SocketAddress var1, ChannelPromise var2);
ChannelFuture connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);
ChannelFuture disconnect(ChannelPromise var1);
ChannelFuture close(ChannelPromise var1);
ChannelFuture deregister(ChannelPromise var1);
ChannelOutboundInvoker read();
ChannelFuture write(Object var1);
ChannelFuture write(Object var1, ChannelPromise var2);
ChannelOutboundInvoker flush();
ChannelFuture writeAndFlush(Object var1, ChannelPromise var2);
ChannelFuture writeAndFlush(Object var1);
ChannelPromise newPromise();
ChannelProgressivePromise newProgressivePromise();
ChannelFuture newSucceededFuture();
ChannelFuture newFailedFuture(Throwable var1);
ChannelPromise voidPromise();
}
ChannelFuture用于获取异步的结果,ChannelPromise是对ChannelFuture的扩展,支持写的操作。ChannelPromise也被称为可写的ChannelFuture。
AttributeMap接口
package io.netty.util;
public interface AttributeMap {
<T> Attribute<T> attr(AttributeKey<T> var1);
<T> boolean hasAttr(AttributeKey<T> var1);
}
AttributeMap就是类似于Map的键值对,键就是AttributeKey类型,值是Attribute类型。
Netty提供了AttributeMap的默认实现类DefaultAttributeMap,与JDK中的ConcurrentHashMap相比,在高并发下DefaultAttributeMap可以更加节省内存。
package io.netty.util;
import io.netty.util.internal.ObjectUtil;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
/**
* Default {@link AttributeMap} implementation which not exibit any blocking behaviour on attribute lookup while using a
* copy-on-write approach on the modify path.<br> Attributes lookup and remove exibit {@code O(logn)} time worst-case
* complexity, hence {@code attribute::set(null)} is to be preferred to {@code remove}.
*/
public class DefaultAttributeMap implements AttributeMap {
private static final AtomicReferenceFieldUpdater<DefaultAttributeMap, DefaultAttribute[]> ATTRIBUTES_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultAttributeMap.class, DefaultAttribute[].class, "attributes");
private static final DefaultAttribute[] EMPTY_ATTRIBUTES = new DefaultAttribute[0];
/**
* Similarly to {@code Arrays::binarySearch} it perform a binary search optimized for this use case, in order to
* save polymorphic calls (on comparator side) and unnecessary class checks.
*/
private static int searchAttributeByKey(DefaultAttribute[] sortedAttributes, AttributeKey<?> key) {
int low = 0;
int high = sortedAttributes.length - 1;
while (low <= high) {
int mid = low + high >>> 1;
DefaultAttribute midVal = sortedAttributes[mid];
AttributeKey midValKey = midVal.key;
if (midValKey == key) {
return mid;
}
int midValKeyId = midValKey.id();
int keyId = key.id();
assert midValKeyId != keyId;
boolean searchRight = midValKeyId < keyId;
if (searchRight) {
low = mid + 1;
} else {
high = mid - 1;
}
}
return -(low + 1);
}
private static void orderedCopyOnInsert(DefaultAttribute[] sortedSrc, int srcLength, DefaultAttribute[] copy,
DefaultAttribute toInsert) {
// let's walk backward, because as a rule of thumb, toInsert.key.id() tends to be higher for new keys
final int id = toInsert.key.id();
int i;
for (i = srcLength - 1; i >= 0; i--) {
DefaultAttribute attribute = sortedSrc[i];
assert attribute.key.id() != id;
if (attribute.key.id() < id) {
break;
}
copy[i + 1] = sortedSrc[i];
}
copy[i + 1] = toInsert;
final int toCopy = i + 1;
if (toCopy > 0) {
System.arraycopy(sortedSrc, 0, copy, 0, toCopy);
}
}
private volatile DefaultAttribute[] attributes = EMPTY_ATTRIBUTES;
@SuppressWarnings("unchecked")
@Override
public <T> Attribute<T> attr(AttributeKey<T> key) {
ObjectUtil.checkNotNull(key, "key");
DefaultAttribute newAttribute = null;
for (;;) {
final DefaultAttribute[] attributes = this.attributes;
final int index = searchAttributeByKey(attributes, key);
final DefaultAttribute[] newAttributes;
if (index >= 0) {
final DefaultAttribute attribute = attributes[index];
assert attribute.key() == key;
if (!attribute.isRemoved()) {
return attribute;
}
// let's try replace the removed attribute with a new one
if (newAttribute == null) {
newAttribute = new DefaultAttribute<T>(this, key);
}
final int count = attributes.length;
newAttributes = Arrays.copyOf(attributes, count);
newAttributes[index] = newAttribute;
} else {
if (newAttribute == null) {
newAttribute = new DefaultAttribute<T>(this, key);
}
final int count = attributes.length;
newAttributes = new DefaultAttribute[count + 1];
orderedCopyOnInsert(attributes, count, newAttributes, newAttribute);
}
if (ATTRIBUTES_UPDATER.compareAndSet(this, attributes, newAttributes)) {
return newAttribute;
}
}
}
@Override
public <T> boolean hasAttr(AttributeKey<T> key) {
ObjectUtil.checkNotNull(key, "key");
return searchAttributeByKey(attributes, key) >= 0;
}
private <T> void removeAttributeIfMatch(AttributeKey<T> key, DefaultAttribute<T> value) {
for (;;) {
final DefaultAttribute[] attributes = this.attributes;
final int index = searchAttributeByKey(attributes, key);
if (index < 0) {
return;
}
final DefaultAttribute attribute = attributes[index];
assert attribute.key() == key;
if (attribute != value) {
return;
}
final int count = attributes.length;
final int newCount = count - 1;
final DefaultAttribute[] newAttributes =
newCount == 0? EMPTY_ATTRIBUTES : new DefaultAttribute[newCount];
// perform 2 bulk copies
System.arraycopy(attributes, 0, newAttributes, 0, index);
final int remaining = count - index - 1;
if (remaining > 0) {
System.arraycopy(attributes, index + 1, newAttributes, index, remaining);
}
if (ATTRIBUTES_UPDATER.compareAndSet(this, attributes, newAttributes)) {
return;
}
}
}
@SuppressWarnings("serial")
private static final class DefaultAttribute<T> extends AtomicReference<T> implements Attribute<T> {
private static final AtomicReferenceFieldUpdater<DefaultAttribute, DefaultAttributeMap> MAP_UPDATER =
AtomicReferenceFieldUpdater.newUpdater(DefaultAttribute.class,
DefaultAttributeMap.class, "attributeMap");
private static final long serialVersionUID = -2661411462200283011L;
private volatile DefaultAttributeMap attributeMap;
private final AttributeKey<T> key;
DefaultAttribute(DefaultAttributeMap attributeMap, AttributeKey<T> key) {
this.attributeMap = attributeMap;
this.key = key;
}
@Override
public AttributeKey<T> key() {
return key;
}
private boolean isRemoved() {
return attributeMap == null;
}
@Override
public T setIfAbsent(T value) {
while (!compareAndSet(null, value)) {
T old = get();
if (old != null) {
return old;
}
}
return null;
}
@Override
public T getAndRemove() {
final DefaultAttributeMap attributeMap = this.attributeMap;
final boolean removed = attributeMap != null && MAP_UPDATER.compareAndSet(this, attributeMap, null);
T oldValue = getAndSet(null);
if (removed) {
attributeMap.removeAttributeIfMatch(key, this);
}
return oldValue;
}
@Override
public void remove() {
final DefaultAttributeMap attributeMap = this.attributeMap;
final boolean removed = attributeMap != null && MAP_UPDATER.compareAndSet(this, attributeMap, null);
set(null);
if (removed) {
attributeMap.removeAttributeIfMatch(key, this);
}
}
}
}
ChannelHandler接口
package io.netty.channel;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
public interface ChannelHandler {
/**
* Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
*/
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
* anymore.
*/
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if a {@link Throwable} was thrown.
*
* @deprecated if you want to handle this event you should implement {@link ChannelInboundHandler} and
* implement the method there.
*/
@Deprecated
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
/**
* Indicates that the same instance of the annotated {@link ChannelHandler}
* can be added to one or more {@link ChannelPipeline}s multiple times
* without a race condition.
* <p>
* If this annotation is not specified, you have to create a new handler
* instance every time you add it to a pipeline because it has unshared
* state such as member variables.
* <p>
* This annotation is provided for documentation purpose, just like
* <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
*/
@Inherited
@Documented
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@interface Sharable {
// no value
}
}
Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in
its ChannelPipeline
ChannelHandler本身没有提供什么方法,可以使用其子类:
- ChannelInboundHandler:处理入站IO事件
- ChannelOutboundHandler:处理出站IO事件
- ChannelHandlerAdapter:采用适配器模式的ChannelHandler适配器
ChannelInboundHandler接口
package io.netty.channel;
/**
* {@link ChannelHandler} which adds callbacks for state changes. This allows the user
* to hook in to state changes easily.
*/
public interface ChannelInboundHandler extends ChannelHandler {
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
*/
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
*/
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} is now active
*/
void channelActive(ChannelHandlerContext ctx) throws Exception;
/**
* The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
* end of lifetime.
*/
void channelInactive(ChannelHandlerContext ctx) throws Exception;
/**
* Invoked when the current {@link Channel} has read a message from the peer.
*/
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
/**
* Invoked when the last message read by the current read operation has been consumed by
* {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further
* attempt to read an inbound data from the current {@link Channel} will be made until
* {@link ChannelHandlerContext#read()} is called.
*/
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if an user event was triggered.
*/
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
/**
* Gets called once the writable state of a {@link Channel} changed. You can check the state with
* {@link Channel#isWritable()}.
*/
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
/**
* Gets called if a {@link Throwable} was thrown.
*/
@Override
@SuppressWarnings("deprecation")
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
ChannelOutboundHandler接口
package io.netty.channel;
import java.net.SocketAddress;
public interface ChannelOutboundHandler extends ChannelHandler {
/**
* Called once a bind operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the bind operation is made
* @param localAddress the {@link SocketAddress} to which it should bound
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* Called once a connect operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the connect operation is made
* @param remoteAddress the {@link SocketAddress} to which it should connect
* @param localAddress the {@link SocketAddress} which is used as source on connect
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
/**
* Called once a disconnect operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the disconnect operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Called once a close operation is made.
*
* @param ctx the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Called once a deregister operation is made from the current registered {@link EventLoop}.
*
* @param ctx the {@link ChannelHandlerContext} for which the close operation is made
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
/**
* Intercepts {@link ChannelHandlerContext#read()}.
*/
void read(ChannelHandlerContext ctx) throws Exception;
/**
* Called once a write operation is made. The write operation will write the messages through the
* {@link ChannelPipeline}. Those are then ready to be flushed to the actual {@link Channel} once
* {@link Channel#flush()} is called
*
* @param ctx the {@link ChannelHandlerContext} for which the write operation is made
* @param msg the message to write
* @param promise the {@link ChannelPromise} to notify once the operation completes
* @throws Exception thrown if an error occurs
*/
void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;
/**
* Called once a flush operation is made. The flush operation will try to flush out all previous written messages
* that are pending.
*
* @param ctx the {@link ChannelHandlerContext} for which the flush operation is made
* @throws Exception thrown if an error occurs
*/
void flush(ChannelHandlerContext ctx) throws Exception;
}
ChannelHandlerAdapter
package io.netty.channel;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerMask.Skip;
import io.netty.util.internal.InternalThreadLocalMap;
import java.util.Map;
public abstract class ChannelHandlerAdapter implements ChannelHandler {
boolean added;
public ChannelHandlerAdapter() {
}
protected void ensureNotSharable() {
if (this.isSharable()) {
throw new IllegalStateException("ChannelHandler " + this.getClass().getName() + " is not allowed to be shared");
}
}
public boolean isSharable() {
Class<?> clazz = this.getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = (Boolean)cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
}
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
}
/** @deprecated */
@Skip
@Deprecated
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
ChannelHandlerAdaptor常用的两个子类,分别是ChannelInboundHandlerAdapor、ChannelOutboundHandlerAdatper
ChannelInboundHandlerAdapor
package io.netty.channel;
import io.netty.channel.ChannelHandlerMask.Skip;
public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {
public ChannelInboundHandlerAdapter() {
}
@Skip
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Skip
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Skip
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
@Skip
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}
@Skip
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
@Skip
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
@Skip
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
ctx.fireUserEventTriggered(evt);
}
@Skip
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelWritabilityChanged();
}
@Skip
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.fireExceptionCaught(cause);
}
}
ChannelOutboundHandlerAdapter
package io.netty.channel;
import io.netty.channel.ChannelHandlerMask.Skip;
import java.net.SocketAddress;
public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {
public ChannelOutboundHandlerAdapter() {
}
@Skip
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.bind(localAddress, promise);
}
@Skip
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
@Skip
public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.disconnect(promise);
}
@Skip
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.close(promise);
}
@Skip
public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
ctx.deregister(promise);
}
@Skip
public void read(ChannelHandlerContext ctx) throws Exception {
ctx.read();
}
@Skip
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ctx.write(msg, promise);
}
@Skip
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
适配器的作用
使用适配器是因为适配器的子类不需要实现父类中的所有方法,按需覆盖适配器的方法即可。
ChannelPipeline接口
ChannelPipeline接口设计采用了责任链模式,底层采用双向链表的数据结构,将链上个各个处理器串联起来。
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker, Iterable<Entry<String, ChannelHandler>> {
ChannelPipeline addFirst(String name, ChannelHandler handler);
ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addLast(String name, ChannelHandler handler);
ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler);
ChannelPipeline addBefore(String baseName, String name, ChannelHandler handler);
ChannelPipeline addBefore(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(String baseName, String name, ChannelHandler handler);
ChannelPipeline addAfter(EventExecutorGroup group, String baseName, String name, ChannelHandler handler);
ChannelPipeline addFirst(ChannelHandler... handlers);
ChannelPipeline addFirst(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline addLast(ChannelHandler... handlers);
ChannelPipeline addLast(EventExecutorGroup group, ChannelHandler... handlers);
ChannelPipeline remove(ChannelHandler handler);
ChannelHandler remove(String name);
<T extends ChannelHandler> T remove(Class<T> handlerType);
ChannelHandler removeFirst();
ChannelHandler removeLast();
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);
<T extends ChannelHandler> T replace(Class<T> oldHandlerType, String newName,
ChannelHandler newHandler);
ChannelHandler first();
ChannelHandlerContext firstContext();
ChannelHandler last();
ChannelHandlerContext lastContext();
ChannelHandler get(String name);
<T extends ChannelHandler> T get(Class<T> handlerType);
ChannelHandlerContext context(ChannelHandler handler);
ChannelHandlerContext context(String name);
ChannelHandlerContext context(Class<? extends ChannelHandler> handlerType);
Channel channel();
List<String> names();
Map<String, ChannelHandler> toMap();
@Override
ChannelPipeline fireChannelRegistered();
@Override
ChannelPipeline fireChannelUnregistered();
@Override
ChannelPipeline fireChannelActive();
@Override
ChannelPipeline fireChannelInactive();
@Override
ChannelPipeline fireExceptionCaught(Throwable cause);
@Override
ChannelPipeline fireUserEventTriggered(Object event);
@Override
ChannelPipeline fireChannelRead(Object msg);
@Override
ChannelPipeline fireChannelReadComplete();
@Override
ChannelPipeline fireChannelWritabilityChanged();
@Override
ChannelPipeline flush();
}
创建ChannelPipeline
ChannelPipeline数据管道是与Channel通道绑定的,一个Channel通道对应一个ChannelPipeline,ChannelPipeline是在Channel初始化时被创建的。
ChannelHandlerContext接口
ChannelHandlerContext接口是联系ChannelHandler与其ChannelPipeline之间的纽带。
每当有ChannelHandler添加到ChannelPipeline中时,都会常见ChannelHandlerContext。ChannelHandlerContext的主要功能是管理它所关联的ChannelHandler和在同一个ChannelPipeline中的其他ChannelHandler之间的交互。例如,ChannelHandlerContext可以通知ChannelPipeline中的下一个ChannelHandler开始执行及动态修改其所属的ChannelPipeline。