rxjava2源码解析(三)observeOn线程池原理分析

2023-05-16

observeOn

还是先说observeOn,直接看源码:

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
复制代码

这段代码我们上篇看到过,这里再重复一下。obsererOn是切换下游观察者线程,我们看ObserveOnObserver中的onNext方法是如何切换线程的。

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        
        void schedule() {
            if (getAndIncrement() == 0) {
                //ObserveOnObserver继承了runnable接口,意味着可以当做是线程任务来执行。这里代表着在新线程中执行run方法。
                worker.schedule(this);
            }
        }
        
        //ObserveOnObserver继承了runnable接口
        @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
        
        void drainNormal() {
            int missed = 1;
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = downstream;
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
                for (;;) {
                    ····//省略一些判断的代码
                    v = q.poll();
                    //这里就可以看到,将下游的onNext方法,切换到新线程执行。
                    a.onNext(v);
                }
                ···
            }
        }
        
    }
复制代码

这是上游的处理器执行onNext,传到这里,使用之前设置的线程执行下游的onNext方法。

Worker

这个worker到底是什么?我们先看scheduler的createWorker方法:

    public abstract Worker createWorker();
复制代码

Scheduler类中,createWorker只是一个接口,子类会重写这个方法,我们就以Schedulers.newThread()这个方法创建的Scheduler为例,来看看这里面的原理。

    //Schedulers类中的newThread静态方法,这里的hock我们暂且不理,直接返回NEW_THREAD
    public static Scheduler newThread() {
        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
    }
    
    //Schedulers类中定义了NEW_THREAD和其他THREAD
        static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }
    
    //NewThreadTask是Schedulers的静态内部类,继承自Callable接口,其中call方法返回一个Scheduler
    static final class NewThreadTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return NewThreadHolder.DEFAULT;
        }
    }
    
    //NewThreadHolder同样是一个静态内部类,里面只有一个静态参数DEFAULT,这里我们就找到了newThread方法返回的本尊NewThreadScheduler
    static final class NewThreadHolder {
        static final Scheduler DEFAULT = new NewThreadScheduler();
    }
复制代码

如上面代码和注释所示,我们直接看NewThreadScheduler的源码:

/**
 * Schedules work on a new thread.
 */
public final class NewThreadScheduler extends Scheduler {

    final ThreadFactory threadFactory;

    private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
    private static final RxThreadFactory THREAD_FACTORY;

    /** The name of the system property for setting the thread priority for this Scheduler. */
    private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";

    static {
        //这里Thread.MIN_PRIORITY为1,Thread.MAX_PRIORITY为10.Thread.NORM_PRIORITY为5.如果我们不做任何更改,这里的priority的值就为5.
        int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
                Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));

        THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
    }

    public NewThreadScheduler() {
        this(THREAD_FACTORY);
    }

    public NewThreadScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
    }

    @NonNull
    @Override
    public Worker createWorker() {
        return new NewThreadWorker(threadFactory);
    }
}
复制代码

这里createWorker方法返回的是一个NewThreadWorker对象。我们总算找到了worker的来源,需要注意这里的构造参数是threadFactory。来看看NewThreadWorker的源码。

public class NewThreadWorker extends Scheduler.Worker implements Disposable {
    private final ScheduledExecutorService executor;

    volatile boolean disposed;

    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable run) {
        return schedule(run, 0, null);
    }

    @NonNull
    @Override
    public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }
        return scheduleActual(action, delayTime, unit, null);
    }
    
    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        //hock机制
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //用一个ScheduledRunnable把传入的runnable包装一下,本质上没区别。
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        ....//省略判断性代码
        Future<?> f;
        try {
            if (delayTime <= 0) {
                //executor由构造方法中创建
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
    ....
}
复制代码

这里我们就可以看到,前面调用worker.schedule(this),最终走到了executor.submit(sr)。这里的sr只是前面ObserveOnObserver的包装。executor在构造方法中创建。来看看executor是什么:

    //SchedulerPoolFactory类中的静态方法
    public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
        tryPutIntoPool(PURGE_ENABLED, exec);
        return exec;
    }
