Mina基础(五):编写自定义协议及编解码器

2023-05-16

为什么要制定协议呢?

        我们知道,底层传输的都是二进制数据,服务端和客户端建立连接后进行数据的交互,接受这对方发送来的消息,如何判定发送的请求或者响应的数据结束了呢?总不能一直傻等着,或者随意的就结束消息接收吧。这就需要一个规则!比如QQ聊天工具,当输入完一个消息后,点击发送按钮向对方发送时,此时系统就会在在你的消息后添加一个文本换行符,接收方看到这个文本换行符就认为这是一个完整的消息,解析成字符串显示出来。而这个规则,就称之为协议!

根据协议,把二进制数据转换成Java对象称为解码(也叫做拆包);把Java对象转换为二进制数据称为编码(也叫做打包);

常用的协议制定方法有哪些?

  • 定长消息法:这种方式是使用长度固定的数据发送,一般适用于指令发送。譬如:数据发送端规定发送的数据都是双字节,AA 表示启动、BB 表示关闭等等。
  • 字符定界法:这种方式是使用特殊字符作为数据的结束符,一般适用于简单数据的发送。譬如:在消息的结尾自动加上文本换行符(Windows使用\r\n,Linux使用\n),接收方见到文本换行符就认为是一个完整的消息,结束接收数据开始解析。注意:这个标识结束的特殊字符一定要简单,常常使用ASCII码中的特殊字符来标识(会出现粘包、半包情况)。
  • 定长报文头法:使用定长报文头,在报文头的某个域指明报文长度。该方法最灵活,使用最广。譬如:协议为– 协议编号(1字节)+数据长度(4个字节)+真实数据。请求到达后,解析协议编号和数据长度,根据数据长度来判断后面的真实数据是否接收完整。HTTP 协议的消息报头中的Content-Length 也是表示消息正文的长度,这样数据的接收端就知道到底读到多长的字节数就不用再读取数据了。

本文使用的是定长报文头法,也是实际中使用的最多的协议方法。

在定长报文头法中

  • 包头:数据包的版本号,以及整个数据包(包头+包体)的长度
  • 包体:实际数据

下面我们就来编写一个自定义协议

    介绍协议组成: 数据长度(4个字节) + 协议编号(1字节)+ 真实数据。

    创建协议的实体类,这里不在过多介绍,看注释即可。

/*
    自定义协议包
    协议为– 数据长度(4个字节)+ 协议编号(1字节)+ 真实数据。

*/
public class CustomPack {

    /**
     * 0x00表示请求
     */
    public static final byte REQUEST = 0x00;
    /**
     * 0x01表示回复
     */
    public static final byte RESPONSE = 0x01;

    // 总长度(编号字节 + 长度的字节 + 包体长度字节)
    private int len;
    // 版本号
    private byte flag;
    // 发送人,只是服务端-客户端,暂时无需发送人 接收人
    // private long sender;
    // 接收人
    // private long receiver;
    // 包体
    private String content;

    // 构造方法设置协议
    public CustomPack(byte flag, String content) {
        this.flag = flag;
        this.content = content;
        // 版本类型的长度1个字节, len的长度4个字节, 内容的字节数
        this.len = 1 + 4 + (content == null ? 0 : content.getBytes().length);
    }

    public int getLen() {
        return len;
    }

    public void setLen(int len) {
        this.len = len;
    }

    public byte getFlag() {
        return flag;
    }

    public void setFlag(byte flag) {
        this.flag = flag;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    @Override
    public String toString() {
        return "CustomPack{" +
                "len=" + len +
                ", flag=" + flag +
                ", content='" + content + '\'' +
                '}';
    }
}

自定义编解码器及工厂类    

    有了我们自己定义的协议,那么怎么把我们的协添加到Mina的通讯机制中呢?

    我们查看ProtocolCodecFilter的构造方法,发现需要注入一个ProtocolCodecFactory编解码工厂:

    我们继续查看ProtocolCodecFactory接口,发现需要实现2个方法,该接口的两个方法需要返回ProtocolDecoder和ProtocolEncoder的实现类对象(自定义编解码器):

       接下来我们分别对编码器、解码器进行实现:

