如何自定义TCP通信协议

2023-05-16

       物联网行业智能硬件之间的通信、异构系统之间的对接、中间件的研发、以及各种即时聊天软件等,都会涉及自定义协议。

为了满足不同的业务场景的需要, 应用层之间通信需要实现各种各样的网络协议。以异构系统的对接为例。在早期,我们使用 Web Service 来解决异构系统的对接,后来我们逐渐使用 MQ、RPC 等方式来实现异构系统的通信和整合。

Web Service 是使用 SOAP 协议通过 HTTP 进行传输。MQ 有很多常用的消息队列协议,例如 AMQP、MQTT、STOMP 等,而新兴的消息队列,如 Kafka 和 ZeroMQ,它们并没有严格遵循 MQ 规范,而是基于TCP/IP 协议自行封装了一套协议,并通过 TCP 进行传输。另外,像 Dubbo 这样的 RPC 框架,本身支持多种协议。其自身的 Dubbo 协议也是阿里巴巴自己实现的应用层协议,并通过 TCP 进行传输。

因此,设计好一款合理的、可扩展的自定义协议,可以打通不同的异构系统,亦或者可以作为一款 RPC 框架的基石。今天我将手把手带你设计一个高效、可扩展、易维护的自定义通信协议,以及如何使用 Netty 实现该协议的 TCP 服务端。

为什么需要自定义通信协议?

我们在开发一款工业自动化的智能硬件时,通常需要一台上位机(一般是一款桌面端应用程序)来控制不同的硬件设备。上位机可以独立存在,也可以由 Web 后台发送指令到上位机,再通过上位机来控制智能硬件,以此来完成业务上的操作。

从上位机到 Web 后台之间的通信,可能是由一个 TCP 长连接(也可能是 WebSocket 长连接)来进行维护,而上位机到各个硬件设备之间也可能通过长连接来维护,当然也可以是串口、MQTT、CoAP 等协议,这主要取决于所连接的设备。从 Web 后台到上位机再到智能硬件,假如都使用了 TCP 长连接,那么后两者甚者可以使用 TCP 透传。

无论是 TCP 的长连接,还是 WebSocket 的长连接,本质都是基于 TCP 的连接,为此我们需要使用 Socket 编程,通俗地说可以认为它是对 TCP 协议的具体实现。此外,我们所熟知的中间件、网络游戏、智能硬件、金融等领域也都会涉及 Socket 相关的编程。在使用 Socket 编程时,我们经常会听到别人提起“自定义协议”。事实上,目前已经有了很多标准的协议,那我们为何还需要“自定义”呢?

我们先从下面这张 OSI 七层模型的图开始,快速回顾一下网络通信的面貌。

TCP/IP 协议将 OSI 七层模型进行了简化,变成四层模型。

在 TCP/IP 协议中从应用层到网络接口层,每一层传输的数据包都会包含两部分内容:一部分是协议所要用到的首部,另一部分是从上一层传过来的数据。下图展示了 TCP/IP 包的全貌。

我们所熟知的各种网络应用程序都是在应用层上使用的,TCP/IP 协议的应用层为我们提供了多种常见的应用层协议,例如 HTTP、SSH、Telnet、FTP 等。正是有了这些协议,各种网络应用程序才可以为我们服务。

另外,应用层也支持给我们的程序“量身”制定协议,也就是支持“自定义协议”。当常用的应用层协议不满足我们的应用开发时,例如扩展性不够、安全性不足、不能针对特定领域、无法追求极致的性能等,就需要“自定义协议”。

如何设计自定义通信协议?

TCP 是一种流模式的协议,在实现自定义协议时,我们会遇到诸如以下的问题:

1.应用程序如何知道业务数据是全部接收完毕的,如何解决拆包和粘包问题?

2.如何实现请求/响应机制?

3.如何解决超时问题和实际应用的通信需求?

4.如何定义消息指令或报文类型?

……

自定义通信协议

为了解决上述的问题,首先我们介绍一种比较通用的 TCP 通信协议,其协议结构如下:

+--------------+---------------+------------+---------------+-----------+-

| 魔数(4)| version(1)|序列化方式(1)|command(1)|SerialNo(2)|数据长度(4)|数据(n)   |

+--------------+---------------+------------+---------------+-----------+-

下面我们对这个协议中的内容展开介绍。

魔数:4 个字节,为了防止该 TCP 端口被意外调用。我们在收到报文后取前 4 个字节与魔数比对,如果不相同则直接拒绝并关闭连接。魔数可以随意定义,比如采用 20200803 作为魔数,它的 16 进制是 0x1343d63。

版本号:1 个字节,仅表示协议的版本号,便于协议升级时使用。

