假设我有两个可能无限的流:
s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
我想要merge流,然后使用缓慢的异步操作映射合并流(例如,在 Bacon 中)fromPromise
and flatMapConcat
).
我可以将它们与merge
:
me = a12b3.c45d6.7e...
然后是地图
s1 = a..b..c..d..e...
s2 = 1.2.3.4.5.6.7...
me = a12b3.c45d6.7e...
mm = a..1..2..b..3..c..4..5..
如你所见greedier s2
从长远来看,流具有优势。这是不受欢迎的行为.
The merge行为不好,因为我想要某种背压来进行更多交错、“公平”、“循环”合并。几个例子desired行为:
s1 = a.....b..............c...
s2 = ..1.2.3..................
mm = a...1...b...2...3....c...
s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...
一种思考方式是s1
and s2
将任务发送给同时只能处理一项任务的工作人员。和merge
and flatMapConcat
我会得到一个贪婪的任务管理器,但我想要一个更公平的。
我想找到一个简单而优雅的解决方案。如果它可以轻松推广到任意数量的流,那就太好了:
// roundRobinPromiseMap(streams: [Stream a], f: a -> Promise b): Stream b
var mm = roundRobinPromiseMap([s1, s2], slowAsyncFunc);
使用 RxJS 或其他 Rx 库的解决方案也很好。
澄清
不是 zipAsArray
我不想要:
function roundRobinPromiseMap(streams, f) {
return Bacon.zipAsArray.apply(null, streams)
.flatMap(Bacon.fromArray)
.flatMapConcat(function (x) {
return Bacon.fromPromise(f(x));
});
}
比较示例大理石图:
s1 = a.....b..............c.......
s2 = ..1.2.3......................
mm = a...1...b...2...3....c....... // wanted
zip = a...1...b...2........c...3... // zipAsArray based
是的,我会遇到缓冲问题
...但我也会直接unfair one:
function greedyPromiseMap(streams, f) {
Bacon.mergeAll(streams).flatMapConcat(function (x) {
return Bacon.fromPromise(f(x));
});
}
大理石图
s1 = a.........b..........c...
s2 = ..1.2.3..................
mm = a...1...2...b...3....c...
merge = a...1...2...3...b....c...