rxjava2源码解析(一)基本流程分析

2023-05-16

从基本使用入手

首先随便写一个rxjava2的基本用法,我们根据这个简单的示例来看看rxjava2整个流程是什么样的。

        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("1");
                emitter.onNext("2");
                emitter.onNext("3");
                emitter.onNext("4");
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG,"onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG,"s = "+ s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
复制代码

上面的部分,看起来太长,我们可以先将其简化。

Observable.create(ObservableOnSubscribe).subscribe(Observer);
复制代码

废话不多说,直接划重点:

  • 1.可以看到这里出现了三个类名非常相像的类:ObservableObservableOnSubscribeObserver。也就是我们日常说的,被观察者,观察者。
  • 2.为了更好的区分,我们将其形象化一点。Observable我们称其为装饰器,ObservableOnSubscribe我们也称其为发射源,Observer我们称其为处理器。为什么这么称呼,我们可以边看源码边讲。
  • 3.我们可以把上面的内容形象化为:装饰器Observable通过一个create方法和一个subscribe方法,将发射源和处理器连接起来。

接下来我们看看这个连接在源码中是如何实现的。

装饰器Observable

首先从Observablecreate入手。

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");//判空作用
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
复制代码

划重点:

  • 1.create方法,需要传入一个发射源ObservableOnSubscribe<T>对象,返回一个Observable<T>对象。
  • 2.忽略掉判空的代码,onAssembly方法我们也暂时放在一边,只需要知道是返回传入参数就行了。那create方法就是返回一个ObservableCreate对象。

那我们来看看ObservableCreate这个类。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    ....
}
复制代码

划重点:

  • 1.ObservableCreate这个类继承自Observable
  • 2.ObservableCreate的构造方法中直接将参数中的发射源ObservableOnSubscribe作为source存在本地。

OK,create方法看完了。很简单,一句话总结,创建了一个装饰器对象,将发射源存在本地备用。(有没有一种看王刚炒菜的感觉?)

为什么我们称Observable为装饰器?因为rxjava在这里用到了装饰器模式,而Observable是装饰器模式下的基类。装饰器模式这里看还不明显,看到后面就知道了。

发射源ObservableOnSubscribe

上面create方法需要传入一个发射源ObservableOnSubscribe参数,我们来看看源码:

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param emitter the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
复制代码

划重点:

  • 1.发射源ObservableOnSubscribe是一个接口,我们在使用它时会重写subscribe方法。
  • 2.我们会在subscribe方法中定义接下来要进行的一系列事件,所以我们称ObservableOnSubscribe为事件发射源。
  • 3.subscribe方法有一个参数就是发射器ObservableEmitter(后面会详细说明)。

订阅(连接)

接下来说说下一步:subscribe
前面说到,Observablecreate方法返回的是ObservableCreate对象,ObservableCreatesubscribe方法并没有进行重写,我们直接看Observable里的subscribe方法。

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //判空和hock机制,暂时忽略
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            //重点是这个
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
复制代码

让我们抛开那些不重要的代码,直入主题。将其中的关键代码简化之后可以变为:

public final void subscribe(Observer<? super T> observer) {
    observer = RxJavaPlugins.onSubscribe(this, observer);
    subscribeActual(observer);
}
复制代码

RxJavaPlugins这个同样先甩在一边放着不管,跟前面的onAssembly一样,我们只需要知道这是返回传入的observer就行了。
那么只有subscribeActual(observer)这一句关键代码了。ObservablesubscribeActual是一个抽象方法,具体实现在子类中。

其实,在这里我们就可以看出来,这是一个装饰器模式。Observable是装饰器模式的基类,实际上所有操作都是它的子类完成的。所以我们称其为装饰器。不只是create方法,其他一些操作符,例如mapflatMap也是这样的。这个后面讲到操作符和线程切换的时候,你们应该会更有体会。

所以后面我们分析Observablesubscribe方法时,直接看子类中的subscribeActual(observer)就行。

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
复制代码

划重点:

  • 1.先创建一个CreateEmitter对象parent,然后调用处理器observeronSubscribe方法持有它。
  • 2.再调用source.subscribe(parent)将其传入到source当中。这个source就是前面我们说到备用的发射源ObservableOnSubscribe,其中的subscribe方法正好需要一个发射器CreateEmitter