   自定义编码器

目标:将JAVA对象转换成二进制流

实现:继承ProtocolEncoderAdapter类或实现ProtocolEncoder接口

    编码器具体实现:

/*
 *
 * describe 自定义编码器
 * @author xmc
 * @param  * @param null
 * @return
 */
public class CustomProtocolEncoder implements ProtocolEncoder {

    private final Charset charset;

    public CustomProtocolEncoder() {
        this.charset = Charset.defaultCharset();
    }

    // 构造方法注入编码格式
    public CustomProtocolEncoder(Charset charset) {
        this.charset = charset;
    }

    @Override
    public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
        // 转为自定义协议包
        CustomPack customPack = (CustomPack) message;
        // 初始化缓冲区
        IoBuffer buffer = IoBuffer.allocate(customPack.getLen())
                                  .setAutoExpand(true);
        // 设置长度、报头、内容
        buffer.putInt(customPack.getLen());
        buffer.put(customPack.getFlag());
        if (customPack.getContent() != null) {
            buffer.put(customPack.getContent().getBytes());
        }
        // 重置mask,发送buffer
        buffer.flip();
        out.write(buffer);
    }

    @Override
    public void dispose(IoSession session) throws Exception {

    }
}

    自定义解码器

目标:将二进制流转换成JAVA对象

实现:实现ProtocolDecoder接口或继承ProtocolDecoderAdapter类(难以解决半包、粘包问题)

           继承CumulativeProtocolDecoder类,重写doDecode方法(推荐使用此方法,完美解决半包、粘包问题)

    具体实现:

/*
 *
 * describe 自定义解码器
 * @author xmc
 * @param  * @param null
 * @return
 */
public class CustomProtocolDecoder extends CumulativeProtocolDecoder {

    private final Charset charset;

    public CustomProtocolDecoder() {
        this.charset = Charset.defaultCharset();
    }

    // 构造方法注入编码格式
    public CustomProtocolDecoder(Charset charset) {
        this.charset = charset;
    }

    @Override
    protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        // 包头的长度
        final int PACK_HEAD_LEN = 5;
        // 拆包时,如果可读数据的长度小于包头的长度,就不进行读取
        if (in.remaining() < PACK_HEAD_LEN) {
            return false;
        }
        if (in.remaining() > 1) {
            // 标记设为当前
            in.mark();
            // 获取总长度
            int length = in.getInt(in.position());
            // 如果可读取数据的长度 小于 总长度 - 包头的长度 ,则结束拆包,等待下一次
            if (in.remaining() < (length - PACK_HEAD_LEN)) {
                in.reset();
                return false;
            } else {
                // 重置,并读取一条完整记录
                in.reset();
                byte[] bytes = new byte[length];
                // 获取长度4个字节、版本1个字节、内容
                in.get(bytes, 0, length);
                byte flag = bytes[4];
                String content = new String(bytes, PACK_HEAD_LEN, length - PACK_HEAD_LEN, charset);
                // 封装为自定义的java对象
                CustomPack pack = new CustomPack(flag, content);
                out.write(pack);
                // 如果读取一条记录后,还存在数据(粘包),则再次进行调用
                return in.remaining() > 0;
            }
        }
        return false;
    }
}

doDecode方法说明:

  • 你的doDecode()方法返回true 时,CumulativeProtocolDecoder 的decode()方法会首先判断你是否在doDecode()方法中从内部的IoBuffer 缓冲区读取了数据,如果没有,则会抛出非法的状态异常,也就是你的doDecode()方法返回true 就表示你已经消费了本次数据(相当于聊天室中一个完整的消息已经读取完毕),进一步说,也就是此时你必须已经消费过内部的IoBuffer 缓冲区的数据(哪怕是消费了一个字节的数据)。如果验证过通过,那么CumulativeProtocolDecoder 会检查缓冲区内是否还有数据未读取,如果有就继续调用doDecode()方法,没有就停止对doDecode()方法的调用,直到有新的数据被缓冲。
  • 当你的doDecode()方法返回false 时,CumulativeProtocolDecoder 会停止对doDecode()方法的调用,但此时如果本次数据还有未读取完的,就将含有剩余数据的IoBuffer 缓冲区保存到IoSession 中,以便下一次数据到来时可以从IoSession 中提取合并。如果发现本次数据全都读取完毕,则清空IoBuffer 缓冲区。

        简而言之,当你认为读取到的数据已经够解码了,那么就返回true,否则就返回false。这个CumulativeProtocolDecoder 其实最重要的工作就是帮你完成了数据的累积,因为这个工作是很烦琐的。也就是说返回true,那么CumulativeProtocolDecoder会再次调用decoder,并把剩余的数据发下来返回false就不处理剩余的,当有新数据包来的时候把剩余的和新的拼接在一起然后再调用decoder。

