RabbitMQ教程-重要参数&&API解释

2023-11-19

RabbitMQ的工作原理

下图是RabbitMQ的基本结构:

生产者发送消息流程:

1、生产者和Broker建立TCP连接。

2、生产者和Broker建立通道。

3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

4、Exchange将消息转发到指定的Queue(队列)

消费者接收消息流程:

1、消费者和Broker建立TCP连接

2、消费者和Broker建立通道

3、消费者监听指定的Queue(队列)

4、当有消息到达Queue时Broker默认将消息推送给消费者。

5、消费者接收到消息。

6、ack回复
 

其中,消费者接收消息的流程尤为重要。消息是broker主动推送给消费者的,并不是消费者去主动拉取的。 默认推送的方式是:轮询。

如果不自己设置basic.Qos的数量,默认是250个消息可以不应答给broker(源码里面写着:basic.Qos(prfetchCount), 而prefetchCount默认是250. )。两种解决:

1、消费者声明basic.Qos(1);

2、或者配置:prefetchCount=1;

可以理解broker始终是以【轮询】的方式,把消息推送给消费者。【公平分发】策略也可以认为是轮询的一种,只不过该把消息给consumer1的时候,发现consumer1有消息没有应答(具体可以有几条消息没有应答看qos的配置)所以又把消息给了consumer2.

broker是把消息自动推送给消费者(具体的是把消费放到消费者机器的网卡中,然后每个消费者都有一个线程死循环的去访问网卡是否有数据,如果有就把数据拿来)。

prfetchCount决定了消费者BlockQueue的size。当设置为’手动应答‘且设置qos数量的时候,这个BQ的size就有点鸡肋,比如: prfetchCount设置为250,说明我的阻塞队列最大可放250个数据;然而消费者设置了’手动应答‘且qos=1; 那么,如果消费者1拿到第一条数据的时候就卡住了(宕机、耗时)那么broker再次推送的时候发现你还没有给我ACK,那么broker就不会再把消息推给你,消费者剩余的249个位置空着。

当然,如果basicqos=250,那么就可以有250个消息不应答;

或者,设置为自动应答。

rabbitmq消息分发机制

可以理解broker始终是以【轮询】的方式,把消息推送给消费者。【公平分发】策略也可以认为是轮询的一种,只不过该把消息给consumer1的时候,发现consumer1有消息没有应答(具体可以有几条消息没有应答看qos的配置)所以又把消息给了consumer2.

消息是broker推送给消费者,推送之前会做个判断:该消费者设置的max应答数量(basicQos)。如果该消费者已经到达了max应答数,则不在给该消费者推送消息。

RabbitMQ 入门系列5 -- 消息分发机制_Geffin的博客-CSDN博客_rabbitmq消息分发机制1 问题引入在实际环境下,每个消息的“大小”是不同的,所需要的处理时间也是不同的。在这种情况下,我们应该如何分配资源来令效率最大化呢?这就需要我们来学习 RabbitMQ 的消息分发机制了。2 回顾分发机制以下分发机制的内容来自我的前几篇博客,这里借用一下。一般来说,一个队列有多个消费者同时消费数据,此时有两种分发数据的方式,一种是轮询分发,另一种是公平分发。2.1 轮询分发轮询分发,...https://blog.csdn.net/Geffin/article/details/102637022

一、消费端

ACK机制(重要)

mq的ack  主要是确认消息被消费者消费完成后通知服务器将队列里面的消息清除

而如果不配置Ack的话呢,我测试过他会自动的忽略,也就是说此时的服务是no_ack=true的模式,就是说只要我发现你是消费了这个数据,至于异常不异常的,我不管了。通知Ack机制就是这么来的,更加灵活的,我们需要Ack不自动,而是手动,这样做的好处,就是使得我们开发人员更加人性化或者灵活的来处理我们的业务罗杰代码,更加方便的处理异常的问题以及数据的返回处理等。
Debug方式讲解Rabbitmq的自动ACK和手动ACK_普通网友的博客-CSDN博客_手动ack 自动ack

消费端消息确认 - QoS

不仅仅是工作队列模式,direct、topic等模式也是可以有多个消费者的(比如,项目集群部署)。

QoS = quality-of-service, 顾名思义,服务的质量。通常我们设计系统的时候不能完全排除故障或保证说没有故障,而应该设计有完善的异常处理机制。在出现错误的时候知道在哪里出现什么样子的错误,原因是什么,怎么去恢复或者处理才是真正应该去做的。在接收消息出现故障的时候我们可以通过RabbitMQ重发机制来处理。重发就有重发次数的限制,有些时候你不可能不限次数的重发,这取决于消息的大小,重要程度和处理方式

QoS是在接收端设置的。发送端没有任何变化,接收端的代码也比较简单,只需要加如下代码:

channel.BasicQos(0, 1, false);


