修复服务器端
在Netty世界中,有多个“事件”:
- channelActive https://netty.io/4.1/api/io/netty/channel/ChannelInboundHandler.html#channelActive-io.netty.channel.ChannelHandlerContext-
-
channelRead https://netty.io/4.1/api/io/netty/channel/ChannelInboundHandler.html#channelRead-io.netty.channel.ChannelHandlerContext-java.lang.Object-
-
channelReadComplete https://netty.io/4.1/api/io/netty/channel/ChannelInboundHandler.html#channelReadComplete-io.netty.channel.ChannelHandlerContext-
- channelInactive https://netty.io/4.1/api/io/netty/channel/ChannelInboundHandler.html#channelInactive-io.netty.channel.ChannelHandlerContext-
- exceptionCaught https://netty.io/4.1/api/io/netty/channel/ChannelInboundHandler.html#exceptionCaught-io.netty.channel.ChannelHandlerContext-java.lang.Throwable-
- more... https://netty.io/4.1/api/io/netty/channel/ChannelInboundHandler.html#method.summary
在这些“事件”中,你可能已经知道什么channelRead
确实(自从你使用它以来),但你似乎需要的另一个是channelInactive
。当另一个端点关闭连接时会调用此函数,您可以像这样使用它:
@Override
public void channelInactive(ctx) {
System.out.println("I want to close fileoutputstream!");
fos.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
// if (buf.isReadable()) { // a buf should always be readable here
buf.readBytes(fos, buf.readableBytes());
// fos.flush(); // flushing is always done when closing
//} else {
// System.out.println("I want to close fileoutputstream!");
// buf.release(); // Should be placed in the finally block
// fos.flush();
// fos.close();
//}
} catch (Exception e) {
e.printStackTrace();
} finally {
buf.release(); // Should always be done, even if writing to the file fails
}
}
但是,服务器如何知道连接已关闭?目前,客户端不会关闭服务器,而是继续在后台运行,永远保持连接处于活动状态。
修复客户端
要正确关闭客户端的连接,我们需要调用channel.close()
但是,我们不能直接将其插入到返回行之前,因为这会导致发送数据和关闭网络层连接之间的竞争条件,从而可能会丢失数据。
为了正确处理这些情况,Netty 使用了Future https://netty.io/4.1/api/io/netty/util/concurrent/Future.html允许代码在异步操作发生后处理事件的系统。
我们很幸运,Netty 已经有一个内置的解决方案 https://netty.io/4.0/api/io/netty/channel/ChannelFutureListener.html#CLOSE为此,我们只需将其连接起来。为了将此解决方案连接到我们的代码,我们必须跟踪最新的ChannelFuture https://netty.io/4.0/api/io/netty/channel/ChannelFuture.html由 Netty 的 write 方法发出。
为了正确实施这个解决方案,我们改变sendToServer
返回 write 方法的结果:
private ChannelFuture sendToServer(byte[] bytes, Channel channel, int length)
throws IOException {
return channel.writeAndFlush(Unpooled.copiedBuffer(bytes, 0, length));
}
然后我们跟踪这个返回值,并在我们想要关闭连接时添加一个包含 Netty 内置解决方案的监听器:
ChannelFuture lastFuture = null;
for (;;) {
byte[] bytes = new byte[readLength];
int readNum = bis.read(bytes, 0, readLength);
// System.out.println(readNum);
if (readNum == -1) {
bis.close();
fis.close();
if(lastFuture == null) { // When our file is 0 bytes long, this is true
channel.close();
} else {
lastFuture.addListener(ChannelFutureListener.CLOSE);
}
return;
}
lastFuture = sendToServer(bytes, channel, readNum);
}