RxJava 2.x 源码分析 之 FlatMap

2023-05-16

FlatMap

  • 官方定义:把被观察者发射出去的事件转化成新的子被观察者,然后把这些发射量展开平铺后统一放到一个被观察者中。官方文档

 

  • 简单来讲就是把被观察者每次发射的事件转化成一个子被观察者,然后通过合并(Merge)所有子被观察者的事件成总的一系列的事件并发射给观察者

  • 官方文档中提及到很多语言都拥有 Merge 和 Concat 的合并操作,他们的区别是前者会顺序交错,而后者是不会破坏顺序的。

  • 所以FlatMapConcatMap的区别是合并后的事件顺序有可能是无序的,但FlatMap真的不能做到有序事件吗?本文也会探讨这个问题。

RxJava使用了观察者模式,封装了很多ObservableObserver,针对不同的操作符的调用会用对应的ObservaEbleObserver实现。

根据源码Observable发射的事件都是有序的,使用FlatMap时由事件转换的被观察者也是有序地发射自己的事件,我们可以猜测:

  • FlatMap事件无序的关键是线程,当由事件转换成的多个被观察者在不同线程中发射事件时,会导致顶层观察者接收到的事件是无序的。
  • 反之所有被观察者都在同一个线程中发射时间的话 FlatMap 的效果跟ConcatMap是相同的。

为了证实我们的猜测,我们先具体简单的例子:

public static void main(String args[]) {
    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                    e.onNext(2);
                    e.onNext(3);
                    e.onComplete();
                }
            })
            .flatMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    return Observable.just(
                            "item " + integer + " sub-item " + 1 + " Observable Thread: " + Thread.currentThread().getName()
                            , "item " + integer + " sub-item " + 2 + " Observable Thread: " + Thread.currentThread().getName()
                            , "item " + integer + " sub-item " + 3 + " Observable Thread: " + Thread.currentThread().getName());
                }
            })
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {

                }

                @Override
                public void onNext(String s) {
                    System.out.println(s + " Observer Thread: " + Thread.currentThread().getName());
                }

                @Override
                public void onError(Throwable e) {
                }

                @Override
                public void onComplete() {
                }
            });
}

运行结果:

item 1 sub-item 1 Observable Thread: main Observer Thread: main
item 1 sub-item 2 Observable Thread: main Observer Thread: main
item 1 sub-item 3 Observable Thread: main Observer Thread: main
item 2 sub-item 1 Observable Thread: main Observer Thread: main
item 2 sub-item 2 Observable Thread: main Observer Thread: main
item 2 sub-item 3 Observable Thread: main Observer Thread: main
item 3 sub-item 1 Observable Thread: main Observer Thread: main
item 3 sub-item 2 Observable Thread: main Observer Thread: main
item 3 sub-item 3 Observable Thread: main Observer Thread: main

阅读源码之前我们需要知道,RxJava 内部包装了很多ObservableObserver,用FlatMap实现该例子的实现方法不止一种,使用的操作符也可以不一样,所以运行时调用到的ObservableObserver不一定相同,所以下面阅读的源码路径只是基于这个例子。

  • 首先看看Observable的创建过程:

//Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

跟上一篇源码分析一样,内部创建的是ObservableCreate对象,ObservableOnSubscribe是一个提供subscribe()作为约定函数的接口。

  • 接下来看看flatMap()做了什么

//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        return flatMap(mapper, false);
}

mapper 代表的是我们的刚才创建的Function对象,这里有函数式编程的味道(如果用过Kotlin的同学就更加熟悉了),相当于把我们写的函数作为一个参数。

false 表示异常是否需要延迟到所有内部被观察者都结束后才抛出。

继续进下一层

//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
        return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}

maxConcurrency: 最大并发数

继续进下一层

//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
        return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}

bufferSize :缓存所有子被观察者的事件加起来总数大小

继续进下一层

//Observable.java
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
    // 检查参数是否合法
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    if (this instanceof ScalarCallable) { //false
        @SuppressWarnings("unchecked")
        T v = ((ScalarCallable<T>)this).call();
        if (v == null) {
            return empty();
        }
        return ObservableScalarXMap.scalarXMap(v, mapper);
    }
    // ObservableCreate 没有实现 ScalarCallable,所以走这里
    return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}

这里返回一个 ObservableFlatMap 对象,实际上ObservableFlatMap包装了ObservableCreate并把ObservableCreate对象作为常量source,那么flatMap()到这里结束了。

  • 下面看看subscribe()的源码:

//Observable.java
public final void subscribe(Observer<? super T> observer) {
    // 检查观察者是否为空
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        // 调用 hook 方法
        observer = RxJavaPlugins.onSubscribe(this, observer);
        // 检查调用 hook 的观察者是否为空
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        // 实际订阅操作在这个方法里
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
     ...
    }
}

下面看看 subscribeActual:

// Observable.java
protected abstract void subscribeActual(Observer<? super T> observer);

居然是一个抽象方法,在哪里被实现了?机智的我们反应过来了,具体实现在flatMap()返回的ObservableFlatMap中:

// ObservableFlatMap.java
public ObservableFlatMap(ObservableSource<T> source,
        Function<? super T, ? extends ObservableSource<? extends U>> mapper,
        boolean delayErrors, int maxConcurrency, int bufferSize) {
    super(source);
    this.mapper = mapper;
    this.delayErrors = delayErrors;
    this.maxConcurrency = maxConcurrency;
    this.bufferSize = bufferSize;
}
    
@Override
public void subscribeActual(Observer<? super U> t) {
    // source : ObservableCreate对象
    // t : 观察者
    // mapper : 之前我们写的 mapper 函数
    if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) { // false
        return;
    }
    // 本次运行走这里,调用 ObservableCreate 的 subscribe() 并用    
    // MergeObserver 包装了我们写的 observer
    source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}

可以说ObservableFlatMapFlatMap来说是比较重要的类,里面包含了许多重要逻辑。

调用 ObservableCreatesubscribe() 之前我们先看看 MergeObserver 的构造方法:

// ObservableFlatMap.java
MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
                boolean delayErrors, int maxConcurrency, int bufferSize) {
    this.actual = actual; // 我们写的 observer
    this.mapper = mapper; 
    this.delayErrors = delayErrors;
    this.maxConcurrency = maxConcurrency; // Integer.MAX_VALUE
    this.bufferSize = bufferSize;
    if (maxConcurrency != Integer.MAX_VALUE) {
        sources = new ArrayDeque<ObservableSource<? extends U>>(maxConcurrency);
    }
    // 创建一个原子性的内部观察者对象数组
    this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
}

由于ObservableCreate没有覆写 subscribe(),所以实际上调用的是父类Observablesubscribe()且源码上面已经贴过,可以直接跳过进入ObservableCreatesubscribeActual()中:

// ObservableCreate.java
@Override
protected void subscribeActual(Observer<? super T> observer) {
    // 创建发射器,这里先忽略发射器内部实现,只需要知道发射器主要用来回调观察者的
    // onNext onComplete onError 方法
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    // 回调 MergeObserver 的 onSubscribe() 
    // 再由其回调我们写的 observer 的 onSubscribe()
    observer.onSubscribe(parent);

    try {
        // source : 我们创建的 ObservableOnSubscribe 对象
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

下面走到了我们写的subcribe()逻辑里,我们调用了发射器的 onNext(),看看发射器的源码:

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    // 检测是否切断
    // 如果切断了被观察者就接收不到后续的事件了
    if (!isDisposed()) {
        // observer : MergeObserver 对象
        // 回调观察者的 onNext()
        observer.onNext(t);
    }
}

还记得我们写的 observer 对象被包装成了MergeObserver,那么进入MergeObserveronNext():

// ObservableFlatMap.java
@Override
public void onNext(T t) {
    // safeguard against misbehaving sources
    if (done) {
        return;
    }
    ObservableSource<? extends U> p;
    try {
        // 把我们的 mapper 方法里面返回的子被观察者提取出来,
        // 由于我们当初用的是 .just() 创建子被观察者,
        // 所以子被观察者是 ObservableFromArray 对象,
        // 这里先忽略 just() 内部实现
        p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        s.dispose();
        onError(e);
        return;
    }

    if (maxConcurrency != Integer.MAX_VALUE) { // false
        synchronized (this) {
            if (wip == maxConcurrency) {
                sources.offer(p);
                return;
            }
            wip++;
        }
    }
    // 进入下面的函数
    subscribeInner(p);
}
    
    
void subscribeInner(ObservableSource<? extends U> p) {
    for (;;) {
        // p : ObservableFromArray 对象
        if (p instanceof Callable) { //false
            tryEmitScalar(((Callable<? extends U>)p));

            if (maxConcurrency != Integer.MAX_VALUE) {
                synchronized (this) {
                    p = sources.poll();
                    if (p == null) {
                        wip--;
                        break;
                    }
                }
            } else {
                break;
            }
        } else {
            // ObservableFromArray 没有实现 Callable 接口,所以走这里
            InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
            if (addInner(inner)) {
                p.subscribe(inner); // 飙车入口
            }
            break;
        }
    }
}
    
// 这里主要做内部观察者对象数组的增加
// 通过创建size为原数组长度+1的新数组并作为新的内部观察者对象数组来实现
boolean addInner(InnerObserver<T, U> inner) {
    for (;;) {
        // 获取之前 MergeObserver 创建的内部观察者对象数组
        InnerObserver<?, ?>[] a = observers.get();
        if (a == CANCELLED) {
            inner.dispose();
            return false;
        }
        int n = a.length; // 0
        InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
        System.arraycopy(a, 0, b, 0, n);
        b[n] = inner;
        if (observers.compareAndSet(a, b)) {
            return true;
        }
    }
}
        

看到这里我们应该到了 p.subscribe(inner) ,又调用Observablesubscribe()?到现在我们好像已经调用了好几次了,证明这两个东西都被包装好几层了,前面我提到过RxJava包装了许多ObservableObserver,配合观察者模式一层一层地地传递事件下去,这是 RxJava 的其中一个奥妙之处。

下面整个过程我们会一直在 ObservableFromArray InnerObserver ObservableFlatMap MergeObserver的方法调用中飙车,可能会感到不适。

我们直接跳过进入从ObservableFromArraysubscribeActual()开始看,这里会有很多的跳转不方便一步步展示,所有相关代码调用顺序和注释都在下面:

// ObservableFromArray.java
public final class ObservableFromArray<T> extends Observable<T> {
    final T[] array;
    public ObservableFromArray(T[] array) {
        this.array = array;
    }
    @Override
    public void subscribeActual(Observer<? super T> s) {
        // [1]
        FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
        // s : InnerObserver 对象
        // array : 我们创建子被观察者时调用 .just() 方法生成的String数组
        // [4]
        s.onSubscribe(d);
    
        // [9]
        if (d.fusionMode) { // true 看到这里结束了 onNext() 所有操作
            return;
        }

        d.run();
    }

    // [2]
    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
        
        final Observer<? super T> actual; // InnerObserver 对象
        
        final T[] array;
        
        int index;
        
        boolean fusionMode;
        
        volatile boolean disposed;
        // [3]
        FromArrayDisposable(Observer<? super T> actual, T[] array) {
        this.actual = actual;
        this.array = array;
        }
...
        // [7]
        @Override
        public int requestFusion(int mode) {
            if ((mode & SYNC) != 0) { //true
                fusionMode = true;
                return SYNC;
            }
                return NONE;
        }
    }
}

InnerObserver:

// ObservableFlatMap.java
static final class InnerObserver<T, U> extends AtomicReference<Disposable>
    implements Observer<U> {

    private static final long serialVersionUID = -4606175640614850599L;
    final long id;
    final MergeObserver<T, U> parent;

    volatile boolean done;
    volatile SimpleQueue<U> queue;

    int fusionMode;

    InnerObserver(MergeObserver<T, U> parent, long id) {
        this.id = id;
        this.parent = parent;
    }
    
    // [5]
    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.setOnce(this, s)) { // true
            if (s instanceof QueueDisposable) {  // true
                @SuppressWarnings("unchecked")
                // s : FromArrayDisposable 对象
                QueueDisposable<U> qd = (QueueDisposable<U>) s;
                // 获取合并的标记 这里返回同步标记 SYNC
                // [6]
                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                if (m == QueueDisposable.SYNC) { // true
                    fusionMode = m;
                    queue = qd;
                    done = true;
                    // parent: MergeObserver
                    // 把 MergeObserver 的所有事件都发送完毕
                    // [8]
                    parent.drain();
                    return;
                }
                if (m == QueueDisposable.ASYNC) {
                    fusionMode = m;
                    queue = qd;
                }
            }
        }
    }        
    ...
}

MergeObserver:

// [9]
void drain() {
    if (getAndIncrement() == 0) {
        drainLoop();
    }
}
// [10]
// 这里利用之前创建的具有原子性的内部观察者数组发射子被观察者的所有事件
void drainLoop() {
 ...
}