序列化方式:1 个字节,表示如何将 Java 对象转化为二进制数据,以及如何反序列化。

指令:1 个字节,也可以叫报文类型,表示该消息的意图,如登录、心跳、升级,以及不同的业务指令等。最多可支持 256 种指令(-127 到 127)。

SerialNo:2 个字节,表示整个任务的 id 或者任务的流水号,便于进行追踪。最多支持 2^16 位(-32,768 到 32,767)。

数据长度:4 个字节,表示该字段后数据部分的长度。类似于 HTTP 协议的报文头中的 Content-Length  这个字段。最多支持 2^32 位。

数据:具体的数据内容。

根据上述设计的通信协议,定义一个报文类 Message,它代表通信协议的报文,如下所示:


public abstract class Message<T extends MessageBody> {
    private MessageHeader messageHeader;
    private T messageBody;
    public T getMessageBody() {
        return messageBody;
  

}

}

Message 参考 TCP 协议,将其抽象成由 Header 和 Payload 组成(即首部和数据块)。其中,报文的 Header 部分共 9 个字节,包含魔数、版本号、序列化方式、指令、SerialNo,结构如下:

+--------------+---------------+------------+---------------+-----------+

| 魔数(4)       | version(1)    |序列化方式(1)      | command(1)           |SerialNo(2)|

+--------------+---------------+------------+---------------+-----------+

因此可以定义一个如下的 Header 类:


  

public class MessageHeader {

    private int magicNumber; // 魔数

    private int version = 1; // 版本号,当前协议的版本号为 1

private int serializeMethod; // 序列化方式,默认使用 json

    private int command;      // 消息的指令


  

    private long serialNo;    // 任务的流水号

}

每个 Payload 都是报文的具体内容,即协议体。它可以是一个字符串也可以是一个复杂的对象,因此我们定义一个空接口用于表示 Payload,所有的 Payload 都需要实现该接口:


public abstract class MessageBody {
}  

考虑到需要预留和扩展性,以避免在将来报文经常性地被修改,可以给 Payload 增加一个预留的属性 extra ,它是一个 Map 类型。因此,再定义一个基类的 BasePayload,我们也可以在 Header 中额外定义一个字段作为一个预留字段。

按照上述的设计,该协议的报文头/首部只有 9 个字节,相比于 HTTP 协议的报文头还是少了很多,极大地精简了传输内容。这也是为什么后端的 RPC 框架通常会采用自定义 TCP 协议进行通信。

Packet 的一次完整旅行

介绍完自定义通信协议后,我们来看看 Packet 在一个 TCP 服务中是怎样经历一次完整的旅行的。

(1)定义指令集

在业务系统中,我们通常需要定义很多个指令,一个指令对应一个 Packet。Header 的 command 字段用来区分不同的指令。

在 Packet 的 Header 中,command 定义了 1 个字节,表示它支持  256 种指令。所以,我们可以定义一个最多包含 256 个指令的指令集 Commands,其定义方式如下:


 /*
 * 指令集
 */
public interface Command {
    /**
     * 心跳包
     */
    final Byte HEART_BEAT = 0;
    /**
     * 登录请求
     */
    final Byte LOGIN_REQUEST = 1;
    /**
     * 登录响应
     */
    final Byte LOGIN_RESPONSE = 2;
    /**
     * 消息请求
     */
    final Byte MESSAGE_REQUEST = 3;
    /**
     * 消息响应
     */
  

final Byte MESSAGE_RESPONSE = 4;

}

当然,如果觉得 256 个指令不够,修改协议 Header 中 command 的字节数即可。

下面以心跳的 Packet 为例,首先定义一个 HeartBeatPacket:


public class HeartBeatPacket extends Packet {
    private String msg = "ping-pong";
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
    @Override
    public Byte getCommand() {
        return HEART_BEAT;
    }
}  

心跳包一般是由 TCP 客户端发起,经由 TCP 服务端接收后,进行响应并返回给客户端。它像心跳一样每隔固定时间发送一次,以此来告诉服务端,这个客户端还活着。

(2) 定义序列化方式

心跳包的内容很小,可以使用 JSON 进行解析。但是对于图片、视频、日志文件等比较大的内容,可能需要使用 Java 自带的序列化方式,或由 Kryo、Hessian、FST、Protobuf 等框架实现对象的序列化和反序列化。

因此,我们定义一个序列化方式的常量列表,代码如下:


* 定义序列化算法
 */
public interface SerializeAlgorithm {
    /**
     * json序列化标识
     */
  

byte json = 1;

    

    byte binary = 2;


  

    byte fst = 3;

}

