我有一个非常简单的 timeInterval 可观察对象,我想在不断开订阅者连接的情况下启动/停止传输(无论可观察状态如何,都应该坐下来等待)。有可能吗?如果可以的话怎么办?
var source = Rx.Observable
.interval(500)
.timeInterval()
.map(function (x) { return x.value + ':' + x.interval; })
.take(10);
var subscription = source.subscribe(
function (x) {
$("#result").append('Next: ' + x + ' ');
},
function (err) {
$("#result").append('Error: ' + err);
},
function () {
$("#result").append('Completed');
});
一般评论:我看到的大多数示例都展示了如何定义可观察量和订阅者。我如何影响现有对象的行为?
取决于停止/恢复信号的来源。我能想到的最简单的方法是pausable操作员 https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/pausable.md,正如文档所说,它更适合热可观察量。所以在下面的示例代码中,我删除了take(10)
(您的暂停信号现在通过pauser
主题),并添加share
将您的可观测值变成热门观测值。
- 关于热与冷,请查看图示了各自的数据流 https://stackoverflow.com/questions/32190445/hot-and-cold-observables-are-there-hot-and-cold-operators/34669444#34669444.
- 关于主题,您还可以查看相应的语义 https://stackoverflow.com/questions/34849873/what-are-the-semantics-of-different-rxjs-subjects/34860777#34860777
var pauser = new Rx.Subject();
var source = Rx.Observable
.interval(500)
.timeInterval()
.map(function (x) { return x.value + ':' + x.interval; })
.share()
.pausable(pauser);
var subscription = source.subscribe(
function (x) {
$("#result").append('Next: ' + x + ' ');
},
function (err) {
$("#result").append('Error: ' + err);
},
function () {
$("#result").append('Completed');
});
// To begin the flow
pauser.onNext(true); // or source.resume();
// To pause the flow at any point
pauser.onNext(false); // or source.pause();
这里有一个更复杂的例子 http://jsfiddle.net/jsy2kva5/1/这将每 10 项暂停您的源:
// Helper functions
function emits ( who, who_ ) {return function ( x ) {
who.innerHTML = [who.innerHTML, who_ + " emits " + JSON.stringify(x)].join("\n");
};}
var pauser = new Rx.Subject();
var source = Rx.Observable
.interval(500)
.timeInterval()
.map(function (x) { return x.value + ':' + x.interval; })
.share();
var pausableSource = source
.pausable(pauser);
source
.scan(function (acc, _){return acc+1}, 0)
.map(function(counter){return !!(parseInt(counter/10) % 2)})
.do(emits(ta_validation, 'scan'))
.subscribe(pauser);
var subscription = pausableSource.subscribe(
function (x) {
$("#ta_result").append('Next: ' + x + ' ');
},
function (err) {
$("#ta_result").append('Error: ' + err);
},
function () {
$("#ta_result").append('Completed');
});
现在你应该知道第二个问题的答案了。将您获得的可观察量与相关的 RxJS 运算符相结合以实现您的用例。这就是我在这里所做的。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)