已发布并等待 RabbitMQ / EasyNetQ 主题的回复 - 如何仅获取您的主题?

2024-01-08

当发布者期望得到消息的答案时,如何确保在扩展时它只能得到(与其自己的消息相关的)答案?

我们有一个客户端进程,它发布一条消息以供服务器进程应答。此外,我们有一个“监听器”进程,只需要消耗问题和答案,而不需要发布任何内容。此外,服务器进程将来可能会被分成多个进程,从而创建消息级联。我们不能使用请求/响应,因为我们需要监听器,然后,当我们需要级联时...此外,我们将有几个问题/答案类别,并且 EasyNetQ 中的请求/响应不支持主题。

我们使用 EasyNetQ 的解决方案是简单的基于主题的发布/订阅:客户端发布到“question“主题,订阅”answer”,服务器订阅“question”并发布到“answer”,听众只需订阅两者即可。

问题是当你扩展客户端时。它的两个实例现在发布问题,但由于它们都订阅了一个“answer”主题,一个人可能会得到另一个实例发布的问题的答案,而不是得到自己的答案。

我们找到的解决方案是让客户端在订阅“时使用唯一命名的队列”answer“ - 这样每个客户端都会得到所有的答案,只需要忽略不属于他的答案。但是,这个解决方案有一些性能缺陷,并且还会导致每次客户端崩溃(或只是在开发过程中重新启动等)。

客户端,发送对象消息:

string corrId = Guid.NewGuid().ToString();

// Register the corrId in a dictionary
//...

var myMessage = new MyMessage {correlationId =corrId, realMessage = msg};
easyNetQBus.Subscribe<MyMessage>("mqClient"+uniqueSuffix, HandleMsg, x => x.WithTopic("answer"));
easyNetQBus.Publish(myMessage, "question");

// In HandleMsg, we see if we have issued questions with the correlation id that came with the answer (lookup in the dictionary) and if not, ignore it

Server:

easyNetQBus.Subscribe<MyMessage>("mqServer", HandleMsg, x => x.WithTopic("question"));

// In HandleMsg, we publish the answer back to "answer" with the correlation id from the question

我们还应该使用另一种模式吗?我们可以在每条消息中放入一个唯一的主题/队列来发送答案,但这使听众的生活变得复杂,也使我提到的级联中未来参与者的灵活性变得复杂......


解决你的问题的一种方法是使用远程过程调用模式 https://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html。这提供了一种简单、可扩展的方式来为每个客户端提供唯一的队列。

从例子来看:

  • Client启动时,会创建一个匿名独占回调 队列。对于 RPC 请求,客户端发送一条消息,其中包含两个 属性:ReplyTo,设置为回调队列和 CorrelationId,为每个请求设置一个唯一值。
  • 请求被发送到 rpc_queue 队列。 RPC 工作线程(又名: 服务器)正在等待该队列上的请求。当一个请求 出现后,它会完成工作并将结果发送回至 客户端,使用 ReplyTo 属性中的队列。
  • 客户端等待回调队列上的数据。当一条消息 出现时,它会检查 CorrelationId 属性。如果匹配的话 请求中的值,它返回响应 应用。

