Android RxJava第一弹之原理详解、使用详解、常用场景(基于Rxjava2.0)

2023-10-26

Android RxJava第一弹之原理详解、使用详解、常用场景(基于Rxjava2.0)
Android RxJava第二弹之RxJava封装库 RxJava+Animation RxJava+Glide
Android RxJava第三弹之RxJava2.0尝鲜

本人参考以下文章

给 Android 开发者的 RxJava 详解—扔物线

Rxjava 2.0 与 Rxjava 1.0有什么不同

注:原理讲解可能会用到rx1.0的概念,但是代码示例部分用rx2.0 来展示

引言

很多做android开发朋友对rxjava都有熟悉,github上也出现了很多的基于rxjava的开源库,比如 RxBus RxBinding RxPermission,如果我们了解了RxJava的原理,那么我们也可以很轻松的通过RxJava来封装我们自己的库。后面会有简单的例子来用RxJava来封装Animation。

好了,开始我们的正文

RxJava介绍和原理简析

RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

RxJava是一个实现反应性扩展框架的Java虚拟机:用于通过使用观察序列构成异步和基于事件的程序库。

扩展了观察者模式,以支持数据/事件序列,并增加了操作符,他可以将将序列清晰的组合在一起的。这些序列组合可以是抽象出来的某些数据/事件,如低级别的线程,同步,线程安全和并发数据结构。

概括的的文字刚开始一般是看不懂的,简单来说RxJava就是一个实现异步操作的库。

扩展的观察者模式

官方的概括中提到了扩展的观察者模式,那么我们先从此入手

观察者模式

Java_观察者模式(Observable和Observer)

在Java中通过Observable类和Observer接口实现了观察者模式。一个Observer对象监视着一个Observable对象的变化,当Observable对象发生变化时,Observer得到通知,就可以进行相应的工作。
这里Observable(被观察者)对象的变化是采用注册(Register)或者称为订阅(Subscribe)的方式告诉Observer(观察者)。

RxJava的观察者模式

RxJava 有四个基本概念:Observable (可观察者,即被观察者)、Observer (观察者)、 subscribe (订阅)、事件。ObservableObserver 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer

与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted()onError()

onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。
onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中, onCompleted()onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted()onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。

备:RxJava2.0还添加了一个新的回调方法:onSubscribe(),这是为了解决RxJava1.0backpressure问题,后面会讲到

RxJava观察者模式的图如下

这里写图片描述

RxJava的基本实现

因为RxJava2.0引入了很多新的接口,我们在讲原理的时候,直接拿着2.0的代码来做示例,但是有些得放用2.0不太好理解,我们就用1.0的代码来理解原理吧

创建Subscriber(2.0)/Observer(2.0)

        Subscriber<String> subscriber = new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                Logger.i("hello  onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Logger.i("hello  onNext-->" + s);
            }

            @Override
            public void onError(Throwable t) {
                Logger.i("hello  onError");
            }

            @Override
            public void onComplete() {
                Logger.i("hello  onComplete");
            }
        };
        Observer<String> observer = new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
                Logger.i("hello  onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Logger.i("hello  onNext-->" + value);
            }

            @Override
            public void onError(Throwable e) {
                Logger.i("hello  onError");
            }

            @Override
            public void onComplete() {
                Logger.i("hello  onComplete");
            }
        };

Subscriber 和 Observer的接口是分别独立的,Obsesrver用于订阅Observable,而Subscriber用于订阅Flowable

创建Flowable(2.0)/Observable(2.0)

Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 RxJava 使用 create()方法来创建一个Observable,并为它定义事件触发规则

        Flowable<String> stringFlowable = Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> e) throws Exception {
                Logger.i("---rxHelloFlowable---");
            }
        }, FlowableEmitter.BackpressureMode.BUFFER);
        Observable<String> stringObservable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("Hello");
                e.onNext("Inke");
                e.onComplete();
            }
        });

可以看到,这里传入了一个 ObservableOnSubscribe对象作为参数,它的作用相当于一个计划表,当 Observable被订阅的时候,ObservableOnSubscribesubscribe()方法会自动被调用,事件序列就会依照设定依次触发(对于上面的代码,就是观察者Subscriber 将会被调用两次 onNext()和一次 onCompleted())。这样,由被观察者调用了观察者的回调方法,就实现了由被观察者向观察者的事件传递,即观察者模式。