上面的代码表示目前只支持这些序列化方式,后续可以不断添加新的序列化方式。

再定义一个序列化接口,每种序列化方式需要一个相应的实现,代码如下:


*
 * Serializer,用来指定序列化算法,用于序列化对象
 */
public interface Serializer {
    /**
     * @return 序列化算法
     */
    byte getSerializerAlgorithm();
    /**
     * 将对象序列化成二进制
     *
     */
    byte[] serialize(Object object);
    /**
     * 将二进制反序列化为对象
     */
    <T> T deSerialize(Class<T> clazz, byte[] bytes);
}  

由于,存在多个序列化方式,可以考虑设计一个序列化的工厂类SerializerMap,通过工厂类来获取指令所需要的序列化实现。

private static final Map<Byte, Serializer> serializerMap;


serializerMap = new HashMap<Byte, Serializer>();
Serializer serializer = new JsonSerializer();
serializerMap.put(serializer.getSerializerAlgorithm(), serializer);  

(3)定义 Packet 的工厂类

最初,我们将 Packet 抽象成 Header 和 Payload 两部分,因此 Packet 的生成也包含了两部分的生成。

前面,我们定义了一些客户端、服务端的指令,也知道不同的指令对应不同的 Packet。因此,可以通过指令来生成对应的 Payload。Header 中本身就包含了 command,唯一需要注意的就是序列化方式 serializeMethod,不同的 command 对应唯一的 serializeMethod。

下面是 Packet 的工厂类,用于生成 Payload 和 Header:


*
 * 编解码对象
 */
public class PacketCodeC {
    /**
     * 魔数
     */
    public static final int MAGIC_NUMBER = 0x88888888;
    public static PacketCodeC instance = new PacketCodeC();
    /**
     * 采用单例模式
     */
    public static PacketCodeC getInstance(){
        return instance;
    }
    private static final Map<Byte, Class<? extends Packet>> packetTypeMap;
static {
        packetTypeMap = new HashMap<Byte,Class<? extends Packet>>();
        packetTypeMap.put(HEART_BEAT, HeartBeatPacket.class);
        packetTypeMap.put(LOGIN_REQUEST, LoginRequestPacket.class);
        packetTypeMap.put(LOGIN_RESPONSE, LoginResponsePacket.class);
        packetTypeMap.put(MESSAGE_REQUEST, MessageRequestPacket.class);
        packetTypeMap.put(MESSAGE_RESPONSE, MessageResponsePacket.class);
    }
    private PacketCodeC(){
  

}

}

(4)实现报文的 encode、decode

到了这里,我们的工作还差了报文的 encode、decode。我们可以定义一个报文的管理类 PacketManager,用于对报文进行 encode、decode。


/**
 * 编码
 *
 * 魔数(4字节) + 版本号(1字节) + 序列化算法(1字节) + 指令(1字节) + 数据长度(4字节) + 数据(N字节)
 */
public ByteBuf encode(ByteBufAllocator alloc, Packet packet){
    //创建ByteBuf对象
    ByteBuf buf = alloc.ioBuffer();
   return encode(buf,packet);
  

}

public ByteBuf encode(ByteBuf buf,Packet packet){
//序列化java对象
byte[] objBytes = serializer.serialize(packet);
//实际编码过程,即组装通信包
//魔数(4字节) + 版本号(1字节) + 序列化算法(1字节) + 指令(1字节) + 数据长度(4字节) + 数据(N字节)
buf.writeInt(MAGIC_NUMBER);
buf.writeByte(packet.getVersion());
buf.writeByte(serializer.getSerializerAlgorithm());

    buf.writeByte(packet.getCommand());


  

    buf.writeShort(serialNo);

    buf.writeInt(objBytes.length);
buf.writeBytes(objBytes);
return buf;
}
public ByteBuf encode(Packet packet){
return encode(ByteBufAllocator.DEFAULT, packet);
}
/**
* 解码
*

* 魔数(4字节) + 版本号(1字节) + 序列化算法(1字节) + 指令(1字节)+序列 2

+ 数据长度(4字节) + 数据(N字节)

*/
public Packet decode(ByteBuf buf){
//魔数校验(handler单独处理)
buf.skipBytes(4);
//版本号校验(暂不做)
buf.skipBytes(1);
//序列化算法
byte serializeAlgorithm = buf.readByte();
//指令

    byte command = buf.readByte();

    //序列号

    buf.readShort(serialNo);

    //数据长度
int length = buf.readInt();
//数据
byte[] dataBytes = new byte[length];
buf.readBytes(dataBytes);
Class<? extends Packet> packetType = getRequestType(command);
Serializer serializer = getSerializer(serializeAlgorithm);
if(packetType != null && serializer != null){
return serializer.deSerialize(packetType,dataBytes);
}
return null;
}

