使用 RabbitMq 锁定和批量获取消息

2024-04-19

我正在尝试以一种更非常规的方式使用 RabbitMq(尽管此时我可以根据需要选择任何其他消息队列实现)。消费者不会将 Rabbit 推送消息留给我的消费者,而是连接到一个队列并获取一批 N 条消息(在此期间它会消费一些消息,并可能拒绝一些消息),然后跳转到另一个队列,依此类推。这样做是为了冗余。如果某些消费者崩溃,所有消息都保证被其他消费者消费。

问题是我有多个消费者,我不希望他们竞争同一个队列。有没有办法保证队列上的锁?如果不是,我至少可以确保如果两个消费者连接到同一个队列,他们不会读取相同的消息吗?事务可能在某种程度上对我有帮助,但我听说它们将从 RabbitMQ 中删除。

其他架构建议也受到欢迎。

Thanks!

EDIT:正如评论中指出的,我需要如何处理消息有一个特殊性。它们只有在分组时才有意义,并且相关消息很有可能聚集在队列中。例如,如果我提取一批 100 条消息,那么我很有可能能够对消息 1-3、4-5,6-10 等执行某些操作。如果我无法找到某些消息的组,我会会将它们重新提交到队列中。 WorkQueue 不起作用,因为它将消息从同一组传播到多个工人,而这些工人不知道如何处理它们。


您看过这本免费的在线书籍吗?企业集成模式 http://eaipatterns.com/?

听起来您确实需要一个工作流程,在消息到达您的工作人员之前,您需要一个批处理组件。使用 RabbitMQ 有两种方法可以做到这一点。要么使用一种可以为您进行批处理的交换类型(和消息格式),要么拥有一个队列,以及一个对批次进行排序并将每个批次放入其自己的队列中的工作人员。批处理程序可能还应该向控制队列发送“批处理就绪”消息,以便工作人员可以发现新批处理队列的存在。处理完批次后,工作人员可以删除批次队列。

如果您可以控制消息格式,则可以让 RabbitMQ 通过多种方式隐式执行批处理。通过主题交换,您可以确保每条消息上的路由密钥的格式为 work.batchid.something,然后获悉批次 xxyzz 存在的工作人员将使用像 #.xxyzz.# 这样的绑定密钥来仅消费这些消息。无需重新发布。

另一种方法是在标头中包含批次 ID 并使用较新的标头交换类型。当然,如果您愿意编写少量的 Erlang 代码,您也可以实现自己的自定义交换类型。

不过,我确实建议您查看这本书,因为它比大多数人开始使用的典型工作队列概念更好地概述了消息传递架构。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

