Android RxJava:组合 / 合并操作符 详细教程

2023-11-02

前言

  • Rxjava,由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。

Github截图

如果还不了解 RxJava,请看文章:Android:这是一篇 清晰 & 易懂的Rxjava 入门教程

  • RxJava如此受欢迎的原因,在于其提供了丰富 & 功能强大的操作符,几乎能完成所有的功能需求
  • 今天,我将为大家详细介绍RxJava操作符中最常用的 组合 / 合并操作符,并附带 Retrofit 结合 RxJava的实例Demo教学,希望你们会喜欢。
  1. 本系列文章主要基于 Rxjava 2.0
  2. 接下来的时间,我将持续推出 AndroidRxjava 2.0 的一系列文章,包括原理、操作符、应用场景、背压等等 ,有兴趣可以继续关注Carson_Ho的安卓开发笔记!!

示意图


目录

示意图


1. 作用

组合 多个被观察者(Observable) & 合并需要发送的事件


2. 类型

  • RxJava 2 中,常见的组合 / 合并操作符 主要有:

    示意图

     

  • 下面,我将对每个操作符进行详细讲解


3. 应用场景 & 对应操作符 介绍

注:在使用RxJava 2操作符前,记得在项目的Gradle中添加依赖:

dependencies {
      compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
      compile 'io.reactivex.rxjava2:rxjava:2.0.7'
      // 注:RxJava2 与 RxJava1 不能共存,即依赖不能同时存在
}

3.1 组合多个被观察者

该类型的操作符的作用 = 组合多个被观察者

concat() / concatArray()

  • 作用
    组合多个被观察者一起发送数据,合并后 按发送顺序串行执行

二者区别:组合被观察者的数量,即concat()组合被观察者数量≤4个,而concatArray()则可>4个

  • 具体使用
// concat():组合多个被观察者(≤4个)一起发送数据
        // 注:串行执行
        Observable.concat(Observable.just(1, 2, 3),
                           Observable.just(4, 5, 6),
                           Observable.just(7, 8, 9),
                           Observable.just(10, 11, 12))
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

// concatArray():组合多个被观察者一起发送数据(可>4个)
        // 注:串行执行
        Observable.concatArray(Observable.just(1, 2, 3),
                           Observable.just(4, 5, 6),
                           Observable.just(7, 8, 9),
                           Observable.just(10, 11, 12),
                           Observable.just(13, 14, 15))
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

  • 测试结果

concat()情况

concatArray()情况

merge() / mergeArray()

  • 作用
    组合多个被观察者一起发送数据,合并后 按时间线并行执行
  1. 二者区别:组合被观察者的数量,即merge()组合被观察者数量≤4个,而mergeArray()则可>4个
  2. 区别上述concat()操作符:同样是组合多个被观察者一起发送数据,但concat()操作符合并后是按发送顺序串行执行
  • 具体使用
// merge():组合多个被观察者(<4个)一起发送数据
        // 注:合并后按照时间线并行执行
        Observable.merge(
                Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)) // 从2开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                  .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

// mergeArray() = 组合4个以上的被观察者一起发送数据,此处不作过多演示,类似concatArray()

  • 测试结果

两个被观察者发送事件并行执行,输出结果 = 0,2 -> 1,3 -> 2,4

33333.gif

concatDelayError() / mergeDelayError()

  • 作用

示意图

  • 具体使用

a. 无使用concatDelayError()的情况

Observable.concat(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onError(new NullPointerException()); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

测试结果:第1个被观察者发送Error事件后,第2个被观察者则不会继续发送事件

示意图


<-- 使用了concatDelayError()的情况 -->
Observable.concatArrayDelayError(
                Observable.create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                        emitter.onNext(1);
                        emitter.onNext(2);
                        emitter.onNext(3);
                        emitter.onError(new NullPointerException()); // 发送Error事件,因为使用了concatDelayError,所以第2个Observable将会发送事件,等发送完毕后,再发送错误事件
                        emitter.onComplete();
                    }
                }),
                Observable.just(4, 5, 6))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }
                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

测试结果:第1个被观察者的Error事件将在第2个被观察者发送完事件后再继续发送

 

示意图

mergeDelayError()操作符同理,此处不作过多演示


3.2 合并多个事件

该类型的操作符主要是对多个被观察者中的事件进行合并处理。

Zip()

  • 作用
    合并 多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送

  • 原理
    具体请看下图

示意图

  • 特别注意:
  1. 事件组合方式 = 严格按照原先事件序列 进行对位合并
  2. 最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量

