Mass Transit:当存在不同消息类型时确保消息处理顺序

2024-01-10

我是公共交通新手,我想了解它是否对我的情况有帮助。 我正在构建一个使用 CQRS 事件源架构实现的示例应用程序,并且我需要一个服务总线,以便将命令堆栈创建的事件分派到查询堆栈反规范化器。

假设我们的域中有一个聚合,我们称之为Photo,以及两个不同的域事件:照片上传 and 照片存档.

在这种情况下,我们有两种不同的消息类型,默认的 Mass Transit 行为是创建两个不同的 RabbitMq 交换:一个用于 PhotoUploaded 消息类型,另一个用于 PhotoArchived 消息类型。

让我们假设有一个名为照片去规格化器:此服务将是两种消息类型的使用者,因为每当上传或存档照片时,照片读取模型都必须更新。

给定默认的公共交通拓扑,将有两个不同的交换,因此无法保证不同类型的事件之间的消息处理顺序:我们唯一的保证是相同类型的所有事件都将按顺序处理,但我们无法保证不同类型事件之间的处理顺序(请注意,考虑到我的示例的事件语义,处理顺序很重要)。

我该如何处理这种情况?公共交通适合我的需求吗?我是否完全错过了域事件调度的要点?


免责声明:这不是对您问题的回答,而是一条预防性信息,为什么您不应该做你打算做的事。

虽然 RMQ 等消息代理和 MassTransit 等消息中间件库非常适合集成,但我强烈建议不要使用消息代理进行事件溯源。我可以参考我的旧答案事件溯源:何时(以及不)应该使用消息队列? https://stackoverflow.com/questions/41131609/event-sourcing-when-and-not-should-i-use-message-queue这解释了其背后的原因。

您发现自己的原因之一 - 活动顺序永远无法得到保证。

另一个明显的原因是,从通过消息代理发布的事件构建读取模型有效地消除了重放的可能性,并构建了需要从一开始就开始处理事件的新读取模型,但它们得到的只是以下事件:正在出版now.

聚合形成事务边界,因此每个命令都需要保证它在一个事务内完成。虽然 MT 支持交易中间件 http://masstransit-project.com/MassTransit/advanced/transactions.html,它只保证您获得支持它们的依赖项的事务,但不保证您获得支持它们的依赖项的事务context.Publish(@event)在消费者主体中,因为 RMQ 不支持交易。您很有可能提交更改并且不会在读取端获取事件。因此,事件存储的经验法则是您应该能够订阅更改流从商店,并且不要发布代码中的事件,除非这些事件是集成事件而不是域事件。

对于事件溯源,每个读取模型在其投影的事件流中保留自己的检查点至关重要。消息代理不会给你这种权力,因为“检查点”实际上是你的队列,一旦消息从队列中消失 - 它就永远消失了,不会再回来。

关于实际问题:

您可以使用消息拓扑配置 http://masstransit-project.com/MassTransit/advanced/topology/message.html为不同的消息设置相同的实体名称,然后将它们发布到同一个交易所,但这属于“滥用”类别,就像克里斯在该页面上写道的那样。我还没有尝试过,但你绝对可以尝试一下。消息 CLR 类型是元数据的一部分,因此不应存在反序列化问题。

但同样,将消息放入同一个交换中不会给您提供任何排序​​保证,除了所有消息都将进入消费服务的一个队列中这一事实之外。

您至少必须根据聚合 ID 设置分区过滤器,以防止并行处理同一聚合的多条消息。顺便说一句,这对于集成也很有用。我们就是这样做的:

void AddHandler<T>(Func<ConsumeContext<T>, string> partition) where T : class
    => ep.Handler<T>(
        c => appService.Handle(c, aggregateStore), 
        hc => hc.UsePartitioner(8, partition));