如果您的回调队列是自动删除的,则独占队列(RabbitMQ 队列文档 https://www.rabbitmq.com/queues.html),那么当客户端或服务器重新启动时,它们不应该堆积起来。

对于某些工作负载,队列应该是短暂的。尽管 客户端可以删除他们在断开连接之前声明的队列,这 并不总是方便。最重要的是,客户端连接可能会失败, 可能会留下未使用的资源(队列)。

有3种方法可以让队列自动删除:

  • 独占队列(如下所述)
  • TTL(下面也有介绍)
  • 自动删除队列

当最后一个消费者被删除时,自动删除队列将被删除 取消(例如使用basic.cancel在 AMQP 0-9-1 中)或消失 (关闭通道或连接,或丢失与 服务器)。

如果一个队列从来没有任何消费者,例如,当所有消费 发生使用basic.get方法(“拉”API),它不会 自动删除。对于这种情况,请使用独占队列或队列 TTL。 (RabbitMQ 队列文档 https://www.rabbitmq.com/queues.html)

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

已发布并等待 RabbitMQ / EasyNetQ 主题的回复 - 如何仅获取您的主题? 的相关文章

  • 如何使用 Java 在 RabbitMQ 中实现标头交换?

    我是一个新手 试图在java客户端中实现标头交换 我知道这就是 x match 绑定参数的用途 当 x match 参数设置为 any 时 只需一个匹配的标头值就足够了 或者 将 x match 设置为 all 强制所有值必须匹配 但任何人
  • ECONNREFUSED:无法连接到集群内默认端口上的 RabbitMQ pod

    我的本地集群中有一个运行 RabbitMQ 的 pod 我已经将其配置为 apiVersion v1 kind Service metadata name service rabbitmq spec selector app service
  • 从 RabbitMQ 迁移到 Amazon SQS [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我们的初创公司目前正在使用RabbitMQ with Python Django 对于消息队列 现在我们计划转移到Amazon SQS其高可用性
  • 有人还在使用客户端服务器架构吗[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我编写软件已有几十年了 现在一切都是网络 在网络出现之前 我们拥有的客户端服务器应用程序基本上是直接与数据库对话的胖客户端应用程序 它
  • Google BigQuery 底层架构

    所以我大约 10 分钟前才开始摆弄 Google BigQuery 我想知道是否有人知道他们用来存储数据的底层架构 例如 这只是他们自己的下一代 BigTable 基础设施吗 另外 他们在索引 索引重建等方面使用什么样的策略是否清楚 我只是
  • Celery 广播 vs RabbitMQ 扇出

    我最近一直在使用 Celery 但我不喜欢它 它的配置很混乱 过于复杂并且文档记录很少 我想用 Celery 从单个生产者向多个消费者发送广播消息 让我困惑的是 Celery 术语和底层传输 RabbitMQ 术语之间的差异 在 Rabbi
  • PouchDB可以在客户端代理一个大数据库吗?

    在单页应用程序上工作时 我必须编写大量样板代码才能与服务器端数据同步 PouchDB http pouchdb com 为这个问题提供了一个优雅的解决方案 允许在客户端本地访问数据 我不明白的是 当数据库太大而无法完全适应浏览器内存时 Po
  • Django Celery 和多个数据库(Celery、Django 和 RabbitMQ)

    是否可以设置与 Django Celery 一起使用的不同数据库 我有一个配置了多个数据库的项目 并且不希望 Django Celery 使用默认数据库 如果我仍然可以使用 django celery 管理页面并读取存储在这个不同数据库中的
  • SOA、客户端-服务器、Web 服务 - 有什么区别? [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 在阅读了一些有关 SOA Web 服务和客户端 服务器架构主题的文献之后 我真的对这些术语感到困惑 因为看不出它们之间的真正区别 有人
  • 使用 Spring Boot 的多个 Rabbitmq 队列

    来自 Spring Boot 教程 https spring io guides gs messaging rabbitmq https spring io guides gs messaging rabbitmq 他们给出了创建 1 个队
  • 为什么我们需要 RESTful Web 服务?

    我将学习 RESTful Web 服务 最好说我必须这样做 因为它是 CS 硕士学位课程的一部分 我在 Wikipedia 上阅读了一些信息 还在 Sun Developer Network 上阅读了一篇有关 REST 的文章 我发现这不是
  • 在 Perl 中优雅地确定系统架构

    我正在寻找一种简单的方法来确定 Perl 5 中的系统是 32 位还是 64 位 我已阅读perlvar来回翻阅手册页 并没有发现包含系统 CPU 架构的变量 编译 Perl 的 CPU 架构将足够接近 这是我最接近的 chomp my a
  • 如何停止rabbitmq服务器

    我正在尝试启动一个节点应用程序 但我认为rabbitmq 妨碍了我 与此线程类似 名为 rabbit 的节点已经在运行 但也 无法连接到节点 rabbit https stackoverflow com questions 8737754
  • SQL Server 中数据库数量的实际限制?

    在 stackoverflow 播客之一 我认为是 18 中 Jeff 和 Joel 正在讨论多租户数据库与单租户数据库 Joel 提到 FogBugz on Demand 使用了每个客户架构的数据库 我想知道是否有一个点超过这个点 您将需
  • 内存缓存 VS。分布式系统中的集中式缓存

    我们目前正在寻找最合适的解决方案来访问分布式系统上的关键数据 并且我们正在考虑是否使用内存缓存而不是集中式缓存 有关我们希望存储 访问的数据的一些信息 数据量非常小 数据很冷 这意味着它几乎不会改变 并且只有当人们改变我们后台系统中的某些内
  • 使用 Celery 时出现错误消息“无法找到记录器“多处理”的处理程序”

    RabbitMQ http en wikipedia org wiki RabbitMQ现在似乎工作正常 然而 当我尝试 python m celery bin celeryd loglevel INFO 常规的celeryd不起作用 我收
  • RabbitMq 和“致命错误:握手失败 -handshake_decode_error”

    我正在使用 Windows Server 2012 Erlang 19 2 和 RabbitMq 3 6 6 我在使用 TLS 配置端点之间的连接时遇到问题 我已经尝试了所有关于 SO 的答案 以及所有 RabbitMq 文档here ht
  • Celery 设计帮助:如何防止并发执行任务

    我对 Celery AMQP 相当陌生 正在尝试提出一个任务 队列 工作人员设计来满足以下要求 我有多种类型的 每用户 任务 例如 TaskA TaskB TaskC 这些 每用户 任务中的每一个都为系统中的一个特定用户读取 写入数据 因此
  • django npm 和 Node 包架构

    在我加入的项目中 这是以下架构node packages Django project app1 app2 node modules foundation sites grunt static css images js urls py s
  • 如何阻止或拦截对 Directory.Delete(path, true) 的调用

    最近我正在调试一个 poof all customer data is gone 问题 没花多少时间就发现错误的分支导致了Directory Delete customerRoot true 行代码 这条灾难性的代码是由一名普通的 GUI

随机推荐

  • Hibernate 带注释的类中使用的“目录”是什么? [复制]

    这个问题在这里已经有答案了 在 Java 类中我看到了以下内容 Table name user catalog users 它的用途是什么 From 维基百科 http en wikipedia org wiki Database cata
  • 反转 ClientDataSet 索引的顺序

    我想反转 TClientDataSet 中索引的顺序 下面的代码看起来应该可以解决问题 但什么也没做 有没有一种好方法来反转索引的顺序 procedure TForm8 Button1Click Sender TObject var ind
  • VB.net,如何按值对集合项进行排序

    如何在 VB NET 中按值对集合项进行排序 我想对此进行排序 Dim col as Collection New Collection col Add b b1 col Add a a1 col Add d d1 就像 Krishnadd
  • Google Maps JS v3:地图显示:无;地图初始化后导致地图损坏

    This当然涉及到之前有关初始化期间地图显示的问题 然而 这里的问题是在地图应该已经初始化之后 地图显示被设置为无 我的 widow onload 的最后一行设置要显示的地图 none 那时地图初始化应该已经完成 但事实仍然是 最终的调用导
  • 使用 JSON 将嵌套对象发布到 Spring MVC 控制器

    我有一个控制器 其 POST 处理程序定义如下 RequestMapping value ajax saveVendor do method RequestMethod POST public ResponseBody AjaxRespon
  • Dask 分布式工作线程在运行许多任务时总是会泄漏内存

    有哪些策略可以解决或调试这个问题 distribution worker 警告 内存使用率很高 但工作线程没有数据可存储到磁盘 也许其他进程正在泄漏内存 进程内存 26 17 GB 工作内存限制 32 66 GB 基本上 我只是在一台机器上
  • Ember.Object 实例中必需的属性(构造函数参数)

    在 Ember 中 假设我有一个名为FoodStuff它有几个属性 export default Ember Object extend name null REQUIRED Slice of Apple Pie calories null
  • 使用 jQuery 使用 WCF 服务

    到目前为止 我已经使用了 Web 服务 并且运行良好 我添加了一个新的 WCF 服务 我正在使用 jQuery 调用服务 这就是我使用 jQuery 来使用 Web 服务的方式 ajax dataType json processData
  • 如何在reducer中处理redux-form/CHANGE

    推荐的处理方式是什么redux form CHANGE动作由 redux form 调度 我有自己的减速器来管理此表单的状态 但我不确定是否最好执行以下操作 export default reducer state initialState
  • WHOIS 命令未返回有用信息?

    如今 每当您使用 WHOIS 命令时 都不会返回任何有用的信息 我通常必须去 Godaddy Dnsstuff 或其他服务来获取数据 据我了解 原因主要是由于垃圾邮件 我只是想知道其他服务如何获取这些数据 他们使用不同类型的 WHOIS 命
  • 插入 SQL 数据库之前检查重复项

    所以我一直在做一些研究 我需要写一份INSERT语句将唯一的客户端名称插入到我的服务器上的表中 然而 数据库的默认标准已经有数千个客户端 当插入新客户端时 我们需要在尝试将其添加到系统之前检查它们是否已经存在 我的问题是最好 最快的方法是什
  • 在 JavaFX 中右键单击?

    如何检测 处理 JavaFX 中的右键单击 这是一种方法 import javafx stage Stage import javafx scene Scene import javafx scene shape Rectangle imp
  • 尝试导入sql文件时出现表已存在错误

    我正在尝试通过 phpMyAdmin 上传备份 sql 文件 创建与 phpMyAdmin 中的导入文件具有相同数据库名称的空数据库 然后使用从此空数据库中选择的导入函数 我收到以下错误消息 1050 Table db t already
  • JPA Native 查询获取单个对象

    如何使用 JPA Native 查询获取单个对象 我做了一些研究 但所有给出的答案都是使用 getSingleResult 但它没有返回我想要得到的东西 例如 如果我想获取数据库中表的计数并将其提取到整数中 我该怎么办 下面的代码显示了我如
  • Nginx 使用 CORS 和凭据

    我正在构建一个通过 Nginx 服务器与 Laravel API 进行通信的 Web 应用程序 我尝试按照 Nginx 网站上的说明进行操作完全开放的科尔斯 https enable cors org server nginx html 但
  • 无法解析符号 WebSecurityConfigurerAdapter

    我尝试在我的 java 应用程序中创建基本身份验证 对于他们 我在 gradle 文件中使用了这个依赖项 dependencies compile group org springframework boot name spring boo
  • Netlogo HPC CPU 使用百分比增加

    我通过以下方式使用无头 NetLogo 将作业提交到 HPC 服务器code bin bash N r20p q all q pe mpi 24 home abhishekb netlogo netlogo 5 1 0 netlogo he
  • RuntimeException 未安装 Zip PHP 扩展

    我是 Linux 新手 我刚刚安装了 Composer 和 Laravel 但是当我运行laravel new project我收到以下错误 RuntimeException The Zip PHP extension is not ins
  • 如何将 Jupyter 中的数据框导出为 csv 文件

    我有一个数据框另存为df在 jupyter 笔记本中 我想将其导出到桌面上的 csv 文件中 对于 pandas 数据框 您可以使用df to csv data csv https pandas pydata org pandas docs
  • 已发布并等待 RabbitMQ / EasyNetQ 主题的回复 - 如何仅获取您的主题?

    当发布者期望得到消息的答案时 如何确保在扩展时它只能得到 与其自己的消息相关的 答案 我们有一个客户端进程 它发布一条消息以供服务器进程应答 此外 我们有一个 监听器 进程 只需要消耗问题和答案 而不需要发布任何内容 此外 服务器进程将来可