使用 RabbitMq 锁定和批量获取消息 的相关文章

  • 当我为rabbitmq-management创建用户时,发生了错误

    当我为rabbitmq创建用户时 root localhost rabbitmqctl add user admin admin 发生错误 消息 Creating user admin Error undef crypto hash sha
  • 在 Kafka 中设计生产者和消费者组件

    我在用Kafka and Zookeeper作为我的数据管道的主要组件 该管道每秒处理数千个请求 我在用Samza作为我需要对数据进行小型转换的实时数据处理工具 我的问题是我的一位消费者 比方说ConsumerA 消耗了几个主题Kafka并
  • Akka 的语言和产品替代品是什么?

    现在我正在看游戏框架 https www playframework com 并且非常喜欢它 Play 中提供的功能中最受宣传的部分之一是Akka http akka io 为了更好地理解 Akka 以及如何正确使用它 您能告诉我其他语言或
  • 谁能告诉我 python 中的 pika 和 kombu 消息传递库有什么区别?

    我想在我的应用程序中使用消息传递库与rabbitmq交互 谁能解释一下 pika 和 kombu 库之间的区别吗 Kombu 和 pika 是两个不同的 Python 库 它们从根本上服务于相同的目的 向消息代理发布消息和使用消息代理发送消
  • Spring AMQP Java 客户端中的队列大小

    我使用 Spring amqp 1 1 版本作为我的 java 客户端 我有一个大约有 2000 条消息的队列 我想要一个服务来检查这个队列大小 如果它是空的 它会发出一条消息说 所有项目已处理 我不知道如何获取当前队列大小 请帮忙 我用谷
  • 如何在nodejs中验证rabbitmq?

    错误 握手被服务器终止 403 ACCESS REFUSED 消息 ACCESS REFUSED 使用身份验证拒绝登录 旋转机制平原 有关详细信息 请参阅代理日志文件 我单独尝试了 authMechanism PLAIN AMQPLAIN
  • 消息队列 makefile 错误:未定义对“mq_open”的引用

    虽然我已经链接了 lrt在我的 Makefile 中 正如你在下面看到的 我仍然得到undefined reference to mq open 请帮忙 all get1 iserv1 get get1 c gcc Wall o get1
  • 每次发布后我应该关闭通道/连接吗?

    我在 Node js 中使用 amqplib 但我不清楚代码中的最佳实践 基本上 我当前的代码调用amqp connect 当 Node 服务器启动时 然后为每个生产者和每个消费者使用不同的通道 而不会真正关闭它们中的任何一个 我想知道这是
  • RabbitMQ 启动失败

    RabbitMQ Windows 服务将无法启动 C Program Files x86 RabbitMQ Server rabbitmq server 3 0 4 sbin gt rabbitmq service bat start C
  • RabbitMQ - 升级到新版本并收到很多“PRECONDITION_FAILED Unknown Delivery Tag 1”

    刚刚升级到新版本的 RabbitMQ 2 3 1 现在出现以下错误 PRECONDITION FAILED unknown delivery tag 1 随后通道关闭 这适用于较旧的 RabbitMQ 无需客户端更改 在应用程序行为方面 当
  • 无法从 docker 将 RabbitMQ 连接到我的应用程序 [重复]

    这个问题在这里已经有答案了 我目前被这个问题困扰了大约一周 确实找不到合适的解决方案 问题是 当我尝试连接到 dockerized RabbitMQ 时 它每次都会给出相同的错误 wordofthedayapp wordofthedayap
  • MongoDB 架构设计 - 实时聊天

    我正在启动一个项目 我认为该项目特别适合 MongoDB 因为它提供的速度和可扩展性 我目前感兴趣的模块是与实时聊天有关的 如果我要在传统的 RDBMS 中执行此操作 我会将其分为 频道 一个频道有很多用户 用户 一个用户有一个频道但有多条
  • 可以从 http(javascript 客户端)直接向 Amazon SQS 发送请求吗?

    是否可以直接从 JavaScript 向 Amazon 的 SQS 发送消息请求 我正在尝试创建一个日志系统 并且希望绕过将请求发送到中间人服务器 另外 有人知道我可以利用这个解决方案的任何替代方案吗 SQS 事实上所有 aws 服务 都公
  • Celery 任务状态取决于 CELERY_TASK_RESULT_EXPIRES

    据我所知 任务状态完全取决于 CELERY TASK RESULT EXPIRES 设置的值 如果我在任务完成执行后检查此间隔内的任务状态 则返回的状态为 AsyncResult task id state 是正确的 如果没有 状态将不会更
  • 多个队列在一个通道中消耗

    我使用rabbitMq 来管理和使用队列 我有多个队列 它们的数量并不具体 我使用直接交换来发布消息 我怎样才能仅使用一个队列来消费每个队列的所有消息 基于routing key 渠道 此时我假设我有 5 个队列 我使用了 for 循环并为
  • RabbitMQ 失败,错误:无法连接到节点rabbit@TPAJ05421843:nodedown

    在 Windows 7 Enterprise 计算机上 我全新安装了 Erlang 17 4 和 RabbitMQ 3 4 3 x64 安装成功且顺利 我还没有尝试创建我的第一个队列或交换器 但我已经看到了麻烦 这个问题类似于另一个SO帖子
  • Linux 消息队列 - 多个接收者

    我最近一直在研究和研究 Linux 消息队列 并遇到了一些我不太明白为什么会发生的事情 如果我们运行两个程序 它们都在无限 for 循环中使用 msgrcv 来检查消息 然后发送两条消息 那么第一个运行的程序将收到第一条消息 第二个程序将收
  • 在 PHP 中使用消息队列与普通 Cron 作业之间的区别

    我们有一个基于 PHP 构建的大型 Web 应用程序 该应用程序允许安排推文和墙贴 并且有从服务器发出的预定电子邮件 我所说的 计划 是指这些 PHP 脚本计划在特定时间运行cron 大约有 7 个 PHP 文件可以完成上述工作 我一直听说
  • Azure 服务总线中的死信队列中的消息是否会过期?

    Azure 服务总线中的死信队列中的消息是否会过期 一些解释 我有这些队列设置 var queueDescription new QueueDescription MyTestQueue RequiresSession false Defa
  • AMQPRuntimeException:读取数据时出错。收到 0 而不是预期的 7 字节

    它曾经有效 但现在不再有效了 我正在使用 php amqplib 和 RabbitMQ 当我尝试创建新的 AMQP 连接时 connection new AMQPConnection localhost 5672 username pass