encode() 方法是将 Packet 对象组装成 Netty 的 ByteBuf 对象,组装的方式完全是按照自定义的 TCP 协议来,顺序千万不能错,否则 decode() 无法解析。

需要注意的是,不同的报文可能会采用不同的序列化方式。需要从 Packet 的 Header 中读取 serializeMethod ,然后从工厂类 SerializerMap 中获取对应的序列化实现 serializer。

这样,客户端和服务端就可以进行交互了,TCP 的报文也可以在我们的 TCP 服务中完成一次完整的旅行。

TCP 服务端的设计

服务端采用 Netty 框架,我们使用的是 Netty 的主从多线程 Reactor 模型。Reactor 模型是 Netty 实现高性能的基础,Netty 的 Reactor 模型分为三种:

1.单线程模型、2.多线程模型、3.主从多线程模型。

主从多线程模型由多个 Reactor 线程组成,MainReactor 负责处理客户端连接的 Accept 事件,连接建立成功后将新创建的连接对象注册至 SubReactor。SubReactor 分配线程池中的 I/O 线程与其连接绑定,负责连接生命周期内所有的 I/O 事件。主从多线程模型可以利用 CPU 的多核来提升系统的吞吐量,因此这也是 Netty 推荐使用的模型。

我们需要在服务端定义 boss 和 worker 这两个 Reactor。其中,boss 是主 Reactor,worker 是从 Reactor。它们分别使用不同的 NioEventLoopGroup,主 Reactor 负责处理 Accept 然后把 Channel 注册到从 Reactor 上,从 Reactor 主要负责 Channel 生命周期内的所有 I/O 事件。


@ChannelHandler.Sharable
public class Server {
    private static Logger logger = LoggerFactory.getLogger(Server.class);
    private static int port = 8888;
    public static void main(String[] strings){
        port = StringUtil.isNullOrEmpty(System.getProperty("port")) ? port : Integer.parseInt(System.getProperty("port"));
        NioEventLoopGroup boss = new NioEventLoopGroup();
        NioEventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap bootstrap = new ServerBootstrap();
        bootstrap.group(boss,worker).channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        nioSocketChannel.pipeline().addLast(new ServerIdleHandler());
                        nioSocketChannel.pipeline().addLast(new MagicNumValidator());
                        nioSocketChannel.pipeline().addLast(PacketCodecHandler.getInstance());
                        nioSocketChannel.pipeline().addLast(LoginRequestHandler.getInstance());
                        nioSocketChannel.pipeline().addLast(HeartBeatHandler.getInstance());
                        nioSocketChannel.pipeline().addLast(AuthHandler.getInstance());
                        nioSocketChannel.pipeline().addLast(ServerHandler.getInstance());
                    }
                });
        ChannelFuture future = bootstrap.bind(port);
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (channelFuture.isSuccess()){
                    logger.info("server started! using port {} " , port);
                }else {
                    logger.info("server start failed! using port {} " , port);
                    channelFuture.cause().printStackTrace();
                    System.exit(0);
                }
            }
        });
    }
}
  

ChannelHandler 的使用

从上述代码中,可以看到 worker 处理了各种各样的 Handler。其中,ServerIdleHandler 继承 Netty 自带的 IdleStateHandler 类,用于检测连接的有效性。如果 150秒内没有收到心跳,则断开连接。


* 心跳检测,150s没收到心跳包的话,断开连接
 */
public class ServerIdleHandler extends IdleStateHandler {
    private static Logger logger = LoggerFactory.getLogger(ServerIdleHandler.class);
    private static int HERT_BEAT_TIME = 150;
    public ServerIdleHandler() {
        super(0, 0, HERT_BEAT_TIME);
    }
    @Override
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        logger.info("{}内没有收到心跳,关闭连接...",HERT_BEAT_TIME);
        ctx.channel().close();
    }
}  

MagicNumValidator:用于 TCP 报文的魔数校验。


public class MagicNumValidator extends LengthFieldBasedFrameDecoder {
    private static final int LENGTH_FIELD_OFFSET = 7;
    private static final int LENGTH_FIELD_LENGTH = 4;
    public MagicNumValidator() {
        super(Integer.MAX_VALUE, LENGTH_FIELD_OFFSET, LENGTH_FIELD_LENGTH);
    }
    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        //魔数校验不通过
        if(in.getInt(in.readerIndex()) != MAGIC_NUMBER){
            ctx.channel().close();
            return null;
        }
        return super.decode(ctx, in);
    }
}
  

