我花了相当多的时间研究这个(出于学习目的),并将尝试尽可能彻底地解释这段代码的工作原理。
首先,这是原始代码,带注释:
import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';
function backoff(maxTries, ms) { // (1)
return pipe( // (2)
retryWhen(attempts => range(1, maxTries) // (3)
.pipe(
zip(attempts, (i) => i), // (4)
map(i => i * i), // (5)
mergeMap(i => timer(i * ms)) // (6)
)
)
); // (7)
}
ajax('/api/endpoint')
.pipe(backoff(3, 250))
.subscribe(data => handleData(data));
function handleData(data) {
// ...
}
- 很简单,我们正在创建自定义
backoff
运算符出retryWhen
操作员。我们稍后可以应用它pipe
功能。
- 在此背景下,
pipe
方法返回一个自定义运算符。
-
我们的自定义操作符将被修改retryWhen
操作员。它需要一个函数参数。这个函数将被调用once-具体来说,当这个retryWhen
第一次遇到/调用。顺便一提,retryWhen
开始发挥作用only当源可观察量产生错误时。然后,它会防止错误进一步传播并重新订阅源。如果源产生非错误结果(无论是在第一次订阅还是在重试时),retryWhen
被忽略并且不涉及。
几句话attempts
。这是一个可观察的。它不是可观察到的源。它是专门为retryWhen
。它只有一种用途:每当订阅(或重新订阅)源可观察结果导致错误时,attempts
发射一个next
。我们被给予attempts
并可以自由地使用它,以便以某种方式对源可观察的每个失败的订阅尝试做出反应。
这就是我们要做的。
首先我们创建range(1, maxTries)
,一个可观察量,对于我们愿意执行的每次重试都有一个整数。range
已准备好立即发射所有号码,但我们必须按兵不动:只有在另一次重试发生时,我们才需要一个新号码。所以,这就是为什么我们...
-
... 用拉链将其压缩attempts
。意思是,将每个发出的值结合起来attempts
具有单一值range
.
请记住,我们当前所在的函数只会被调用一次,那时,attempts
只会被解雇next
一次——对于最初失败的订阅。因此,此时,我们的两个压缩可观察量仅产生一个值。
顺便说一句,压缩成一个的两个可观察量的值是多少?该函数决定:(i) => i
。为了清楚起见,可以写成(itemFromRange, itemFromAttempts) => itemFromRange
。第二个参数没有使用,所以它被删除,第一个被重命名为i
.
这里发生的事情是我们简单地忽略了values被解雇attempts
,我们只对fact他们被解雇了。每当这种情况发生时,我们都会从中提取下一个值range
可观察到的...
-
...并将其平方。这是为了指数指数退避的一部分。
因此,现在每当(重新)订阅源失败时,我们手上就会有一个不断增加的整数(1、4、9、16...)。我们如何将该整数转换为下次重新订阅之前的时间延迟?
请记住,我们当前所在的这个函数,它必须返回一个可观察的,使用attempts
作为输入。由此产生的可观察对象仅构建一次。retryWhen
然后订阅生成的 observable,并且:每当生成的 observable 触发时,重试订阅源 observablenext
; calls complete
or error
每当结果 observable 触发那些相应的事件时,就在源 observable 上。
-
长话短说,我们需要做retryWhen
稍等一会。delay也许可以使用运算符,但是设置延迟的指数增长可能会很痛苦。反而,mergeMap
运算符开始发挥作用。
mergeMap
是两个运算符组合的快捷方式:map
and mergeAll
. map
只需将每个递增整数 (1, 4, 9, 16...) 转换为timer
可观察到的触发next
经过毫秒数后。mergeAll
forces retryWhen
实际订阅timer
。如果最后一点没有发生,我们得到的 observable 就会被触发next
立即与timer
可观察实例作为值。
此时,我们已经构建了自定义可观察对象,它将被使用retryWhen
决定何时尝试重新订阅源可观察对象。
就目前情况而言,我发现此实现有两个问题:
-
一旦我们产生的可观察到的火焰最后一次next
(导致最后一次尝试重新订阅),它也会立即触发complete
。除非源 observable 返回结果very很快(假设最后一次重试将成功),该结果将被忽略。
这是因为一旦retryWhen
hears complete
从我们的观察来看,它调用complete
在源上,它可能仍在发出 AJAX 请求的过程中。
如果所有重试均不成功,则源实际上会调用complete
而不是更符合逻辑error
.
为了解决这两个问题,我认为我们生成的可观察值应该触发error
最后,在给最后一次重试一些合理的时间来尝试完成其工作之后。
这是我对上述修复的实现,其中还考虑了弃用zip
最新的运算符rxjs v6
:
import { delay, dematerialize, map, materialize, retryWhen, switchMap } from "rxjs/operators";
import { concat, pipe, range, throwError, timer, zip } from "rxjs";
function backoffImproved(maxTries, ms) {
return pipe(
retryWhen(attempts => {
const observableForRetries =
zip(range(1, maxTries), attempts)
.pipe(
map(([elemFromRange, elemFromAttempts]) => elemFromRange),
map(i => i * i),
switchMap(i => timer(i * ms))
);
const observableForFailure =
throwError(new Error('Could not complete AJAX request'))
.pipe(
materialize(),
delay(1000),
dematerialize()
);
return concat(observableForRetries, observableForFailure);
})
);
}
我测试了这段代码,它似乎在所有情况下都能正常工作。我现在懒得详细解释;我怀疑有人会读上面的文字墙。
不管怎样,非常感谢@BenjaminGruenbaum 和@cartant 让我走上正确的道路,让我的头脑明白这一切。