如何在 MassTransit 3.0 中使用分散/聚集模式实现传奇

2024-03-29

吉米·博加德 (Jimmy Bogard) 描述麦当劳快餐连锁店here https://lostechies.com/jimmybogard/2013/03/11/saga-implementation-patterns-observer/将其与一个分散聚集模式。 http://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html

Workflow image stolen from above article: enter image description here

初步实施思路:

为所有食品站都会获得的所有类型的 FoodOrdered 事件提供一个通用接口,然后每个食品站将能够消费/创建其各自的项目并发布一个共同的完成事件。例如:薯条和汉堡站收到一条有关薯条订单的消息,薯条站消耗该订单并宣布传奇正在侦听的 ItemDoneEvent。

最初的担忧:

由于传奇并不关心完成的食物类型,只关心所有食物都已完成这一事实,这似乎是一个OK解决方案。然而after阅读警告here http://docs.masstransit-project.com/en/latest/configuration/gotchas.html#trying-to-share-a-queue关于共享队列并注意到MassTransit 3.0 中已删除 Consumer.Conditional 过滤 http://docs.masstransit-project.com/en/latest/migrating/index.html#consumes-t-to-iconsumer-t感觉框架好像在说“用这种方法会发生坏事(TM)”。但我不确定如果不创建消息请求和响应以及为厨房中的每个食品关联事件,您还会如何做到这一点。例如:FriesOrdered、BurgerOrdered FriesCooked、BurgerCooked。如果您必须对厨房中的每件物品都执行此操作,这会非常乏味吗?

考虑到上述问题 - 这种类型的工作流程的一个好的传奇示例是什么样的?


我遇到了类似的问题 - 需要发布几十条命令(所有相同的界面,IMyRequest)然后等待。

实际上我的命令启动了其他传奇,它发布了IMyRequestDone在处理结束时没有标记 saga 已完成。 (需要稍后完成它们。)因此,我不是在父 saga 中保存已完成的嵌套 saga 的数量,而是查询子 saga 实例的状态。

检查每一个MyRequestDone信息:

Schedule(() => FailSagaOnRequestsTimeout, x => x.CheckToken, x =>
{
    // timeout for all requests
    x.Delay = TimeSpan.FromMinutes(10);
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});


During(Active,
    When(Xxx)
        .ThenAsync(async context =>
        {
            await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
            await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));

            context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow + FailSagaOnRequestsTimeout.Delay;
            context.Instance.WaitingMyResponsesCount = 2;
        })
        .TransitionTo(WaitingMyResponses)
        .Schedule(FailSagaOnRequestsTimeout, context => new FailSagaCommand(context.Instance))
    );

During(WaitingMyResponses,
    When(MyRequestDone)
        .Then(context =>
        {
            if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow)
                throw new TimeoutException();
        })
        .If(context =>
        {
            var db = serviceProvider.GetRequiredService<DbContext>();
            var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
            var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
                requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing)); // assume 3 states of request - Processing, Done and Failed
            return allDone;
        }, x => x
            .Unschedule(FailSagaOnRequestsTimeout)
            .TransitionTo(Active))
        )
        .Catch<TimeoutException>(x => x.TransitionTo(Failed))
);

During(WaitingMyResponses,
    When(FailSagaOnRequestsTimeout.Received)
        .TransitionTo(Failed)

定期检查所有请求是否已完成(通过“减少 NServiceBus Saga 负载”):

Schedule(() => CheckAllRequestsDone, x => x.CheckToken, x =>
{
    // check interval
    x.Delay = TimeSpan.FromSeconds(15);
    x.Received = e => e.CorrelateById(context => context.Message.CorrelationId);
});

During(Active,
    When(Xxx)
        .ThenAsync(async context =>
        {
            await context.Publish(context => new MyRequestCommand(context.Instance, "foo"));
            await context.Publish(context => new MyRequestCommand(context.Instance, "bar"));

            context.Instance.WaitingMyResponsesTimeoutedAt = DateTime.UtcNow.AddMinutes(10);
            context.Instance.WaitingMyResponsesCount = 2;
        })
        .TransitionTo(WaitingMyResponses)
        .Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance))
    );

During(WaitingMyResponses,
    When(CheckAllRequestsDone.Recieved)
        .Then(context =>
        {
            var db = serviceProvider.GetRequiredService<DbContext>();
            var requestsStates = db.MyRequestStates.Where(x => x.ParentSagaId == context.Instance.CorrelationId).Select(x => x.State).ToList();
            var allDone = requestsStates.Count == context.Instance.WaitingMyResponsesCount &&
                requestsStates.All(x => x != nameof(MyRequestStateMachine.Processing));
            if (!allDone)           
            {
                if (context.Instance.WaitingMyResponsesTimeoutedAt < DateTime.UtcNow + CheckAllRequestsDone.Delay)              
                    throw new TimeoutException();
                throw new NotAllDoneException();
            }
        })
        .TransitionTo(Active)
        .Catch<NotAllDoneException>(x => x.Schedule(CheckAllRequestsDone, context => new CheckAllRequestsDoneCommand(context.Instance)))
        .Catch<TimeoutException>(x => x.TransitionTo(Failed));
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在 MassTransit 3.0 中使用分散/聚集模式实现传奇 的相关文章

随机推荐