即如下图

示意图

  • 具体使用
<-- 创建第1个被观察者 -->
        Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "被观察者1发送了事件1");
                emitter.onNext(1);
                // 为了方便展示效果,所以在发送事件后加入2s的延迟
                Thread.sleep(1000);

                Log.d(TAG, "被观察者1发送了事件2");
                emitter.onNext(2);
                Thread.sleep(1000);

                Log.d(TAG, "被观察者1发送了事件3");
                emitter.onNext(3);
                Thread.sleep(1000);

                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.io()); // 设置被观察者1在工作线程1中工作

<-- 创建第2个被观察者 -->
        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                Log.d(TAG, "被观察者2发送了事件A");
                emitter.onNext("A");
                Thread.sleep(1000);

                Log.d(TAG, "被观察者2发送了事件B");
                emitter.onNext("B");
                Thread.sleep(1000);

                Log.d(TAG, "被观察者2发送了事件C");
                emitter.onNext("C");
                Thread.sleep(1000);

                Log.d(TAG, "被观察者2发送了事件D");
                emitter.onNext("D");
                Thread.sleep(1000);

                emitter.onComplete();
            }
        }).subscribeOn(Schedulers.newThread());// 设置被观察者2在工作线程2中工作
        // 假设不作线程控制,则该两个被观察者会在同一个线程中工作,即发送事件存在先后顺序,而不是同时发送

<-- 使用zip变换操作符进行事件合并 -->
// 注:创建BiFunction对象传入的第3个参数 = 合并后数据的数据类型
        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String string) throws Exception {
                return  integer + string;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "最终接收到的事件 =  " + value);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
  • 测试结果

示意图

  • 特别注意:
    1. 尽管被观察者2的事件D没有事件与其合并,但还是会继续发送
    2. 若在被观察者1 & 被观察者2的事件序列最后发送onComplete()事件,则被观察者2的事件D也不会发送,测试结果如下

示意图

  • 因为Zip()操作符较为复杂 & 难理解,此处将用1张图总结

示意图

关于Zip()结合RxJavaRxtrofit的实例讲解将在第4节中详细讲解


combineLatest()

  • 作用
    当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据

Zip()的区别:Zip() = 按个数合并,即1对1合并;CombineLatest() = 按时间合并,即在同一个时间点上合并

  • 具体使用
Observable.combineLatest(
                    Observable.just(1L, 2L, 3L), // 第1个发送数据事件的Observable
                    Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2个发送数据事件的Observable:从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
                    new BiFunction<Long, Long, Long>() {
                @Override
                public Long apply(Long o1, Long o2) throws Exception {
                    // o1 = 第1个Observable发送的最新(最后)1个数据
                    // o2 = 第2个Observable发送的每1个数据
                    Log.e(TAG, "合并的数据是: "+ o1 + " "+ o2);
                    return o1 + o2;
                    // 合并的逻辑 = 相加
                    // 即第1个Observable发送的最后1个数据 与 第2个Observable发送的每1个数据进行相加
                }
            }).subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long s) throws Exception {
                    Log.e(TAG, "合并的结果是: "+s);
                }
            });
  • 测试结果

示意图

combineLatestDelayError()

作用类似于concatDelayError() / mergeDelayError() ,即错误处理,此处不作过多描述

reduce()

  • 作用
    把被观察者需要发送的事件聚合成1个事件 & 发送

聚合的逻辑根据需求撰写,但本质都是前2个数据聚合,然后与后1个数据继续进行聚合,依次类推

  • 具体使用
Observable.just(1,2,3,4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    // 在该复写方法中复写聚合的逻辑
                    @Override
                    public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
                        Log.e(TAG, "本次计算的数据是: "+s1 +" 乘 "+ s2);
                        return s1 * s2;
                        // 本次聚合的逻辑是:全部数据相乘起来
                        // 原理:第1次取前2个数据相乘,之后每次获取到的数据 = 返回的数据x原始下1个数据每
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer s) throws Exception {
                Log.e(TAG, "最终计算的结果是: "+s);

            }
        });
  • 测试结果

示意图

collect()

  • 作用
    将被观察者Observable发送的数据事件收集到一个数据结构里

  • 具体使用

