rxjava2源码解析(二)线程切换分析

2023-05-16

使用方法

还是先从最基本的使用开始看:

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onNext("4");
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String s) {
                        Log.d(TAG, "s = " + s);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
复制代码

相较于前面的实例,多了两段代码,subscribeOnobserveOn。我们知道,subscribeOn是用来调整被观察者(发射源)的线程,而observeOn是调整观察者(处理器)的线程。

observeOn

我们先从observeOn的源码来看它是如何控制处理器的线程:

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        //判空代码,和hock相关机制,我们可以忽略,直接看ObservableObserveOn
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    
复制代码

首先我们知道,observeOnsubscribeOn都是observable这个装饰器的方法,他们的返回值也都是observable(前面讲过这是装饰器模式)。这里还是老样子,直接看ObservableObserveOn这个对象就行了。

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        //将上游source存在本地
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //如果是当前线程,就不做处理
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
复制代码

重点还是subscribeActual这个方法。我们看到,初始化ObservableObserveOn的时候传入了我们设置的scheduler。所以在subscribeActual里,先判断scheduler是否是TrampolineSchedulerTrampolineScheduler是什么东西呢?我们看官方注释:

/**
 * Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed
 * after the current unit of work is completed.
 *计划在当前线程上工作,但不会立即执行。 将工作放入队列并在当前工作单元完成后执行。
 */
复制代码

OK,一目了然,如果是当前线程,不做任何处理,直接用绑定起来。否则,新建一个ObserveOnObserver对象,将上游装饰器(这里的上游是代码流程上的上游,即调用observeOn的装饰器)先于这个对象绑定。这个看起来是不是很眼熟?我们回看上一篇里面说的ObservableCreate里的subscribeActual方法,对比一下有什么不同。

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
复制代码

可以看到,两者都有在内部新建一个对象,并将上游装饰器(或者是发射源)与之绑定。区别在于,ObservableCreate内部是新建一个发射器CreateEmitter对象,而ObservableObserveOn内部是新建一个处理器ObserveOnObserver对象。ObservableCreate是将之前存储的上游发射源与发射器绑定,ObservableObserveOn是将上游装饰器与处理器绑定。
根据这一点,我们可以将装饰器分为两种样式:

  • 一种是跟ObservableCreate类似,属于每一条流水线的开端,本身是装饰器,上游是发射源,内部生成一个发射器,处理最开始的发射流程。我们称之为起始装饰器
  • 一种是跟ObservableObserveOn类似,属于流水线中间流程,本身是装饰器,上游是装饰器,内部新建一个处理器来处理上游事件,下游是处理器或者其他装饰器。我们称之为流程装饰器。

下面我们可以看看ObserveOnObserver的源码。

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            //同步异步相关,暂时不管
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            //重点是这个
            schedule();
        }
        
        void schedule() {
            if (getAndIncrement() == 0) {
                //ObserveOnObserver继承了runnable接口,意味着可以当做是线程任务来执行。这里代表着在新线程中执行run方法。
                worker.schedule(this);
            }
        }
        //ObserveOnObserver继承了runnable接口
        @Override
        public void run() {
        //同步异步相关,我们直接看drainNormal()
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
        
        void drainNormal() {
            int missed = 1;
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = downstream;
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
                for (;;) {
                    ····//省略一些判断的代码
                    v = q.poll();
                    //这里就可以看到,将下游的onNext方法,切换到新线程执行。
                    a.onNext(v);
                }
                ···
            }
        }
        
    }
复制代码

这里我们就不深入细究了(会在下一篇中详细来说),只需要知道,这是上游的处理器执行onNext,传到这里,使用之前设置的线程执行下游的onNext方法。所以这里就完成了线程切换功能。并且这个切换,延续到下游所有处理的onNext。如果在下游再次调用ObserverOn,就会将后面的处理器切换到另外一个线程。
所以,我们可以得到结论,ObserverOn可以多次调用,每次调用会作用于下游的所有处理器,直到遇到新的ObserverOn

subscribeOn

接下来可以看看,SubscribeOn的源码内容。

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        //判空和hock机制代码,忽略。直接看ObservableSubscribeOn
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
复制代码

一样的配方,一样的味道,我们可以直接看ObservableSubscribeOn类的subscribeActual方法。

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        //创建SubscribeOnObserver内部类对象
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
        //这里调用了下游的onSubscribe
        observer.onSubscribe(parent);
        //scheduler.scheduleDirect方法返回一个disposable
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
复制代码