1. 代码第一个参数是可接收消息的大小的,但是似乎在客户端2.8.6版本中它必须为0,即使:不受限制。如果不输0,程序会在运行到这一行的时候报错,说还没有实现不为0的情况。

2. 第二个参数是处理消息最大的数量。举个例子,如果输入1,那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息,消息只会在队列中阻塞。如果输入3,那么可以最多有3个消息不应答,如果到达了3个,则发送端发给这个接收方得消息只会在队列中,而接收方不会有接收到消息的事件产生。总结说,就是在下一次发送应答消息前,客户端可以收到的消息最大数量。

3. 第三个参数则设置了是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的。

这种数量的设置,也为我们在多个客户端监控同一个queue的这种负载均衡环境下提供了更多的选择。

prefetch,concurrency,qos

1、prefetch指定了单个消费者的BlockQueue大小,也可以认为最大存储消息数量。

但是注意一点:

prefetch不代表一次性从queue拉取的数量哦,而是最终可以存储的数量。 数据只能是broker推送,消费者是不能主动拉取的

2、concurrency指定了消费者实例个数,每个消费者实例只有一个线程在消费。当设置为2时,来到管理界面可以看到:

3、qos指定了消费者最大不应答数量。比如channel.basicQos(1),说明消费者最大不应答数量是1,可以理解是最大阻塞1条数据;  比如channel.basicQos(3),说明消费者最大不应答数据是3,当第一条消息开始阻塞的时候,还是可以给该消费者推送2条消息的. 

场景:

生产者发送10条数据; 消费者1处理一条消息时间是100s,消费者2处理一条消息是0.1s,

API解释

queueDeclare - 声明(创建)队列

生产者、消费者都有。

        //声明(创建)队列
        //参数:String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
        /**
         * 参数明细
         * 1、queue 队列名称
         * 2、durable 是否持久化,如果持久化,mq重启后队列还在
         * 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
         * 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
         * 5、arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间
         */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

basicPublish - 向指定的队列中发送消息

生产者。

        //向指定的队列中发送消息
        //参数:String exchange, String routingKey, BasicProperties props, byte[] body
        /**
         * 参数明细:
         * 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
         * 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
         * 3、props,消息的属性
         * 4、body,消息内容
         */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());

basicConsume - 监听队列

消费者。

        //监听队列,第二个参数:是否自动进行消息确认。
        //参数:String queue, boolean autoAck, Consumer callback
        /**
         * 参数明细:
         * 1、queue 队列名称
         * 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复
         * 3、callback,消费方法,当消费者接收到消息要执行的方法
         */
        channel.basicConsume(QUEUE_NAME, false, consumer);

basicAck - 手动签收消息

消费者。

/**
* 手动签收消息
* 参数1:消息投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
*/
channel.basicAck(deliveryTag, false);

 basicNack&basicReject - 返回消息到Broke

消费者。

如果出现异常,则调用channel.basicNack()或者basicReject()方法,让其自动重新发送消息

       try{
            //......

       } catch (Exception ex) {
            /**
             * 手动拒绝签收
             * 参数1:当前消息的投递标签
             * 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
             * 参数3:是否重回队列,true为重回队列,false为不重回
             */
            channel.basicNack(deliveryTag, false, true);
            System.out.println("拒绝签收,重回队列:{}" + ex);
        }

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ教程-重要参数&&API解释 的相关文章