完成了自定义的编解码器,下面实现编解码工厂

自定义编解码工厂

        目标:为filter中注入编解码工厂,通过工厂类获取编解码器

        实现:实现ProtocolCodecFactory接口,获取编解码器

   实现很简单,看代码:

/*
    自定义编解码工厂类
*/
public class CustomProtocolCodecFactory implements ProtocolCodecFactory {

    private final ProtocolEncoder encoder;
    private final ProtocolDecoder decoder;

    public CustomProtocolCodecFactory() {
        this(Charset.forName("UTF-8"));
    }

    // 构造方法注入编解码器
    public CustomProtocolCodecFactory(Charset charset) {
        this.encoder = new CustomProtocolEncoder(charset);
        this.decoder = new CustomProtocolDecoder(charset);
    }

    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return encoder;
    }

    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return decoder;
    }
}

现在已经完成了我们的自定义编解码工厂,此时在服务端及客户端的filter中注入工厂类即可。

    测试服务端

/*
    使用自定义协议的服务端
 */
public class MinaServerCustom {

    private static final Logger logger = LogManager.getLogger(MinaServerCustom.class);

    // 端口
    private static final int MINA_PORT = 7080;

    public static void main(String[] args) {
        IoAcceptor acceptor;
        try {
            // 创建一个非阻塞的服务端server
            acceptor = new NioSocketAcceptor();
            // 设置编码过滤器(自定义)
            acceptor.getFilterChain().addLast("mycoder", new ProtocolCodecFilter(new CustomProtocolCodecFactory(Charset.forName("UTF-8"))));
            // 设置缓冲区大小
            acceptor.getSessionConfig().setReadBufferSize(1024);
            // 设置读写空闲时间
            acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
            // 绑定handler
            acceptor.setHandler(new MyServerHandler());
            // 绑定端口
            acceptor.bind(new InetSocketAddress(MINA_PORT));
            logger.info("创建Mina服务端成功,端口:" + MINA_PORT);
        } catch (IOException e) {
            logger.error("创建Mina服务端出错:" + e.getMessage());
        }
    }
}
// 类继承handler
class MyServerHandler extends IoHandlerAdapter {
    private static final Logger logger = LogManager.getLogger(MyServerHandler.class);

    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        MyCustomPack pack = (MyCustomPack) message;
        logger.info("服务端接收消息成功:" + pack);
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        MyCustomPack pack = (MyCustomPack) message;
        logger.info("服务端发送消息成功:" + pack);
    }

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        cause.printStackTrace();
        logger.error("服务端处理消息异常:" + cause);
    }

}

 

    测试客户端

/*
    使用自定义协议的客户端
 */
public class MinaClientCustom {
    private static final Logger logger = LogManager.getLogger(MinaClientCustom.class);

    private static final String MINA_HOST = "127.0.0.1";
    private static final int MINA_PORT = 7080;