RxJava提供快捷创建事件队列的方法

  • just()将传入的参数依次发送出来
  • fromIterable() 将传入的Iterable 拆分成具体对象后,依次发送出来
  • fromArray() 还没研究明白

心细的朋友可以看到Flowablecreate()的时候多了一个参数 BackpressureMode,是用来处理backpressure的发射器
一共有以下几种模式

 enum BackpressureMode {
        /** 
         * OnNext events are written without any buffering or dropping. 
         * Downstream has to deal with any overflow.
         * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
         */
        NONE,
        /**
         * Signals a MissingBackpressureException in case the downstream can't keep up.
         */
        ERROR,
        /**
         * Buffers <em>all</em> onNext values until the downstream consumes it.
         */
        BUFFER,
        /**
         * Drops the most recent onNext value if the downstream can't keep up.
         */
        DROP,
        /**
         * Keeps only the latest onNext value, overwriting any previous value if the 
         * downstream can't keep up.
         */
        LATEST
    }

个人感觉BUFFER较为安全,api解释为缓冲器存有onNext值,直到下游消费它。

因为Observer不支持 backpressure,所以后面的代码我们默认使用RxJava2.0的FlowableSubscriber,但是为了便于理解,某些原理可能还会用RxJava1.0。

Subscribe (订阅)

创建了 Flowable和 Subscriber 之后,再用 subscribe()方法将它们联结起来,整条链子就可以工作了。代码形式很简单:

stringFlowable.subscribe(subscriber);

有人可能会注意到, subscribe()这个方法有点怪:它看起来是『observalbe订阅了 observer/ subscriber』而不是『observer /subscriber 订阅了 observalbe』,这看起来就像『杂志订阅了读者』一样颠倒了对象关系。这让人读起来有点别扭,不过如果把 API 设计成 observer.subscribe(observable) / subscriber.subscribe(observable) ,虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,比较起来明显是得不偿失的。

    @Override
    public final void subscribe(Subscriber<? super T> s) {
        ObjectHelper.requireNonNull(s, "s is null");
        s = RxJavaPlugins.onSubscribe(this, s);
        ObjectHelper.requireNonNull(s, "Plugin returned null Subscriber");
        subscribeActual(s);

    }
/**注意:这不是 subscribe()的源码,而是将源码中与性能、兼容性、扩性有关的代码剔除后的核心代码。
 *如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
 */
public Disposable onSubscribe(Subscriber subscriber) {
    subscriber.onSubscribe();
    flowableOnSubscribe.subscribe();
    return subscriber;
}

订阅过程做了三件事

  • 调用 Subscriber.onSubscribe()。是Rx2.0新添加的方法,第一个执行
  • 调用 FlowableOnSubscribe中的subscribe() 。在这里,事件发送的逻辑开始运行。从这也可以看出,在 RxJava 中,Flowable并不是在创建的时候就立即开始发送事件,而是在它被订阅的时候,即当 subscribe()方法执行的时候。

上面我们可以看到,通过subscriber来订阅返回的是void
在RxJava2.0 如果是直接订阅或传入消费者那么会产生一个新的类
那就是Disposable

/**
* Represents a disposable resource.
*/
代表一个一次性的资源。
代码如下

        Disposable subscribe = stringFlowable.subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {

            }
        });