PacketCodecHandler:解析报文的 Handler。PacketCodecHandler 继承自 ByteToMessageCodec ,它是用来处理 byte-to-message 和 message-to-byte,便于解码字节消息成 POJO 或编码 POJO 消息成字节。

这一步非常关键。因为 TCP 作为传输层的协议,无法理解上层业务数据的具体含义,它根据 TCP 缓冲区的实际情况进行数据包的划分。在业务上认为是一个完整的包,很可能会被 TCP 拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包进行发送,这就是所谓的 TCP 粘包和拆包问题。在这一步,我们通过自定义的编解码器解决了粘包和拆包问题。

在这里,我们看到 PacketCodecHandler 使用上面提到的报文管理类 PacketManager 的 encode()、decode() 方法来完成编解码的过程。


@ChannelHandler.Sharable
public class PacketCodecHandler extends MessageToMessageCodec<ByteBuf,Packet> {
    private PacketCodecHandler(){}
    private static PacketCodecHandler instance = new PacketCodecHandler();
    public static PacketCodecHandler getInstance(){
        return instance;
    }
    protected void encode(ChannelHandlerContext ctx, Packet packet, List<Object> list) throws Exception {
        ByteBuf byteBuf = ctx.channel().alloc().ioBuffer();
        PacketCodeC.getInstance().encode(byteBuf,packet);
        list.add(byteBuf);
    }
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> list) throws Exception {
        list.add(PacketCodeC.getInstance().decode(buf));
    }
}  

HeartBeatHandler:心跳的 Handler,接收 TCP 客户端发来的"ping",然后给客户端返回"pong"。


public class HeartBeatPacket extends Packet {
    private String msg = "ping-pong";
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
    @Override
    public Byte getCommand() {
        return HEART_BEAT;
    }
}  

ResponseHandler:通用的处理接收 TCP 客户端发来业务指令的 Handler,可以根据对应的指令去查询对应的 Handler,并对这些命令进行响应。

最后,我们在 ResponseHandler 中,看到还有一个 ThreadPool,它是一个业务线程池。但是在我们所定义的 TCPServer 中, worker 本身使用了一个线程池,为何还需要一个业务线程池呢?

业务线程池的使用

Netty 的 Reactor 线程模型适合处理耗时短的任务场景,对于耗时较长的 ChannelHandler 来说,维护一个业务线程池是一个比较好的做法。将编解码后的数据封装成任务放入线程池中,避免 ChannelHandler 阻塞而造成 EventLoop 不可用。

如果有复杂且耗时的业务逻辑,我推荐的做法是在 ChannelHandler 处理器中自定义新的业务线程池,从而将这些耗时的操作提交到业务线程池中执行。

例如定义一个业务线程池,代码如下:


  

@Slf4j

