优化处1 —— 编码和解码
编码解码用的是JDK,对象与数组的转换。这种虽然简单,但是效率不高,现在需要支持更多的序列化算法,就需要改进。
抽取一个接口 Serializer,用以支持 序列化和 反序列化
package com.zhao.protocol;
/**
* 用于扩展序列化、反序列化算法
*/
public interface Serializer {
// 反序列化方法
<T> T deserialize(Class<T> clazz, byte[] bytes);
// 序列化方法
<T> byte[] serialize(T object);
}
实现该接口的枚举类
package com.zhao.protocol.impl;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.zhao.message.Message;
import com.zhao.protocol.Serializer;
import jdk.nashorn.internal.runtime.JSONFunctions;
import java.io.*;
import java.nio.charset.StandardCharsets;
/**
* @Auther: HackerZhao
* @Date: 2021/11/24 15:11
* @Description: 枚举类,用来实现序列化算法
*/
@SuppressWarnings({"all"})
public enum Algorithm implements Serializer {
JAVA {
// 解码
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
try {
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException("反序列化失败", e);
}
}
@Override
public <T> byte[] serialize(T object) {
try {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(object);
return bos.toByteArray();
} catch (IOException e) {
throw new RuntimeException("序列化失败", e);
}
}
},
JSON {
// 解码
@Override
public <T> T deserialize(Class<T> clazz, byte[] bytes) {
// 把 byte 数组转成 JSON格式的字符串
String json = new String(bytes, StandardCharsets.UTF_8);
// 把JSON格式的字符串 转成对象
return new Gson().fromJson(json, clazz);
}
@Override
public <T> byte[] serialize(T object) {
// 使用 Gson 把对象转成 JSON格式的字符串
String json = new Gson().toJson(object);
// 把 JSON格式的字符串 转成数组
return json.getBytes(StandardCharsets.UTF_8);
}
};
}
修改原本的编解码代码
package com.zhao.protocol;
import com.zhao.message.Message;
import com.zhao.protocol.impl.Algorithm;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
@Slf4j
@ChannelHandler.Sharable
/**
* 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
public void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
out.writeByte(0);
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
// 原本的代码
// ByteArrayOutputStream bos = new ByteArrayOutputStream(); // 拿到最终的结果
// ObjectOutputStream oos = new ObjectOutputStream(bos); // 会把一个对象转换成字节数组
// oos.writeObject(msg); // 会把对象写入对象输出流
// byte[] bytes = bos.toByteArray();
// 修改后的代码
byte[] bytes = Algorithm.JAVA.serialize(msg);
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
outList.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> list) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerAlgorithm = in.readByte(); // 0 或 1
byte messageType = in.readByte(); // 0,1,2...
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
// 原本的解码代码
// ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
// Message message = (Message) ois.readObject();
// 修改后的代码
Message message = Algorithm.JAVA.deserialize(Message.class, bytes);
// 把解析的消息传给下一个 handler
list.add(message);
}
}
有两个改动点,把原来的编解码定成了 JAVA 序列化算法,显得不会太臃肿
encode 的改动
byte[] bytes = Algorithm.JAVA.serialize(msg)
decode 的改动
Message message = Algorithm.JAVA.deserialize(Message.class, bytes)
此时如果想修改成别的序列化算法,就需要更改枚举类中成员变量,不符合设计模式的思想
所以,在此添加一个配置类,得到配置文件中自定义的 序列化算法
package com.zhao.config;
import com.zhao.protocol.Serializer;
import com.zhao.protocol.impl.Algorithm;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
public abstract class Config {
static Properties properties;
// 静态代码块,随着类的加载而加载,只会加载一次
static {
// 得到资源文件的输入流
try (InputStream in = Config.class.getResourceAsStream("/application.properties")) {
// 创建 properties 集合类
properties = new Properties();
// 加载流中的数据到集合中
properties.load(in);
} catch (IOException e) {
throw new ExceptionInInitializerError(e);
}
}
public static Algorithm getSerializerAlgorithm() {
// 通过集合中的键 得到配置文件中的值
String value = properties.getProperty("algorithm");
// 如果获取到的值为 null,就选择默认的序列化算法
if(value == null) {
// 返回 JDK序列化算法
return Algorithm.JAVA;
} else {
return Algorithm.valueOf(value);
}
}
}
application.properties 配置文件
algorithm=JSON
重新修改 编解码器的代码,效果,当配置文件中的序列化算法发生改变,编解码算法也随之变化
package com.zhao.protocol;
import com.zhao.config.Config;
import com.zhao.message.Message;
import com.zhao.protocol.impl.Algorithm;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import javax.xml.bind.SchemaOutputResolver;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.List;
@Slf4j
@ChannelHandler.Sharable
/**
* 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
// ordinal() 方法可根据枚举顺序得到对应的 int 值
out.writeByte(Config.getSerializerAlgorithm().ordinal());
System.out.println(Config.getSerializerAlgorithm().ordinal());
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组,Config.getSerializerAlgorithm() 得到是哪种类型
byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
outList.add(out);
}
// 解码
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerAlgorithm = in.readByte(); // 0 或 1
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
// 找到反序列化算法
Algorithm algorithm = Algorithm.values()[serializerAlgorithm];
// 确定具体消息类型
Class<?> messageClass = Message.getMessageClass(messageType);
// Algorithm.values() 得到所有的枚举类
Object message = algorithm.deserialize(messageClass, bytes);
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerAlgorithm, messageType, sequenceId, length);
// log.debug("{}", message);
out.add(message);
}
}
这里注意两个点,第一点:对象变 byte数组时,如何确定序列化算法类型,序列化方式是用一个字节表示的,传入的却是具体的算法,例如 JDK,JSON。
解决方式:在枚举类中,有一个 ordinal() 方法,这个方法可以根据枚举类中变量的顺序得到从 0开始的 int数值。在枚举类中 JAVA 代表 0,JSON 代表 1
第二点,在反序列化中需要传入具体的消息类型,不能再向上文那样只传入 Message 抽象类
解决方式:通过Message类中的Map集合,用 int做为键,消息类型做为值,序列化时传入int键,反序列化时根据键得到值。
测试
出站测试
package com.zhao.test;
import com.zhao.message.LoginRequestMessage;
import com.zhao.protocol.MessageCodecSharable;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.logging.LoggingHandler;
/**
* @Auther: HackerZhao
* @Date: 2021/11/24 19:14
* @Description: 测试不同协议的编解码
*/
public class TestSerializer {
public static void main(String[] args) {
MessageCodecSharable CODEC = new MessageCodecSharable();
LoggingHandler LOGGING = new LoggingHandler();
EmbeddedChannel channel = new EmbeddedChannel(LOGGING, CODEC, LOGGING);
LoginRequestMessage message = new LoginRequestMessage("张三", "123");
// 模拟出站操作
channel.writeOutbound(message);
}
}
配置文件修改成JAVA,结果
配置文件修改成JSON,结果
入站测试
package com.zhao.test;
import com.zhao.config.Config;
import com.zhao.message.LoginRequestMessage;
import com.zhao.message.Message;
import com.zhao.protocol.MessageCodecSharable;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.handler.logging.LoggingHandler;
/**
* @Auther: HackerZhao
* @Date: 2021/11/24 19:14
* @Description: 测试不同协议的编解码
*/
public class TestSerializer {
public static void main(String[] args) {
MessageCodecSharable CODEC = new MessageCodecSharable();
LoggingHandler LOGGING = new LoggingHandler();
EmbeddedChannel channel = new EmbeddedChannel(LOGGING, CODEC, LOGGING);
LoginRequestMessage message = new LoginRequestMessage("张三", "123");
// 模拟出站操作
// channel.writeOutbound(message);
ByteBuf buf = messageToByteBuf(message);
channel.writeInbound(buf);
}
private static ByteBuf messageToByteBuf (Message msg){
// 得到消息类型
int ordinal = Config.getSerializerAlgorithm().ordinal();
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
// ordinal() 方法可根据枚举顺序得到对应的 int 值
out.writeByte(ordinal);
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组,Config.getSerializerAlgorithm() 得到是哪种类型
byte[] bytes = Config.getSerializerAlgorithm().serialize(msg);
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
return out;
}
}
配置文件修改成JAVA,结果
配置文件修改成JSON,结果
参数调优
参数1:CONNECT_TIMEOUT_MILLIS
客户端连接超时:客户端建立连接时,如果在指定毫秒内无法连接,会抛出 timeout 异常
使用 option() 方法在客户端代码中的配置,具体实现
package com.zhao.source;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestConnectionTimeout {
public static void main(String[] args) {
// 1. 客户端通过 .option() 方法配置参数 给 SocketChannel 配置参数
// 2. 服务器端
// new ServerBootstrap().option() // 是给 ServerSocketChannel 配置参数
// new ServerBootstrap().childOption() // 给 SocketChannel 配置参数
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
// 参数1 这里为什么是 ChannelOption.CONNECT_TIMEOUT_MILLIS,是怕你写错
// 参数2 单位是毫秒
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 300)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler());
ChannelFuture future = bootstrap.connect("127.0.0.1", 8080);
future.sync().channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
log.debug("timeout");
} finally {
group.shutdownGracefully();
}
}
}
结果
当设置超时等待为 5秒时,结果
第一个结果和第二个是不一样的,超时等待设置为 300毫秒,打印的是 netty 的超时异常,300 毫秒确实太短了,刚想去连接,就显示连接超时。第二张打印的是 java异常,显示的是连接被拒绝。这是因为客户端想去连接,发现服务器并没有开启,就直接返回连接被拒绝,不会等到你设置的超时时间再显示异常。
CONNECT_TIMEOUT_MILLIS 源码
在图片位置加入断点,注意需要加入当前线程的断点
对感兴趣的对象进行标记,上图所示
源码部分
int connectTimeoutMillis = config().getConnectTimeoutMillis();
// 如果设置的超时等待参数大于0
if (connectTimeoutMillis > 0) {
// 触发一个定时任务
// 参数1,回调任务
// 参数2,多久后出发回调
// 参数3,触发回调的时间单位
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
运行后,回调任务中的 promise 和主线程中的 promise 是同一个对象
回调任务把任务结果放入 promise 容器中,main线程得到结果向下运行。成功就去获取 channel,失败直接进入 catch块中
常说的线程通信方式,在这里就会有所体现,main线程执行 sync方法陷入等待,当连接建立失败会调用 connectPromise.tryFailure(cause) 方法来唤醒 mian线程。main线程则会进入 catch 块中。
Netty中的定时任务,在此处也有所体现。时间到达了就会执行
参数2:SO_BACKLOG
TCP的三次握手
上图中的 client 代表客户端,server 代表服务器
syns queue,半连接队列,存放的是 还没有完成三次握手的连接,先放入半连接队列中
accept queue,全连接队列,存放的是 已经完成三次握手的连接。
三次握手的过程
-
第一次握手,client 发送 SYN 到 server,状态修改为 SYN_SEND,server 收到,状态改变为 SYN_REVD,并将该请求放入 sync queue 队列
-
第二次握手,server 回复 SYN + ACK 给 client,client 收到,状态改变为 ESTABLISHED,并发送 ACK 给 server
-
第三次握手,server 收到 ACK,状态改变为 ESTABLISHED,将该请求从 sync queue 放入 accept queue
其中
当连接数超过指定连接,连接被拒
服务器端
package com.zhao.source;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class Server {
public static void main(String[] args) throws IOException {
ServerSocket ss = new ServerSocket(8888, 2);
Socket accept = ss.accept();
System.out.println(accept);
System.in.read();
}
}
客户端
package com.zhao.source;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.Date;
public class Client {
public static void main(String[] args) throws IOException {
try {
Socket s = new Socket();
System.out.println(new Date()+" connecting...");
s.connect(new InetSocketAddress("localhost", 8888),1000);
System.out.println(new Date()+" connected...");
s.getOutputStream().write(1);
System.in.read();
} catch (IOException e) {
System.out.println(new Date()+" connecting timeout...");
e.printStackTrace();
}
}
}
连接第四个客户端,控制台打印结果
参数3:ulimit -n
属于操作系统参数
参数4:TCP_NODELAY
属于 SocketChannal 参数,默认为 开启状态,建议设置 true,保证消息能够及时发送出去。
参数5:SO_SNDBUF & SO_RCVBUF
SO_SNDBUF 属于 SocketChannal 参数
SO_RCVBUF 既可用于 SocketChannal 参数,也可以用于 ServerSocketChannal 参数(建议设置到 ServerSocketChannal 上)
不建议改动这个参数,因为操作系统会自动调整缓冲区的大小
参数6:ALLOCATOR
-
属于 SocketChannal 参数
-
ByteBuf 的分配器,ctx.alloc()
服务器端测试类
package com.zhao.source;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestByteBuf {
public static void main(String[] args) {
boolean flag = true;
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = ctx.alloc().buffer();
log.debug("alloc buf {}", buf);
// log.debug("receive buf {}", msg);
System.out.println("");
}
});
}
}).bind(8080);
}
}
客户端测试类
package com.zhao.source;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestBacklogClient {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(ctx.alloc().buffer().writeBytes("hello!".getBytes()));
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}
控制台打印结果
这个 ByteBuf 用的是池化的直接内存
如何配置 ByteBuf 池化以及非池化,直接内存或堆内存?其实这些在源码中都可以得到答案
先找到 ChannelConfig抽象类,
找到它的实现类 DefaultChannelConfig,成员变量
private volatile ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
一直追踪 DEFAULT 变量,得到
public final class ByteBufUtil {
// 5、一直寻找的 ByteBuf,下面的 alloc变量为其赋值
static final ByteBufAllocator DEFAULT_ALLOCATOR;
static {
// 1、根据 "io.netty.allocator.type"得到值,如果没有配置会拿到默认值,参数2的值就是默认值
String allocType = SystemPropertyUtil.get(
"io.netty.allocator.type", PlatformDependent.isAndroid() ? "unpooled" : "pooled");
allocType = allocType.toLowerCase(Locale.US).trim();
// 3、匹配的类型赋值给 alloc 这个变量
ByteBufAllocator alloc;
// 2、如果获得到的值与下面的类型匹配,就创建这种类型
if ("unpooled".equals(allocType)) {
alloc = UnpooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else if ("pooled".equals(allocType)) {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: {}", allocType);
} else {
alloc = PooledByteBufAllocator.DEFAULT;
logger.debug("-Dio.netty.allocator.type: pooled (unknown: {})", allocType);
}
// 4、给 ByteBuf 进行赋值
DEFAULT_ALLOCATOR = alloc;
}
}
可尝试修改系统的环境变量,下图
启动服务器和客户端
已经变为非池化,后面的还是直接内存
修改堆内存或者直接内存,还需要继续追踪源码
点入创建 Buf 的DEFAULT 参数
一直追到 DIRECT_BUFFER_PREFERRED 赋值语句
// We should always prefer direct buffers by default if we can use a Cleaner to release direct buffers.
DIRECT_BUFFER_PREFERRED = CLEANER != NOOP
&&
// 当不首选直接内存,设置 false,取反后就是 首选,为 true
!SystemPropertyUtil.getBoolean("io.netty.noPreferDirect", false);
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noPreferDirect: {}", !DIRECT_BUFFER_PREFERRED);
}
Vm参数中添加
-Dio.netty.noPreferDirect=true
打印结果
参数7:RCVBUF_ALLOCATOR
属于 SocketChannal 参数,用来控制 netty 接收缓冲区大小(并可动态调整),统一采用 direct 直接内存,具体池化还是非池化由 allocator 决定
测试代码
package com.zhao.source;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class TestByteBuf {
public static void main(String[] args) {
boolean flag = true;
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// ByteBuf buf = ctx.alloc().buffer();
// log.debug("alloc buf {}", buf);
log.debug("receive buf {}", msg);
System.out.println("");
}
});
}
}).bind(8080);
}
}
控制台打印结果
打印的其实就是代码中的 msg 对象,非池化的,直接内存,这是一个最原始的 ByteBuf,并没有做任何处理。
这里的 非池化很好理解,因为在 vm参数中已经配置其 非池化的参数,但是直接内存就让人感到不解,上面已经配置使用堆内存来创建 ByteBuf 对象了。
这是因为 直接内存的读取效率要比堆内存的高,Netty对这种 IO操作,就强制它使用直接内存,如果是自己创建的 Buf,是可以自己取决的。
源码中对传入的 ByteBuf 实现
找到创建这个 Buf的地方
控制台的对象和上图的对象,两个对象的地址值是相同的,所以它们就是同一个对象
ByteBuf 选用什么内存,初始容量大小是由 allocate 方法控制的,池化非池化是由 allocator 变量控制的。
追入 allocator可以看到这个参数是由 vm配置的环境变量决定的
现在查看源码,看直接内存 allocate 方法是怎么控制的
查看 初始容量大小如何分配,点进 recvBufAllocHandle() 方法
最小 64,初始 1024,最大 65536