免责声明:这不是对您问题的回答,而是一条预防性信息,为什么您不应该做你打算做的事。
虽然 RMQ 等消息代理和 MassTransit 等消息中间件库非常适合集成,但我强烈建议不要使用消息代理进行事件溯源。我可以参考我的旧答案事件溯源:何时(以及不)应该使用消息队列? https://stackoverflow.com/questions/41131609/event-sourcing-when-and-not-should-i-use-message-queue这解释了其背后的原因。
您发现自己的原因之一 - 活动顺序永远无法得到保证。
另一个明显的原因是,从通过消息代理发布的事件构建读取模型有效地消除了重放的可能性,并构建了需要从一开始就开始处理事件的新读取模型,但它们得到的只是以下事件:正在出版now.
聚合形成事务边界,因此每个命令都需要保证它在一个事务内完成。虽然 MT 支持交易中间件 http://masstransit-project.com/MassTransit/advanced/transactions.html,它只保证您获得支持它们的依赖项的事务,但不保证您获得支持它们的依赖项的事务context.Publish(@event)
在消费者主体中,因为 RMQ 不支持交易。您很有可能提交更改并且不会在读取端获取事件。因此,事件存储的经验法则是您应该能够订阅更改流从商店,并且不要发布代码中的事件,除非这些事件是集成事件而不是域事件。
对于事件溯源,每个读取模型在其投影的事件流中保留自己的检查点至关重要。消息代理不会给你这种权力,因为“检查点”实际上是你的队列,一旦消息从队列中消失 - 它就永远消失了,不会再回来。
关于实际问题:
您可以使用消息拓扑配置 http://masstransit-project.com/MassTransit/advanced/topology/message.html为不同的消息设置相同的实体名称,然后将它们发布到同一个交易所,但这属于“滥用”类别,就像克里斯在该页面上写道的那样。我还没有尝试过,但你绝对可以尝试一下。消息 CLR 类型是元数据的一部分,因此不应存在反序列化问题。
但同样,将消息放入同一个交换中不会给您提供任何排序保证,除了所有消息都将进入消费服务的一个队列中这一事实之外。
您至少必须根据聚合 ID 设置分区过滤器,以防止并行处理同一聚合的多条消息。顺便说一句,这对于集成也很有用。我们就是这样做的:
void AddHandler<T>(Func<ConsumeContext<T>, string> partition) where T : class
=> ep.Handler<T>(
c => appService.Handle(c, aggregateStore),
hc => hc.UsePartitioner(8, partition));
AddHandler<InternalCommands.V1.Whatever>(c => c.Message.StreamGuid);