我正在研究一个用例,要求如果可观察量在一定时间内没有发出值,那么我们应该做一些副作用。
给出一个实际用例:
- 打开网络套接字连接
- 如果在 X 时间内没有发送/接收消息,则关闭 Web 套接字连接并通知用户
这需要在每个发出的值上以及在初始订阅可观察值时启动计时器,然后在分配的时间后或直到发出计时器重置的值时运行某些函数。我正在努力以 Rx 方式做到这一点。任何帮助,将不胜感激 :)
debounceTime https://www.learnrxjs.io/operators/filtering/debouncetime.html是您正在寻找的运算符:如果在特定超时内没有其他运算符跟随,它只会发出一个值。监听第一条消息debounce
d Stream 会让你超时并清理你的 websocket 连接。如果您需要从流打开开始超时,您可以简单地startWith
。具体来说:
messages$.startWith(null)
.debounceTime(timeout)
.take(1)
.subscribe(() => { /* side effects */ });
Edit:相反,如果您希望在超时时完全结束消息流(例如,您在 onComplete 处理程序中进行清理),那么只需填充debounceTime
into a takeUntil
:
messages$.takeUntil(
messages$.startWith(null)
.debounceTime(timeout)
).subscribe(timeout_observer);
With a timeout_observable: Observer<TMessage>
其中包含您的清理 onComplete。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)