public final class ThreadPoolFactoryUtils {
 

/**
* 通过 threadNamePrefix 来区分不同线程池(我们可以把相同 threadNamePrefix 的线程池看作是为同一业务场景服务)。
* key: threadNamePrefix
* value: threadPool
*/
private static final Map<String, ExecutorService> THREAD_POOLS = new ConcurrentHashMap<>();
private ThreadPoolFactoryUtils() {
}
public static ExecutorService createCustomThreadPoolIfAbsent(String threadNamePrefix) {
CustomThreadPoolConfig customThreadPoolConfig = new CustomThreadPoolConfig();
return createCustomThreadPoolIfAbsent(customThreadPoolConfig, threadNamePrefix, false);
}
public static ExecutorService createCustomThreadPoolIfAbsent(String threadNamePrefix, CustomThreadPoolConfig customThreadPoolConfig) {
return createCustomThreadPoolIfAbsent(customThreadPoolConfig, threadNamePrefix, false);
}
public static ExecutorService createCustomThreadPoolIfAbsent(CustomThreadPoolConfig customThreadPoolConfig, String threadNamePrefix, Boolean daemon) {
// 存在则取出,不存新建一个
ExecutorService threadPool = THREAD_POOLS.computeIfAbsent(threadNamePrefix, k -> createThreadPool(customThreadPoolConfig, threadNamePrefix, daemon));
// 如果 threadPool 被 shutdown 的话就重新创建一个
if (threadPool.isShutdown() || threadPool.isTerminated()) {
THREAD_POOLS.remove(threadNamePrefix);
threadPool = createThreadPool(customThreadPoolConfig, threadNamePrefix, daemon);
THREAD_POOLS.put(threadNamePrefix, threadPool);
}
return threadPool;
}
/**
* shutDown 所有线程池
*/
public static void shutDownAllThreadPool() {
log.info("call shutDownAllThreadPool method");
THREAD_POOLS.entrySet().parallelStream().forEach(entry -> {
ExecutorService executorService = entry.getValue();
executorService.shutdown();
log.info("shut down thread pool [{}] [{}]", entry.getKey(), executorService.isTerminated());
try {
executorService.awaitTermination(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("Thread pool never terminated");
executorService.shutdownNow();
}
});
}
private static ExecutorService createThreadPool(CustomThreadPoolConfig customThreadPoolConfig, String threadNamePrefix, Boolean daemon) {
ThreadFactory threadFactory = createThreadFactory(threadNamePrefix, daemon);
return new ThreadPoolExecutor(customThreadPoolConfig.getCorePoolSize(), customThreadPoolConfig.getMaximumPoolSize(),
customThreadPoolConfig.getKeepAliveTime(), customThreadPoolConfig.getUnit(), customThreadPoolConfig.getWorkQueue(),
threadFactory);
}
/**
* 创建 ThreadFactory 。如果threadNamePrefix不为空则使用自建ThreadFactory,否则使用defaultThreadFactory
*
* @param threadNamePrefix 作为创建的线程名字的前缀
* @param daemon 指定是否为 Daemon Thread(守护线程)
* @return ThreadFactory
*/
public static ThreadFactory createThreadFactory(String threadNamePrefix, Boolean daemon) {
if (threadNamePrefix != null) {
if (daemon != null) {
return new ThreadFactoryBuilder()
.setNameFormat(threadNamePrefix + "-%d")
.setDaemon(daemon).build();
} else {
return new ThreadFactoryBuilder().setNameFormat(threadNamePrefix + "-%d").build();
}
}
return Executors.defaultThreadFactory();
}


}
  

业务线程池的使用很简单,在 businessAction() 方法中, block 参数是一个 Lambda 表达式,用于执行耗时的业务逻辑,通过 Header 中的 command 来查找服务端所对应使用的 ChannelHandler ,并作出响应。

针对特别复杂的业务,还可以根据业务的特点拆分出多个业务线程池。这样做的好处是:即使某个业务逻辑出现异常造成线程池资源耗尽,也不会影响到其他业务逻辑,从而提高应用程序整体的可用性。也做到了线程池的隔离。

总结,只有充分理解各个硬件、各个软件系统可实现的功能,才能设计出合理的自定义协议。反之,理解了一些常用的中间件相关协议,也可以帮助我们深入理解这些中间件,甚至还可以实现各个中间件的代理功能。

实际上对于其他的高级语言实现自定义协议也是类似的。当你真正理解了自定义 TCP 协议,以后再遇到新的协议,例如自定义的串口协议,会更容易理解。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何自定义TCP通信协议 的相关文章

  • mybaits之动态sql

    mybaits除了提供连接数据库 xff0c 使java和数据库语句分离之外 xff0c 还有一个显著的特点就是使用动态sql语句 这些sql语句均写在map映射文件中 xff0c 并通过一系列标记来完成 1 if标记 常用形式 xff1a
  • 平衡二叉树-的四种旋转调整(代码,图解)

    平衡二叉树 的四种旋转调整 xff08 代码 xff0c 图解 xff09 1 右单旋 xff1a 新插入节点插入在较高左子树的左侧 xff08 左左右 xff09 xff0c 插入新节点二十 xff1a 1 修改parent和curLR的
  • 头文件里一般会加入的宏定义,为了避免一个头文件被重复调用

    华清视频里讲的 xff0c 写代码的时候 xff0c 头文件一般地都会加上一个类似这样的宏 xff0c 希望你从此以后再写头文件 xff0c 加上一个宏 加了这个有一个好处 xff0c 当你第一次包含tree h的时候 xff0c 如果没定
  • 自己组装Pixhawk F450无人机的一些细节

    首先参考文档为 xff1a 1 https mp weixin qq com s VXKU kIB v i0AX3zgtLig 2 https mp weixin qq com s Qzzl dQ6Tz2pXNp7Oj0lTg 3 http
  • 电机和桨叶要搭配选择

    拍自 四旋翼无人机的制作与飞行
  • 接收机PPM与SBUS

    最开始是自己弄ACfly的飞控时发现插接收机有两个位置 xff0c 一个PPM一个SBUS xff0c 我想直接移植Pixhawk的接收机或者无名的接收机到ACfly模块上 我最后发现Pixhawk和无名的也是留了两个给接收机 xff08
  • 现在发现如果无人机的电机不同,浆可能是不能混用的。

    现在发现如果无人机的电机不同 xff0c 浆可能是不能混用的 孔位不同 xff0c 我之前在无名那里买了很多浆 xff0c 觉得这次F330的浆没了可以用那个替 xff0c 我刚刚试了下发现插不进去 xff0c 不能通用 包括我刚才在店家那
  • 无人机电池似乎可以并联,串联组合

