MassTransit - 等待所有活动完成然后继续处理

2024-02-04

如果我有很多活动,是否会导致资源阻塞或请求超时?

这是我的场景:

我有一个 api 控制器,它向消费者发送订单请求;我使用请求/响应模式来接收错误信息来自消费者的属性并基于该属性响应返回,如果它为空我想返回OK()否则,返回BadRequest or Ok但有这样的消息:产品缺货通知客户.

在我的消费者中,我构建了一个路由表,其中包含 2 个活动:

  • 创建订单活动:创建包含订单详细信息的订单。
  • 预订产品活动:这减少了库存产品数量,if product quantity < 0我将发布一条消息错误信息返回给消费者并补偿之前的活动。

    public async Task Consume(ConsumeContext<ProcessOrder> context)
    {
        try
        {
            if (!string.IsNullOrEmpty(context.Message.ErrorMessage))
            {
                await context.RespondAsync<OrderSubmitted>(new
                {
                    context.Message.OrderId,
                    context.Message.ErrorMessage
                });
    
                return;
            }
    
            RoutingSlipBuilder builder = new RoutingSlipBuilder(context.Message.OrderId);
            // get configs
            var settings = new Settings(_configuration);
    
            // Add activities
            builder.AddActivity(settings.CreateOrderActivityName, settings.CreateOrderExecuteAddress);
            builder.SetVariables(new { context.Message.OrderId, context.Message.Address, context.Message.CreatedDate, context.Message.OrderDetails });
    
            builder.AddActivity(settings.ReserveProductActivityName, settings.ReserveProductExecuteAddress);
            builder.SetVariables(new { context.Message.OrderDetails });
    
    
            await context.Execute(builder.Build());
    
            await context.RespondAsync<OrderSubmitted>(new
            {
                context.Message.OrderId
            });
    
        }
        catch (Exception ex)
        {
            _log.LogError("Can not create Order {OrderId}", context.Message.OrderId);
            throw new Exception(ex.Message);
        }
    }
    

ReserveProductActivity 的代码:

    public async Task<ExecutionResult> Execute(ExecuteContext<ReserveProductArguments> context)
    {
        var orderDetails = context.Arguments.OrderDetails;

        foreach (var orderDetail in orderDetails)
        {
            var product = await _productRepository.GetByProductId(orderDetail.ProductId);
            if (product == null) continue;

            var quantity = product.SetQuantity(product.QuantityInStock - orderDetail.Quantity);


            if (quantity < 0)
            {
                var errorMessage = "Out of stock.";
                await context.Publish<ProcessOrder>(new
                {
                    ErrorMessage = errorMessage
                });
                throw new RoutingSlipException(errorMessage);
            }

            await _productRepository.Update(product);
        }

        return context.Completed(new Log(orderDetails.Select(x => x.ProductId).ToList()));
    }

消费者方法中的这行代码等待上下文.执行(builder.Build())

起初我以为它会构建路由表并在进入下一行之前首先执行所有活动,但事实并非如此。相反,它会立即转到下一行代码(响应回控制器),然后在执行活动之后,这不是我想要的。我需要首先检查第二个活动中的产品数量,然后根据返回到控制器的数量。

(当前,它总是首先响应控制器 - 之后的行buider.Buid(),然后如果quantity < 0它仍然会转到 Consumer 方法的第一个 if 条件,但由于它已经响应,我无法再次触发该 if 语句内的响应)。

简而言之,如果产品在第二个活动中仍然可用,我可以像平常一样发回响应(在context.Execute(builder.Build()),但如果quantity < 0- 我将其发布回消费者方法错误信息,我希望它跳转到 Consume 方法的第一个 if 条件(if(!string.IsNullOrEmpty(context.Message.ErrorMessage)) ...)并基于错误信息通知客户。

这种做法有什么问题吗?我怎样才能实现这样的目标?

Thanks


它没有记录,但可以使用代理来执行路由表,并使用路由表的结果响应请求。您可以在单元测试中查看详细信息:

https://github.com/MassTransit/MassTransit/blob/master/tests/MassTransit.Tests/Courier/RequestRoutingSlip_Specs.cs#L20 https://github.com/MassTransit/MassTransit/blob/master/tests/MassTransit.Tests/Courier/RequestRoutingSlip_Specs.cs#L20

您可以创建代理(构建路由单并执行它)和响应代理 - 然后将两者配置在接收端点上:.Instance消费者。

  class RequestProxy :
        RoutingSlipRequestProxy<Request>
    {
        protected override void BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<Request> request)
        {
        // get configs
        var settings = new Settings(_configuration);

        // Add activities
        builder.AddActivity(settings.CreateOrderActivityName, settings.CreateOrderExecuteAddress);
        builder.SetVariables(new { context.Message.OrderId, context.Message.Address, context.Message.CreatedDate, context.Message.OrderDetails });

        builder.AddActivity(settings.ReserveProductActivityName, settings.ReserveProductExecuteAddress);
        builder.SetVariables(new { context.Message.OrderDetails });
        }
    }


    class ResponseProxy :
        RoutingSlipResponseProxy<Request, Response>
    {
        protected override Response CreateResponseMessage(ConsumeContext<RoutingSlipCompleted> context, Request request)
        {
            return new Response();
        }
    }

然后,您可以从使用者调用它,或者将排序逻辑放入代理中 - 无论哪种方式有意义,然后使用控制器中的请求客户端发送请求并等待响应。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

MassTransit - 等待所有活动完成然后继续处理 的相关文章

