RxJava2 背压

2023-05-16

1 背压

在RxJava中,会遇到被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息,这就是典型的背压(Back Pressure)场景。

BackPressure经常被翻译为背压,背压的字面意思比较晦涩,难以理解。它是指在异步场景下,被观察者发送事件速度远快于观察者处理的速度,从而导致下游的buffer溢出,这种现象叫做背压。

产生条件:

  1. 异步,被观察者和观察者处于不同的线程中。
  2. 被观察者发送消息的速度远远快于观察者处理的的数据

在RxJava2.x中,只有Flowable类型是支持背压的,并且Flowable很多操作符内部都使用了背压策略,从而避免过多的数据填满内部的队列。

解决背压问题的方法:

1.1 过滤限流

通过使用限流操作符将被观察者产生的大部分事件过滤并抛弃,以达到限流的目的,间接降低事件发射的速度,例如使用以下操作符:

  • sample:在一段时间内,只处理最后一个数据。
  • throttleFirst:在一段时间内,只处理第一个数据。
  • debounce:发送一个数据,开始计时,到了规定时间,若没有再发送数据,则开始处理数据,反之重新开始计时。

1.2 打包缓存

在被观察者发射事件过快导致观察者来不及处理的情况下,可以使用缓存类操作符将其中一部分打包缓存起来,再一点一点的处理其中的事件。

  • buffer:将多个事件打包放入一个List中,再一起发射。
  • window:将多个事件打包放入一个Observable中,再一起发射。

1.3 使用背压操作符

通过一些操作符来转化成支持背压的Observable

2. RxJava2.x的背压策略

在RxJava2.x中,Observable不在支持背压,而是改用Flowable来专门支持背压。默认队列大小为128,并且要求所有的操作符强制支持背压。

Flowable一共有5中背压策略(BackpressureStategy中定义):MISSING,ERROR,BUFFER,DROP,LATEST

2.1 MISSING

通过Create方法创建Flowable没有指定背压策略,不会通过onNext发射的数据做缓存或丢弃处理,需要下游通过飞呀操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背压策略。

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i <1000 ; i++) {
                    emitter.onNext(i);
                }
            }
        },BackpressureStrategy.MISSING)
                .onBackpressureBuffer()
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

上面的代码执行的是buffer的背压策略。

2.2 ERROR

如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 129; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

在Android中运行以上代码,会立即引起App Crash,引起以下Exception:

W/System.err: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
W/System.err:     at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:438)
        at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:406)

因为Flowable的默认队列是128,所以讲上述代码的129改成128,程序就可以正常运行了。

2.3 BUFFER

Flowable的异步缓存池同Observable的一样,没有固定大小,可以无限制添加数据,不会抛出MissingBackpressureException异常,但会导致OOM(Out of Memory).

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0;; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

上述代码不会导致崩溃,但会引起ANR。

2.4 DROR

如果Flowable的异步缓存池满了,则会丢掉将要放入缓存池中的数据。

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0;i<129; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.DROP)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

在Android中运行这段代码,不会引起Crash,但只会打印出0~127,第128则被丢弃,因为Flowable的内部队列已经满了。

2.5 LATEST

如果缓存池满了,会丢掉将要放入缓存池中的数据。这一点与DROP策略一样,不同的是,不管缓存池的状态如何,LATEST策略会将最后一条数据强行放入缓存池中。

        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0;i<1000; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.LATEST)
                .subscribeOn(Schedulers.newThread())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });

执行结果:

0
1
...
127
516
519
521
523
526
528
530
...
734
737
740
743
745
748
999

3. 其他

Flowable不仅可以通过create创建时需要制定背压策略,还可以在通过其他创建操作符,例如just、fromArray等创建后通过背压操作符指定背压策略。例如,

onBackpressreBuffer()对应BackpressureStategy.BUFFER,

onBackpressreDrop()对应BackpressureStategy.DROP,

onBackpressreLatest()对应BackpressureStategy.LATEST.

示例:

Flowable.interval(1, TimeUnit.MILLISECONDS)
        .onBackpressureBuffer()
        .subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                System.out.println(aLong);
            }
        });
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RxJava2 背压 的相关文章

