第7篇 rabbitmq 创建SocketFrameHandler

2023-11-16

本节主要是熟悉socketFrameHandlerFactory类, 真正涉及到socket流处理器

  • 展示如下类图(我们本文关心是SocketFrameHandlerFactory 和SocketFrameHandler),由类图可以知道SocketFrameHandlerFactory继承抽象类AbstractFrameHandlerFactory, 抽象类实现FrameHandlerFactory接口
  • image-20210607174608720

1、FrameHandlerFactory (顶级接口)

  • 你可以看到它只有一个方法创建FrameHandler, 入参是地址和连接名称

    • FrameHandler create(Address addr, String connectionName) throws IOException;
      
  • 处理socket,必须创建socket对象,这就需要地址信息了,connectionName(连接名称)不是必要

1.1、Address地址对象

  • 它只有两个属性一个是_host 主机, _port 端口号
  • 主要方法就是解析ipv6(例如:[2001:db8:85a3:8d3:1319:8a2e:370:7348]:5671)的 host和port

1.2、FrameHandler (数据帧处理接口)

  • 它继承了NetworkConnection接口(主要地址相关的,获取本地地址和本地端口,以及地址和端口)
  • 实现类必须是线程安全的
  • com.rabbitmq.client.impl.FrameHandler#setTimeout ( 设置超时时间, 单位为毫秒)
  • com.rabbitmq.client.impl.FrameHandler#getTimeout (获取超时时间,单位为毫秒)
  • com.rabbitmq.client.impl.FrameHandler#sendHeader ( 设置请求头)
  • initialize() (初始化连接)
  • com.rabbitmq.client.impl.FrameHandler#readFrame (读取数据帧)
  • com.rabbitmq.client.impl.FrameHandler#writeFrame (写入数据帧)
  • com.rabbitmq.client.impl.FrameHandler#flush (刷新)
  • com.rabbitmq.client.impl.FrameHandler#close 关闭底层数据连接(没有实现Closeable接口)

2、AbstractFrameHandlerFactory 抽象类

  • 主要作用就是定义关于FrameHandlerFactory公共字段,其实很多地方也是这么用的,这个也是可以学习一下
  • 首先定义根据功能定义接口,第二步定义一个抽象类,把一个公共字段,还有可能模板方法写好,最后有子类进行实现
  • 能设置成final尽量设置成final
  • 成员变量
    • connectionTimeout 连接超时时间
    • configurator Socket 设置函数
    • ssl 是否ssl连接

3、SocketFrameHandlerFactory extend AbstractFrameHandlerFactory

3.1、成员变量

  • socketFactory socket工厂类
  • shutdownExecutor 关闭执行器
  • sslContextFactory ssl上下文工厂类

3.2、关键方法

  • com.rabbitmq.client.impl.SocketFrameHandlerFactory#create (创建处理器)

  • 流程如下所示

  • image-20210607192840575

  • 本质就是为创建socket对象,并将shutdownExecutor 封装成SocketFrameHandler对象,我们在3.3熟悉一下SocketFrameHandler

3.3、SocketFrameHandler implement FrameHandler

3.3.1、成员变量

  • _socket : socket对象

  • _shutdownExecutor 关闭执行器

  • _inputStream DataInputStream((BufferedInputStream))

    •         _inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream()));
      
      
  • _outputStream DataOutputStream((BufferedOutputStream))

  • SOCKET_CLOSING_TIMEOUT=1指定强制关socket的逗留时间(单位为秒)

