我有以下问题:
我有一个可观察量正在做一些工作,但其他可观察量需要该可观察量的输出才能工作。我曾尝试多次订阅同一个可观察量,但在日志中我看到原始可观察量已启动多次。
这就是我的观察结果,即创建对象:
Observable.create((Observable.OnSubscribe<Api>) subscriber -> {
if (mApi == null) {
//do some work
}
subscriber.onNext(mApi);
subscriber.unsubscribe();
})
这就是我需要该对象的可观察对象
loadApi().flatMap(api -> api....()));
我在用着
.subscribeOn(Schedulers.io()) observable.observeOn(AndroidSchedulers.mainThread())
.unsubscribeOn(Schedulers.io()
在所有可观察到的情况下。
我不确定我是否正确理解了您的问题,但我认为您正在寻找一种在多个订阅者之间共享可观察量的排放的方法。有几种方法可以做到这一点。首先,您可以使用可连接的可观察的 http://reactivex.io/RxJava/javadoc/rx/observables/ConnectableObservable.html像这样:
ConnectableObservable<Integer> obs = Observable.range(1,3).publish();
obs.subscribe(item -> System.out.println("Sub A: " + item));
obs.subscribe(item -> System.out.println("Sub B: " + item));
obs.connect(); //Now the source observable starts emitting items
Output:
Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3
或者,您可以使用发布主题 http://reactivex.io/RxJava/javadoc/rx/subjects/PublishSubject.html:
PublishSubject<Integer> subject = PublishSubject.create(); //Create a publish subject
subject.subscribe(item -> System.out.println("Sub A: " + item)); //Subscribe both subscribers on the publish subject
subject.subscribe(item -> System.out.println("Sub B: " + item));
Observable.range(1,3).subscribe(subject); //Subscribe the subject on the source observable
Output:
Sub A: 1
Sub B: 1
Sub A: 2
Sub B: 2
Sub A: 3
Sub B: 3
这两个示例都是单线程的,但您可以轻松添加observeOn或subscirbeOn调用以使它们异步。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)