随机推荐

  • 如何从xamarin表单应用程序将图像上传到服务器

    我正在尝试使用 post 请求将图像从我的 xamarin 表单应用程序发送到 asp net core 服务器 我需要将图像保存在某个服务器文件夹中 但我做不到 这是我在 mediaFile 中选择图像后发送图像的方法 private a
  • 如何使用 Identity Server 4 颁发基于 Windows 身份验证的访问令牌

    我的目标是保护 Web API 以便客户端只能使用 IS 基于 Windows 身份验证颁发的访问令牌来访问它 我完成了这个基本示例 http docs identityserver io en release quickstarts 1
  • 全局运算符和成员运算符的区别

    定义一个接受类的两个引用的全局运算符和定义一个仅接受正确操作数的成员运算符之间有区别吗 Global class X public int value bool operator X left X right return left val
  • Tensorflow - 保存模型

    我有以下代码 在尝试保存模型时出现错误 我可能做错了什么 我该如何解决这个问题 import tensorflow as tf data labels cifar tools read data C Users abc Desktop Te
  • 如何从 JObject 获取第一个键?

    我在用Newtonsoft Json在我的项目中 我有JObject像这样 4781 Name 1 1577 Name 2 9973 Name 3 我成功解析它JObject Parse 我需要从此 JObject 获取第一个密钥 4781
  • Javascript CRC16 示例代码或实现

    有人可以分享一个链接或示例代码来实现 JavaScript 中字符串的校验和吗 预先非常感谢 你想要什么 你需要更具体 CRC16 算法数量众多 每种算法都有自己的多项式并用于特定用途 一些 CRC16 算法非常适合创建哈希 例如 对于 R
  • 如何在ggplot2中实现手绘铅笔填充? [关闭]

    Closed 此问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我发现了这篇关于可视化的精彩博客文章 http www darkhorseanalytics com b
  • 进程的异步生成:设计问题 - Celery 或 Twisted

    全部 我正在寻求意见 指导 和设计理念 我的目标是找到一种精简但可靠的方法来从 HTTP POST 获取 XML 有效负载 这部分没有问题 对其进行解析 并异步生成一个相对寿命较长的进程 生成的进程是 CPU 密集型进程 将持续大约三分钟
  • Excel:如果在另一列中发现重复的单元格值,则突出显示绿色

    有人可以帮助我 我不知道该使用什么公式 我突出显示了图片中的单元格以展示我的意思的示例 What I want to do is highlight the cell in column A where the value matches
  • Python OrderedDict 不保持元素顺序[重复]

    这个问题在这里已经有答案了 我正在尝试创建一个 OrderedDict 对象 但我一创建它 元素就变得混乱了 这就是我所做的 from collections import OrderedDict od OrderedDict 0 0 2
  • Sublime Text 默认保存选项

    为什么当我在 Sublime Text 3 中保存文件时 默认保存位置是 Sublime 安装目录 为什么默认文件类型是什么 我想将默认保存位置设置为桌面并将默认文件类型设置为 txt 我该如何执行此操作 这是我的设置 font size
  • iOS 上的背景图像随着用户交互而闪烁 [Ionic 5]

    我正在尝试让背景图像在我正在从 Ionic 3 更新到 5 的多页 Ionic 应用程序上工作 除了加载的第一页之外 我在 iOS 上的任何页面上都遇到了闪烁的背景图像问题 我尝试实施这个解决方案 如何在 Ionic 中将图像同时放入 和
  • 应用程序在 Play 商店中上线后 Android 应用程序链接不起作用

    我已经根据以下链接实现了 Android 应用程序链接 https developer android com studio write app link indexing html https developer android com
  • 嵌套类模板特化

    A class template
  • 流星合并同一集合的光标

    在我的社交应用程序 如 FB 中 我有一个奇怪的需要 将同一集合用户的两个光标合并到一个发布中 Meteor 服务器打印此错误 发布函数为集合用户返回了多个游标 也许这在 Meteor 0 7 2 中无法完成 也许我的方法是错误的 但我发现
  • 如何在Python中获取Linux控制台窗口宽度

    python 有没有办法以编程方式确定控制台的宽度 我的意思是一行中不换行的字符数 而不是窗口的像素宽度 Edit 寻找适用于 Linux 的解决方案 不确定为什么它在模块中shutil 但它在 Python 3 3 中出现了 看 查询输出
  • 如何在 java 类方法或构造函数中插入前提条件?

    这是我正在上的 Java 课程 本书提到了前置条件和后置条件 但没有给出任何如何对其进行编码的示例 它继续讨论断言 我已经把它记下来了 但是我正在做的作业特别指出插入前提条件并用断言测试前提条件 任何帮助都会很棒 像 Eiffel 这样的语
  • 使用独立对齐和附加的 Listview 元素反应本机 Listview 网格布局

    我有一个关于 Listview 元素对齐的问题 这些元素应该以比行样式更盒装的样式显示 在图片中 您可以看到当前状态 这是通过使用 Listview 的 contentContainerStyle prop 中使用的样式表代码生成的 Lis
  • 带有默认参数的 Swift 选择器

    我在这里编写简单的代码 self navigationItem leftBarButtonItem UIBarButtonItem barButtonSystemItem UIBarButtonSystemItem Cancel targe
  • 使用 RabbitMq 锁定和批量获取消息

    我正在尝试以一种更非常规的方式使用 RabbitMq 尽管此时我可以根据需要选择任何其他消息队列实现 消费者不会将 Rabbit 推送消息留给我的消费者 而是连接到一个队列并获取一批 N 条消息 在此期间它会消费一些消息 并可能拒绝一些消息