    之前总是见到这种奇怪的线 xff0c 一直不知道作什么用 xff0c 现在大概清楚了 是不是这样可以实现更长时间的续航 xff0c 我之前再ACfly的群里看到一个人的六轴是上面放了两个电池的 xff0c 他这可能也是并联的
  • 任务的三要素是任务主体函数,任务栈和任务控制块

    任务的三要素是任务主体函数 xff0c 任务栈和任务控制块 由xTaskCreateStatic 函数来把三者联合起立 下面拍自野火的 FreeRTOS内核实现与应用开发实战指南
  • 如何用Realsense D435i运行VINS-Mono等VIO算法 获取IMU同步数据

    摘自 xff1a https blog csdn net qq 41839222 article details 86552367 如何用Realsense D435i运行VINS Mono等VIO算法 获取IMU同步数据 Manii 20
  • Opencv安装与环境配置

    转载自 xff1a https blog csdn net sm16111 article details 81238324 Opencv安装与环境配置 代码敌敌畏 2018 07 27 15 46 24 50411 收藏 94 分类专栏
  • 串口参数详解:波特率,数据位,停止位,奇偶校验位

    转载自 xff1a https blog csdn net sinat 35705952 article details 89034455 串口参数详解 xff1a 波特率 xff0c 数据位 xff0c 停止位 xff0c 奇偶校验位 W
  • cpp-httplib库简单原理,听说你还不会开源库?

    cpp httplib库的原理 听说你还不会开源库 xff1f 介绍httplib h头文件的处理流程httplib h头文件的组成httplib h头文件搭建服务端与客户端的原理Get接口listen 0 0 0 0 8989 接口 介绍
  • UART串口调试

    转载自 xff1a https www secpulse com archives 157847 html UART串口调试 脉搏文库 TideSec 2021 04 23 4 356 0x00前言 前段时间陆陆续续的对光猫 路由器 摄像头
  • visca协议及其实现的简单认识

    转载自 xff1a https latelee blog csdn net article details 35811777 visca协议及其实现的简单认识 李迟 2014 06 30 14 09 01 7064 收藏 12 分类专栏 x
  • C语言实现的一个简单的HTTP程序

    转载自 xff1a https www cnblogs com xuwenmin888 archive 2013 05 04 3059282 html C语言实现的一个简单的HTTP程序 以下是参考 lt winsock网络编程经络 gt
  • ideavim使用

    IdeaVim 常用操作 IdeaVim简介 IdeaVim是IntelliJ IDEA的一款插件 xff0c 他提高了我们写代码的速度 xff0c 对代码的跳转 xff0c 查找也很友好 安装之后它在 Tools gt Vim Emula
  • CAN总线——数据传输故障处理

    最近遇到CAN总线通讯的问题 上位机为arm板 xff0c 核心板为Cortex A9处理器 Linux内核 下位机为5块 STM32板 现象为 xff1a 如果上位机只接收数据 xff0c 一切通讯正常 当上位机下发命令 xff0c 那么
  • 升级构建工具,从Makefile到CMake

    更多博文 xff0c 请看音视频系统学习的浪漫马车之总目录 C C 43 43 编译 浅析C C 43 43 编译本质 一篇文章入门C C 43 43 自动构建利器之Makefile 升级构建工具 xff0c 从Makefile到CMake
  • RTKLIB简介

    RTKLIB是全球导航卫星系统GNSS global navigation satellite system 的标准 amp 精密定位开源程序包 xff0c RTKLIB由日本东京海洋大学 xff08 Tokyo University of

随机推荐

  • zzuli OJ 1038: 绝对值最大

    Description 输入3个整数 xff0c 输出绝对值最大的那个数 Input 输入包含3个int范围内的整数 xff0c 用空格隔开 Output 输出三个数中绝对值最大的数 xff0c 单独占一行 若绝对值最大的数不唯一 xff0
  • md5sum

    ERROR 1550456422 414780061 Client Lidar cipv 213 wants topic rs percept result to have datatype md5sum autodrive msgs Pe
  • libcurl实现HTTP

    关于libcurl的相关函数介绍以及参数详见官方说明 https curl haxx se libcurl c example html HTTP Request 一个http请求包含方法 路径 http版本 请求包头 请求方法 GET H
  • 深夜没事,抓个ARP包吧!

    深夜没事 xff0c 抓个ARP包吧 xff01 ipconfig查看网卡信息 选择en33这个网卡 xff0c 发送两次 xff0c 询问192 168 21 1的mac地址 xff0c 注意 xff1a ARP请求只能在同一子网内部进行
  • linux基础篇(一)——GCC和Makefile编译过程

