我使用 rxjs 来处理 websocket 连接
var socket = Rx.Observable.webSocket('wss://echo.websocket.org')
socket.resultSelector = (e) => e.data
我想定期(5秒)发送ping
消息并等待 3 秒接收pong
如果没有收到响应,则响应并订阅流。
我尝试这样做但没有成功。我承认我有点迷失所有可以处理超时、反跳或节流的操作员。
// periodically send a ping message
const ping$ = Rx.Observable.interval(2000)
.timeInterval()
.do(() => socket.next('ping'))
const pong$ = socket
.filter(m => /^ping$/.test(`${m}`))
.mergeMap(
ping$.throttle(2000).map(() => Observable.throw('pong timeout'))
)
pong$.subscribe(
(msg) => console.log(`end ${msg}`),
(err) => console.log(`err ${err}`),
() => console.log(`complete`)
)
但不幸的是,没有发送 ping。
我也尝试使用但没有成功。
const ping$ = Rx.Observable.interval(2000)
.timeInterval()
.do(() => socket.next('ping'))
const pong$ = socket
.filter(m => /^ping$/.test(`${m}`))
const heartbeat$ = ping$
.debounceTime(5000)
.mergeMap(() => Rx.Observable.timer(5000).takeUntil(pong$))
heartbeat$.subscribe(
(msg) => console.log(`end ${msg}`),
(err) => console.log(`err ${err}`),
() => console.log(`complete`)
)
任何帮助表示赞赏。
您可以使用race()
运算符始终仅连接到首先发出的 Observable:
function sendMockPing() {
// random 0 - 5s delay
return Observable.of('pong').delay(Math.random() * 10000 / 2);
}
Observable.timer(0, 5000)
.map(i => 'ping')
.concatMap(val => {
return Observable.race(
Observable.of('timeout').delay(3000),
sendMockPing()
);
})
//.filter(response => response === 'timeout') // remove all successful responses
.subscribe(val => console.log(val));
观看现场演示:https://jsbin.com/lavinah/6/edit?js,控制台 https://jsbin.com/lavinah/6/edit?js,console
这随机模拟响应采取0 - 5s
。当响应时间超过3s时Observable.of('timeout').delay(3000)
首先完成并且timeout
字符串通过以下方式传递给其观察者concatMap()
.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)