订阅源码如下

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, 
            Action onComplete, Consumer<? super Subscription> onSubscribe) {
        LambdaSubscriber<T> ls = new LambdaSubscriber<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

不过最终走的还是上面的逻辑,不过多返回了一个Disposable,
用于dispose();

线程控制

Scheduler

以下API来自RxJava1.0, 与RxJava2.0用法无区别

在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
    -Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 new Thread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比new Thread()更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在computation()中,否则 I/O 操作的等待时间会浪费 CPU。
  • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用 subscribeOn()observeOn()两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe()所发生的线程,即 Observable.OnSubscribe被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

下面是一个获取本地资源并显示在控件上的例子

    private void rxSchedulerMap() {
        Flowable<Bitmap> bitmapFlowable = Flowable.just(R.drawable.effect_icon001)
                .subscribeOn(Schedulers.io())
                .map(new Function<Integer, Bitmap>() {
                    @Override
                    public Bitmap apply(Integer integer) throws Exception {
                        Logger.i("这是在io线程做的bitmap绘制圆形");
                        return BitmapUtils.createCircleImage(BitmapFactory.decodeResource(MainActivity.this.getResources(), integer));
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Bitmap>() {
                    @Override
                    public void accept(Bitmap bitmap) throws Exception {
                        Logger.i("这是在main线程做的UI操作");
                        imageView.setImageBitmap(bitmap);
                    }
                });
        bitmapFlowable.subscribe();
    }

想必大家已经看得很清楚了

获取drawable资源我用的io线程
通过 subscribeOn(Schedulers.io())控制
转变成bitmap并绘制成圆形也是在io线程,可以通过observeOn(Schedulers.io())也可以顺着之前的流继续执行
最后显示在UI上是通过observeOn(AndroidSchedulers.mainThread())

subscribeOn(Scheduler.io())observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常见,它适用于多数的 『后台线程取数据,主线程显示』的程序策略。

转换和Scheduler的原理

大家参考扔物线大神的文章吧,我没必要再赘述一遍
变换 & 变换的原理:lift()& compose: 对 Observable 整体的变换

像一种代理机制,通过事件拦截和处理实现事件序列的变换
在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber

这里写图片描述

Scheduler 的 API & Scheduler 的原理 & 延伸:doOnSubscribe()

这里写图片描述

这里写图片描述

这里写图片描述

由图中可以看出,①和②两处受第一个 subscribeOn() 影响,运行在红色线程;③和④处受第一个observeOn() 的影响,运行在绿色线程;⑤处受第二个 onserveOn()影响,运行在紫色线程;而第二个 subscribeOn(),由于在通知过程中线程就被第一个 subscribeOn()截断,因此对整个流程并没有任何影响。这里也就回答了前面的问题:当使用了多个subscribeOn()的时候,只有第一个 subscribeOn() 起作用。

常用操作符

在上一个章节
我很还通过just直接快捷的生成了Flowable
我们还通过将drwable通过map操作符转换成了 bitmap进以下一流的操作

这些操作符使整个逻辑流程很美 很漂亮 很sexy~~~
比那些蜜汁缩进美了不是一点半点, 我们下面来个比较复杂的例子,大家对比一下(用的RxJava1.0,意思一下)

    //-----------------------------------蜜汁缩进--嵌套循环--回调地狱 -----------------------------------------------------------

    /**
     * 实现的功能:获取assets文件夹下所有文件夹中的jpg图片,并且将所有的图片画到一个ImageView上,没有实际的用处,只是为了说明问题--- 谜之缩进--嵌套循环--回调地狱
     * 不使用RxJava的写法-- 谜之缩进--回调地狱
     */
    //思路:需要以下6个步骤完成
    //1:遍历获取assets文件夹下所有的文件夹的名称
    //2:遍历获取获取assets文件夹下某个文件夹中所有图片路径的集合
    //3:过滤掉非JPG格式的图片
    //4:获取某个路径下图片的bitmap
    //5:将Bitmap绘制到画布上
    //6:循环结束后更新UI,给ImageView设置最后绘制完成后的Bitmap,隐藏ProgressBar
    private void miZhiSuoJinAndNestedLoopAndCallbackHell() {
        new Thread(new Runnable() {
            @Override
            public void run() {

                runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        mProgressBar.setVisibility(View.VISIBLE);
                    }
                });
                //1:遍历获取assets文件夹下所有的文件夹的名称
                ArrayList<String> assetsFolderNameList = ImageNameFactory.getAssetImageFolderName();

                for (String folderName : assetsFolderNameList) {

                    //2:遍历获取获取assets文件夹下某个文件夹中所有图片路径的集合
                    ArrayList<String> imagePathList = ImageUtils.getAssetsImageNamePathList(getApplicationContext(), folderName);

                    for (final String imagePathName : imagePathList) {
                        //3:过滤掉非JPG格式的图片
                        if (imagePathName.endsWith(JPG)) {

                            //4:获取某个路径下图片的bitmap
                            final Bitmap bitmap = ImageUtils.getImageBitmapFromAssetsFolderThroughImagePathName(getApplicationContext(), imagePathName, Constant.IMAGE_WITH, Constant.IMAGE_HEIGHT);
                            runOnUiThread(new Runnable() {
                                @Override
                                public void run() {
                                    //Logger.d(mCounter + ":" + imagePathName);

                                    //5:将Bitmap绘制到画布上
                                    createSingleImageFromMultipleImages(bitmap, mCounter);
                                    mCounter++;

                                }
                            });
                        }
                    }
                }


                //6:循环结束后更新UI,给ImageView设置最后绘制完成后的Bitmap,隐藏ProgressBar
                runOnUiThread(new Runnable() {
                    @Override
                    public void run() {
                        mImageView.setImageBitmap(mManyBitmapSuperposition);
                        mProgressBar.setVisibility(View.GONE);
                    }
                });

            }
        }).start();
    }
 //-----------------------------------RxJava的实现--链式调用--十分简洁 -----------------------------------------------------------


    private void rxJavaSolveMiZhiSuoJinAndNestedLoopAndCallbackHell() {
        //1:被观察者:

        //2:数据转换

        //3:设置事件的产生发生在IO线程

        //4:设置事件的消费发生在主线程

        //5:观察者

        //6:订阅:被观察者被观察者订阅
        mGoToRecycleImageView = false;
        Observable.from(ImageNameFactory.getAssetImageFolderName())
                //assets下一个文件夹的名称,assets下一个文件夹中一张图片的路径
                .flatMap(new Func1<String, Observable<String>>() {
                    @Override
                    public Observable<String> call(String folderName) {
                        return Observable.from(ImageUtils.getAssetsImageNamePathList(getApplicationContext(), folderName));
                    }
                })
                //过滤,筛选出jpg图片
                .filter(new Func1<String, Boolean>() {
                    @Override
                    public Boolean call(String imagePathNameAll) {
                        return imagePathNameAll.endsWith(JPG);
                    }
                })
                //将图片路径转换为对应图片的Bitmap
                .map(new Func1<String, Bitmap>() {
                    @Override
                    public Bitmap call(String imagePathName) {
                        return ImageUtils.getImageBitmapFromAssetsFolderThroughImagePathName(getApplicationContext(), imagePathName, Constant.IMAGE_WITH, Constant.IMAGE_HEIGHT);
                    }
                })
                .map(new Func1<Bitmap, Void>() {
                    @Override
                    public Void call(Bitmap bitmap) {
                        createSingleImageFromMultipleImages(bitmap, mCounter);
                        mCounter++;
                        return null;
                    }
                })
                .subscribeOn(Schedulers.io())//设置事件的产生发生在IO线程
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        mProgressBar.setVisibility(View.VISIBLE);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())//设置事件的消费发生在主线程
                .subscribe(new Subscriber<Void>() {
                    @Override
                    public void onCompleted() {
                        mImageView.setImageBitmap(mManyBitmapSuperposition);
                        mProgressBar.setVisibility(View.GONE);
                    }

                    @Override
                    public void onError(Throwable e) {
                        //Toast.makeText(MainActivity.this, ""+e.getMessage(), Toast.LENGTH_SHORT).show();
                    }

                    @Override
                    public void onNext(Void aVoid) {

                    }
                });
    }