3.3.2、核心方法

  • com.rabbitmq.client.impl.SocketFrameHandler#sendHeader(设置请求头)

    • synchronized (_outputStream) {
                  _outputStream.write("AMQP".getBytes("US-ASCII"));
                  _outputStream.write(0);
                  _outputStream.write(major);
                  _outputStream.write(minor);
                  _outputStream.write(revision);
                  try {
                      _outputStream.flush();
                  } catch (SSLHandshakeException e) {
                      LOGGER.error("TLS connection failed: {}", e.getMessage());
                      throw e;
                  }
              }
      
    • 同步方法写入_outputStream,防止并发

    • AMQP采用是US-ASCII字符编码

    • 主要协商交互mq协议为 ( AMQP091)

  • com.rabbitmq.client.impl.SocketFrameHandler#initialize ( 开启监控socket 读取输入流)

    • 主要流程如下

    • image-20210607204159191

    • A、B、C、D 在下篇在串一下

  • com.rabbitmq.client.impl.SocketFrameHandler#close

    • 设置socket逗留时间为1秒
    • 使用shutdownExecutor执行 flush()方法(如果shutdownExecutor为空则手动调用flush方法, 异步Future task完成此项工作,防止写阻塞
    • 如果任务执行失败将考虑取消任务执行。
    • 最后再关闭socket

4、总结

  • 主要了解创建创建SocketFrameHandler类的过程,以及一些核心方法。类之间继承和实现
  • 一般会使用抽象接口定义公共属性和模板代码
  • 增加一个包装类,无非增加一些新能力和属性,数据形式转换

结尾

  • 感谢大家的耐心阅读,如有建议请私信或评论留言。
  • 如有收获,劳烦支持,关注、点赞、评论、收藏均可,博主会经常更新,与大家共同进步
  • 下一篇研究一下A,B,C,D具体处理逻辑
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

第7篇 rabbitmq 创建SocketFrameHandler 的相关文章

  • Celery 3.0.1 中的框架错误

    我最近从 2 3 0 升级到 Celery 3 0 1 所有任务都运行良好 很遗憾 我经常收到 帧错误 异常 我还运行主管来重新启动线程 但由于这些线程从未真正被杀死 主管无法知道 celery 需要重新启动 有没有人见过这个 2012 0
  • 保持鼠兔 BlockingConnection 存活而不禁用心跳

    我正在使用 pika 0 10 0 和 python 2 7 版本开发 RabbitMQ 消费者 在我的消费者客户端中 我有一个根据输入消息运行一段时间的进程 时间可能从 3 到 40 分钟不等 我不想禁用心跳 相反 我正在寻找一些回滚机制
  • 使用Camel的spring-rabbitmq组件时如何自动声明交换?

    我正在尝试从 Camel 3 x 迁移到 Camel 4 x 版本 因此我需要从rabbitmq替换组件spring rabbitmq With rabbitmq我正在使用的组件declare https camel apache org
  • Spring AMQP + RabbitMQ 3.3.5 ACCESS_REFUSED - 使用身份验证机制 PLAIN 拒绝登录

    我遇到以下异常 org springframework amqp AmqpAuthenticationException com rabbitmq client AuthenticationFailureException ACCESS R
  • RabbitMQ C# API:如何检查绑定是否存在?

    使用 RabbitMQ C API 我如何检查给定队列到给定交换是否存在绑定 很多 RabbitMQ 调用都是幂等的 所以有些人可能会说在这些情况下检查是不必要的 但我认为它们在测试中很有用 您可以使用他们的 REST API 来调用并查看
  • 每次发布后我应该关闭通道/连接吗?

    我在 Node js 中使用 amqplib 但我不清楚代码中的最佳实践 基本上 我当前的代码调用amqp connect 当 Node 服务器启动时 然后为每个生产者和每个消费者使用不同的通道 而不会真正关闭它们中的任何一个 我想知道这是
  • 无法从 docker 将 RabbitMQ 连接到我的应用程序 [重复]

    这个问题在这里已经有答案了 我目前被这个问题困扰了大约一周 确实找不到合适的解决方案 问题是 当我尝试连接到 dockerized RabbitMQ 时 它每次都会给出相同的错误 wordofthedayapp wordofthedayap
  • Spring AMQP RabbitMQ 如何直接发送到Queue而不需要Exchange

    我正在使用 Spring AMQP 和 Rabbitmq 模板 如何直接将消息发送到队列而不使用Exchange 我该怎么做 我该怎么做 你不能 发布者不知道队列 只是交换和路由密钥 但是 所有队列都绑定到默认交换器 以队列名称作为其路由键
  • 在rabbitmq配置spring boot中在AMQP中配置多个Vhost

    我正在实现一个项目 我必须在rabbitmq中的不同虚拟主机之间发送消息 使用 SimpleRoutingConnectionFactory 但得到 java lang IllegalStateException 无法确定查找键的目标 Co
  • 为什么需要消息队列来与 Web 套接字聊天?

    我在互联网上看到了很多使用 Web 套接字和 RabbitMQ 进行聊天的示例 https github com videlalvaro rabbitmq chat https github com videlalvaro rabbitmq
  • Celery 与rabbitmq 创建结果多个队列

    我已经用 RabbitMQ 安装了 Celery 问题是 对于返回的每个结果 Celery 都会在 Rabbit 中创建队列 并在交换 celeryresults 中使用任务 ID 我仍然想得到结果 但在一个队列上 我的芹菜配置 from
  • 即使设置了 cookie,RabbitMQ 身份验证也会失败

    我最近在运行 lattePanda 的 Windows 10 上安装了带有 ErlanOTP 的rabbitmq 我运行rabbitmqctl status并收到以下错误 C Program Files RabbitMQ Server ra
  • 在 Celery 工作线程中捕获 Heroku SIGTERM 以优雅地关闭工作线程

    我对此进行了大量研究 令我惊讶的是我还没有在任何地方找到一个好的答案 我正在 Heroku 上运行一个大型应用程序 并且我有某些运行很长时间处理的 celery 任务 并在任务结束时保存结果 每次我在 Heroku 上重新部署时 它都会发送
  • AMQP如何克服直接使用TCP的困难?

    AMQP如何克服直接使用TCP发送消息时的困难 或者更具体地说 在发布 订阅场景中 在 AMQP 中 有一个代理 该代理接收消息 然后完成将消息路由到交换器和队列的困难部分 您还可以设置持久队列 即使客户端断开连接 也可以为客户端保存消息
  • RabbitMQ:如何创建和恢复备份

    我是 RabbitMQ 的新手 我需要一些帮助 如何备份和恢复到RabbitMQ 以及我需要保存哪些重要数据 谢谢 如果您安装了管理插件 您可以在Overview页 在底部你会看到导入 导出定义您可以使用它来下载代理的 JSON 表示形式
  • 从 Handler.obtainMessage() 获取什么参数

    我正在使用线程来执行一些 BT 任务 我正在尝试向 UI 线程发送消息 以便我可以基于我的 BT 线程执行 UI 工作 为此 我使用处理程序 但我不知道如何检索发送到处理程序的数据 要发送数据 我使用 handler obtainMessa
  • Rabbit mq - 等待 Mnesia 表时出错

    我已经在 Kubernetes 集群上使用 Helm Chart 安装了 RabbitMQ rabbitmq pod不断重新启动 在检查 pod 日志时 我收到以下错误 2020 02 26 04 42 31 582 warning lt
  • 如何使用应用程序接口将蓝牙套接字传递给另一个活动

    因此 根据我收集的信息 套接字连接既不可序列化 也不可分割 但我需要将蓝牙连接传递给另一个活动 我不想作为中间人编写服务 所以请不要将此作为解决方案发布 我听说有一种方法可以使用自定义应用程序接口来传递这些类型的对象 但我一生都找不到这样的
  • 如何使用 Java 在 RabbitMQ 中实现标头交换?

    我是一个新手 试图在java客户端中实现标头交换 我知道这就是 x match 绑定参数的用途 当 x match 参数设置为 any 时 只需一个匹配的标头值就足够了 或者 将 x match 设置为 all 强制所有值必须匹配 但任何人
  • 当连接断开时,boost::asio::async_write 中的 WriteHandler 无法正常工作(防火墙/手动断开网络)

    我一直在尝试使用 boost asio 编写客户端 服务器应用程序 总的来说 该应用程序工作得很好 但是当连接 客户端 服务器 被防火墙或手动禁用新网络卡删除 这是代码片段 void write const CommunicatorMess

随机推荐

  • spring Data JPA 拾遗

    Preface JPA在国内的使用频率较小 但也是一个值得学习的极为优秀的ORM框架 DDD的思想在里面体现得淋漓尽致 结构图 配置 1 2 3 4 5 6 7 8 9 10 11 spring jpa generate ddl false
  • 搭建jboss

    jboss 是中间件comcat是框架 jboss 基于java需要安装jbk配置环境变量 配置环境变量 我的电脑 右键 属性 高级 环境变量 新建系统变量 变量名为 JAVA HOME 变量值 C Program Files Java j
  • SpringBoot系统列 5 - 接口版本控制、SpringBoot FreeMarker模板引擎

    接着上篇博客的代码继续写 1 接口版本控制 一个系统上线后会不断迭代更新 需求也会不断变化 有可能接口的参数也会发生变化 如果在原有的参数上直接修改 可能会影响线上系统的正常运行 这时我们就需要设置不同的版本 这样即使参数发生变化 由于老版
  • 数据结构(Python版):线性表

    2 线性表 线性数据结构 线性结构是一种有序数据项的集合 其中每个数据项都有唯一的前驱和后继 除了第一个没有前驱 最后一个没有后继 新的数据项加入到数据集中时 只会加入到原有某个数据项之前或之后 具有这种性质的数据集 就称为线性结构 顺序表
  • 学习常用模型及算法:3.评价和预测

    评价方法 1 加权平均法 最简单的方法 但不能忽视 2 层次分析法 该题可划分为三层 首先我们要求得准则层对目标层的权重 我们可以引进判断矩阵的概念 以两两比较的方式判断每两个指标中哪个更为重要 因为这里是4个判断标准 所以n 4 RI n
  • 5G QoS控制原理专题详解(7)-Default QoS Flow探秘

    相关文章会在公众号同步更新 最近工作忙 更新完公众号后 经常容易忘记再CSDN上再发 公众号上的文章更新的能快一些 各位同学有兴趣可以关注一下 公众号 5G通信大家学 持续更新的相关5G内容都是直接根据3GPP整理 保证更新内容的准确性 避
  • 【MybatisPlus逆向工程】代码生成器

    使用mybatis plus的逆向工程生成entity controller service mapper的初始代码 参考链接 https baomidou com pages d357af E6 B7 BB E5 8A A0 E4 BE
  • linux:ubuntu 查看ip

    解决方法 ifconfig a 或者sudo vi etc netplan 00 installer config yaml 进行查看
  • SpringSecurity基本使用

    文章目录 1 基本使用 2 自定义配置用户名与密码 3 自定义登录页面 4 基于权限访问控制 5 自定义403 6 使用注解 Secured PreAuthorize PostAuthorize PostFilter PreFilter 1
  • 芯片组x299是服务器主板吗,X299主板怎么样/值得买吗?新一代酷睿i9和发烧级X299主板全面深度评测...

    主板外观 配置 丰富的PCIe扩展插槽一向是Extreme至尊级平台的优势之处 5条长插槽中的两条有LED灯效设计和合金固化 需要组双路显卡的时候就优先使用这两条合金固化的插槽就好 CPU供电模块 CPU供电模块有9相合金数字供电 也有设计
  • pandas对文本数据进行长度统计

    在NLP的各项任务中 都需要对语料的长度有一个概括性的了解 例如平均长度 最大长度 大多数语料的长度范围等 以此来确定输入模型时的最大文本长度 pandas库有一种写法可以快速的统计文本语料的长度情况 pd pandas Series po
  • Ubuntu 挂载硬盘

    安装 nfs服务 三台服务器都需要安装 sudo apt get install nfs kernel server nfs common 数据盘操作 安装 完成后在数据盘服务器 修改etc exports文件增加共享盘 home Odoo
  • Arduino - PC817C光耦

    元件简介 PC817C是光隔离线性反馈控制器件 是PC817系列中常用的一个型号 左边 1 2脚 输入端 控制端 是一个发光二极管 右边 3 4脚 输出端 受控端 是一个三极管 CE极 控制端与受控端是隔离的 PC817C主要技术参数 控制
  • pandas 更改单元格的值,使用索引为pandas DataFrame中的特定单元格设置值

    I ve created a Pandas DataFrame df DataFrame index A B C columns x y and got this x y A NaN NaN B NaN NaN C NaN NaN Then
  • python collections.OrderedDict有序字典的使用

    detectron2中大量的使用OrderedDict 有序字典 有序字典可以按字典中元素的插入顺序来输出 python的基础数据类型中的字典类型分为 无序字典与有序字典两种类型 1 无序字典 普通字典 遍历一个普通字典 返回的数据和定义字
  • 关于api-ms-win-crt-runtime

    关于api ms win crt runtime 1 1 0 dll缺失的解决方案 问题原因 有时 我们在打开文件程序的时候经常出现一些关于以下的错误 无法启动此程序因为计算机中丢失api ms win crt runtime 1 1 0
  • C语言 输出 1000 对孪生素数

    孪生素数就是差值为 2 的成对素数 例如3和5 5和7 11和13 代码实现 include
  • 如何解决vue项目打包内存溢出问题

    在打包命令配置文件中加入图中代码即可 cross env NODE OPTIONS max old space size 4096
  • string的用法

    std string的初始化 std string 变量名称 字符串 要截取的长度 std string str 123456 3 str 123 注 中文时支持不佳 std string 变量名称 字符串 起始位置 截取长度 std st
  • 第7篇 rabbitmq 创建SocketFrameHandler

    本节主要是熟悉socketFrameHandlerFactory类 真正涉及到socket流处理器 展示如下类图 我们本文关心是SocketFrameHandlerFactory 和SocketFrameHandler 由类图可以知道Soc