在处理程序中组装 Netty 消息

2024-01-09

我正在为我的项目制作 Netty 原型。我正在尝试在 Netty 之上实现一个简单的面向文本/字符串的协议。在我的管道中,我使用以下内容:

public class TextProtocolPipelineFactory implements ChannelPipelineFactory
{
@Override
public ChannelPipeline getPipeline() throws Exception 
{
    // Create a default pipeline implementation.
    ChannelPipeline pipeline = pipeline();

    // Add the text line codec combination first,
    pipeline.addLast("framer", new DelimiterBasedFrameDecoder(2000000, Delimiters.lineDelimiter()));
    pipeline.addLast("decoder", new StringDecoder());
    pipeline.addLast("encoder", new StringEncoder());

    // and then business logic.
    pipeline.addLast("handler", new TextProtocolHandler());

    return pipeline;
}
}

我的管道中有一个 DelimiterBasedFrameDecoder、一个字符串解码器和一个字符串编码器。

由于此设置,我收到的消息被分成多个字符串。这会导致多次调用我的处理程序的“messageReceived”方法。这可以。但是,这需要我在内存中累积这些消息,并在收到消息的最后一个字符串数据包时重新构造消息。

我的问题是,“累积字符串”然后“将它们重新构造成最终消息”的最有效的内存方法是什么。到目前为止我有3个选择。他们是:

  • 使用 StringBuilder 进行累积并使用 toString 进行构造。 (这会带来最差的内存性能。事实上,对于具有大量并发用户的大型有效负载,这会带来不可接受的性能)

  • 通过 ByteArrayOutputStream 累积到 ByteArray 中,然后使用字节数组进行构造(这比选项 1 具有更好的性能,但它仍然占用相当多的内存)

  • 累加到一个Dymamic Channel Buffer中并使用toString(charset)来构造。我还没有分析过这个设置,但我很好奇它与上述两个选项相比如何。有人使用动态通道缓冲区解决了这个问题吗?

我是 Netty 新手,我可能在架构上做错了什么。我们将非常感谢您的意见。

提前致谢 索希尔

添加我的自定义 FrameDecoder 实现以供 Norman 审核

public final class TextProtocolFrameDecoder extends FrameDecoder 
{
public static ChannelBuffer messageDelimiter() 
{
      return ChannelBuffers.wrappedBuffer(new byte[] {'E','O','F'});
    }

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,ChannelBuffer buffer) 
throws Exception 
{
    int eofIndex = find(buffer, messageDelimiter());

    if(eofIndex != -1)
    {
        ChannelBuffer frame = buffer.readBytes(buffer.readableBytes());
        return frame;
    }

    return null;
}

private static int find(ChannelBuffer haystack, ChannelBuffer needle) {
    for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i ++) {
        int haystackIndex = i;
        int needleIndex;
        for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex ++) {
            if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {
                break;
            } else {
                haystackIndex ++;
                if (haystackIndex == haystack.writerIndex() &&
                    needleIndex != needle.capacity() - 1) {
                    return -1;
                }
            }
        }

        if (needleIndex == needle.capacity()) {
            // Found the needle from the haystack!
            return i - haystack.readerIndex();
        }
    }
    return -1;
   }
  }

我认为如果您实现自己的 FrameDecoder,您将获得最佳性能。这将允许您缓冲所有数据,直到您确实需要将其分派到链中的下一个处理程序。请参阅帧解码器 http://static.netty.io/3.5/api/org/jboss/netty/handler/codec/frame/FrameDecoder.htmlapidocs。

如果您不想自己处理 CRLF 的检测,也可以保留 DelimiterBasedFrameDecoder 并在其后面添加一个自定义 FrameDecoder 来组装表示一行文本的 ChannelBuffer。

在这两种情况下,FrameDecoder 都会尝试“包装”缓冲区而不是每次都复制它们,从而尽可能减少内存复制。

也就是说,如果您想获得最佳性能,请使用第一种方法,如果您想要轻松,请使用第二种方法;)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

在处理程序中组装 Netty 消息 的相关文章

随机推荐