这里可以看到,ObservableSubscribeOn同样是一个流程装饰器,在调用subscribe的时候,内部新建一个处理器,这个处理器与下游处理器相互持有。这里与ObservableObserveOn有所不同,并没有立即执行subscribe将上游装饰器与内部处理器连接起来,而是执行了:

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
复制代码

我们看看SubscribeTask是啥:

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            //这里的source是上游装饰器,parent是内部处理器
            source.subscribe(parent);
        }
    }
复制代码

很简单,SubscribeTask继承了Runnable,其中run方法是执行上游装饰器的subscribe方法。我们再看scheduleDirect方法。

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }
复制代码

这里createWorker方法,是每个不同的scheduler自行重写的。我们不深究这块的代码,只需要理解为,在subscribeActual中,根据前面设置的scheduler,新建一个线程,在线程中立即执行上游sourcesubscribe方法,与内部处理器SubscribeOnObserver绑定。SubscribeOnObserver处理器与一般处理器没什么区别,就不贴代码了。
那么上面就实现了,在另外一个线程执行subscribe方法。看过前面基本流程的都知道,这里基本上就确定了,上游所有subscribe方法执行的线程。所以我们知道了,subscribeOn()方法会切换上游所有的subscribe方法,至于发射源所在的线程,只跟离它最近的subscribeOn()方法中所切换的线程有关。所以说,subscribeOn()方法只需要执行一次,且只有第一次是生效的。

onSubscribe

我们看到,终端Observer这里有一个onSubscribe方法,我们一般在这里进行一些初始化的操作,而前面的源码中也有很多地方有onSubscribe方法。那这个方法到底是执行在哪个线程呢?我们从源码中寻找答案。
想要知道onSubscribe方法在哪个线程,只需要看,在离终端处理器最近的上游装饰器中,是在哪个线程调用onSubscribe()的。我们来看看subscribeOnobserveOn有没有更改它所运行的线程。 先看ObserveOn,毕竟这是作用于下游处理器的线程切换方法。

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
复制代码

从上面的源码可以看到,ObservableObserveOn类中的subscribeActual并没有onSubscribe相关的内容。我们看看ObserveOnObserver的源码:


        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                this.upstream = d;
                ·····//不相干代码
                //这里的downstream是指下游处理器
                downstream.onSubscribe(this);
            }
        }
复制代码

我们看到,在ObserveOnObserver被调用onSubscribe的时候,会调用下游的onSubscribe,参数是本身。也就是说,ObserveOn并没有切换onSubscribe方法的线程。
再看subscribeOn方法,很明显是直接在subscribeActual中执行下游处理器的onSubscribe方法。

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
复制代码

这两个线程切换的方法,都没有更改onSubscribe()方法的线程。所以我们能确定,在终端的处理器Observer里面的onSubscribe()方法,是跟外部在同一个线程上。

总结一下

  • observeOn作用于下游的所有处理器,可以多次调用。每一个处理器所运行的线程,决定于它最近的上游observeOn方法中指定的线程。
  • subscribeOn作用于上游的发射源,主要是用来指定subscribe方法所在的线程。针对于发射源,只有离它最近的下游subscribeOn方法中所指定的线程才生效。所以subscribeOn方法多次调用并没有效果。
  • onSubscribe()方法并不会跟随内部线程切换而切换线程。运行在哪个线程,只跟外部创建这一整套观察者模式的线程一致。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