操作符部分一览 (基于Rxjava1.0)

Combining Obsercables(Observable的组合操作符)

操作符 功能
combineLatest 两个Observable产生的结果合并成新Observable,任意Observable产生的结果和另一个Observable最后产生的结果按规则合并
join like combineLatest 能控制每个Observable产生结果的生命周期,在每个结果的生命周期内,与另一个Observable产生的结果按规则合并
groupJoin like join 暂不知其他区别
==merge== ==按照两个Observable的提交结果的时间顺序,对Observable合并。时间按某Observable完成的最小时间==
mergeDelayError 合并的某一个Observable中出现错误,把错误放到所有结果都合并完成之后,订阅者回调执行onError。而merge会马上停止合并
startWith 源Observable提交结果之前,插入指定数据
switchOnNext 把一组Observable转换成Observable。这组Observable中取最后一个Observable提交的结果给订阅者。
==zip== ==把两个Observable提交的结果按照顺序进行合并。==

Error Handing Operators(Observable的错误处理操作符)

操作符 功能
onErrorReturn 在Observable 发生错误或异常(即将回调onError)时,拦截错误并执行指定的逻辑,返回一个跟源Observable相同类型的结果,最后回调订阅者的onComplete方法
onErrorResumeNext like onErrorReturn 不同的是返回一个Observable 例:return Observable.just(5,2,0);
onExceptionResumeNext like onErrorResumeNext 不同的是只有在exception的时候触发
==retry== ==当Observable发生错误或异常,重新执行Observable的逻辑,如果经过n次重新执行后仍然出现错误或异常,则最后回调onError方法,若无错误或异常则按正常流程执行 例:observable.retry(2).subscribe();==
retryWhen like retry 不同在于retryWhen是在源Observable出现错误或异常时,通过回调第二个Observable来判断是否重新尝试执行源Observable的逻辑;若第二个Observable没错误或异常,则会重新尝试执行源Observable的逻辑,否则就会直接回调执行订阅者的onError();

