如果我有很多活动,是否会导致资源阻塞或请求超时?
这是我的场景:
我有一个 api 控制器,它向消费者发送订单请求;我使用请求/响应模式来接收错误信息来自消费者的属性并基于该属性响应返回,如果它为空我想返回OK()
否则,返回BadRequest
or Ok
但有这样的消息:产品缺货通知客户.
在我的消费者中,我构建了一个路由表,其中包含 2 个活动:
-
创建订单活动:创建包含订单详细信息的订单。
-
预订产品活动:这减少了库存产品数量,if product quantity < 0
我将发布一条消息错误信息返回给消费者并补偿之前的活动。
public async Task Consume(ConsumeContext<ProcessOrder> context)
{
try
{
if (!string.IsNullOrEmpty(context.Message.ErrorMessage))
{
await context.RespondAsync<OrderSubmitted>(new
{
context.Message.OrderId,
context.Message.ErrorMessage
});
return;
}
RoutingSlipBuilder builder = new RoutingSlipBuilder(context.Message.OrderId);
// get configs
var settings = new Settings(_configuration);
// Add activities
builder.AddActivity(settings.CreateOrderActivityName, settings.CreateOrderExecuteAddress);
builder.SetVariables(new { context.Message.OrderId, context.Message.Address, context.Message.CreatedDate, context.Message.OrderDetails });
builder.AddActivity(settings.ReserveProductActivityName, settings.ReserveProductExecuteAddress);
builder.SetVariables(new { context.Message.OrderDetails });
await context.Execute(builder.Build());
await context.RespondAsync<OrderSubmitted>(new
{
context.Message.OrderId
});
}
catch (Exception ex)
{
_log.LogError("Can not create Order {OrderId}", context.Message.OrderId);
throw new Exception(ex.Message);
}
}
ReserveProductActivity 的代码:
public async Task<ExecutionResult> Execute(ExecuteContext<ReserveProductArguments> context)
{
var orderDetails = context.Arguments.OrderDetails;
foreach (var orderDetail in orderDetails)
{
var product = await _productRepository.GetByProductId(orderDetail.ProductId);
if (product == null) continue;
var quantity = product.SetQuantity(product.QuantityInStock - orderDetail.Quantity);
if (quantity < 0)
{
var errorMessage = "Out of stock.";
await context.Publish<ProcessOrder>(new
{
ErrorMessage = errorMessage
});
throw new RoutingSlipException(errorMessage);
}
await _productRepository.Update(product);
}
return context.Completed(new Log(orderDetails.Select(x => x.ProductId).ToList()));
}
消费者方法中的这行代码等待上下文.执行(builder.Build())
起初我以为它会构建路由表并在进入下一行之前首先执行所有活动,但事实并非如此。相反,它会立即转到下一行代码(响应回控制器),然后在执行活动之后,这不是我想要的。我需要首先检查第二个活动中的产品数量,然后根据返回到控制器的数量。
(当前,它总是首先响应控制器 - 之后的行buider.Buid()
,然后如果quantity < 0
它仍然会转到 Consumer 方法的第一个 if 条件,但由于它已经响应,我无法再次触发该 if 语句内的响应)。
简而言之,如果产品在第二个活动中仍然可用,我可以像平常一样发回响应(在context.Execute(builder.Build())
,但如果quantity < 0
- 我将其发布回消费者方法错误信息,我希望它跳转到 Consume 方法的第一个 if 条件(if(!string.IsNullOrEmpty(context.Message.ErrorMessage)) ...
)并基于错误信息通知客户。
这种做法有什么问题吗?我怎样才能实现这样的目标?
Thanks
它没有记录,但可以使用代理来执行路由表,并使用路由表的结果响应请求。您可以在单元测试中查看详细信息:
https://github.com/MassTransit/MassTransit/blob/master/tests/MassTransit.Tests/Courier/RequestRoutingSlip_Specs.cs#L20 https://github.com/MassTransit/MassTransit/blob/master/tests/MassTransit.Tests/Courier/RequestRoutingSlip_Specs.cs#L20
您可以创建代理(构建路由单并执行它)和响应代理 - 然后将两者配置在接收端点上:.Instance
消费者。
class RequestProxy :
RoutingSlipRequestProxy<Request>
{
protected override void BuildRoutingSlip(RoutingSlipBuilder builder, ConsumeContext<Request> request)
{
// get configs
var settings = new Settings(_configuration);
// Add activities
builder.AddActivity(settings.CreateOrderActivityName, settings.CreateOrderExecuteAddress);
builder.SetVariables(new { context.Message.OrderId, context.Message.Address, context.Message.CreatedDate, context.Message.OrderDetails });
builder.AddActivity(settings.ReserveProductActivityName, settings.ReserveProductExecuteAddress);
builder.SetVariables(new { context.Message.OrderDetails });
}
}
class ResponseProxy :
RoutingSlipResponseProxy<Request, Response>
{
protected override Response CreateResponseMessage(ConsumeContext<RoutingSlipCompleted> context, Request request)
{
return new Response();
}
}
然后,您可以从使用者调用它,或者将排序逻辑放入代理中 - 无论哪种方式有意义,然后使用控制器中的请求客户端发送请求并等待响应。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)