复制代码
    //Executors类的静态方法
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    
复制代码

OK,executor是一个ScheduledThreadPoolExecutor,标准的工作线程池。核心线程数为1,threadFactory是前面NewThreadWorker构造参数中的RxThreadFactory。他会给thread按照命名格式进行命名。

public final class RxThreadFactory extends AtomicLong implements ThreadFactory {

    public RxThreadFactory(String prefix) {
        this(prefix, Thread.NORM_PRIORITY, false);
    }

    public RxThreadFactory(String prefix, int priority) {
        this(prefix, priority, false);
    }

    public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
        this.prefix = prefix;
        this.priority = priority;
        this.nonBlocking = nonBlocking;
    }

    @Override
    public Thread newThread(Runnable r) {
        StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
        String name = nameBuilder.toString();
        Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
        t.setPriority(priority);
        t.setDaemon(true);
        return t;
    }
    
    ...
}
复制代码

总结一下:

  • observeOnsubscribe方法中,新建一个worker对象。这个worker对象是根据设置的scheduler创建的。然后在新建一个ObserveOnObserver对象,将上游与之订阅。
  • ObserveOnObserveronNext方法中,会调用worker.schedule(this),将本身作为runnable传入到worker中。
  • newThreadScheduler为例,他创建的worker是一个NewThreadWorker实例。在实例构造方法中,会根据传入的threadFactory新建一个ScheduledThreadPool线程池。
  • NewThreadWorkershedule方法,就是将ObserveOnObserver作为一个runnable放在一个新的线程池中执行。
  • ObserveOnObserverrun方法,就是用来执行下游的onNext,将数据传输下去。从而达到了,切换下游onNext线程的目的。

subscribeOn

subscribeOn是用来切换上游发射器线程。切换原理上一篇有说过,其中线程池相关跟上面observeOn差不多,这里就不赘述了。

总结

上面就是rxjava2线程切换原理分析了,后面再有人面试问你rxjava2里面的线程池是哪一种,你就可以自信的说出:ScheduledThreadPool

最后贴出我做的一张类图:

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

