rxjava - compose()操作符

2023-05-16

1. 问题背景:

想要给多个流重复应用"一系列"相同的操作符,该怎么办???,比如,我们使用Rx+Retrofit进行网络请求时,都有遇到这样场景:要在io线程中请求数据,在主线程订阅,更新UI,所以必须频繁使用下面这样的代码:

...
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(consumer)
...

,如果我们将"让"些操作符,连续的,可重复的 应用到所有流上,同时保持这rx的链式写法,岂不美滋滋...

这时候compose操作符就派上用处了.

2. compose()操作符:

2.1 compose() 源码:

    public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
        return wrap(((ObservableTransformer<T, R>) ObjectHelper.requireNonNull(composer, "composer is null")).apply(this));
    }

compose 区分于lift(lift是map,flatMap等操作符的根本),compose,是对ObservableSource本身进行操作的,上面的apply(this),里面this就是代表ObservableSource自己,而lift是对ObservableSource发送的数据进行操作的,

compose(transformer) 接收一个参数ObservableTransformer ,
transformer是一个接口,我们实现它,为了避免Object -> Observable的强转,我们在方法里定义了泛型,这个结合自己的返回数据和逻辑自行修改

public class RxSchedulers {

    static final ObservableTransformer schedulersTransformer = new ObservableTransformer() {
                @Override
                public ObservableSource apply(Observable upstream) {
                    return upstream
                            .subscribeOn(Schedulers.io())
                            .observeOn(AndroidSchedulers.mainThread());
                }
    };

    public static <T> ObservableTransformer<T, T> applySchedulers() {
        return (ObservableTransformer<T, T>) schedulersTransformer;
    }
}

其中,apply方法里的签名(参数) Observable upstrem,即我们上面的this,即我们要将一系列变换应用在它上面,返回的Observable 就是应用一些列变换之后的Observable.

2.2: 应用:

在代码中,如下使用:

server.requestXXX()
      .homeBanners()
      //线程切换 封装
      .compose(RxSchedulers.<JavaBean>applySchedulers())

至此,我们就将网络请求过程的线程切换封装了.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