AddHandler<InternalCommands.V1.Whatever>(c => c.Message.StreamGuid);
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Mass Transit:当存在不同消息类型时确保消息处理顺序 的相关文章

  • 如何在 celery task.apply_async 中使用优先级

    我有一个testcelery 中的队列 我为它定义了一个任务 celery app task queue test ignore result True def priority test priority print priority 它
  • 如何触发 IModel.BasicAcks?

    我第一次使用 RabbitMQ 的 NET API 我想出了一个对我来说似乎合理的用例 我想创建发布消息并在消息被确认后执行某些操作的发布者 IModel BasicAcks 事件似乎是了解这一点的好方法 所以 我给出版商写了一封信 pri
  • Azure WebJobs SDK 服务总线死信队列

    使用 WebJobs SDK 时 将 BrokeredMessage 移动到死信队列的正确方法是什么 通常我只会调用 msg DeadLetter 但是 SDK 负责管理代理消息的生命周期 如果方法返回成功 它将调用 msg Complet
  • 设置服务总线辅助角色的 OperationTimeOut 属性

    我正在使用服务总线辅助角色模板创建辅助角色 我处理每条消息都要花费一分多钟的时间 因此 我发现工作角色多次收到相同的消息 大约每分钟收到一条消息 我认为这是因为该值默认为 60 秒 http msdn microsoft com en us
  • 无法在Windows上启用rabbitmq管理插件

    所以 这就是我所做的 在我的 Windows x64 位机器上安装了 Erlang 安装 RabbitMQ 启动 RabbitMQ 服务 这一步我没有任何错误 但是 当我尝试启用rabbitmq management时 我在控制台中收到一些
  • RabbitMQ 中多个消费者如何订阅同一主题并获取同一消息

    首先 我知道类似问题已经有答案了here https stackoverflow com questions 10620976 rabbitmq amqp single queue multiple consumers for same m
  • Django、RabbitMQ 和 Celery - 为什么在我更新开发中的 Django 代码后,Celery 会运行旧版本的任务?

    所以我有一个 Django 应用程序 它偶尔会向 Celery 发送任务以进行异步执行 我发现 当我在开发中处理代码时 Django 开发服务器知道如何自动检测代码何时发生更改 然后重新启动服务器 以便我可以看到我的更改 然而 我的应用程序
  • 何时使用 RabbitMQ 而不是 Kafka? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我被要求评估 RabbitMQ 而不是 Kafka 但发现很难找到消息队列比 Kafka 更合适的情况 有谁知道消息队列在吞吐量 耐用性 延迟或
  • Celery 任务状态取决于 CELERY_TASK_RESULT_EXPIRES

    据我所知 任务状态完全取决于 CELERY TASK RESULT EXPIRES 设置的值 如果我在任务完成执行后检查此间隔内的任务状态 则返回的状态为 AsyncResult task id state 是正确的 如果没有 状态将不会更
  • 使用 Windows 服务总线 (1.1) 配置向导配置时出错

    我正在尝试使用服务总线配置向导配置 Windows 服务总线 1 1 当我尝试配置它时 出现以下错误 谁能告诉我出了什么问题 错误 5 9 2014 9 32 40 AM System Management Automation Cmdle
  • springrabbitmq:无法将id设置为属性?

    我有一个属性文件 其中包含队列 其值为queue name 如果我在其他请使用该属性 那么它可以工作 但如果我在 id 上使用它 那么它会失败
  • RabbitMQ 上的 Nack 和拒绝

    我想处理消费者从队列中获取的不成功的消息并将它们重新排队 想象一下我有这样的情况 P gt foo bar baz gt C 其中 foo bar 和 baz 是消息 如果消费者读到baz但出了问题 我可以使用basic reject or
  • 定义具有多种消息类型的消息传递域

    到目前为止 我见过的大多数 F 消息传递示例都使用 2 4 种消息类型 并且能够利用模式匹配将每条消息定向到其正确的处理函数 对于我的应用程序 由于处理和所需参数的不同性质 我需要数百种独特的消息类型 到目前为止 每个消息类型都是其自己的记
  • 即使设置了 cookie,RabbitMQ 身份验证也会失败

    我最近在运行 lattePanda 的 Windows 10 上安装了带有 ErlanOTP 的rabbitmq 我运行rabbitmqctl status并收到以下错误 C Program Files RabbitMQ Server ra
  • 事件溯源:在重放事件并监听新传入事件时避免项目重复事件

    在需要构建新视图的场景中 我们可以重播来自活动商店 结果 我们将投影出新的视图 因此 我们的想法是部署一个新的投影 该投影可以投影所有旧事件 通过重播 并监听新传入的事件并投影它们 我认为在读取旧事件和收听新传入事件时可能会发生比赛条件 因
  • 事件源和 SQL Server 多个关系表

    我们使用 SQL Server 2016 的事件源 我们有完整的客户产品应用程序 每个应用程序都标记为CustomerId并在事件商店中获取单个指南行项目 这是写入事件存储指南的主要标识符 产品应用程序附带许多不同的关系事物 没有引导 但有
  • MassTransit 生成我想忽略的_skipped 队列

    任何人都可以猜出问题是什么 因为我不知道如何解决这个问题 大众运输产生 skipped队列 我不知道为什么它会生成这些队列 它是在执行发布请求响应时生成的 请求客户端是使用 MassTransit RequestClientExtensio
  • 使用流程管理器(又名 saga)在同一有界上下文中跨聚合根实现最终一致性

    假设您的有界上下文中有两个聚合 它们之间存在一些约束 使用 DDD 这些内部聚合约束不能在同一事务中强制执行 即聚合边界是事务边界 您是否会考虑使用 Microsoft CQRS 旅程中所谓的 流程管理器 来协调同一有界上下文中的两个聚合
  • AMQP如何克服直接使用TCP的困难?

    AMQP如何克服直接使用TCP发送消息时的困难 或者更具体地说 在发布 订阅场景中 在 AMQP 中 有一个代理 该代理接收消息 然后完成将消息路由到交换器和队列的困难部分 您还可以设置持久队列 即使客户端断开连接 也可以为客户端保存消息
  • 更改 RabbitMQ 队列中的参数

    我有一个 RabbitMQ 队列 最初声明如下 var result channel QueueDeclare NewQueue true false false null 我正在尝试添加死信交换 因此我将代码更改为 channel Exc

