泄漏:ByteBuf.release() 在被垃圾收集之前没有被调用。 Spring Reactor TcpServer

2024-01-25

我正在使用reactor-core [1.1.0.RELEASE],reactor-net [1.1.0.RELEASE]正在使用netty-all [4.0.18.Final],reactor-spring-context [1.1.0.RELEASE] & Spring Reactor TcpServer [Spring 4.0.3.RELEASE]。

我在 netty 中创建了简单的 REST API 来进行健康检查:/health。我已关注gs-reactor-thumbnailer https://github.com/spring-guides/gs-reactor-thumbnailer code

请看代码如下:

import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;

import org.springframework.stereotype.Service;

import reactor.function.Consumer;
import reactor.net.NetChannel;

@Service
public class HealthCheckNettyRestApi{

    public Consumer<FullHttpRequest> getResponse(NetChannel<FullHttpRequest, FullHttpResponse> channel,int portNumber){
        return req -> {
            if (req.getMethod() != HttpMethod.GET) {
                channel.send(badRequest(req.getMethod()
                        + " not supported for this URI"));
            } else {
                DefaultFullHttpResponse resp = new DefaultFullHttpResponse( HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                resp.content().writeBytes("Hello World".getBytes());
                resp.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain");
                resp.headers().set(HttpHeaders.Names.CONTENT_LENGTH,resp.content().readableBytes());
                //resp.release();
                channel.send((resp));
            }
        };
    }
}

在 Spring Boot 应用程序中,我将其连接为:

 @Bean
    public ServerSocketOptions serverSocketOptions(){
        return new NettyServerSocketOptions().
                pipelineConfigurer(pipeline -> pipeline.addLast(new HttpServerCodec()).
                        addLast(new HttpObjectAggregator(16*1024*1024)));
    }

 @Autowired
    private HealthCheckNettyRestApi healthCheck;

    @Value("${netty.port:5555}")
    private Integer nettyPort;

    @Bean
    public NetServer<FullHttpRequest, FullHttpResponse> restApi(Environment env,ServerSocketOptions opts) throws InterruptedException{
        NetServer<FullHttpRequest, FullHttpResponse> server = 
                new TcpServerSpec<FullHttpRequest, FullHttpResponse>(NettyTcpServer.class)
                .env(env)
                .dispatcher("sync")
                .options(opts)
                .listen(nettyPort)
                .consume(ch -> { 

                    Stream<FullHttpRequest> in = ch.in();
                    log.info("netty server is humming.....");

                    in.filter( (FullHttpRequest req) -> (req.getUri().matches(NettyRestConstants.HEALTH_CHECK)))
                    .when(Throwable.class, NettyHttpSupport.errorHandler(ch))
                    .consume(healthCheck.getResponse(ch, nettyPort));

                }).get();

        server.start().await(); //this is working as Tomcat is also deployed due to Spring JPA & web dependencies

        return server;

    }

当我使用 wrk 运行基准测试时:

wrk -t6 -c100 -d30s --latency 'http://localhost:8087/health'

然后我得到以下堆栈跟踪:

2015-04-22 17:23:21.072] - 16497 ERROR [reactor-tcp-io-22] --- i.n.u.ResourceLeakDetector: LEAK: ByteBuf.release() was not called before it's garbage-collected. Enable advanced leak reporting to find out where the leak occurred. To enable advanced leak reporting, specify the JVM option '-Dio.netty.leakDetectionLevel=advanced' or call ResourceLeakDetector.setLevel()


2015-04-22 23:09:26.354 ERROR 4308 --- [actor-tcp-io-13] io.netty.util.ResourceLeakDetector       : LEAK: ByteBuf.release() was not called before it's garbage-collected.
Recent access records: 0
Created at:
    io.netty.buffer.CompositeByteBuf.<init>(CompositeByteBuf.java:59)
    io.netty.buffer.Unpooled.compositeBuffer(Unpooled.java:355)
    io.netty.handler.codec.http.HttpObjectAggregator.decode(HttpObjectAggregator.java:144)
    io.netty.handler.codec.http.HttpObjectAggregator.decode(HttpObjectAggregator.java:49)
    io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:89)
    io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:341)
    io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:327)
    io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:155)
    io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:148)
    io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:341)
    io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:327)
    io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
    io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:116)
    io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:494)
    io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:461)
    io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:378)
    io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:350)
    io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    java.lang.Thread.run(Thread.java:745)

2015-04-22 23:09:55.217  INFO 4308 --- [actor-tcp-io-13] r.n.netty.NettyNetChannelInboundHandler  : [id: 0x260faf6d, /127.0.0.1:50275 => /127.0.0.1:8087] Connection reset by peer
2015-04-22 23:09:55.219 ERROR 4308 --- [actor-tcp-io-13] reactor.core.Reactor                     : Connection reset by peer

java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:446)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:871)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:224)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:108)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:494)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:461)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:378)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:350)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
    at java.lang.Thread.run(Thread.java:745)

我的分析:我想既然我转发了DefaultFullHttpResponse对于 Spring 实现,Spring API 应该注意调用release()方法。顺便说一句,我也尝试打电话release()方法来自我的实现,但我仍然遇到相同的错误。

有人可以告诉我实施有什么问题吗?


None

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

泄漏:ByteBuf.release() 在被垃圾收集之前没有被调用。 Spring Reactor TcpServer 的相关文章

随机推荐