随机推荐

  • Android Studio导入和调试Android8.0源码

    生成IDE相关文件 idegen专门为IDE环境调试源码而设计的工具 xff0c 依次执行如下命令 xff1a source build envsetup sh mmm development tools idegen developmen
  • make snod注意事项-刷机后启动异常

    1 正确执行顺序 需要执行 source build envsetup sh lunch 2 单独编译 xff0c 刷机后运行异常 全编andorid后 xff0c 单独修改编译一个framwork模块 xff0c make snod会有如
  • adb remount 系统提示只读文件系统Read-only file system,解决用adb disable-verity

    在Android6 0 xff08 Android M xff09 userdebug版本上 eng版本不存在该问题 xff0c 发现使用adb remount 系统之后 xff0c 还是不能对system分区进行操作 xff0c 提示没有
  • 枚举 switchcase 标签必须为枚举常量的非限定名称

    enum switch case label must be the unqualified name of an enumeration constant 或 错误 枚举 switchcase 标签必须为枚举常量的非限定名称case Co
  • VMware为什么会越用占用的内存越大?该如何清理?

    现象描述 xff1a VMware用了一段时间后发现原来刚开始只占5G左右的内存 xff0c 慢慢的会占用几十个G xff0c 甚至更多 xff0c 磁盘空间占用越来越大 解决办法 xff1a 虚拟机内部执行cat dev zero gt
  • H264中的时间戳(DTS和PTS)

    xff08 1 xff09 Ffmpeg中的DTS 和 PTS H264里有两种时间戳 xff1a DTS xff08 Decoding Time Stamp xff09 和PTS xff08 Presentation Time Stamp
  • UEFI/Legacy两种启动模式下安装Win10/Ubuntu双系统

    文章目录 更多操作细节请移步到 UEFI Legacy两种启动模式下安装Win10 Ubuntu双系统 http www aigrantli com archives uefilegacy E4 B8 A4 E7 A7 8D E5 90 A
  • H264视频编码原理

    一 为什么要对视频编码 视频是由一帧帧的图像组成 xff0c 就像gif图片一样 一般视频为了不会让人感觉到卡顿 xff0c 一秒钟至少需要16帧画面 一般30帧 加入该视频是一个1280x720的分辨率 xff0c 那么不经过编码一秒钟传
  • H.264基础知识总结

    H264是视频编解码格式 xff1b 学习H264之前首先要搞明白一个问题 xff0c 视频为什么要编码 xff0c 编码传输不行吗 xff1f 视频就是一堆图片按时间顺序播放 xff0c 在编码标准出现之前 xff0c 不经过编码的原始码
  • linux文件分割(将大的日志文件分割成小的)

    linux文件分割 xff08 将大的日志文件分割成小的 xff09 linux下文件分割可以通过split命令来实现 xff0c 可以指定按行数分割和安大小分割两种模式 Linux下文件合并可以通过cat命令来实现 xff0c 非常简单
  • 华为AGC性能管理功能sdk集成

    集成SDK 1 xff09 在AGC网站的我的项目中选择需要启用性能管理的应用 xff0c 点击质量 gt 性能管理 xff0c 进入性能管理服务页面 xff0c 立即开通服务 2 xff09 添加AGC插件 xff0c 在Android
  • Android平台集成华为AGC性能管理服务问题处理指南

    最近尝试集成了华为AGC的性能管理服务 xff0c 集成过程中也遇到一些问题 本文就对我在集成性能管理服务的踩坑记录进行总结 xff0c 希望能帮到大家 问题一 xff1a 刚集成性能管理服务 xff0c 报错miss client id
  • Android ANR全解析&华为AGC性能管理解决ANR案例集

    1 ANR介绍 1 1 ANR是什么 ANR xff0c 全称为Application Not Responding xff0c 也就是应用程序无响应 如果 Android 应用的界面线程处于阻塞状态的时间过长 xff0c 就会触发 应用无
  • JAVA包装类

    什么是包装类 虽然 Java 语言是典型的面向对象编程语言 xff0c 但其中的八种基本数据类型并不支持面向对象编程 xff0c 基本类型的数据不具备 对象 的特性 不携带属性 没有方法可调用 沿用它们只是为了迎合人类根深蒂固的习惯 xff
  • Rxjava理论(一)

    大家都知道RxJava上手是非常难的一个框架 xff0c 为什么说是难呢 xff0c 因为它的功能非常强大 xff0c 各种操作符让人很难上手 xff0c 搭配使用带生命周期的框架有RxLife等 以至于后面出了很多类似Rxjava的框架
  • rxjava理论(二)

    doOnSubscribe的监听 在上一节我们介绍过subscribeOn是控制上游的observable在哪个线程执行 xff0c 关于怎么控制上游的observable可以看我上篇文章RxJava面经一 xff0c 拿去 xff0c 不
  • RxJava Hook(钩子)方法

    Hook技术又叫钩子函数 xff0c 在系统没有调用函数之前 xff0c 钩子就先捕获该消息 xff0c 得到控制权 这时候钩子程序既可以改变该程序的执行 xff0c 插入我们要执行的代码片段 xff0c 还可以强制结束消息的传递 RxJa
  • android底层之什么是Zram?

    ZRAM的理解 ZRAM xff08 压缩内存 xff09 的意思是说在内存中开辟一块区域压缩数据 就是说假设原来150MB的可用内存现在可以放下180MB的东西 本身不会提高内存容量和运行速度 只是让后台程序更少被系统砍掉罢了 xff0c
  • rxjava - compose()操作符

    1 问题背景 想要给多个流重复应用 34 一系列 34 相同的操作符 该怎么办 比如 我们使用Rx 43 Retrofit进行网络请求时 都有遇到这样场景 要在io线程中请求数据 在主线程订阅 更新UI 所以必须频繁使用下面这样的代码 su
  • RxJava2 背压

    1 背压 在RxJava中 xff0c 会遇到被观察者发送消息太快以至于它的操作符或者订阅者不能及时处理相关的消息 xff0c 这就是典型的背压 Back Pressure 场景 BackPressure经常被翻译为背压 xff0c 背压的