随机推荐

  • 调试osgEarth(33)分页瓦片卸载器子节点的作用-(3)渲染遍历的帧号和时间设置-_terrain使用TerrainCuller

    继续调试 这个是一堆NULL 省事了 然而 真的有这么简单么 想想地球加载起来时 视点变化时
  • 关于命令或语法中的方括号,尖括号等符号的区别

    关于方括号和尖括号的区别 命令说明或者语法说明中包含一些方括号 lt gt 等符号 方括号 表示可选参数 尖括号 表示必填参数 1 基本符号命令 内的内容意思是 可写可不写 那就必须要在 内给出的选择里选一个 lt gt 表示必选 注 不能
  • PKI体系及密码算法

    HTTPS 的诞生 可先参考网络协议 HTTPS协议等文章 明文传输 对称加密 加密 和 解密 使用 相同的 密钥 如果密钥可以安全的传输 那么消息也应该可以安全的传输 非对称加密 上述非对称加密与对称加密效果基本一样 如果公钥可以安全的传
  • squid使用NCSA验证

    如果要在squid中加入用户名和密码的验证 使用NCSA是最方便的 生成用户名 密码文件 用命令 htpasswd 来生成 使用非常简单 生成一个叫passwdfile的密码文件 包含username和password账户 htpasswd
  • 【电子电路】RS485收发器两种典型电路

    1 基本RS 485 电路 图1为一个经常被应用到的SP485R芯片的示范电路 可以被直接嵌入实际的RS 485应用电路中 微处理器的标准串行口通过RXD 直接连接SP485R 芯片的 RO 引脚 通过TXD直接连接SP485R 芯片的 D
  • Markdown基础语法介绍

    何为Markdown Markdown是一种轻量级标记语言 它允许用户以纯文本格式编写文档 然后转换成有效的XHTML或HTML文档 Markdown具备轻量化 跨平台 易读易写等特性 且支持文本 图片 图表等多种展示样式 Markdown
  • 使用Mac的十大最好用神器

    资深 Mac 用户 提升效率的 10 大神器 谢志鹏 大家好 我是曹将的徒弟 Pem 最近刚结束悉尼大学研究生的学习 是一枚正在求职的交互设计师 我是从 2015 年开始使用 Mac 的 如果用一句话来形容 Mac 的使用感受 那就是 令人
  • 下载Freescale CodeWarrior 11.0解决Freescale CodeWarrior 代码限制(不需要license)

    一些NXP的项目需要软件Freescale CodeWarrior的最新版本10 7 但是新安装了软件之后 10 7版本的license只能使用一个月 一个月到期后 就会有代码大小的限制 这个时候 需要花钱购买license 费用几千块钱
  • android.accessibilityservice包介绍

    android accessibilityservice 英文原文 http developer android com reference android accessibilityservice package summary html
  • Ant Design Pro 修改主题设置

    Ant Design Pro 修改主题设置 主题是在项目根目录下的config defaultSettings js文件下内所定义的其中导出的Settings对象中即为默认的主题等配置 如下图 const Settings navTheme
  • Vue2项目使用高德地图

    目录 一 账号准备 1 注册账号 2 获取key 二 快速上手 1 安装 2 创建地图 3 点标记 4 海量点标记 5 简易行政区图 6 GeoJSON 三 绑定事件 总结 一 账号准备 1 注册账号 首先 注册开发者账号 成为高德开放平台
  • 高德地图加渐变色3D线段

    想用高德地图实现渐变色的边界效果 查看了很多资料 测试了很多方法 终于实现啦 记录一下 1 按照高德官方示例创建地图 var map new AMap Map container pitch 75 地图俯仰角度 有效范围 0 度 83 度
  • python基础练习--《人力资源管理员工管理》

    python新手入门练习 运用python的基础数据结构编写 人力资源管理员工管理 初学python 入门练习 留些记录 方便以后查看 如有错误 请诸位大神指点 谢谢 需求分析 要求使用python的最基础的数据结构 字典 元组 列表 字符
  • [系统安全] 四十六.恶意软件分析 (2)静态分析Capa经典工具批量提取静态特征和ATT&CK技战术

    终于忙完初稿 开心地写一篇博客 您可能之前看到过我写的类似文章 为什么还要重复撰写呢 只是想更好地帮助初学者了解病毒逆向分析和系统安全 更加成体系且不破坏之前的系列 因此 我重新开设了这个专栏 准备系统整理和深入学习系统安全 逆向分析和恶意
  • JS 实现一键复制(复制DIV)

    话不多说 直接上代码 JS部分 function copyDivContent divId 获取标签内容 const div document getElementById divId 创建文档区域 const range document
  • 计算机硬件系统结构主要分为什么五大组成,硬件系统的五大组成部分

    大家好 我是时间财富网智能客服时间君 上述问题将由我为大家进行解答 以计算机为例 硬件系统的五大组成部分别为 储存器 控制器 运算器 输入设备 输出设备 计算机硬件 Computer hardware 是指计算机系统中由电子 机械和光电元件
  • 面试题流散汇总

    1 n位数全排列 大字符串相加 SQL HTTPS 根据简历来问 2 MapReduce和Spark的主要区别在于 MapReduce使用持久存储 而Spark使用弹性分布式数据集 RDDS Spark之所以如此快速 原因在于它在内存中处理
  • python爬取链家二手房信息并存储到数据库

    爬取链家的二手房信息 存储到数据库方便以后查看 文章目录 页面分析 引入库 方法编写 主函数编写 运行结果 页面分析 分析页面后发现是前后端未分离的状态 所以需要使用xpath分析界面元素 在li中存放着对应的div 有相关的信息 分析请求
  • Android 接入穿山甲SDK之开屏广告

    大家可以先参考我的上一篇博客介绍了如何集成SDK以及一些工具类传送门 首先创建一个脚本写入如下内容 package com unity3d player chuanshanjia import android app Activity im
  • RabbitMQ教程-重要参数&&API解释

    RabbitMQ的工作原理 下图是RabbitMQ的基本结构 生产者发送消息流程 1 生产者和Broker建立TCP连接 2 生产者和Broker建立通道 3 生产者通过通道消息发送给Broker 由Exchange将消息进行转发 4 Ex