rxjava - compose()操作符 的相关文章

  • 使用 RxJava、Retrofit 上传进度

    在我的项目中 我使用 MVP 设计模式 RxJava RxAndroid 和 Retrofit 来使用 API 调用 目前 我正在寻找在发送图片时尝试显示上传进度的解决方案 我已经看到了几种可能的实现 但恐怕它们不适合我的实现 以下是我如何
  • Rxjava tolist() 未完成

    我的 RxJava 调用链有问题 toList 无法正常工作 我猜想 toList 需要一些东西来完成 这就是它被卡住的原因 但我不知道如何解决这个问题 The code mModel getLocations flatMapIterabl
  • RxJava Observable 最短执行时间

    我有一个 Observable 它从网络获取数据 问题是 observable 可能会快或慢 具体取决于网络条件 当 observable 执行时 我显示进度小部件 并在 observable 完成时隐藏它 当网络速度很快时 进度会闪烁 出
  • RxJava。顺序执行

    在我的 Android 应用程序中 我有一个演示者 它处理用户交互 包含某种请求管理器 如果需要 可以通过请求管理器将用户输入发送到请求管理器 请求管理器本身包含服务器 API 并使用此 RxJava 处理服务器请求 我有一个代码 每次用户
  • 我什么时候应该使用blockingGet?

    我在工作中经常使用 RxJava 并且看到了一些调用返回 Observable 或 Single 的方法的示例 然后在其上调用blockingGet 以在不同的 我认为这可能是对图书馆和概念的滥用 但我可能是错的 我举一个小例子 publi
  • Schedulers.io() 上的并行数据库搜索

    我想知道当我并行访问数据库表时是否应该使用 Schedulers io 还是 Schedulers newThread 例如 如果我使用 Schedulers io 并行地从数千个表中选择记录 则任务完成后会在线程池中创建很多新创建的线程
  • 如何使用 RxJava 将双精度值的嵌套列表转换为 Java 类?

    在我的 Android 客户端中 我从后端收到以下 JSON 数据 1427378400000 553 1427382000000 553 这是实际加载数据的例程 我在用接收Android https github com Reactive
  • RxJava:如何制作一次获取并重用的 Observable?

    在每次应用程序启动时 我都有一个 Retrofit Observable 从服务器获取用户的用户名 我想为每个后续订阅者使用这个值 但似乎每次我调用 subscribe 时 都会从网络重新获取该值 由于用户名在应用程序的生命周期中不太可能更
  • 调试未收到消息的 RxJava 问题的最佳方法是什么

    我有一个 Android 应用程序 其中包含多个A型观察者订阅多个B 类可观测量 订阅是在IO Scheduler中完成的 观察是在Android主线程上完成的 我遇到的问题是随机地经过一些工作后 A 从未收到 B 发出的一条消息 并且经过
  • 使用 RxJava 实现存储库模式

    我正在尝试找出一种更好的方法来实现 Android 中 RxJava 中的存储库模式之类的功能 这是我到目前为止所拥有的 从here https gist github com pieces029 5e92f9003fa1a4ebc59b
  • subject.asObservable 有什么用?

    为什么需要RxJavaasObservable 从技术上讲 每个主题都已经是一个可观察的 它比仅仅铸造它有什么优势 Observable obs subject 如果你只是投Subject to an Observable那么你仍然可以使用
  • RxJava:尝试将错误传播到 Observer.onError 时发生错误

    我在 Rx 库中收到 IllegalStateException 错误 并且不知道问题的根源在哪里 无论是 RxJava 还是我可能做错的事情 当证书固定 发生在所有服务器请求上 但似乎指向会话超时或注销并返回时 会发生致命崩溃 重现步骤
  • RxJava,一个可观察多个订阅者:publish().autoConnect()

    我正在使用 rxJava rxAndroid 但有一些非常基本的东西没有按照我的预期运行 我有一个可观察对象和两个订阅者 Observable
  • 如何在 RxJava 中延迟地从列表中发出项目?

    我正在使用 Retrofit 从 REST API 获取书签 public interface BookmarkService GET bookmarks Observable
  • 使用 RxJava 进行电子邮件登录验证,可观察对象发出两次

    我正在制作一个简单的登录表单 电子邮件和密码 来尝试增强我的反应式编程技能 我在让电子邮件字段验证按照我想要的方式工作时遇到一些问题 这是我的代码 final Observable
  • 为什么我的 RxJava Flowable 在使用observeOn 时不考虑背压?

    我正在尝试创建一个Flowable它会发出有关背压的事件以避免内存问题 同时并行运行转换的每个阶段以提高效率 我创建了一个简单的测试程序来推理程序不同步骤的行为以及何时发出事件与在不同阶段等待事件 我的程序如下 public static
  • 如何过滤 RXJava 中 observable 发出的重复值?

    我有一个对象集合 我想在其中抑制重复的项目 我知道关于Distinct http reactivex io documentation operators distinct html运算符 但如果我没有记错的话 它会通过正确覆盖的哈希码方法
  • 如何使用一个可观察量的状态来跳过另一个可观察量的值?

    通过一个简短的例子可以最好地解释这一点 假设这是我想要过滤的可观察源 Observable interval 1 TimeUnit SECONDS 我使用复选框来处理过滤器状态 当未选中该框时 我想跳过所有值 我使用 RxAndroid 来
  • Java反应式框架的比较[关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我看到许多框架 库声称它们可以帮助用 Java 构建响应式应用程序 例如 Akka Vert x RxJava Reactor QBit 等 他
  • 在 RxJava 中,如何在错误时重试/恢复,而不是完成可观察的

    我想要实现的是 监控特定变化的偏好 当检测到更改时 使用新值启动新的网络调用 变换结果 在 UI 中显示结果 我知道更改何时发生 现在我认为我需要对某个主题调用 onNext 然后这应该会触发 Rx 链 最后我可以更新 UI mViewPe

随机推荐

  • Android源码刷机步骤

    打开OEM开关 xff1a 先点击设置 关于手机 版本号七次 开发者选项 打开OEM解锁 xff08 这步必须可以上网 xff0c 否则打不开 xff09 进入bootloader页面 使用方法1必须安装adb platform tools
  • Android Studio导入和调试Android8.0源码

    生成IDE相关文件 idegen专门为IDE环境调试源码而设计的工具 xff0c 依次执行如下命令 xff1a source build envsetup sh mmm development tools idegen developmen
  • make snod注意事项-刷机后启动异常

    1 正确执行顺序 需要执行 source build envsetup sh lunch 2 单独编译 xff0c 刷机后运行异常 全编andorid后 xff0c 单独修改编译一个framwork模块 xff0c make snod会有如
  • adb remount 系统提示只读文件系统Read-only file system,解决用adb disable-verity

    在Android6 0 xff08 Android M xff09 userdebug版本上 eng版本不存在该问题 xff0c 发现使用adb remount 系统之后 xff0c 还是不能对system分区进行操作 xff0c 提示没有
  • 枚举 switchcase 标签必须为枚举常量的非限定名称

    enum switch case label must be the unqualified name of an enumeration constant 或 错误 枚举 switchcase 标签必须为枚举常量的非限定名称case Co
  • VMware为什么会越用占用的内存越大?该如何清理?

    现象描述 xff1a VMware用了一段时间后发现原来刚开始只占5G左右的内存 xff0c 慢慢的会占用几十个G xff0c 甚至更多 xff0c 磁盘空间占用越来越大 解决办法 xff1a 虚拟机内部执行cat dev zero gt
  • H264中的时间戳(DTS和PTS)

    xff08 1 xff09 Ffmpeg中的DTS 和 PTS H264里有两种时间戳 xff1a DTS xff08 Decoding Time Stamp xff09 和PTS xff08 Presentation Time Stamp
  • UEFI/Legacy两种启动模式下安装Win10/Ubuntu双系统

    文章目录 更多操作细节请移步到 UEFI Legacy两种启动模式下安装Win10 Ubuntu双系统 http www aigrantli com archives uefilegacy E4 B8 A4 E7 A7 8D E5 90 A
  • H264视频编码原理

    一 为什么要对视频编码 视频是由一帧帧的图像组成 xff0c 就像gif图片一样 一般视频为了不会让人感觉到卡顿 xff0c 一秒钟至少需要16帧画面 一般30帧 加入该视频是一个1280x720的分辨率 xff0c 那么不经过编码一秒钟传
  • H.264基础知识总结

    H264是视频编解码格式 xff1b 学习H264之前首先要搞明白一个问题 xff0c 视频为什么要编码 xff0c 编码传输不行吗 xff1f 视频就是一堆图片按时间顺序播放 xff0c 在编码标准出现之前 xff0c 不经过编码的原始码
  • linux文件分割(将大的日志文件分割成小的)

    linux文件分割 xff08 将大的日志文件分割成小的 xff09 linux下文件分割可以通过split命令来实现 xff0c 可以指定按行数分割和安大小分割两种模式 Linux下文件合并可以通过cat命令来实现 xff0c 非常简单
  • 华为AGC性能管理功能sdk集成

    集成SDK 1 xff09 在AGC网站的我的项目中选择需要启用性能管理的应用 xff0c 点击质量 gt 性能管理 xff0c 进入性能管理服务页面 xff0c 立即开通服务 2 xff09 添加AGC插件 xff0c 在Android
  • Android平台集成华为AGC性能管理服务问题处理指南

    最近尝试集成了华为AGC的性能管理服务 xff0c 集成过程中也遇到一些问题 本文就对我在集成性能管理服务的踩坑记录进行总结 xff0c 希望能帮到大家 问题一 xff1a 刚集成性能管理服务 xff0c 报错miss client id
  • Android ANR全解析&华为AGC性能管理解决ANR案例集

    1 ANR介绍 1 1 ANR是什么 ANR xff0c 全称为Application Not Responding xff0c 也就是应用程序无响应 如果 Android 应用的界面线程处于阻塞状态的时间过长 xff0c 就会触发 应用无
  • JAVA包装类

    什么是包装类 虽然 Java 语言是典型的面向对象编程语言 xff0c 但其中的八种基本数据类型并不支持面向对象编程 xff0c 基本类型的数据不具备 对象 的特性 不携带属性 没有方法可调用 沿用它们只是为了迎合人类根深蒂固的习惯 xff
  • Rxjava理论(一)

    大家都知道RxJava上手是非常难的一个框架 xff0c 为什么说是难呢 xff0c 因为它的功能非常强大 xff0c 各种操作符让人很难上手 xff0c 搭配使用带生命周期的框架有RxLife等 以至于后面出了很多类似Rxjava的框架
  • rxjava理论(二)

    doOnSubscribe的监听 在上一节我们介绍过subscribeOn是控制上游的observable在哪个线程执行 xff0c 关于怎么控制上游的observable可以看我上篇文章RxJava面经一 xff0c 拿去 xff0c 不
  • RxJava Hook(钩子)方法

    Hook技术又叫钩子函数 xff0c 在系统没有调用函数之前 xff0c 钩子就先捕获该消息 xff0c 得到控制权 这时候钩子程序既可以改变该程序的执行 xff0c 插入我们要执行的代码片段 xff0c 还可以强制结束消息的传递 RxJa
  • android底层之什么是Zram?

    ZRAM的理解 ZRAM xff08 压缩内存 xff09 的意思是说在内存中开辟一块区域压缩数据 就是说假设原来150MB的可用内存现在可以放下180MB的东西 本身不会提高内存容量和运行速度 只是让后台程序更少被系统砍掉罢了 xff0c
  • rxjava - compose()操作符

    1 问题背景 想要给多个流重复应用 34 一系列 34 相同的操作符 该怎么办 比如 我们使用Rx 43 Retrofit进行网络请求时 都有遇到这样场景 要在io线程中请求数据 在主线程订阅 更新UI 所以必须频繁使用下面这样的代码 su