其他常用

操作符 功能
map 对源Observable数据的加工处理,返回其他类型 例:return 520+”string data”;
flatMap like map 不同的是返回一个Observable 扩展:使用了merge操作符 例:return Observable.from(…);
concatMap like concatMap 不同的是concatMap操作遵循元素的顺序 扩展:使用了concat操作符
compose 唯一一个能从流中获取原生Observable的方法,因此,影响整个流的操作符(subscribeOn()和observeOn())需要用compose()。当你创建一个Observable流并且内联了一堆操作符以后,compose()会立即执行,所以compose转换的是整个流
compose与flagMap的区别 flatMap()一定是低效率的,因为他每次调用onNext()之后都需要创建一个新的Observable,compose()是操作在整个流上的
concat 按顺序依次连接两个或更多的 Observable
first 从序列中取第一个先完成的项
takeFirst like first 区别是first()如果没有释放有效的数据源,那么会throw NoSuchElementException;而takeFirst会complete没有 exception

常用场景

我们前面已经大致理解RxJava和他的基本使用了,虽然是冰山一角,但够我们入门了,现在我们来通过实际项目中常用的场景来进阶学习。

因为RxJava2.0 是16年八九月份刚更新的,没有时间来将1.0的代码替换过来,但是主要的使用方法还是没变的,所以下面的代码大部分是基于RxJava1.0,看客请见谅

RxJava实现三级缓存(RxJava 1.0)

参考文章和开源库
Loading data from multiple sources with RxJava
RxImageloader
使用Rxjava实现三级缓存(

创建三个缓存的Observable对象

Observable<Data> memory = ...;  
Observable<Data> disk = ...;  
Observable<Data> network = ...;

获取第一个源的数据

Observable<Data> source = Observable  
  .concat(memory, disk, network)
  .first(new Func1<Data, Boolean>() {//如果对象为空、说明没有数据从下一层找
                    public Boolean call(Data data) {
                        return data!=null;
                    }
                });

concat()订阅了所有需要的Observable。
通过first()会因为取到数据后会停止序列
也就是说,如果memory返回了一个结果,那么我们不会打扰disk 和 network

我们从网络获取到数据,记得存起来。

Observable<Data> networkWithSave = network.doOnNext(data -> {  
  saveToDisk(data);
  cacheInMemory(data);
});

Observable<Data> diskWithCache = disk.doOnNext(data -> {  
  cacheInMemory(data);
});

RxJava实现心跳(RxJava 2.0)

    private Disposable intervalInterval;//心跳
    private void rxInterval() {
        intervalInterval = Flowable.interval(1, TimeUnit.SECONDS)
                .doOnNext(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Logger.i("rxInterval---" + aLong);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Logger.i("rxInterval---txt.setText---" + aLong);
                        txt.setText("----心跳---" + aLong);
                    }
                })
                .subscribeOn(Schedulers.io())
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {

                    }
                });
    }
    /**
     * 停止心跳
     * @param v
     */
    @Override
    public void onClick(View v) {
        switch (v.getId()) {
            case R.id.btn:
                if (intervalInterval != null) {
                    intervalInterval.dispose();
                }
                break;
        }
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        if (intervalInterval != null) {
            intervalInterval.dispose();
        }
    }

