情况
我有一个系统,其中一个请求会产生两个响应。请求和响应有相应的可观察量:
IObservable<RequestSent> _requests;
IObservable<MainResponseReceived> _mainResponses;
IObservable<SecondResponseReceived> _secondaryResponses;
可以保证RequestSent
事件发生早于MainResponseReceived
and SecondaryResponseReceived
但响应的顺序是随机的。
我拥有的
最初我想要处理这两个响应的处理程序,所以我压缩了可观察量:
_requests
.SelectMany(async request =>
{
var main = _mainResponses.FirstAsync(m => m.Id == request.Id);
var secondary = _secondaryResponses.FirstAsync(s => s.Id == request.Id);
var zippedResponse = main.Zip(secondary, (m, s) => new MainAndSecondaryResponseReceived {
Request = request,
Main = m,
Secondary = s
});
return await zippedResponse.FirstAsync(); ;
})
.Subscribe(OnMainAndSecondaryResponseReceived);
我需要的
现在我还需要处理MainResponseReceived
无需等待 secondaryResponseRecieved,并且必须保证 OnMainResponseRecieved 在之前完成OnMainAndSecondaryResponseReceived
叫做
请问这两个订阅如何定义?
测试用例1:
-
RequestSent
occurs
-
MainResponseReceived
发生 -> OnMainResponseReceived 被调用
-
SecondaryResponseReceive
d 发生 -> OnMainAndSecondaryResponseReceived 被调用
测试案例2:
-
RequestSent
occurs
-
SecondaryResponseReceived
occurs
-
MainResponseReceived occurs
-> OnMai ResponseReceived 被调用 -> OnMainAnd secondary ResponseReceived 被调用
我认为你几乎走在正确的轨道上。我不会再摆弄所有异步的东西 - 这只会让事情变得复杂。
尝试这个查询:
var query =
_requests
.SelectMany(request =>
_mainResponses.Where(m => m.Id == request.Id).Take(1)
.Do(m => OnMainResponseReceived(m))
.Zip(
_secondaryResponses.Where(s => s.Id == request.Id).Take(1),
(m, s) => new MainAndSecondaryResponseReceived()
{
Request = request,
Main = m,
Secondary = s
}));
var subscription =
query.Subscribe(x => OnMainAndSecondaryResponseReceived(x));
The .Do(...)
是代码中重要的缺失部分。它确保OnMainResponseReceived
之前被调用OnMainAndSecondaryResponseReceived
无论主要响应还是次要响应先出现。
我对此进行了测试:
Subject<RequestSent> _requestsSubject = new Subject<RequestSent>();
Subject<MainResponseReceived> _mainResponsesSubject = new Subject<MainResponseReceived>();
Subject<SecondResponseReceived> _secondaryResponsesSubject = new Subject<SecondResponseReceived>();
IObservable<RequestSent> _requests = _requestsSubject.AsObservable();
IObservable<MainResponseReceived> _mainResponses = _mainResponsesSubject.AsObservable();
IObservable<SecondResponseReceived> _secondaryResponses = _secondaryResponsesSubject.AsObservable();
_requestsSubject.OnNext(new RequestSent() { Id = 42 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 42 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 42 });
_requestsSubject.OnNext(new RequestSent() { Id = 99 });
_mainResponsesSubject.OnNext(new MainResponseReceived() { Id = 99 });
_secondaryResponsesSubject.OnNext(new SecondResponseReceived() { Id = 99 });
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)