rxjava2源码解析(三)observeOn线程池原理分析 的相关文章

  • Activity的启动流程

    总的流程图 xff1a 1 进程A与AMS的交互过程 此处以跨进程启动Activity分析一下源码流程 xff1a A调用startActivity时 xff0c 需要与AMS交互 xff0c 此时需要需要获取到AMS的代理对象Binder
  • Handler同步屏障

    一 消息机制之同步屏障 消息机制的同步屏障 xff0c 其实就是阻碍同步消息 xff0c 只让异步消息通过 而开启同步屏障的方法就是调用下面的方法 xff1a MessageQueue postSyncBarrier 源码如下 xff1a
  • view的绘制流程

    一 view树的绘制流程 measure gt layout gt draw measure 1 ViewGroup LayoutParams 指定部件的长宽 2 MeasureSpec 32位的int值 前两位代表模式 后30位测量规格的
  • Thread.join()

    3 1 用法 下源码里对这个方法的描述 Thread java Waits for this thread to die lt p gt An invocation of this method behaves in exactly the
  • android 源码编译 模拟器emulator启动

    Android源代码编译成功之后 xff0c 我们就可以运行它了 为了方便起见 xff0c 我们使用Android模拟器emulator来运行编译出来的Android源代码 执行以下命令来启动Android模拟器 xff1a USER 64
  • Android 源码 (AOSP) - 编译 ( 模拟器 )

    启动模拟器 emulator command not found 错误 你肯定是在没有导入环境的窗口执行 emulator 了 bash emulator command not found 这里要先导入环境 如下 build envset
  • android模拟器

    模拟器运行需要四个文件 xff0c 分别是 xff1a Linux Kernelsystem imguserdata imgramdisk img 上面我 lunch 命令时选择的是 aosp x86 eng xff0c 因此 linux
  • mac 7z命令

    压缩文件 xff1a 7z a Mina3 7z Mina3 mp4 查看压缩后的文件 xff1a 7z t Mina3 7z 解压 7z文件 xff1a 7z x Mina3 7z 解压时换个目录
  • nRF51822:Keil下载程序报错(Erro;Flash Download failed - "Cortex-M0")

    问题描述 今天在用keil软件下载程序时遇到这个问题 比较奇怪的是我下载之前的其他工程都没有问题 xff0c 可以正常 xff0c 重新开的以前的另一个样例工程下载测试就出现了这个问题 原因分析与解决办法 1 找攻略 在查看了J Link配
  • Linux远程桌面连接,Xmanager 5实现远程调用CentOS7图形化界面

    先说效果 xff0c 可以实现 在 Centos7 6 上的远程桌面操作 流畅度和Windows远程桌面差不多 背景 xff1a 大家都知道Centos的图形化比较鸡肋 xff0c 为什么要弄图形化呢 xff1f 在虚拟机管控里 xff0c
  • Mac电脑上没有允许任何来源选项的解决方法

    Mac电脑的安全设置没有允许任何来源的选项怎么办 xff1f 1 在终端输入下方的命令 xff0c 按回车 xff1a sudo spctl master disable 2 输入系统密码 输入密码时候 xff08 输入过程中看不到输入的密
  • MAC编译Android源码 prebuilts/misc/darwin-x86/bison/bison出错

    错误提示 xff1a 0 438 72411 external one true awk awk yacc awkgram y FAILED out soong intermediates external one true awk awk
  • VMware Tools安装方法及共享文件夹设置方法

    正确安装好VMware Tools后 xff0c 可以实现主机与虚拟机之间的文件共享 xff0c 可以设置共享文件夹 xff0c 以及在主机与虚拟机之间直接进行复制黏贴的操作 安装方法 xff1a 选择 34 虚拟机 34 gt 34 重新
  • repo init 失败

    root 64 68b3ebd4a7ab aosp python3 bin repo init u https aosp tuna tsinghua edu cn platform manifest b android 9 0 0 r40
  • 修改ubuntu的sources.list源

    1 首先备份源列表 首先备份源列表 sudo cp etc apt sources list etc apt sources list backup 2 而后打开sources list文件修改 sudo vim etc apt sourc
  • Ubuntu16.04安装Python3.7并设置为默认版本

    1 安装编译环境 sudo apt get install zlib1g dev libbz2 dev libssl dev libncurses5 dev libsqlite3 dev libreadline dev tk dev lib
  • python 3.5 3.7-ubuntu16.04升级Python3.5到Python3.7的方法步骤

    ubuntu16 04自带python有两个版本 xff0c 一个2版本 xff0c 使用的是python xff1b 另一个是3版本 xff0c 使用的是python3 简易安装python后得到的3版本的版本号是python3 5 可以
  • Ubuntu无法使用终端解决方法

    最近在尝试使用Ubuntu xff0c 并遇到了一个坑 我安装的是Ubuntu16 04LTS xff0c 其内置了Python2 7 xff0c 但我想要使用Python3 所以就安装了自带的Python3软件包 xff0c 结果发现版本
  • 安装python3.8后,Ubuntu无法打开终端gnome-terminal的解决方法整理版(importerror _gi)

    在更新了python3 8后 发现Ubuntu上的gnome terminal无法用快捷ctrl 43 alt 43 T打开了 右键点击也是一直转圈啥反应都没有 只剩下桌面右键点击 选择终端这一个途径 到了发现原因竟然是在将python3重
  • 无法找到软件包libffi-dev

    Addthis entry in your etc apt sources list if it 39 snot deb http archive ubuntu com ubuntu vivid main restricted univer

随机推荐