rxjava2源码解析(二)线程切换分析 的相关文章

  • JAVA中枚举如何保证线程安全

    枚举类型到底是什么类呢 xff1f 是enum吗 xff1f 明显不是 xff0c enum就和class一样 xff0c 只是一个关键字 xff0c 他并不是一个类 xff0c 那么枚举是由什么类维护的呢 xff0c 首先写一个简单的枚举
  • Activity的启动流程

    总的流程图 xff1a 1 进程A与AMS的交互过程 此处以跨进程启动Activity分析一下源码流程 xff1a A调用startActivity时 xff0c 需要与AMS交互 xff0c 此时需要需要获取到AMS的代理对象Binder
  • Handler同步屏障

    一 消息机制之同步屏障 消息机制的同步屏障 xff0c 其实就是阻碍同步消息 xff0c 只让异步消息通过 而开启同步屏障的方法就是调用下面的方法 xff1a MessageQueue postSyncBarrier 源码如下 xff1a
  • view的绘制流程

    一 view树的绘制流程 measure gt layout gt draw measure 1 ViewGroup LayoutParams 指定部件的长宽 2 MeasureSpec 32位的int值 前两位代表模式 后30位测量规格的
  • Thread.join()

    3 1 用法 下源码里对这个方法的描述 Thread java Waits for this thread to die lt p gt An invocation of this method behaves in exactly the
  • android 源码编译 模拟器emulator启动

    Android源代码编译成功之后 xff0c 我们就可以运行它了 为了方便起见 xff0c 我们使用Android模拟器emulator来运行编译出来的Android源代码 执行以下命令来启动Android模拟器 xff1a USER 64
  • Android 源码 (AOSP) - 编译 ( 模拟器 )

    启动模拟器 emulator command not found 错误 你肯定是在没有导入环境的窗口执行 emulator 了 bash emulator command not found 这里要先导入环境 如下 build envset
  • android模拟器

    模拟器运行需要四个文件 xff0c 分别是 xff1a Linux Kernelsystem imguserdata imgramdisk img 上面我 lunch 命令时选择的是 aosp x86 eng xff0c 因此 linux
  • mac 7z命令

    压缩文件 xff1a 7z a Mina3 7z Mina3 mp4 查看压缩后的文件 xff1a 7z t Mina3 7z 解压 7z文件 xff1a 7z x Mina3 7z 解压时换个目录
  • nRF51822:Keil下载程序报错(Erro;Flash Download failed - "Cortex-M0")

    问题描述 今天在用keil软件下载程序时遇到这个问题 比较奇怪的是我下载之前的其他工程都没有问题 xff0c 可以正常 xff0c 重新开的以前的另一个样例工程下载测试就出现了这个问题 原因分析与解决办法 1 找攻略 在查看了J Link配
  • Linux远程桌面连接,Xmanager 5实现远程调用CentOS7图形化界面

    先说效果 xff0c 可以实现 在 Centos7 6 上的远程桌面操作 流畅度和Windows远程桌面差不多 背景 xff1a 大家都知道Centos的图形化比较鸡肋 xff0c 为什么要弄图形化呢 xff1f 在虚拟机管控里 xff0c
  • Mac电脑上没有允许任何来源选项的解决方法

    Mac电脑的安全设置没有允许任何来源的选项怎么办 xff1f 1 在终端输入下方的命令 xff0c 按回车 xff1a sudo spctl master disable 2 输入系统密码 输入密码时候 xff08 输入过程中看不到输入的密
  • MAC编译Android源码 prebuilts/misc/darwin-x86/bison/bison出错

    错误提示 xff1a 0 438 72411 external one true awk awk yacc awkgram y FAILED out soong intermediates external one true awk awk
  • VMware Tools安装方法及共享文件夹设置方法

    正确安装好VMware Tools后 xff0c 可以实现主机与虚拟机之间的文件共享 xff0c 可以设置共享文件夹 xff0c 以及在主机与虚拟机之间直接进行复制黏贴的操作 安装方法 xff1a 选择 34 虚拟机 34 gt 34 重新
  • repo init 失败

    root 64 68b3ebd4a7ab aosp python3 bin repo init u https aosp tuna tsinghua edu cn platform manifest b android 9 0 0 r40
  • 修改ubuntu的sources.list源

    1 首先备份源列表 首先备份源列表 sudo cp etc apt sources list etc apt sources list backup 2 而后打开sources list文件修改 sudo vim etc apt sourc
  • Ubuntu16.04安装Python3.7并设置为默认版本

    1 安装编译环境 sudo apt get install zlib1g dev libbz2 dev libssl dev libncurses5 dev libsqlite3 dev libreadline dev tk dev lib
  • python 3.5 3.7-ubuntu16.04升级Python3.5到Python3.7的方法步骤

    ubuntu16 04自带python有两个版本 xff0c 一个2版本 xff0c 使用的是python xff1b 另一个是3版本 xff0c 使用的是python3 简易安装python后得到的3版本的版本号是python3 5 可以
  • Ubuntu无法使用终端解决方法

    最近在尝试使用Ubuntu xff0c 并遇到了一个坑 我安装的是Ubuntu16 04LTS xff0c 其内置了Python2 7 xff0c 但我想要使用Python3 所以就安装了自带的Python3软件包 xff0c 结果发现版本
  • 安装python3.8后,Ubuntu无法打开终端gnome-terminal的解决方法整理版(importerror _gi)

    在更新了python3 8后 发现Ubuntu上的gnome terminal无法用快捷ctrl 43 alt 43 T打开了 右键点击也是一直转圈啥反应都没有 只剩下桌面右键点击 选择终端这一个途径 到了发现原因竟然是在将python3重

随机推荐