RabbitMQ 消费者过载

2023-12-03

我一直在阅读有关 AMQP 消息传递确认的原理。 (https://www.rabbitmq.com/confirms.html)。确实很有帮助且写得很好的文章,但有关消费者致谢的一个特别的事情确实令人困惑,以下是引用:

使用自动确认模式时需要考虑的另一件事是消费者超载.

消费者超载?消息队列由代理处理并保存在 RAM 中(如果我理解正确的话)。这是关于什么过载?消费者是否有某种第二队列? 该文章的另一部分更令人困惑:

因此,消费者可能会对送货速度感到不知所措,在内存中积累积压并耗尽堆或让操作系统终止其进程。

什么积压?这一切是如何协同工作的?消费者完成哪部分工作(当然除了消费消息和处理消息)?我认为代理正在保持队列活动并转发消息,但现在我正在阅读一些神秘的积压和消费者过载。这真的很令人困惑,有人可以解释一下或者至少指出我的好来源吗?


我相信您所指的文档处理了我认为 AMQP 0-9-1 或 RabbitMQ 的实现中的设计缺陷。

考虑以下场景:

  • 队列中有数千条消息
  • 单个消费者订阅队列AutoAck=true并且没有设置预取计数

将会发生什么?

RabbitMQ 的实现是向没有预取计数的客户端传递任意数量的消息。此外,对于自动确认,预取计数是无关紧要的,因为消息在传递给消费者时得到确认。

内存缓冲区:消费者的默认客户端 API 实现有一个内存缓冲区(在 .NET 中,它是某种类型的阻塞集合(如果我没记错的话))。因此,在处理消息之前,但在从代理接收到消息之后,它进入这个内存中的保存区域。现在,设计缺陷就是这个保存区域。消费者别无选择,只能接受来自代理的消息,因为它是异步发布到客户端的。这是AMQP协议规范(参见第 53 页)。

因此,此时队列中的每条消息都将立即传递给消费者,并且消费者将被消息淹没。假设每条消息都很小,但需要 5 分钟来处理,那么这个消费者完全有可能在任何其他消费者附加到它之前耗尽整个队列。自从AutoAck打开后,代理将在传递后立即忘记这些消息。

显然,如果您想要处理这些消息,这不是一个好的场景,因为它们已经离开了代理的相对安全性,并且现在位于消费端点的 RAM 中。假设遇到了导致消费端点崩溃的异常 - 噗,所有消息都消失了。

如何解决这个问题?

您必须关闭 Auto-Ack,通常设置合理的预取计数也是一个好主意(通常 2-3 就足够了)。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RabbitMQ 消费者过载 的相关文章

  • 过期的消息不会从 RabbitMQ 中删除

    我通过生产者向 RabbitMQ 发送一条普通消息 然后发送第二条消息expiration属性分配给一个值 然后使用rabbitmqctl list queues命令我监视消息的状态 我发现如果我先发送一条普通消息 然后发送一条消息expi
  • Akka 的语言和产品替代品是什么?

    现在我正在看游戏框架 https www playframework com 并且非常喜欢它 Play 中提供的功能中最受宣传的部分之一是Akka http akka io 为了更好地理解 Akka 以及如何正确使用它 您能告诉我其他语言或
  • Spring AMQP + RabbitMQ 3.3.5 ACCESS_REFUSED - 使用身份验证机制 PLAIN 拒绝登录

    我遇到以下异常 org springframework amqp AmqpAuthenticationException com rabbitmq client AuthenticationFailureException ACCESS R
  • Spring AMQP Java 客户端中的队列大小

    我使用 Spring amqp 1 1 版本作为我的 java 客户端 我有一个大约有 2000 条消息的队列 我想要一个服务来检查这个队列大小 如果它是空的 它会发出一条消息说 所有项目已处理 我不知道如何获取当前队列大小 请帮忙 我用谷
  • 无法在Windows上启用rabbitmq管理插件

    所以 这就是我所做的 在我的 Windows x64 位机器上安装了 Erlang 安装 RabbitMQ 启动 RabbitMQ 服务 这一步我没有任何错误 但是 当我尝试启用rabbitmq management时 我在控制台中收到一些
  • 在rabbitmq配置spring boot中在AMQP中配置多个Vhost

    我正在实现一个项目 我必须在rabbitmq中的不同虚拟主机之间发送消息 使用 SimpleRoutingConnectionFactory 但得到 java lang IllegalStateException 无法确定查找键的目标 Co
  • 如何在运行时创建 celery 队列,以便工作人员拾取发送到该队列的任务?

    我正在使用 django 1 4 celery 3 0 rabbitmq 为了描述该问题 我的系统中有许多内容网络 并且我需要一个队列来处理与每个网络相关的任务 然而 内容是在系统运行时动态创建的 因此我需要动态创建队列并让现有工作人员开始
  • 在 docker-compose 文件中提供rabbitmq.conf会给出“sed:无法重命名/etc/rabbitmq/sedMaHqMa:设备或资源繁忙”

    我的 docker compose 看起来像这样 version 3 2 services mq hostname HOST NAME ports 5671 5671 5672 5672 15671 15671 15672 15672 en
  • 在 Windows 10 和 PHP 7.3 中安装 AMQP

    我想在 Windows 10 中使用 PHP 7 3 安装 AMQP 以便在 symfony 4 中使用 Windows 不使用任何 apache iis nginx 并直接由 symfony 运行 一切还好 直到 我决定在项目中使用rab
  • 定义具有多种消息类型的消息传递域

    到目前为止 我见过的大多数 F 消息传递示例都使用 2 4 种消息类型 并且能够利用模式匹配将每条消息定向到其正确的处理函数 对于我的应用程序 由于处理和所需参数的不同性质 我需要数百种独特的消息类型 到目前为止 每个消息类型都是其自己的记
  • 在 Celery 工作线程中捕获 Heroku SIGTERM 以优雅地关闭工作线程

    我对此进行了大量研究 令我惊讶的是我还没有在任何地方找到一个好的答案 我正在 Heroku 上运行一个大型应用程序 并且我有某些运行很长时间处理的 celery 任务 并在任务结束时保存结果 每次我在 Heroku 上重新部署时 它都会发送
  • AMQPRuntimeException:读取数据时出错。收到 0 而不是预期的 7 字节

    它曾经有效 但现在不再有效了 我正在使用 php amqplib 和 RabbitMQ 当我尝试创建新的 AMQP 连接时 connection new AMQPConnection localhost 5672 username pass
  • RabbitMQ - 如何死信/处理过期队列中的消息?

    我有一个队列x expires放 我遇到的问题是我需要对队列中的消息进行进一步处理IF队列过期 我最初的想法是设置x dead letter exchange在队列中 但是 当队列过期时 消息就会消失而不会进入死信交换 如何处理死信或以其他
  • 微服务、amqp 和服务注册/发现

    我正在研究微服务架构 实际上我想知道一些事情 我非常同意使用 返回 服务发现来在基于 REST 的微服务上发出请求 我需要知道发出请求的服务 或至少是服务器集群的前端 在哪里 因此在这种情况下能够发现 ip port 是有意义的 但我想知道
  • 面向服务的架构 - AMQP 或 HTTP

    一点背景 非常大的整体 Django 应用程序 所有组件都使用相同的数据库 我们需要分离服务 以便我们可以独立升级系统的某些部分而不影响其余部分 我们使用 RabbitMQ 作为 Celery 的代理 现在我们有两个选择 使用 REST 接
  • 生产者/消费者的不同语言

    我想知道是否可以通过 AMQP 和 RabbitMQ 对生产者和消费者使用不同的语言 例如 Java 代表生产者 python php 代表消费者 或者反之亦然 是的 AMQP 与语言无关 这意味着只要您有可以连接到 AMQP 的客户端sa
  • RabbitMQ - 无法联系统计数据库。消息速率和队列长度将不会显示

    我已经设置了一个兔子经纪人集群 并且在管理门户插件中我收到以下消息 无法联系统计数据库 消息速率和队列长度将不会显示 我已经搜索过这个错误 但谷歌并不友善 任何人都可以阐明这一点吗 我最近在旧安装的RabbitMQ 2 8 7 上遇到了同样
  • rabbitmq 的 REST API

    有没有办法从 ajax 向 RabbitMQ 发送数据 我的应用程序由数千个 Web 客户端 用 js 编写 和 WCF REST 服务组成 现在我试图弄清楚如何为我的应用程序创建可扩展点 这个想法是有一个rabbitmq实例 它从放置在一
  • 如何使用 Java 在 RabbitMQ 中实现标头交换?

    我是一个新手 试图在java客户端中实现标头交换 我知道这就是 x match 绑定参数的用途 当 x match 参数设置为 any 时 只需一个匹配的标头值就足够了 或者 将 x match 设置为 all 强制所有值必须匹配 但任何人
  • Java如何避免在循环中使用Thread.sleep()

    从我的主线程开始 我启动了两个线程 称为生产者和消费者 两者都包含while true 环形 生产者循环是 UDP 服务器 因此不需要睡眠 我的问题出在消费者循环中 消费者循环从链接队 列中删除对象并将其传递给函数以进行进一步处理 根据研究

随机推荐

  • Python Pandas:为源列的每个不同值创建一个新列(布尔输出作为列值)

    我试图根据数据帧的内容将其源列拆分为几列 然后按以下方式用布尔值 1 或 0 填充新生成的列 原始数据框 ID source column A value 1 B NaN C value 2 D value 3 E value 2 生成以下
  • Access 2010 限制查询结果

    MS Access 2010 中用于将查询结果限制为前 1 000 个的语法是什么 我试过这个 SELECT tblGL Cost Centre Code FROM tblGL LIMIT 1000 但我收到错误 FROM 子句中的语法错误
  • 有什么方法可以控制 blockproc 输出的串联吗?

    这是问题的后续 使用 blockproc 或 im2col 在图像上重叠滑动窗口 所以通过使用代码 B blockproc A 1 1 block fun BorderSize 2 2 TrimBorder false PadPartial
  • 使用 geom_line 连接缺失值

    我试图弄清楚是否可以使用 geom line 连接缺失值 例如 在下面的链接中 构面 F 中的时间 3 处缺少值 在这种情况下 我想要一条线来连接时间 2 和 4 有办法实现这一点吗 https farm8 staticflickr com
  • 从自定义表格视图单元格发送重新加载数据?

    我将如何从自定义 tableViewCell 向 tableView 发送 reloadData 消息 实现这一点的最简单方法是使用委托 在 CustomTableCell h 中定义一个协议 如下所示 protocol CustomTab
  • 递归 - 数字按相反顺序排列

    我需要实现一个递归方法 printDigits 它将整数 num 作为参数 并以相反的顺序打印其数字 每行一位数字 这是我到目前为止所拥有的 public class PrintDigits public static void main
  • 启动层初始化时发生错误 FindException: Module not found

    使用 Java 9 执行简单的 Hello World 程序会导致以下错误消息 boot层初始化时出错java lang module FindException 找不到模块 com pantech myModule 我执行的命令行是 ja
  • 使用 VBA 宏在 CATIA V5R19 中实例化 PowerCopy

    我不知道如何使用 VBA 宏实例化 Power copy 我有一个 CATPart1 其 Power copy 名称为 MyPC 我想在当前部分实例化这个超级副本 仅举例来说 此超级复制输入为 Plane Start point 和 End
  • 查找“立体校正”两个摄像机之间的旋转矩阵

    所以我有一个深度图以及相机的外部和内部 我想取回 3D 点和表面法线 我正在使用该函数重新投影图像至 3D 在stereo rectify函数中找到Q如何得到旋转矩阵between第一和第二相机的坐标系 我有单独的旋转矩阵和平移向量 但如何
  • Android Studio:使用 Mongo Java 驱动程序连接到 MongoDB 服务器

    关于这个问题有很多帖子 但似乎没有人能解决问题 所以也许有些事情已经改变了 我正在尝试将我的 Android 应用程序连接到位于 mLab 上的 MongoDB 服务器 我正在使用 Mongo Java Drived 当然已经将库添加到了
  • Vera ++ TCL规则:列出所有局部变量[关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心以获得指导 我正在尝试为 vera 静
  • Datagrid (WPF) 以编程方式设置列样式(不是 xaml)

    我已经看过了 但还没有找到我正在寻找的确切答案 我有一个绑定到数据源的 DataGrid 视图 我想在带有数据网格的窗口可见后以编程方式设置列的样式 我也想根据一些行为时不时地改变它 我尝试使用 DataGridTemplateColumn
  • 如何在C#中删除注册表值

    我可以使用 Microsoft Win32 Registry 类获取 设置注册表值 例如 Microsoft Win32 Registry SetValue HKEY CURRENT USER Software Microsoft Wind
  • 为我的条形码阅读器读取和写入数据到缓冲区的最佳方法是什么?

    我需要用 C 语言为 Linux 的条形码阅读器编写一个驱动程序 条形码阅读器通过串行总线工作 当我向条形码阅读器发送一组命令时 条形码阅读器应该向我返回状态消息 我设法配置端口并创建信号处理程序 在信号处理程序中 我读取串行总线接收的数据
  • 如何搜索在 Json 中注册为数组的日期?

    如何搜索在 Json 中注册为数组的日期 PostgreSQL 是数据库 下面是相关代码 1 Model class Business db Model tablename business id db Column db Integer
  • GitHub api 获取最后 N 次提交

    是否可以使用 GitHub API 获取 GitHub 存储库中特定分支的最后 N 次提交 我刚刚发现了一些关于提交的 GitHub api 详细信息here 但他们都没有提供有关最后 N 次提交的详细信息 任何人都可以对此提供更好的想法吗
  • 具体详细说明了 NSUserDefaultsDidChangeNotification 上的默认值已更改的内容

    我开始进入 NSUserDefaults 的内部密室 现在我可以使用提供的选择器加上 NSNotification 对象作为参数成功拦截 NSUserDefaultsDidChangeNotification 通知 然而 返回的 NSNot
  • R-lang:如果等于引号,则删除第一个字符

    R新手 我正在尝试从数据框中的行的开头和结尾删除 如果引号不是第一个或最后一个字符 我不想删除 我不确定为什么以下内容不适用于我的数据的数据框 其中每一行都是文本的数据点 引号并不是字符串 而是文本的一部分 数据框的一行看起来像这样 x l
  • 在 bash 中获取当前日期而不生成子进程

    这个问题纯粹是好奇心 通过运行以下命令很容易获得日期date来自 bash 的命令 但它是一个外部可执行文件 需要生成一个子进程 我想知道是否可以在没有子进程的情况下获取当前时间 日期的格式 我只能在以下上下文中找到对日期 时间格式的引用P
  • RabbitMQ 消费者过载

    我一直在阅读有关 AMQP 消息传递确认的原理 https www rabbitmq com confirms html 确实很有帮助且写得很好的文章 但有关消费者致谢的一个特别的事情确实令人困惑 以下是引用 使用自动确认模式时需要考虑的另