我有以下代码:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull final ObservableEmitter<String> s) throws Exception {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
s.onNext("1");
s.onComplete();
}
});
thread.setName("background-thread-1");
thread.start();
}
}).map(new Function<String, String>() {
@Override
public String apply(@NonNull String s) throws Exception {
String threadName = Thread.currentThread().getName();
logger.logDebug("map: thread=" + threadName);
return "map-" + s;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String s) {
String threadName = Thread.currentThread().getName();
logger.logDebug("onNext: thread=" + threadName + ", value=" + s);
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
String threadName = Thread.currentThread().getName();
logger.logDebug("onComplete: thread=" + threadName);
}
});
这是输出:
map: thread=background-thread-1
onNext: thread=background-thread-1, value=map-1
onComplete: thread=background-thread-1
重要细节:我正在打电话subscribe
来自另一个线程的方法(main
Android 中的线程)。
所以看起来像Observable
类是同步的,默认情况下它执行所有操作(像这样的运算符map
+ 通知订阅者)在发出事件的同一线程上(s.onNext
), 正确的?我想知道......这是有意的行为还是我只是误解了什么?其实我至少是期待的onNext
and onComplete
回调将在调用者的线程上调用,而不是在发出事件的线程上调用。我是否正确理解,在这种特殊情况下,实际调用者的线程并不重要?至少当事件异步生成时是这样。
另一个问题 - 如果我从某些外部源接收一些 Observable 作为参数(即我自己不生成它)怎么办...作为其用户,我无法检查它是同步还是异步,并且我只需要明确指定我想通过以下方式接收回调subscribeOn
and observeOn
方法,对吗?
Thanks!
RxJava 对于并发性没有任何看法。如果您不使用任何其他机制(例如observeOn/subscribeOn),它将在订阅线程上生成值。请不要在运算符中使用诸如 Thread 之类的低级构造,否则可能会违反约定。
由于使用Thread,onNext将从调用Thread('background-thread-1')调用。订阅发生在调用(UI 线程)上。链下的每个运算符都将从“background-thread-1”调用线程调用。 onNext 订阅也将从“background-thread-1”调用。
如果您想不在调用线程上生成值,请使用:subscribeOn。如果您想将线程切换回主线程,请在链中的某个位置使用observeOn。最有可能在订阅之前。
Example:
Observable.just(1,2,3) // creation of observable happens on Computational-Threads
.subscribeOn(Schedulers.computation()) // subscribeOn happens only once in chain. Nearest to source wins
.map(integer -> integer) // map happens on Computational-Threads
.observeOn(AndroidSchedulers.mainThread()) // Will switch every onNext to Main-Thread
.subscribe(integer -> {
// called from mainThread
});
这是一个很好的解释。http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html http://tomstechnicalblog.blogspot.de/2016/02/rxjava-understanding-observeon-and.html
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)