随机推荐

  • suPHP 有安全性吗?

    我不久前问过这个问题 尽管我提供了一些赏金 但我从未得到太多答案 请参阅here https stackoverflow com questions 8032140 implementing log in alongside suphp 更
  • App.config 似乎被忽略

    作为重构操作的结果 我有了这个类库 我添加了一个 App config 文件并添加了如下内容
  • Spring Websockets STOMP - 获取客户端IP地址

    有没有办法获取STOMP客户端IP地址 我正在拦截入站通道 但我看不到任何方法来检查 IP 地址 任何帮助表示赞赏 您可以在握手期间将客户端 IP 设置为 WebSocket 会话属性HandshakeInterceptor public
  • 检查是否启用了锁定

    我必须检查设置中是否启用了系统锁定 我使用下面的代码行 boolean b android provider Settings System getInt getContentResolver Settings System LOCK PA
  • 如何使用新的 Slick 2.0 HList 克服 22 列限制?

    我目前正在编写 Slick 代码来针对具有两个表 gt 22 列的旧模式 我如何使用新的 HList 代码 http slick typesafe com doc 2 0 0 M3 api scala slick collection he
  • 经过后台推送的一些测试/调试后,iPhone 上的“pushDisallowed”和“决定:绝对不能继续”

    我正在创建一个 Xamarin Forms 应用程序 它通过 Azure 通知中心使用后台推送 该应用程序针对 Android 和 iOS 经过一些初步原型设计和测试后 一切在两个平台上都运行良好 对于 iOS 我发送包含以下内容的推送 a
  • 如何阻止 UIBarButtonItem 文本被截断?

    我有一个UIBarButtonItem在导航栏中 文本标题为 保存 当我切换到全屏时UIPopoverController然后关闭它 我的 UIBarButtonItem 中的文本被截断为 S e 对于所有其他的 Segues 和视图 我返
  • VBA Excel - IFERROR 和 VLOOKUP 错误

    我正在尝试在 Excel VBA 中创建与 IFERROR VLOOKUP 1 公式等效的内容 其中该函数将在数据表中查找文本 如果文本位于表中 则返回第五列中的数字如果不是 则为 1 我已经在 Excel 中测试了上述公式 它给了我想要的
  • 无缝 HTML5 视频循环

    我进行了广泛的搜索以找到解决此问题的方法 但没有成功 我创建了一个 4 秒的视频剪辑 可以在编辑器中无缝循环 然而 当剪辑通过 Safari Chrome 或 Firefox 在页面中运行时 从结尾到开头的播放会出现一个小但明显的暂停 我尝
  • java.lang.NullPointerException Selenium 2 类

    当我的程序从本地计算机运行时运行良好 无需使用带有远程 Web 驱动程序的 selenium 网格 但是 当我使用带有远程 Web 驱动程序的 selenium 网格设置相同的测试用例时 在 eclipse 中收到消息说 java lang
  • 获取当前正在执行的线程的TThread对象?

    我想要一个像 GetCurrentThread 这样的函数 它返回当前执行线程的 TThread 对象 我知道有一个 Win32 API 调用 GetCurrentThread 但它返回线程 Id 如果有可能从该 ID 获取 TThread
  • Python错误:NameError:名称未定义[重复]

    这个问题在这里已经有答案了 import numpy as np from scipy import optimize as opt import time def grad d weight 0 0 learnrate 0 01 tol
  • 未启用延迟段创建功能 (ORA-00439)

    I have sql包含 60 多个表的 DDL 的脚本文件 我正在尝试将脚本复制粘贴到 SQL Developer 中 连接到数据库 Oracle Database 11g Express Edition Release 11 2 0 2
  • zurb 基金会中心网格中的内容

    我试图将图像水平居中到 zurb 列中 但这似乎不可能 我尝试了一切 到处搜索 但我无法让它工作 这是我的代码 div class row div class twelve columns br img src img 06 jpeg al
  • 如何删除 Rabbitmq 以便我可以重新安装

    我遇到了麻烦 所以我进入注册表并删除了rabbitmq的服务条目 现在 当我尝试重新安装时 它说它已经存在 但无法启动 因为我删除了它 我可以执行以下操作sc delete rabbitmq 如何完全删除它的所有痕迹并从头开始重新安装 我猜
  • 我想扩展 std::string,但不是出于您可能认为的原因

    我有一个有效地接受字符串的方法 但是 我想要使用的字符串子集非常有限 我正在考虑将 std string 作为某个类进行 typedef ing 并显式调用函数 不过 我不确定这是否有效 有想法吗 通常的规则仍然适用 该类不是设计为继承的
  • 检查 iOS 应用程序上的互联网连接,Cordova Phonegap 3.3.0 无法正常工作

    我尝试过以下this http cordova apache org docs en 3 3 0 cordova connection connection md html The 20Command Line 0AInterfaceCor
  • ASP.Net MVC 基于安全性隐藏/显示菜单项

    我正在开发一个 ASP Net MVC 3 网站 Layout 主视图包含一个菜单 我想根据您是否登录以及您所处的角色隐藏菜单中的一些项目 目前使用这样的代码可以工作 if HttpContext Current User Identity
  • MySQL默认值错误与ON DUPLICATE KEY UPDATE

    为什么我会收到此错误 MySQL 版本是否发生了某些变化 导致此操作 曾一度有效 现在失败 INSERT INTO 未指定 user id 值would如果插入已完成 则需要 但由于 id 1 已经存在 因此这应该成为更新并且有效 mysq
  • MassTransit - 等待所有活动完成然后继续处理

    如果我有很多活动 是否会导致资源阻塞或请求超时 这是我的场景 我有一个 api 控制器 它向消费者发送订单请求 我使用请求 响应模式来接收错误信息来自消费者的属性并基于该属性响应返回 如果它为空我想返回OK 否则 返回BadRequest