在 RxJS 5 中flatMap()
只是别名mergeMap()
:).
问题在于您使用回调的方式retryWhen()
操作员。它只被调用一次,然后每次错误信号到达时,它都会被推送到从此回调返回的 Observable 中。
在你的第二个例子中,你从返回 Observableattempts.flatMap
然后再次订阅该回调.zip(attempts, i => i)
。但是这个zip
运算符永远不会被调用,因为它是在该值已经被消耗后调用的attempts.flatMap
。这也是为什么Observable.range(1, 3)
总是从头开始。
我知道这看起来很混乱。请注意:
- 的回调
retryWhen()
仅被调用一次。
- 的回调
attempts.flatMap()
每次出现错误时都会调用。
所以你只需要重构你的代码,例如如下所示:
var source = Observable.create(obs => {
obs.next(1);
obs.next(2);
obs.error(new TechnicalError('error from source'));
})
.retryWhen(attempts => {
console.log('retryWhen callback');
let count = 0;
return attempts.flatMap(error => {
if (error instanceof TechnicalError) {
console.log(error);
return ++count >= 3 ? Observable.throw(error) : Observable.timer(count * 1000);
} else {
return Observable.throw(error);
}
});
})
.subscribe(
val => console.log(val),
err => console.log('subscribe error', err),
_ => console.log('complete')
);
这会打印到控制台:
1
2
retryWhen callback
TechnicalError { msg: 'error from source' }
1
2
TechnicalError { msg: 'error from source' }
1
2
TechnicalError { msg: 'error from source' }
subscribe error TechnicalError { msg: 'error from source' }
观看现场演示:https://jsbin.com/hobeda/3/edit?js,控制台 https://jsbin.com/hobeda/3/edit?js,console