    public static void main(String[] args) {
        // 获取当前系统时间戳
        start = System.currentTimeMillis();
        // 创建一个非阻塞的客户端
        IoConnector connector = new NioSocketConnector();
        // 设置编码过滤器
        connector.getFilterChain().addLast("mycoder", new ProtocolCodecFilter(new CustomProtocolCodecFactory(Charset.forName("UTF-8"))));
        // 设置缓冲区大小
        connector.getSessionConfig().setReadBufferSize(1024);
        // 设置空闲时间
        connector.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 10);
        // 绑定逻辑处理Handler
        connector.setHandler(new MyClientHandler());
        // 创建连接
        ConnectFuture future = connector.connect(new InetSocketAddress(MINA_HOST, MINA_PORT));
        // 这里采用监听方式获取session
        future.addListener(new IoFutureListener<IoFuture>() {
            // 当连接创建完成
            public void operationComplete(IoFuture future) {
                IoSession session = future.getSession();
                sendData(session);
            }
        });
    }

    // 发送数据的方法
    public static void sendData(IoSession session) {
        logger.info("----------------------------测试数据准备发送-----------------------------");
        // 模拟发送100次数据
        for (int i = 0; i < 3; i++) {
            String content = "测试数据:" + i;
            MyCustomPack pack = new MyCustomPack((byte) i, content);
            session.write(pack);
        }
        logger.info("----------------------------测试数据发送完毕-----------------------------");
    }
}

class MyClientHandler extends IoHandlerAdapter {

    private static final Logger logger = LogManager.getLogger(MyClientHandler.class);

    @Override
    public void messageReceived(IoSession session, Object message) throws Exception {
        MyCustomPack pack = (MyCustomPack) message;
        logger.info("客户端接收消息成功:" + pack);
    }

    @Override
    public void messageSent(IoSession session, Object message) throws Exception {
        MyCustomPack pack = (MyCustomPack) message;
        logger.info("客户端发送消息成功:" + pack);
    }

    @Override
    public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
        logger.error("客户端处理消息异常:" + cause.getMessage());
    }

    @Override
    public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
        // 空闲时,关闭session
        if (status == IdleStatus.BOTH_IDLE) {
            logger.info("session进入空闲,准备关闭session");
            session.closeNow();
        }
    }
}

分别启动服务端、客户端可以发现控制台中打印出测试数据:

自定义协议、编解码、编解码工厂及测试搭建完毕。

参考:

NIO通讯框架之阿堂教程:Mina学习笔记

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Mina基础(五):编写自定义协议及编解码器 的相关文章

  • sh_脚本语法

    介绍 xff1a 1 开头 程序必须以下面的行开始 xff08 必须方在文件的第一行 xff09 xff1a bin sh 符号 用来告诉系统它后面的参数是用来执行该文件的程序 在这个例子中我们使用 bin sh来执行程序 当编写脚本完成时
  • 【深度学习系列(三)】:基于CNN+seq2seq公式识别系统实现 (1)

    这段时间一直在做公式识别相关的项目 xff0c 尝试了传统的方法 xff0c 效果不怎么好 想到能不能使用深度学习的方法进行相关方法 然后在github找到了相关代码 xff0c 这里做下分析 具体github地址 xff1a GitHub
  • 困惑多年,为什么printf可以重定向?

    很多人在用printf函数进行串口打印的时候 xff0c 都会被告知需要重定向fputc函数 xff08 别的平台可能不是这个函数 xff09 xff0c 让字符串数据输出到指定串口 xff0c 按照网上的教程也能很快解决 但是却没人告诉你
  • 多线程并发编程

    文章目录 多线程并发编程一 多线程带来的问题相关概念 二 互斥1 互斥与互斥量2 申请互斥量I 静态方法申请互斥量 xff1a II 动态方法申请互斥量 xff1a 3 利用互斥量加锁与解锁4 销毁互斥量5 互斥量综合应用 模拟抢票6 互斥
  • 【嵌入式】---- 串口UART波形分析

    串口参数的配置 波特率 xff08 bit s xff09 xff1a 大多数使用115200 但有些芯片特殊 xff0c 具体要看数据手册中波特率的容错率 比如中微的CMS32L051就不支持115200bps 停止位 xff1a 一般选
  • 手把手教你用JAVA实现“语音合成”功能(文字转声音)标贝科技

    手把手教你用JAVA实现 语音合成 功能 xff08 文字转声音 xff09 标贝科技 前言 什么是语音合成 xff1f 将文本转换成自然流畅的语音 xff0c 本篇文章将介绍 实时在线合成 xff08 文本长度不得超过1024字节 xff
  • cv::imread(cv::String const&, int)’未定义的引用

    在 Makefile文件的195 行 LIBRARIES 43 61 opencv core opencv highgui opencv imgproc 后面添加 xff1a opencv imgcodecs opencv videoio修
  • 【C/C++】C++ 网络多线程编程

    关键词 xff1a C C 43 43 网络编程 多线程 套接字 UDP 前言 学习C 43 43 网络编程多线程编程的目的 xff1a 巩固C 43 43 xff1b 由于C 43 43 大多用于服务器 xff0c 因此网络和多线程是进入
  • 在ubuntu20.04上配置VINS_Fusion(亲测有效,一应俱全)

    最近在做科研训练的时候配置了HKUST Aerial Robotics实验室的VINS Fusion代码项目 xff0c 经历了一些编译报错的问题 xff0c 在网上查找的时候博客内容良莠不齐 xff0c 且实质针对性意见不多 xff0c
  • 无人机项目跟踪记录二十五--无线接收模块的输入输出

    无线接收模块的功能是接收无线遥控器的命令 xff08 应该对应的是无人机上面的无线接收芯片 xff09 xff0c 无人机根据接收的指令进行不同的处理 用同样方法 xff0c 无线接收模块包含的函数是 xff1a Nrf Irq void
  • UDP校验和及代码

    UDP校验和采用反码求和 xff1a 两数相加 xff0c 把超出16位加入到第0位 校验和算法 unsigned short UDPCheck unsigned short data int len int carryBit 61 0 i
  • ROS Moveit:rviz和gazebo仿真出现rviz规划后gazebo没有反应

    在用rviz规划后 xff0c 警告 WARN 1649654675 728414350 42 937000000 Failed to validate trajectory couldn 39 t receive full current
  • Libcurl实现HTTP/HTTPS客户端(支持get、post、保持session)

    前面的文章 Libcurl编译指南 Android和Windows系统 已经就libcurl在Windows和Android系统编做了详细的说明 本文档用C C 43 43 实现简单的HTTP HTTPS客户端 xff0c 支持get和po
  • 基于uart的RS232和RS485总线

    我们之前讲uart的时候就已经提过一个问题 xff0c 就是它并不是直接连接到SOC里面的 xff0c 而是经过了一个芯片的转换 这个芯片的转换就是和我们要说的rs232 485总线有关的 RS232和RS485总线其实本质就是uart 只
  • c语言

    一 c基础 1 1 一个函数遇到return语句就终止了 1 2 system系统调用 xff1a 用命令打开计算器 记事本等 xff0c windows和linux下命令不同 xff0c 需要头文件 xff08 stdlib h xff0
  • [PTA]7-115 祝贺你成年了! (5 分)

    到了18岁 xff0c 你就成年了 xff01 请输入你的年龄 xff0c 如果大于等于18岁 xff0c 则输出 34 Congratulations 34 输入格式 输入一个不超过200的整数A 输出格式 按照要求输出 输入样例 18
  • 教你如何用ffmpeg处理音频格式转换(标贝科技)

    文章目录 前言ffmpeg介绍 xff1f 一 下载与安装1 下载安装包2 设置环境变量3 验证是否安装成功 二 命令讲解1 参数解释2 音频格式转换命令 三 标贝开放平台介绍 前言 ffmpeg介绍 xff1f FFmpeg是一套可以用来
  • cmake(十六)Cmake条件判断指令

    一 基础语法 基本框架 优先级 条件的类型 二 实践 项目初始化 CMakeLIsts txt 文件 字符串比较默认值 测试 补充 43 43 43 43 43 43 43 43 43 43 43 43 43 39 CMake 基本语法 3
  • 南通移动打造“5G+无人机”为乡村振兴插上“智慧翅膀”

    近年来 xff0c 南通通州移动按下5G 加速键 xff0c 探索智慧服务 xff0c 以信息化手段助力提升乡村治理现代化水平 xff0c 为打造数字乡村贡献数智力量 如今 xff0c 在南通通州区的二甲镇 xff0c 南通通州移动在二甲镇
  • 免费的在线影视网站汇总(包括图片,小说等)

    免费最新在线影视站 xff0c 仅用于测试用途 日常请使用 正版视频 软件 xff0c 支持正版人人有责 影视大全展开目录 萝莉岛 app少女美腿令你无限遐想大米星球蓝光福利免费在线站hdmoli免费高清 1080 影视站剧迷 tv免费无广

随机推荐