那整条订阅线就很清晰了:

  • 1.Observable调用create方法,参数是一个发射源ObservableOnSubscribe(我们对其subscribe方法进行重写),生成一个ObservableCreate对象。
  • 2.ObservableCreate调用subscribe方法,参数是一个处理器Observer
  • 3.在subscribe方法中我们以Observer为参数生成了一个发射器CreateEmitter,并且将这个发射器作为参数,调用了发射源ObservableOnSubscribesubscribe方法。

这个CreateEmitter是什么?我们来看看它的源码。

发射器CreateEmitter

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
        
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
        
        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
        .....
    }
复制代码

划重点:

  • 1.CreateEmitterObservableCreate中的一个静态内部类,继承自AtomicReference<Disposable>,ObservableEmitter<T>, Disposable,我们称其为发射器。
  • 2.我们从onNext方法中可以看出,这个发射器是直接与外部处理器对接的。
  • 3.发射器继承自Disposable接口,这个接口只有dispose()和isDisposed()两个方法,作用是切断发射过程。
  • 4.在上面的subscribeActual方法中我们可以看到,Observer有调用onSubscribe方法持有这个CreateEmitter发射器对象。所以我们可以在处理器中通过dispose()接口随时中断发射流程。
  • 5.同时我们可以在代码中看到,onErroronComplete两个是互斥的。只会执行一个,因为一旦执行其中一个,会立即切断发射过程。

总结

总结一下出现的几个类:

  • Observable -> 装饰器模式的基类,我们称其为装饰器。有一个create方法,参数是一个ObservableOnSubscribe发射源,会返回一个ObservableCreate对象。
  • ObservableCreate -> 装饰器实现类。有一个subscribe方法,参数是Observer处理器。在subscribe方法内部,我们以Observer为参数生成了一个CreateEmitter发射器,并且将这个发射器作为参数,调用了发射源的subscribe方法。
  • ObservableOnSubscribe -> 发射源,本身只是一个接口,我们重写了subscribe方法,定义了接下来要处理的事件,所以称其为发射源。
  • CreateEmitter -> 发射器,构造方法中包含一个处理器。处理器持有这个发射器对象,可以随时中断发射过程。发射器中的onErroronComplete两个是互斥的,只会执行一个。
  • Observer -> 处理器。用于处理发射器发送的数据。

再总结一下整个运行流程如下:

  • 1.Observable调用create方法,参数是一个发射源ObservableOnSubscribe(我们对其subscribe方法进行重写),生成一个ObservableCreate对象。
  • 2.ObservableCreate调用subscribe方法,参数是一个处理器Observer
  • 3.在subscribe方法中我们以Observer为参数生成了一个CreateEmitter发射器,并且将这个发射器作为参数,调用了发射源ObservableOnSubscribesubscribe方法。
  • 4.发射源ObservableOnSubscribesubscribe方法中定义了我们要处理的事件,并将结果传递给发射器CreateEmitterCreateEmitter先判断事件流是否断开,不断开则将结果传递给处理器Observer
  • 5.处理器Observer处理结果。

拓展