遍历集合

        Flowable.just(new ArrayList<StringEntity>())
                .doOnNext(new Consumer<ArrayList<StringEntity>>() {
                    @Override
                    public void accept(ArrayList<StringEntity> stringEntities) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            stringEntities.add(new StringEntity("rxFromFilter--" + i, i));
                        }
                    }
                })
                .flatMap(new Function<ArrayList<StringEntity>, Publisher<?>>() {
                    @Override
                    public Publisher<?> apply(ArrayList<StringEntity> stringEntities) throws Exception {
                        return handleList(stringEntities);
                    }
                })
                .subscribe(new Subscriber<Object>() {
                    @Override
                    public void onSubscribe(Subscription s) {

                    }

                    @Override
                    public void onNext(Object o) {

                    }

                    @Override
                    public void onError(Throwable t) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

    /**
     * 将list转换成Flowable
     * @param list
     * @return
     */
    public Flowable<StringEntity> handleList(ArrayList<StringEntity> list) {
        return Flowable.fromIterable(list)
                .filter(new Predicate<StringEntity>() {
                    @Override
                    public boolean test(StringEntity entity) throws Exception {
                        return entity.position != 0;
                    }
                })
                .doOnNext(new Consumer<StringEntity>() {
                    @Override
                    public void accept(StringEntity entity) throws Exception {
                        Logger.i(entity.getItem());
                    }
                });
    }

并发任务(RxJava 1.0)

/**
 * 两个耗时任务一起执行
 */
    private static Observable<Intent> createLivePlayerRoomPageOrDonePageObservable(final Context context, final int roomId, final String url) {
        //获取网络资源的Observable
        Observable<RspLiveInfo> rspLiveInfoObservable = LiveNetManager.liveInfo(roomId, null, false);
        //获取图片高斯模糊的Observable
        Observable<GlideBitmapDrawable> glideBitmapDrawableObservable = RxGlide.afterGlideRequestListener(Global.getContext(), ImageWorker.buildBlurBitmapRequest(context, url));
        return Observable.zip(rspLiveInfoObservable, glideBitmapDrawableObservable,
                new Func2<RspLiveInfo, GlideBitmapDrawable, Intent>() {
                    @Override
                    public Intent call(RspLiveInfo rspLiveInfo, GlideBitmapDrawable glideBitmapDrawable) {
                })
                .observeOn(AndroidSchedulers.mainThread());
    }
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Android RxJava第一弹之原理详解、使用详解、常用场景(基于Rxjava2.0) 的相关文章

随机推荐

  • 期货开户供求平衡周而复始

    1 供求关系和价值规律的演变方程式 人们常说 价格围绕价值波动 傅海棠认为更准确的说法应是 价格围绕成本波动 大部分时间 价格在成本之上 小部分时间 价格在成本之下 价格低了 生产积极性受到抑制 供应减少 而低价还刺激需求 一个行业如果商品
  • 如何将你接入微信自动回答别人的问题

    要将我接入微信自动回答别人的问题 您需要使用微信的开放平台 API 使用开放平台 API 您可以创建一个小程序 该小程序可以接收用户发送的消息 并使用我这个语言模型来生成回复消息 具体来说 您需要以下步骤 在微信公众平台上注册一个小程序 并
  • Python每日一记42>>>机器学习中特征重要性feature_importances_

    在进行机器学习算法中 我们常用的算法就像下面的代码形式类型 经历导入数据 预处理 建模 得分 预测 但是总觉得少了点什么 虽然我们建模的目的是进行预测 但是我们想要知道的另一个信息是变量的重要性 在线性模型中 我们有截距和斜率参数 但是其他
  • LDO和DCDC电路的概述和区别

    一 什么是DCDC DCDC的意思是直流变 到 直流 不同直流电源值的转换 只要符合这个定义都可以叫DCDC转换器 常见的DCDC电路有buck boost buck boost分别是降压 升压 降压升压电路 二 什么是LDO LDO 是一
  • Linux驱动——设备树

    在对总线设备驱动进行详细说明时可以看出 虽然总线设备驱动可以实现驱动和设备分离 但是总线设备驱动引发另外的一个问题就是在相同的芯片不同的开发板上 当外设资源不同时需要在不同的设备文件中去定义引脚 这样就导致开发板中保留大量设备文件 为了解决
  • 多元任务,高额奖金!首届“开放原子开源大赛”等你参与!

    人类有各种交流方式 包括语言 文字 音乐 影像等 有的贴近生活 有的充满艺术感 然而 在人工智能时代 代码作为一种特殊的交流形式愈发重要 它使得人与人 人与机器之间能够高效便捷地沟通 从而为科技发展注入活力 开源 则是让这种交流变得更加丰富
  • Android ADB命令大全(通过ADB命令查看wifi密码、MAC地址、设备信息、操作文件、查看文件、日志信息、卸载、启动和安装APK等)

    ADB很强大 记住一些ADB命令有助于提高工作效率 获取序列号 adb get serialno 查看连接计算机的设备 adb devices 重启机器 adb reboot 重启到bootloader 即刷机模式 adb reboot b
  • URL、URI和URN之间的区别

  • 程序员应该掌握的 10 个搜索技巧

    在今天 用户可以通过搜索引擎轻松找出自己想要的信息 但还是难以避免结果不尽如人意的情况 实际上 用户仅需掌握几个常用技巧即可轻松化解这种尴尬 下面介绍 10 个在进行 Google 搜索时可以使用的便捷技巧 其他搜索引擎也支持这 10 种技
  • C++外观模式

    外观模式 1 外观模式简介及应用场景 外观者模式其实就是相当于对一组子系统功能的组合 对外提供统一的简单接口的模式 当我们在实际开发中 一般情况下是一个单独的子系统对应的是一个独立的功能模块 但是随着业务功能的不断增加 对应子系统的迭代必然
  • CentOS8 服务篇4:FTP文件传输服务搭建与配置

    FTP 文件传输服务三种配置模式 匿名模式 本地用户模式 虚拟用户模式 安装ftp服务 安装完后再根据不同模式进行配置 root localhost yum repos d yum install y vsftpd ftp vsftpd是搭
  • Qt中qss样式表

    qss样式表是用于设置QT程序UI界面中控件的背景图片 大小 字体颜色 字体类型 按钮状态变化等属性 美化UI界面 实现界面和程序的分离 可以快速切换皮肤 1 基本语法 selector attribute value 说明 selecto
  • Java生成exe执行文件

    一 准备工作 下载可将jar包转换的工具EXE4J工具 下载地址为 https www ej technologies com download exe4j files 下载完成 直接点击下一步安装 直到安装完成 导出项目jar包 按以下步
  • javaFile类知识点总结

    1 File类 Java io File类是文件和目录路径名的抽象表示 主要用于文件和目录的创建 查找 删除等操作 File中的静态成员变量 pathSeparator与系统有关的路径分隔符 File pathSeparator 代表路径分
  • android系统删除apk的广播,研究androidapk安装卸载等产生的系统广播

    想更加清楚的了解 android 系统在安装 卸载时产生的系统广播 于是写了一个 demo 来做监听 BroadReceiver 配置如下 html 这里有一点要注意 需配置 否则收不到广播 1 当你第一次安装某个应用的时候 java 10
  • 干货

    SpringCloud的从整体架构上看 相对来说是完整的 庞大的 它不仅仅是一个基础性架构工具 它为微服务架构提供了一个 全家桶 的套餐 每一个模块关注各自的职能 并且能够很好地配合与协作 能够帮助入门者快速搭建起一套微服务架构的服务 内容
  • MyBatis之使用JSONObject代替JavaBean优雅返回多表查询结果

    项目中需要返回多个表的查询结果 比如user表中的用户信息和user个人的所在班的班级信息 目前我们有user实体类和class实体类 一般情况下如果是单表查询 比如查询user信息 那么查询的返回值就是一个user对象或一个user对象列
  • Qt_Qt报错multiple target patterns

    去看看pro文件中的路径是否有问题
  • ARM7的三级流水线过程

    看到汇编中很多关于程序返回与中断返回时处理地址都很特别 仔细想想原来是流水线作用的效果 所以 决定总结学习下ARM流水线 ARM7处理器采用3级流水线来增加处理器指令流的速度 能提供0 9MIPS MHz的指令处理速度 PS MIPS Mi
  • Android RxJava第一弹之原理详解、使用详解、常用场景(基于Rxjava2.0)

    Android RxJava第一弹之原理详解 使用详解 常用场景 基于Rxjava2 0 Android RxJava第二弹之RxJava封装库 RxJava Animation RxJava Glide Android RxJava第三弹