我有一个事件流,我想调用一个函数,为每个事件返回一个承诺,问题是这个函数非常昂贵,所以我想一次最多处理 n 个事件。
这个卵石图可能是错误的,但这就是我想要的:
---x--x--xxxxxxx-------------x-------------> //Events
---p--p--pppp------p-p-p-----p-------------> //In Progress
-------d--d--------d-d-dd------dddd--------> //Promise Done
---1--21-2-34-----------3----4-3210-------- //QUEUE SIZE CHANGES
这是我到目前为止的代码:
var n = 4;
var inProgressCount = 0;
var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
.map((ev) => new Date().getTime());
var inProgress$ = events$.controlled();
var done$ = inProgress$
.tap(() => inProgressCount++)
.flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));
done$.subscribeOnNext((timestamp) => {
inProgressCount--;
inProgress$.request(Math.max(1, n - inProgressCount));
});
inProgress$.request(n);
这段代码有两个问题:
- 它正在使用
inProgressCount
var 随 side 更新
效应函数。
The done$ subscription is only called once when I request more than 1 item from the controlled stream. This is making the inProgressCount
var to update incorrectly, this eventually limits the queue to one at a time.
你可以在这里看到它的工作原理:http://jsbin.com/wivehonifi/1/edit?js,控制台,输出 http://jsbin.com/wivehonifi/1/edit?js,console,output
问题:
- 有更好的方法吗?
- 我怎样才能摆脱
inProgressCount
多变的?
Why is the done$ subscription only getting called once when requesting multiple items?
Update:
问题#3 的答案:switchMap 与 flatMapLatest 相同,所以这就是为什么我只得到最后一个。将代码更新为 flatMap 而不是 switchMap。
实际上你根本不需要使用背压。有一个运算符叫flatMapWithMaxConcurrent
这会为你做这件事。它本质上是调用的别名.map().merge(concurrency)
并且它只允许一次传输最大数量的流。
我在这里更新了你的 jsbin:http://jsbin.com/weheyuceke/1/edit?js,输出 http://jsbin.com/weheyuceke/1/edit?js,output
但我在下面注释了重要的一点:
const concurrency = 4;
var done$ = events$
//Only allows a maximum number of items to be subscribed to at a time
.flatMapWithMaxConcurrent(concurrency,
({timestamp}) =>
//This overload of `fromPromise` defers the execution of the lambda
//until subscription
Rx.Observable.fromPromise(() => {
//Notify the ui that this task is in progress
updatePanelAppend(inProgress, timestamp);
removeFromPanel(pending, timestamp);
//return the task
return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)
}));
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)