1、RxJava 如何实现线程切换?
subscribeOn
是通过新建 Observable
的方式,使用 OnSubscribe
类的方式去做到线程切换的。
observeOn
是通过 operator
操作符的形式去完成线程切换的,所以他的作用域和其他操作符一样,是调用 observeOn
之后的链路。
-
Schedulers.io()
代表 io 操作的线程, 通常用于网络,读写文件等 io 密集型的操作
-
Schedulers.computation()
代表 CPU 计算密集型的操作, 例如需要大量计算的操作
-
Schedulers.newThread()
代表一个常规的新线程
-
AndroidSchedulers.mainThread()
代表 Android 的主线程
生产者线程调度流程概括
-
Schedulers.io()
等价于 new IoScheduler()
。
-
new IoScheduler()
Rxjava 创建了线程池,为后续创建线程做准备,同时创建并运行了一个
清理线程 RxCachedWorkerPoolEvictor
,定期执行清理任务。
-
subscribeOn()
返回一个 ObservableSubscribeOn
对象,它是 Observable
的一个装饰类,
增加了 scheduler
。
- 调用
subscribe()
方法,在这个方法调用后,subscribeActual()
被调用,才真正执行了
IoSchduler
中的 createWorker()
创建线程并运行,最终将上游 Observable
的 subscribe()
方
法调度到新创建的线程中运行。
消费者线程调度流程概括
-
AndroidSchedulers.mainThread()
先创建一个包含 handler
的 Scheduler
, 这个 handler
是主线程的 handler
。
-
observeOn
方法创建 ObservableObserveOn
,它是上游 Observable
的一个装饰类,其中包含前面创建的 Scheduler
和 bufferSize
等.
- 当订阅方法
subscribe
被调用后,ObservableObserveOn
的 subscribeActual
方法创建
Scheduler.Worker
并调用上游的 subscribe
方法,同时将自身接收的参数’observer’用装饰
类 ObserveOnObserver
装饰后传递给上游。
- 当上游调用被
ObserveOnObserver
的 onNext
、onError
和 onComplete
方法时,
ObserveOnObserver
将上游发送的事件通通加入到队列 queue
中,然后再调用 scheduler
将处理事件的方法调度到对应的线程中(本例会调度到 main thread)。 处理事件的方法将
queue
中保存的事件取出来,调用下游原始的 observer
再发射出去。
- 经过以上流程,下游处理事件的消费者线程就运行在了
observeOn
调度后的 thread
中。
总结
Schedulers
内部封装了各种 Scheduler
。每一个 Scheduler
中都封装的有线程池,用于执行后台任务。
Scheduler
是所有调度器实现的抽象父类,子类可以通过复写其 scheduleDirect()
来自行决定如何调度被分配到的任务;同时通过复写其 createWorker()
返回的 Scheduler.Worker
实例来执行具体的某个任务。此处的任务指的是通过 Runnable
封装的可执行代码块。
-
子线程切换主线程:给主线程所在的Handler发消息,然后就把逻辑切换过去了。
-
主线程切换子线程:把任务放到线程池中执行就能把执行逻辑切换到子线程
-
子线程切换子线程:把任务分别扔进两个线程就行了。
Rxjava
的 subscribe
方法是由下游一步步向上游进行传递的。会调用上游的 subscribe
,直到调用到事件源。
2、RxJava 有哪些操作符?
-
创建操作符
-
转换操作符
-
过滤操作符
-
条件操作符
-
延时操作符
-
其他操作符
- map 转换事件,返回普通事件
- flatMap 转换事件,返回` Observable
- conactMap concatMap 与 FlatMap 的唯一区别就是 concatMap 保证了顺序
- subscribeOn 规定被观察者所在的线程
- observeOn 规定下面要执行的消费者所在的线程
- take 接受一个 long 型参数 count ,代表至多接收 count 个数据
- debounce 去除发送频率过快的项,常用在重复点击解决上,配合 RxBinging 使用效果很好
- timer 定时任务,多少时间以后发送事件
- interval 每隔一定时间执行一些任务
- skip 跳过前多少个事件
- distinct 去重
- takeUntil 直到到一定条件的是停下,也可以接受另外一个被观察者,当这个被观察者结束之后则停止第一个被观察者
- Zip 专用于合并事件,该合并不是连接(连接操作符后面会说),而是两两配对,也就意味着,最终配对出的 Observable 发射事件数目只和少的那个相同。不影响Observable的发射,Observable 被观察者会一直发射,不会停,只是Observer 接收不到
- merge 多个 Observable 发射的数据随机发射,不保证先后顺序
- Concat 多个 Observable 组合以后按照顺序发射,保证了先后顺序,不过最多能组合4个 Observable ,多的可以使用 contactArray
- onErrorReturn 遇到错误是发射指定的数据到 onNext,并正常终止
- onErrorResumeReturn 遇到错误时,发射设置好的一个 Observable ,用来发送数据到 onNext,并正常终止
- onExceptionResumeReturn 和onErrorResumeReturn 类似,不同之处在于会判断是否是 Exception。如果是和 onErrorResumeReturn 一样,不是则会调用 onError。不会调用onNext
3、操作符 map 和 flatmap 的区别?
-
map
:【数据类型转换】将被观察者发送的事件转换为另一种类型的事件。
-
flatMap
:【化解循环嵌套和接口嵌套】将被观察者发送的事件序列进行拆分 & 转换 后合并成一个新的事件序列,最后再进行发送。
-
concatMap
:【有序】与 flatMap 的 区别在于,拆分 & 重新合并生成的事件序列 的顺序与被观察者旧序列生产的顺序一致。
共同点
-
都是依赖 Function 函数进行转换(将一个类型依据程序逻辑转换成另一种类型,根据入参和返回值)
-
都能在转换后直接被 subscribe
区别
-
返回结果不同
map 返回的是结果集,flatmap 返回的是包含结果集的 Observable 对象(返回结果不同)
-
执行顺序不同
map 被订阅时每传递一个事件执行一次 onNext 方法,flatmap 多用于多对多,一对多,再被转化为多个时,一般利用 from/just 进行一一分发,被订阅时将所有数据传递完毕汇总到一个 Observable 然后一一执行 onNext 方法。(如单纯用于一对一转换则和 map 相同)
-
转换对象的能力不同
map 只能单一转换,单一指的是只能一对一进行转换,指一个对象可以转化为另一个对象但是不能转换成对象数组。
flatmap 既可以单一转换也可以一对多/多对多转换,flatmap 要求返回 Observable,因此可以再内部进行事件分发,逐个取出单一对象。
4、RxJava 如何解决内存泄漏?
-
订阅的时候拿到 Disposable
,主动调用 dispose
-
使用 RxLifeCycle
-
使用 AutoDispose
5、RxJava 中 Observable、Flowable、Single、Maybe、Completable 使用时如何选择?
在 RxJava2
里面,Observable
、Flowable
、Single
、Maybe
、Completable
这几个在使用起来区别不大,因为他们都可以用一个或多个函数式接口作为参数进行订阅(subscribe
),需要几个传几个就可以了。但是从各个的设计初衷来讲,个人感觉最适用于网络请求这种情况的是 Single
和 Completable
。
- 网络请求是一个
Request
对应一个 Response
,不会出现背压情况,所以不考虑 Flowable
;
- 网络请求是一个
Request
对应一个 Response
,不是一个连续的事件流,所以在 onNext
被调用之后,onComplete
就会被马上调用,所以只需要 onNext
和 onComplete
其中一个就够了,不考虑 Observable
、Maybe
;
- 对于关心
ResponseBody
的情况,Single
适用;
- 对于不关心
ResponseBody
的情况,Completable
适用。
6、为什么 subscribeOn() 只有第一次切换有效
因为 RxJava 最终能影响 ObservableOnSubscribe
这个匿名实现接口的运行环境的只能是最后一次运行的 subscribeOn()
,又因为 RxJava 订阅的时候是从下往上订阅,所以从上往下第一个 subscribeOn()
就是最后运行的,这就造成了写多个 subscribeOn()
并没有什么乱用的现象。