Observable.just(1, 2, 3 ,4, 5, 6)
                .collect(
                        // 1. 创建数据结构(容器),用于收集被观察者发送的数据
                        new Callable<ArrayList<Integer>>() {
                            @Override
                            public ArrayList<Integer> call() throws Exception {
                                return new ArrayList<>();
                            }
                            // 2. 对发送的数据进行收集
                        }, new BiConsumer<ArrayList<Integer>, Integer>() {
                            @Override
                            public void accept(ArrayList<Integer> list, Integer integer)
                                    throws Exception {
                                // 参数说明:list = 容器,integer = 后者数据
                                list.add(integer);
                                // 对发送的数据进行收集
                            }
                        }).subscribe(new Consumer<ArrayList<Integer>>() {
            @Override
            public void accept(@NonNull ArrayList<Integer> s) throws Exception {
                Log.e(TAG, "本次发送的数据是: "+s);

            }
        });
  • 测试结果

示意图

3.3 发送事件前追加发送事件

startWith() / startWithArray()

  • 作用
    在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者

  • 具体使用

<-- 在一个被观察者发送事件前,追加发送一些数据 -->
        // 注:追加数据顺序 = 后调用先追加
        Observable.just(4, 5, 6)
                  .startWith(0)  // 追加单个数据 = startWith()
                  .startWithArray(1, 2, 3) // 追加多个数据 = startWithArray()
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });


<-- 在一个被观察者发送事件前,追加发送被观察者 & 发送数据 -->
        // 注:追加数据顺序 = 后调用先追加
        Observable.just(4, 5, 6)
                .startWith(Observable.just(1, 2, 3))
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

  • 测试结果

示意图

示意图

3.4 统计发送事件数量

count()

  • 作用
    统计被观察者发送事件的数量

  • 具体使用

// 注:返回结果 = Long类型
        Observable.just(1, 2, 3, 4)
                  .count()
                  .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "发送的事件数量 =  "+aLong);

                    }
                });
  • 测试结果

示意图

至此,RxJava 2中的组合 / 合并操作符讲解完毕。


4. 实际开发需求案例

下面,我将讲解组合 / 合并操作符的常见实际需求:

  1. 从缓存(磁盘、内存)中获取缓存数据
  2. 合并数据源
  3. 联合判断
  • 下面,我将对每个应用场景进行实例Demo演示讲解。

4.1 获取缓存数据

4.2 合并数据源 & 同时展示

4.3 联合判断

  • 即,同时对多个事件进行联合判断

如,填写表单时,需要表单里所有信息(姓名、年龄、职业等)都被填写后,才允许点击 "提交" 按钮


5. Demo地址

上述所有的Demo源代码都存放在:Carson_Ho的Github地址:RxJava2_组合 / 合并操作符


6. 总结

  • 下面,我将用一张图总结 RxJava2 中常用的组合 / 合并操作符

示意图

示意图

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Android RxJava:组合 / 合并操作符 详细教程 的相关文章