这时候我们再回头看我们前面扔掉的东西,RxJavaPlugins.onAssemblyRxJavaPlugins.onSubscribe。我们直接看源码。

    /**
     * Calls the associated hook function.
     * @param <T> the value type
     * @param source the hook's input value
     * @return the value returned by the hook
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
复制代码

方法介绍中有描述:Calls the associated hook function。
了解hook的应该就知道了,这里相当于是利用Java反射机制,对source进行了一层包装拦截。rxjava给我们提供了一个注入hook的方法,我们可以通过hook来实现在调用source之前,需要先调用我们设置的拦截函数。我们现在只需要知道有这个东西就行了,后面有这个需要再用。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

rxjava2源码解析(一)基本流程分析 的相关文章

  • RxJava 之Consumer和Action的使用

    在之前的RxJava中已经讲到创建观察者的代码如下 xff1a 创建观察者 Observer lt String gt observer 61 new Observer lt String gt 64 Override public voi
  • JAVA中枚举如何保证线程安全

    枚举类型到底是什么类呢 xff1f 是enum吗 xff1f 明显不是 xff0c enum就和class一样 xff0c 只是一个关键字 xff0c 他并不是一个类 xff0c 那么枚举是由什么类维护的呢 xff0c 首先写一个简单的枚举
  • Activity的启动流程

    总的流程图 xff1a 1 进程A与AMS的交互过程 此处以跨进程启动Activity分析一下源码流程 xff1a A调用startActivity时 xff0c 需要与AMS交互 xff0c 此时需要需要获取到AMS的代理对象Binder
  • Handler同步屏障

    一 消息机制之同步屏障 消息机制的同步屏障 xff0c 其实就是阻碍同步消息 xff0c 只让异步消息通过 而开启同步屏障的方法就是调用下面的方法 xff1a MessageQueue postSyncBarrier 源码如下 xff1a
  • view的绘制流程

    一 view树的绘制流程 measure gt layout gt draw measure 1 ViewGroup LayoutParams 指定部件的长宽 2 MeasureSpec 32位的int值 前两位代表模式 后30位测量规格的
  • Thread.join()

    3 1 用法 下源码里对这个方法的描述 Thread java Waits for this thread to die lt p gt An invocation of this method behaves in exactly the
  • android 源码编译 模拟器emulator启动

    Android源代码编译成功之后 xff0c 我们就可以运行它了 为了方便起见 xff0c 我们使用Android模拟器emulator来运行编译出来的Android源代码 执行以下命令来启动Android模拟器 xff1a USER 64
  • Android 源码 (AOSP) - 编译 ( 模拟器 )

    启动模拟器 emulator command not found 错误 你肯定是在没有导入环境的窗口执行 emulator 了 bash emulator command not found 这里要先导入环境 如下 build envset
  • android模拟器

    模拟器运行需要四个文件 xff0c 分别是 xff1a Linux Kernelsystem imguserdata imgramdisk img 上面我 lunch 命令时选择的是 aosp x86 eng xff0c 因此 linux
  • mac 7z命令

    压缩文件 xff1a 7z a Mina3 7z Mina3 mp4 查看压缩后的文件 xff1a 7z t Mina3 7z 解压 7z文件 xff1a 7z x Mina3 7z 解压时换个目录
  • nRF51822:Keil下载程序报错(Erro;Flash Download failed - "Cortex-M0")

    问题描述 今天在用keil软件下载程序时遇到这个问题 比较奇怪的是我下载之前的其他工程都没有问题 xff0c 可以正常 xff0c 重新开的以前的另一个样例工程下载测试就出现了这个问题 原因分析与解决办法 1 找攻略 在查看了J Link配
  • Linux远程桌面连接,Xmanager 5实现远程调用CentOS7图形化界面

    先说效果 xff0c 可以实现 在 Centos7 6 上的远程桌面操作 流畅度和Windows远程桌面差不多 背景 xff1a 大家都知道Centos的图形化比较鸡肋 xff0c 为什么要弄图形化呢 xff1f 在虚拟机管控里 xff0c
  • Mac电脑上没有允许任何来源选项的解决方法

    Mac电脑的安全设置没有允许任何来源的选项怎么办 xff1f 1 在终端输入下方的命令 xff0c 按回车 xff1a sudo spctl master disable 2 输入系统密码 输入密码时候 xff08 输入过程中看不到输入的密
  • MAC编译Android源码 prebuilts/misc/darwin-x86/bison/bison出错

    错误提示 xff1a 0 438 72411 external one true awk awk yacc awkgram y FAILED out soong intermediates external one true awk awk
  • VMware Tools安装方法及共享文件夹设置方法

    正确安装好VMware Tools后 xff0c 可以实现主机与虚拟机之间的文件共享 xff0c 可以设置共享文件夹 xff0c 以及在主机与虚拟机之间直接进行复制黏贴的操作 安装方法 xff1a 选择 34 虚拟机 34 gt 34 重新
  • repo init 失败

    root 64 68b3ebd4a7ab aosp python3 bin repo init u https aosp tuna tsinghua edu cn platform manifest b android 9 0 0 r40
  • 修改ubuntu的sources.list源

    1 首先备份源列表 首先备份源列表 sudo cp etc apt sources list etc apt sources list backup 2 而后打开sources list文件修改 sudo vim etc apt sourc
  • Ubuntu16.04安装Python3.7并设置为默认版本

    1 安装编译环境 sudo apt get install zlib1g dev libbz2 dev libssl dev libncurses5 dev libsqlite3 dev libreadline dev tk dev lib
  • python 3.5 3.7-ubuntu16.04升级Python3.5到Python3.7的方法步骤

    ubuntu16 04自带python有两个版本 xff0c 一个2版本 xff0c 使用的是python xff1b 另一个是3版本 xff0c 使用的是python3 简易安装python后得到的3版本的版本号是python3 5 可以
  • Ubuntu无法使用终端解决方法

    最近在尝试使用Ubuntu xff0c 并遇到了一个坑 我安装的是Ubuntu16 04LTS xff0c 其内置了Python2 7 xff0c 但我想要使用Python3 所以就安装了自带的Python3软件包 xff0c 结果发现版本

随机推荐