从上面的源码主要过程是被观察者的单次调用onNext()发射的事件变成一个子被观察者且将其事件都发射给观察者,然后执行下一个onNext()重新走一遍上述代码或者进入其他回调方法,所以整个过程都在同一个线程中且同步执行的,事件的顺序是有序的。

  • 到此我们见证了FlatMap发射有序事件的全过程

无序事件

修改下flatMap()代码,实现无序事件:

  • 在创建子被观察者的时候调用subscribeOn()指定发射事件在新的子线程中进行,或者使用delay()也可以

.flatMap(new Function<Integer, ObservableSource<String>>() {
    @Override
    public ObservableSource<String> apply(Integer integer) throws Exception {
        final Integer i = integer;
        Observable observable_create = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext(i + "-" + 1 + " Observable: " + Thread.currentThread().getName());
                e.onNext(i + "-" + 2 + " Observable: " + Thread.currentThread().getName());
                e.onNext(i + "-" + 3 + " Observable: " + Thread.currentThread().getName());
                e.onComplete();
            }
        });
        Observable observable_just = Observable.just(
                integer + "-" + 1 + " Observable: " + Thread.currentThread().getName(),
                integer + "-" + 2 + " Observable: " + Thread.currentThread().getName(),
                integer + "-" + 3 + " Observable: " + Thread.currentThread().getName());

        return observable_create
                .subscribeOn(Schedulers.newThread());
                // .delay((int(Math.random()*1000),TimeUnit.MILLISECONDS);
    }
}) 

运行结果:

03-16 10:56:09.255 12781-12799/com.example.myapplication E/RxJava: 1-1 Observable: RxCachedThreadScheduler-1 Observer :RxCachedThreadScheduler-1
03-16 10:56:09.255 12781-12799/com.example.myapplication E/RxJava: 1-2 Observable: RxCachedThreadScheduler-1 Observer :RxCachedThreadScheduler-1
03-16 10:56:09.255 12781-12799/com.example.myapplication E/RxJava: 1-3 Observable: RxCachedThreadScheduler-1 Observer :RxCachedThreadScheduler-1
03-16 10:56:09.275 12781-12800/com.example.myapplication E/RxJava: 2-1 Observable: RxCachedThreadScheduler-2 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 2-2 Observable: RxCachedThreadScheduler-2 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 3-1 Observable: RxCachedThreadScheduler-3 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 3-2 Observable: RxCachedThreadScheduler-3 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 3-3 Observable: RxCachedThreadScheduler-3 Observer :RxCachedThreadScheduler-2
03-16 10:56:09.305 12781-12800/com.example.myapplication E/RxJava: 2-3 Observable: RxCachedThreadScheduler-2 Observer :RxCachedThreadScheduler-2

无序事件的子被观察者不再由.just()方法创建,而是.create()代替,原因是由于ObservableFromArray实现逻辑不能在日志中直接明确的显示子被观察者发送事件是在子线程进行的。

observable_just 作为子被观察者的运行结果:

03-16 11:10:55.381 13418-13437/? E/RxJava: 1-1 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.381 13418-13437/? E/RxJava: 1-2 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.381 13418-13437/? E/RxJava: 1-3 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.391 13418-13437/? E/RxJava: 3-1 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.391 13418-13437/? E/RxJava: 3-2 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.391 13418-13437/? E/RxJava: 3-3 Observable: main Observer :RxCachedThreadScheduler-1
03-16 11:10:55.401 13418-13438/? E/RxJava: 2-1 Observable: main Observer :RxCachedThreadScheduler-2
03-16 11:10:55.401 13418-13438/? E/RxJava: 2-2 Observable: main Observer :RxCachedThreadScheduler-2
03-16 11:10:55.401 13418-13438/? E/RxJava: 2-3 Observable: main Observer :RxCachedThreadScheduler-2

由于调用just()的时候已经在当前线程(默认主线程)把事件都准备好了,再在子线程中发射出去,所以日志上打印的是主线程


总结:

  • FlatMap把每个发射的事件都包装成新的子被观察者,然后这些子被观察者再把子事件发送出去

  • 无序事件每个子被观察者发射的所有事件都运行在同一个线程内发射且顺序按照代码的调用顺序

  • 子被观察者都不指定子线程而是在当前线程的时候,flatMap作用跟concatMap相同

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RxJava 2.x 源码分析 之 FlatMap 的相关文章