随机推荐

  • 前端页面间数据传递常用的几种方式

    1 常用方式 url页面路径携带参数传递 localStorage方式传递 sessionStorage方式传递 cookie的方式传递 2 方式对比 url字节限制可以参考这一篇文章 HTTP中的URL长度限制 其中cookie的setC
  • 开关电源的时钟倍频 辐射发射超标RE+ 噪声源+干扰原因

    1 收藏 史上最全开关电源传导与辐射超标整改方案 医疗设备低频30 50Mhz超标 2 https bbs elecfans com m jishu 941580 1 1 html 3 辐射噪声的产生机理 知乎 1 电流源 噪声源 2 天线
  • 【华为OD机试】叠积木(C++ Python Java)2023 B卷

    时间限制 C C 1秒 其他语言 2秒 空间限制 C C 262144K 其他语言524288K 64bit IO Format lld 语言限定 C clang11 C clang 11 Pascal fpc 3 0 2 Java jav
  • 傅里叶变换快速入门

    网上关于傅里叶变换的解释特别多 但大部分都比较偏理论 导致我看来N多教程也还是懵懵懂懂 在某本书 信号完整性分析 中看到一句震耳发聩的话 每个工程师都应该亲自动手计算一遍傅里叶变换 我知道很多工具可以直接给出傅里叶变换结果 但不清不楚一直是
  • 修改UGF官方的starForce为自己所用

    第一步 修改Launcher的名字 比如我这里是修改成SpaceShoot 第二步修改命名空间名字 重新命名为SpaceShoot 第三步 重新设置Launcher场景中丢失的脚本 Builtin下JsonLite Localization
  • 设计模式-工厂方法模式

    文章目录 前言 工厂方法模式概述 使用场景 工厂方法模式优缺点 Java代码示例 前言 当我们面临需要创建不同类型对象的需求时 通常会使用工厂方法模式 工厂方法模式是一种创建型设计模式 它提供了一种将对象的创建与使用分离的方法 允许我们在不
  • VMware Workstation安装

    VMware Workstation安装 1 安装步骤 双击运行安装包程序 接受许可证协议 关键不接受不让安装啊 选择安装位置 建议非中文无空格 增强型键盘驱动程序可选 按照自身使用习惯勾选产品更新和客户体验提升计划 快捷方式 开始安装 稍
  • MD5加密

    1 md5是什么 md信息摘要算法 一种被广泛使用的密码散列函数 2 md5的特征 一 长度固定 任意长度的数据都会输出长度相等的md5值 二 不可逆 三 对原密码进行改动改变成一个字节输出数据 四 很少碰到两个不同的数据产生相同的md5值
  • 算法该不该刷?如何高效刷算法?

    一 算法该不该刷 最近有小伙伴向我咨询一个问题 就是算法该不该刷 该如何刷算法呢 这个问题可谓太大众化了 只要你去某乎 某度搜索一下相关的解答 会有无数种回答 可见这个问题困扰了多少学习计算机的同学们 但不管回答有多少种 总结一句话就是 算
  • 科大奥锐密立根油滴实验数据_密立根油滴实验数据表格

    静态法 平衡法 第1粒油滴数据 序数 U V t g s v g m s 1 q i C n i 个 e C 10 19 u e e 0 1 235 9 98 1 50E 04 1 12E 18 7 1 61 0 62 2 235 9 88
  • chatglm-6b模型在windows的详细安装教程

    1 先是看了github的文章 如果打不开这篇文章 可能需要科学上网 即访问外网的VPN https github com THUDM ChatGLM 6B 2 准备 台式机 GPU是8G 关于是否可以在笔记本运行 我后面测试下 等我下一篇
  • 什么是频谱仪的RBW带宽和VBW带宽

    1 RBW Resolution Bandwidth 代表两个不同频率的信号能够被清楚的分辨出来的最低频宽差异 两个不同频率的信号频宽如低于频谱分析仪的RBW 此时该两信号将重叠 难以分辨 RBW 分辨率带宽 有人也叫参考带宽 表示测试的是
  • 在laravel中合并路由_一些实用的 Laravel 小技巧

    Laravel 中一些常用的小技巧 说不定你就用上了 1 侧栏 1 网站一般都有侧栏 用来显示分类 标签 热门文章 热门评论啥的 但是这些侧栏都是相对独立的模块 如果在每一个引入侧栏的视图中都单独导入与视图有关的数据的话 未免太冗余了 所以
  • 算法——回溯法(子集、全排列、皇后问题)

    参考 http www cnblogs com wuyuegb2312 p 3273337 html intro 参考 算法竞赛入门经典 P120 1 定义 回溯算法也叫试探法 它是一种系统地搜索问题的解的方法 回溯算法的基本思想是 从一条
  • IDA宏定义

    This file contains definitions used by the Hex Rays decompiler output It has type definitions and convenience macros to
  • 机器学习中的 K-均值聚类算法及其优缺点。

    K 均值聚类算法是一种常见的无监督学习算法 它可以将数据集分成 K 个簇 每个簇内部的数据点尽可能相似 而不同簇之间的数据点应尽可能不同 下面详细讲解 K 均值聚类算法的优缺点 优点 简单易用 K 均值聚类算法是一种简单易懂的算法 容易理解
  • String index out of range错误与解决方法

    在做算法题时遇到了报错 原因是字符串的索引越界 查看自己的代码 原来int的类型范围越界 int的范围 2147483648 2147483647 long的范围 9223372036854775808 922337203685477580
  • golang-nil切片和空切片

    package main import fmt func main var a int b make int 0 if a nil fmt Println a is nil else fmt Println a is not nil if
  • Spring Boot中使用token:jwt

    token由3部分组成 Header Payload Signature 其中Header记录了签名算法和token 的类型 Payload是以明文存储的一些信息 包括用户自定义信息 Signature是使用签名算法 对Payload结合服
  • Android RxJava:组合 / 合并操作符 详细教程

    前言 Rxjava 由于其基于事件流的链式调用 逻辑简洁 使用简单的特点 深受各大 Android开发者的欢迎 Github截图 如果还不了解 RxJava 请看文章 Android 这是一篇 清晰 易懂的Rxjava 入门教程 RxJav