    linux系列目录 xff1a linux基础篇 xff08 一 xff09 GCC和Makefile编译过程 linux基础篇 xff08 二 xff09 静态和动态链接 ARM裸机篇 xff08 一 xff09 i MX6ULL介绍 A
  • jni/ndk问题 :引用so库报错: java.lang.UnsatisfiedLinkError: No implementation found for

    问题 xff1a 引用so库报错 xff1a java span class token punctuation span lang span class token punctuation span UnsatisfiedLinkErro
  • 《python+opencv实践》一、基于颜色的物体追踪(上)

    点击打开链接 本文主要参考国外一大牛博客 xff0c 然后自己修改得来 相关知识点在这里 实现功能 xff1a 追踪红颜色瓶盖 xff0c 并画出瓶盖轮廓和运动轨迹 from collections import deque import
  • C++的sort函数实现字符串排序

    一 背景 sort函数用于C 43 43 中 xff0c 对给定区间所有元素进行排序 头文件是 include lt algorithm gt 实现原理 xff1a sort并不是简单的快速排序 xff0c 它对普通的快速排序进行了优化 x
  • C# 中的Dispose()用法

    一 对Dispose方法的理解是什么呢 xff1f 使用Dispose方法的对象 xff0c 应释放它拥有的所有资源 它还应该通过调用其父类型的Dispose方法释放其基类型拥有的所有资源 net的对象使用一般分为三种情况 1 创建对象 2
  • C++的 remove函数

    一 介绍 remove函数原型如下 xff1a template lt class ForwardIt class T gt ForwardIt remove ForwardIt first ForwardIt last const T a
  • 主板上的南桥与北桥

    一 历史 曾经 xff0c 北桥芯片和南桥芯片都是主板芯片组中最重要的组成部分 传统来说 xff0c 靠上方的叫北桥 xff0c 靠下方的叫南桥 北桥负责与CPU通信 xff0c 并且连接高速设备 xff08 内存 显卡 xff09 xff
  • CMake的add_library与target_link_libraries

    一 add library介绍 使用该命令可以在Linux下生成 xff08 静态 动态 xff09 库so或者 a文件 xff0c Windows下就是dll与lib文件 xff0c 它有两种命令格式 1 1 第一种格式 xff1a No
  • Linux下终止正在执行的shell脚本

    一 问题 Linux系统Shell中提交了一个脚本 xff0c 但是需要停止这个进程 xff0c 如何处理 xff1f 二 方案1 killall fileName 说明 xff1a killall是一个命令 xff0c 不是kill al
  • Qt对象树的销毁

    一 问题 在C 43 43 中中 xff0c 我们都知道 xff1a delete 和 new 必须配对使用 一 一对应 xff1a delete少了 xff0c 则内存泄露 为什么Qt使用new来创建一个控件 xff0c 但是却没有使用d
  • DNS域名解析之递归与非递归查询

    DNS域名解析之递归与非递归查询 递归查询迭代查询实例 递归查询 主机向本地域名服务器的查询一般是递归查询 xff1a 如果本地域名服务器不知道查询的IP地址 xff0c 那么本地域名服务器就会以DNS客户的身份向根域名服务器继续发生请求
  • spi,iic,uart,pcie区别

    一 spi SPI 是英语Serial Peripheral interface的缩写 xff0c 顾名思义就是串行外围设备接口 xff0c 是同步传输协议 xff0c 特征是 xff1a 设备有主机 xff08 master xff09
  • 决策树的介绍

    一 介绍 决策树 decision tree 是一类常见的机器学习方法 它是一种树形结构 xff0c 其中每个内部节点表示一个属性上的判断 xff0c 每个分支代表一个判断结果的输出 xff0c 最后每个叶节点代表一种分类结果 例如 xff
  • 支持向量机

    一 是否线性可分的问题 考虑图6 1中 xff0c A D共4个方框中的数据点分布 xff0c 一个问题就是 xff0c 能否画出一条直线 xff0c 将圆形点和方形点分开呢 xff1f 比如图6 2中 xff0c 方框A中的两组数据 xf
  • cmake 链接库名称扩展

    多个文件 macro span class token punctuation span configure lib by types OUTLIBS DebugSuffix span class token punctuation spa
  • 如何自定义TCP通信协议

    物联网行业智能硬件之间的通信 异构系统之间的对接 中间件的研发 以及各种即时聊天软件等 xff0c 都会涉及自定义协议 为了满足不同的业务场景的需要 xff0c 应用层之间通信需要实现各种各样的网络协议 以异构系统的对接为例 在早期 xff