本文是基于Netty4.1.x,一般在使用Netty作为网络框架进行开发时,编解码框架是我们应该注意的一个重要部分。应用从网络层接收数据需要经过解码(Decode),将二进制的数据报转换从应用层的协议消息,这样才能被应用逻辑所识别。同样,客户端发送或服务端在返回消息时,是需要将消息编码(Encode)成二进制字节数组(在Netty4中就是ByteBuf)对能发送到网络对端。对于编解码Netty4本身提供了一些基本的编解码器,当然如果需要定义自己的私有协议时,需要开发者基于已有的编解码框架来实现自定义的编解码器。下面主要介绍Netty4中的编解码的实现原理及如何自定义一个编解码器。
public class NettyServer {
public static void main(String[] args) throws Exception {
new NettyServer().start("127.0.0.1", 8081);
}
public void start(String host, int port) throws Exception {
ExecutorService executorService = Executors.newCachedThreadPool();
EventLoopGroup bossGroup = new NioEventLoopGroup(0, executorService);//Boss I/O线程池,用于处理客户端连接,连接建立之后交给work I/O处理
EventLoopGroup workerGroup = new NioEventLoopGroup(0, executorService);//Work I/O线程池
EventExecutorGroup businessGroup = new DefaultEventExecutorGroup(2);//业务线程池
ServerBootstrap server = new ServerBootstrap();//启动类
server.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast(businessGroup, new ServerHandler());
}
});
server.childOption(ChannelOption.TCP_NODELAY, true);
server.childOption(ChannelOption.SO_RCVBUF, 32 * 1024);
server.childOption(ChannelOption.SO_SNDBUF, 32 * 1024);
InetSocketAddress addr = new InetSocketAddress(host, port);
server.bind(addr).sync().channel();//启动服务
}
}
上面例子中在initChannel中分别放置了一个decoder跟一个encoder,其中StringDecoder/StringEncoder是Netty4编解码框架中的一成员。下面看看StringDecoder怎么实现的:
@Sharable
public class StringDecoder extends MessageToMessageDecoder<ByteBuf> {
// TODO Use CharsetDecoder instead.
private final Charset charset;
/**
* Creates a new instance with the current system character set.
*/
public StringDecoder() {
this(Charset.defaultCharset());
}
/**
* Creates a new instance with the specified character set.
*/
public StringDecoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
}
this.charset = charset;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
out.add(msg.toString(charset));
}
}
StringDecoder本身很简单,执行解码(decode)方法时候将二进制字节数组转成了String,其中StringDecoder继承了MessageToMessageDecoder,下面看看MessageToMessageDecoder是怎么做的。
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter {
private final TypeParameterMatcher matcher;
/**
* Create a new instance which will try to detect the types to match out of the type parameter of the class.
*/
protected MessageToMessageDecoder() {
matcher = TypeParameterMatcher.find(this, MessageToMessageDecoder.class, "I");
}
/**
* Create a new instance
*
* @param inboundMessageType The type of messages to match and so decode
*/
protected MessageToMessageDecoder(Class<? extends I> inboundMessageType) {
matcher = TypeParameterMatcher.get(inboundMessageType);
}
/**
* Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
* {@link ChannelInboundHandler} in the {@link ChannelPipeline}.
*/
public boolean acceptInboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
CodecOutputList out = CodecOutputList.newInstance();
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
decode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
} else {
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
int size = out.size();
for (int i = 0; i < size; i ++) {
ctx.fireChannelRead(out.getUnsafe(i));
}
out.recycle();
}
}
/**
* Decode from one message to an other. This method will be called for each written message that can be handled
* by this decoder.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageDecoder} belongs to
* @param msg the message to decode to an other one
* @param out the {@link List} to which decoded messages should be added
* @throws Exception is thrown if an error occurs
*/
protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
}
从上面看出,MessageToMessageDecoder继承了ChannelInboundHandlerAdapter本身就是一个处理器Handler,只是定义了一个decode()方法,当上行消息到达时,channelRead()方法被调用,下面重点来了,看channelRead()方法都执行了那些逻辑。acceptInboundMessage(msg);这个方法表示先判断一下是否应该处理这个消息,如果不处理直接添加到解码的结果中;如果需要处理则执行decode()方法,这里的decode方法具体实现是在其子类也就是StringDecoder中实现的。当消息被解码之后,需要将原消息对象进行释放。最后调用 ctx.fireChannelRead(out.getUnsafe(i));这个方法的含义是将消息传递给下一个InBoundHandler处理器去处理,到这里解码就已经完成了。下面看看编码器StringEncoder:
@Sharable
public class StringEncoder extends MessageToMessageEncoder<CharSequence> {
// TODO Use CharsetEncoder instead.
private final Charset charset;
/**
* Creates a new instance with the current system character set.
*/
public StringEncoder() {
this(Charset.defaultCharset());
}
/**
* Creates a new instance with the specified character set.
*/
public StringEncoder(Charset charset) {
if (charset == null) {
throw new NullPointerException("charset");
}
this.charset = charset;
}
@Override
protected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {
if (msg.length() == 0) {
return;
}
out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));
}
}
编码器本身也很简单,主要是实现了其父类MessageToMessageEncoder的encode方法,将String 编码成ByteBuf。下面看看MessageToMessageEncoder的逻辑:
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {
private final TypeParameterMatcher matcher;
/**
* Create a new instance which will try to detect the types to match out of the type parameter of the class.
*/
protected MessageToMessageEncoder() {
matcher = TypeParameterMatcher.find(this, MessageToMessageEncoder.class, "I");
}
/**
* Create a new instance
*
* @param outboundMessageType The type of messages to match and so encode
*/
protected MessageToMessageEncoder(Class<? extends I> outboundMessageType) {
matcher = TypeParameterMatcher.get(outboundMessageType);
}
/**
* Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next
* {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.
*/
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
CodecOutputList out = null;
try {
if (acceptOutboundMessage(msg)) {
out = CodecOutputList.newInstance();
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
encode(ctx, cast, out);
} finally {
ReferenceCountUtil.release(cast);
}
if (out.isEmpty()) {
out.recycle();
out = null;
throw new EncoderException(
StringUtil.simpleClassName(this) + " must produce at least one message.");
}
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable t) {
throw new EncoderException(t);
} finally {
if (out != null) {
final int sizeMinusOne = out.size() - 1;
if (sizeMinusOne == 0) {
ctx.write(out.getUnsafe(0), promise);
} else if (sizeMinusOne > 0) {
// Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
// See https://github.com/netty/netty/issues/2525
if (promise == ctx.voidPromise()) {
writeVoidPromise(ctx, out);
} else {
writePromiseCombiner(ctx, out, promise);
}
}
out.recycle();
}
}
}
private static void writeVoidPromise(ChannelHandlerContext ctx, CodecOutputList out) {
final ChannelPromise voidPromise = ctx.voidPromise();
for (int i = 0; i < out.size(); i++) {
ctx.write(out.getUnsafe(i), voidPromise);
}
}
private static void writePromiseCombiner(ChannelHandlerContext ctx, CodecOutputList out, ChannelPromise promise) {
final PromiseCombiner combiner = new PromiseCombiner(ctx.executor());
for (int i = 0; i < out.size(); i++) {
combiner.add(ctx.write(out.getUnsafe(i)));
}
combiner.finish(promise);
}
/**
* Encode from one message to an other. This method will be called for each written message that can be handled
* by this encoder.
*
* @param ctx the {@link ChannelHandlerContext} which this {@link MessageToMessageEncoder} belongs to
* @param msg the message to encode to an other one
* @param out the {@link List} into which the encoded msg should be added
* needs to do some kind of aggregation
* @throws Exception is thrown if an error occurs
*/
protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
从上面看出,MessageToMessageEncoder继承了ChannelOutboundHandlerAdapter本身就是一个处理器Handler,只是定义了一个encode()方法,当上行消息到达时,wirte()方法被调用,下面重点来了,看write()方法都执行了那些逻辑。(acceptOutboundMessage(msg));这个方法表示先判断一下是否应该处理这个消息,如果不处理直接ctx.write(msg, promise)这个方法表示将结果传递到下一下outBound Handler中;如果需要处理则执行encode()方法,这里的encode方法具体实现是在其子类也就是StringEncoder中实现的。当消息被编码之后,需要将原消息对象进行释放。最后调用 ctx.write(out.getUnsafe(0), promise)或 writeVoidPromise(ctx, out);这个方法的含义是将消息传递给下一个处理器去处理,到这里解码就已经完成了。自此Netty4编解码流程已经完了。总结一下:
- 编解码器本质上就是一个处理器InBound或OutBound Handler,被放置在处理链中;
- 编解码器核心部分是其自定义的decode或encode方法,具体编解码逻辑都放在这两个方法中进行;
- 当编解码处理完成之后需要写到下一个处理器Handler,一般在channelRead或write中调用ctx.fireChannelRead()或ctx.write()方法。注意在自定义编解码器时在ChannelRead或write方法中释放相关的流,否则会造成内存泄漏;
- 一般编解码处理器Hander位于处理链中的最前面,当消息上行时,解码器最先处理;当消息下行时,编码器最后处理
下面看看如何自定义一个编解码器,此处直接实现一个String的编解码器,继承ByteToMessageCodec
public class StringCodec extends ByteToMessageCodec<String> {
//执行编码逻辑
@Override
protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
out.writeBytes(msg.getBytes(StandardCharsets.UTF_8));
}
//执行解码逻辑
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
out.add(in.toString(StandardCharsets.UTF_8));
}
}
从上面一段代码可以看出自定义一个编码器很简单。Netty4框架为开发者提供了很多默认的编解码器,处理器。让开发者能更加关注自已业务逻辑的开发。