随机推荐

  • 为什么 eb 部署在第一次部署时失败?

    我创建了 2 个 NodeJs 环境 效果很好 后来创建了一个新的环境并eb deploy env name给我以下错误 Creating application version archive app ceb7 200713 223016
  • Android:将compileSdkVersion设置为比最新api更低的版本有优势吗?

    将清单中的compileSdkVersion设置为小于最新的api版本号有什么好处 还是应该始终将其设置为最新的api版本 android compileSdkVersion 22 当我说优势时 我指的是应用程序的性能 应用程序的编译时间
  • 使用谷歌应用程序引擎部署 Bottle 应用程序时出现问题

    这里是新手 我一直在尝试使用谷歌应用程序引擎在瓶子中创建一个 Hello World 我显示了 hello world 部分 但即使在索引页面上 我也得到以下输出 Hello world 状态 500 如果我尝试添加新路由 例如 page
  • 关闭服务器后尝试运行服务器时,Django python已停止工作

    python manage py runserver 第一次工作正常 但是用 ctrl c 关闭它后 我无法再次启动它 我收到错误消息 Python 已停止工作 通过重新启动计算机可以轻松解决此问题 但非常不方便 我也使用 pycharm
  • 编译 Rust 静态库并在 C++ 中使用它:未定义的引用

    我正在尝试编译一个staticRust 中的库 然后在我的 C 代码中使用它 注意这是从 C 调用 Rust 而不是相反 我浏览了我可以在网上找到的所有教程 并回答了类似的问题 我显然做错了什么 尽管我看不出是什么 我为我的问题创建了一个最
  • 无法在 Ubuntu Oracle Java 8 上运行 IntelliJ IDEA CE 12

    我使用的是 Ubuntu 13 04 和 Oracle Java 8 我已经设置了JAVA HOME and PATH变量 以下是尝试运行 IntelliJ 的终端输出 darren ubuntu opt intellij idea ce
  • 如何在 Mac OS X 中检测 SSD?

    是否有一种可靠 快速 确定性的方法 即not基准 来检查 Mac OS X 所在的系统驱动器是否是固态驱动器 还有其他指标可以表明磁盘处理并行访问的能力如何吗 我正在尝试调整我的程序将用于磁盘绑定操作的线程数 我对原始速度或寻道时间不感兴趣
  • Android FileProvider 删除文件

    我正在使用照片库应用程序 由于最近的牛轧糖更新 我无法从图库中删除文件 我发现我必须使用Fileprovider对于文件访问 我尝试了下面的代码 但它说 04 25 12 52 03 031 3204 4133 com zo tns1 E
  • 改造解析JSON动态键

    我是改造的新手 如何使用retrofit解析下面的Json data Aatrox id 266 title a Espada Darkin name Aatrox key Aatrox Thresh id 412 title o Guar
  • MongoDB $graphLookup 尝试获取树结构

    我正在尝试使用新的 MongoDB v3 4 graphLookup 聚合管道 我有这个简单的树集合 其中包含一些节点和父 DBRef id ObjectId 59380657bbdbfb36c18a80f2 name Root node
  • 如何在 Web 视图中单击链接时禁用橙色圆圈效果

    我试图捕捉鼠标点击位置 所以我写了一个onClick in the body标签 但每次我点击页面时 整个页面都会变成橙色一段时间 有什么设置可以禁用这个效果吗 根据this http groups google com group pho
  • 1000以上怎么取?

    如何从数据存储中获取超过 1000 条记录并将所有记录放入一个列表中以传递给 django 从版本 1 3 6 2010 年 8 月 17 日发布 开始 您CAN 从变更日志 http code google com p googleapp
  • 为什么我无法保存我的地址?

    我正在尝试使用 Django 开发一个电子商务网站 我想使用 ajax 来处理我的结账表单 当我添加 Ajax 时 填写表单并单击提交按钮后 我发现我的表单和数据没有通过进入我的管理员来保存 它也没有被重定向到return HttpResp
  • C# 中的不透明字典键模式

    我遇到过很多情况 其中访问键控集合 如字典 中的项目的模式因键的类型不是简单类型 字符串 整数 双精度等 而受到阻碍 并且不是您想要提升为实际命名类的东西 C 3 0 引入了编译器自动生成的匿名类型的概念 不像struct的 这些动态生成的
  • 使用正则表达式从字符串中提取数字

    我找到了这个 C 代码 然后根据我的需要进行了改进 但现在我想让它适用于所有数字数据类型 public static int intRemover string input string inputArray Regex Split inp
  • 删除 R 中 ggplot2 中的单个 x 轴刻度线?

    我正在 ggplot2 中制作条形图 出于演示原因 我需要在一些条形之间留有空格 我正在使用限制scale x discrete插入空条 这给了我需要的间距 群体之间的差距b and c在我的模拟数据中看起来很完美 但是之间的差距a and
  • 丢失代理类的类自定义注释

    我正在使用 Seam 使用 In 注释将 bean 注入到我的控制器中 注入的类有一个自定义注释 当调用injectedClass getClass getAnnotation annotationClass 时 它返回null 调试时 我
  • R:rm和remove有什么区别?

    有什么区别rm and remove http stat ethz ch R manual R patched library base html rm html 没有区别 remove是一个别名 定义为 remove lt rm 查看源代
  • 如何从 XSLT 输出与号 (&)

    我正在转换所有 into amp 在我的 XML 中 以便 XSLT 能够编译 我正在将 XML 样式转换为 HTML 但是 当 XSLT 填充文本框时 我需要 amp 显示为 例如 它显示 you amp me 在文本框中 但我需要查看
  • Mass Transit:当存在不同消息类型时确保消息处理顺序

    我是公共交通新手 我想了解它是否对我的情况有帮助 我正在构建一个使用 CQRS 事件源架构实现的示例应用程序 并且我需要一个服务总线 以便将命令堆栈创建的事件分派到查询堆栈反规范化器 假设我们的域中有一个聚合 我们称之为Photo 以及两个