本节主要是熟悉socketFrameHandlerFactory类, 真正涉及到socket流处理器
- 展示如下类图(我们本文关心是SocketFrameHandlerFactory 和SocketFrameHandler),由类图可以知道SocketFrameHandlerFactory继承抽象类AbstractFrameHandlerFactory, 抽象类实现FrameHandlerFactory接口
1、FrameHandlerFactory (顶级接口)
1.1、Address地址对象
- 它只有两个属性一个是_host 主机, _port 端口号
- 主要方法就是解析ipv6(例如:[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5671)的 host和port
1.2、FrameHandler (数据帧处理接口)
- 它继承了NetworkConnection接口(主要地址相关的,获取本地地址和本地端口,以及地址和端口)
- 实现类必须是线程安全的
- com.rabbitmq.client.impl.FrameHandler#setTimeout ( 设置超时时间, 单位为毫秒)
- com.rabbitmq.client.impl.FrameHandler#getTimeout (获取超时时间,单位为毫秒)
- com.rabbitmq.client.impl.FrameHandler#sendHeader ( 设置请求头)
- initialize() (初始化连接)
- com.rabbitmq.client.impl.FrameHandler#readFrame (读取数据帧)
- com.rabbitmq.client.impl.FrameHandler#writeFrame (写入数据帧)
- com.rabbitmq.client.impl.FrameHandler#flush (刷新)
- com.rabbitmq.client.impl.FrameHandler#close 关闭底层数据连接(没有实现Closeable接口)
2、AbstractFrameHandlerFactory 抽象类
- 主要作用就是定义关于FrameHandlerFactory公共字段,其实很多地方也是这么用的,这个也是可以学习一下
- 首先定义根据功能定义接口,第二步定义一个抽象类,把一个公共字段,还有可能模板方法写好,最后有子类进行实现
- 能设置成final尽量设置成final
- 成员变量
- connectionTimeout 连接超时时间
- configurator Socket 设置函数
- ssl 是否ssl连接
3、SocketFrameHandlerFactory extend AbstractFrameHandlerFactory
3.1、成员变量
- socketFactory socket工厂类
- shutdownExecutor 关闭执行器
- sslContextFactory ssl上下文工厂类
3.2、关键方法
3.3、SocketFrameHandler implement FrameHandler
3.3.1、成员变量
-
_socket : socket对象
-
_shutdownExecutor 关闭执行器
-
_inputStream DataInputStream((BufferedInputStream))
-
_outputStream DataOutputStream((BufferedOutputStream))
-
SOCKET_CLOSING_TIMEOUT=1指定强制关socket的逗留时间(单位为秒)
3.3.2、核心方法
-
com.rabbitmq.client.impl.SocketFrameHandler#sendHeader(设置请求头)
-
synchronized (_outputStream) {
_outputStream.write("AMQP".getBytes("US-ASCII"));
_outputStream.write(0);
_outputStream.write(major);
_outputStream.write(minor);
_outputStream.write(revision);
try {
_outputStream.flush();
} catch (SSLHandshakeException e) {
LOGGER.error("TLS connection failed: {}", e.getMessage());
throw e;
}
}
-
同步方法写入_outputStream,防止并发
-
AMQP采用是US-ASCII字符编码
-
主要协商交互mq协议为 ( AMQP091)
-
com.rabbitmq.client.impl.SocketFrameHandler#initialize ( 开启监控socket 读取输入流)
-
主要流程如下
-
-
A、B、C、D 在下篇在串一下
-
com.rabbitmq.client.impl.SocketFrameHandler#close
- 设置socket逗留时间为1秒
- 使用shutdownExecutor执行 flush()方法(如果shutdownExecutor为空则手动调用flush方法, 异步Future task完成此项工作,防止写阻塞
- 如果任务执行失败将考虑取消任务执行。
- 最后再关闭socket
4、总结
- 主要了解创建创建SocketFrameHandler类的过程,以及一些核心方法。类之间继承和实现
- 一般会使用抽象接口定义公共属性和模板代码
- 增加一个包装类,无非增加一些新能力和属性,数据形式转换
结尾
- 感谢大家的耐心阅读,如有建议请私信或评论留言。
- 如有收获,劳烦支持,关注、点赞、评论、收藏均可,博主会经常更新,与大家共同进步
- 下一篇研究一下A,B,C,D具体处理逻辑
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)