RxJava 之Consumer和Action的使用

2023-05-16

在之前的RxJava中已经讲到创建观察者的代码如下:

//创建观察者
Observer<String> observer = new Observer<String>(){

  @Override
  public void onSubscribe(Disposable d) {
      System.out.println("开始采用subscribe连接!");
  }

  @Override
  public void onNext(String value) {
      System.out.println("对Next事件作出响应:"+value);
  }

  @Override
  public void onError(Throwable e) {
      System.out.println("对Error事件作出响应!");
  }

  @Override
  public void onComplete() {
      System.out.println("事件执行完毕!");
  }
};
复制代码

代码中有四个回调方法,但是我们不一定会用到所有的回调方法,那么怎么才能让代码更加简洁呢?

这样我们就会用到Consumer和Action了。

以下是subscribe中可用参数的截图:

  • subscribe()

不带任何参数,也就是说观察者没有任何回调。

  • subscribe(Observer<? super T> observer)

将Observer作为参数,它有四个回调方法,文章开头就说明了

  • subscribe(Consumer<? super T> onNext)

将Consumer作为参数,Consumer中有个回调方法accept,accept带有一个参数,接受被观察者发射过来的数据

    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> emitter) {
            emitter.onNext("hello word A");
            emitter.onNext("hello word B");
            emitter.onNext("hello word C");
            emitter.onComplete();
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    });
复制代码

当被观察者发射onNext时,accept将被执行

  • subscribe(Consume<? super T> onNext, Consumer<? super Throwable> onError)

带有两个Consumer参数,分别负责onNextonError的回到

    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> emitter) {
            emitter.onNext("hello word A");
            emitter.onNext("hello word B");
            emitter.onNext("hello word C");
            emitter.onError(new Throwable("this is nullpointException"));
            emitter.onComplete();
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            System.out.println(throwable.getMessage());
        }
    });
复制代码

执行结果

hello word A
hello word B
hello word C
this is nullpointException复制代码
  • subscribe(Consume<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete)

带有三个参数,分别负责onNext、onError和onComplete的回调

    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> emitter) {
            emitter.onNext("hello word A");
            emitter.onNext("hello word B");
            emitter.onNext("hello word C");
            //emitter.onError(new Throwable("this is nullpointException"));
            emitter.onComplete();
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            System.out.println(throwable.getMessage());
        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("onComplete");
        }
    });
复制代码

执行效果

hello word A
hello word B
hello word C
onComplete复制代码
  • subscribe(Consume<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe)
    Observable.create(new ObservableOnSubscribe<String>() {

        @Override
        public void subscribe(ObservableEmitter<String> emitter) {
            emitter.onNext("hello word A");
            emitter.onNext("hello word B");
            emitter.onNext("hello word C");
            //emitter.onError(new Throwable("this is nullpointException"));
            emitter.onComplete();
        }
    }).subscribe(new Consumer<String>() {
        @Override
        public void accept(String s) throws Exception {
            System.out.println(s);
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            System.out.println(throwable.getMessage());
        }
    }, new Action() {
        @Override
        public void run() throws Exception {
            System.out.println("onComplete");
        }
    }, new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
            System.out.println("观察者和被观察者已被连接,disposable可以立即中断连接!");
        }
    });
复制代码

执行效果:

System out: 观察者和被观察者已被连接,disposable可以立即中断连接!
hello word A
hello word B
hello word C
onComplete
复制代码

想要搞清楚Consumer和Action的用法,必须了解他们是什么东西?他们是干什么的

(1)他们是什么?

他们都是借口,源码如下:

public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
}
复制代码
public interface Action {
    /**
     * Runs the action and optionally throws a checked exception.
     * @throws Exception if the implementation wishes to throw a checked exception
     */
    void run() throws Exception;
}
复制代码

(2)他们是干什么的?

他们都是为了触发回调的,Consumer自带一个参数,Action不带餐宿。

当被观察者发射 onNext时,由于onNext带有参数,所以使用Consumer

当被观察者发送onComplete时,由于onComplete不带参数,所以使用Action

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RxJava 之Consumer和Action的使用 的相关文章

  • RxJava 2.x 中带有背压的 PublishSubject

    我目前正在为我当前的项目选择 RxJava 1 x 或 2 x 我基本上需要一个PublishSubject采用背压策略onBackpressureLatest 我想选择 RxJava 2 x 但我不太清楚如何将反压策略应用于Publish
  • Rx:组合 ThrottleFirst 和 Sample 运算符

    给定一个源可观察 S 我如何要求 RxJava Rx 生成可观察 D 即 立即从 S 发出第一项 在发射每个项目之后和发射下一个项目 L 之前等待至少 T 秒 其中 L 是 S 在等待期间发射的最后一个项目 如果 S 在等待时间 T 内没有
  • 如何在 RxJava 中计算移动平均线

    在金融领域 我们通常需要从时间序列数据流中计算移动窗口总值 以移动平均线为例 假设我们有以下数据流 T是时间戳 V是实际值 T0 V0 T1 V1 T2 V2 T3 V3 T4 V4 T5 V5 T6 V6 T7 V7 T8 V8 T9 V
  • 在后台运行 void 方法

    我想使用 rxjava 在后台运行一个方法 我不在乎结果 void myHeavyMethod 到目前为止 我唯一的解决方案是将返回类型修改为例如boolean boolean myHeavyMethod return true 之后我运行
  • RxJava 并行获取 Observables

    我需要一些帮助来在 RxJava 中实现并行异步调用 我选择了一个简单的用例 其中第一个调用获取 而不是搜索 要显示的产品列表 平铺 随后的调用将获取 A 评论和 B 产品图像 经过几次尝试我到达了这个地方 1 Observable
  • 使用 Kafka 和 NodeJS 进行实时通知

    在我的项目中 我必须设计一个实时通知系统 我就是这样做的 如下图所示 你可以看到我使用 Kafka 作为队列消息系统 并使用 NodeJS 来构建 Websocket Server 和 Kafka Consumers 生产者将收集通知数据并
  • 使用 Retrofit observable 处理网络错误

    当将 Observables 与 Retrofit 结合使用时 如何处理网络故障 鉴于此代码 Observable
  • 使用 WindowManager.LayoutParams.FLAG_WATCH_OUTSIDE_TOUCH 获取所有 MotionEvent

    我的问题直接涉及到这个question https stackoverflow com q 4481226 394933 这个问题的答案表明了如何创建一个ViewGroup 将其嵌入到WindowManager 并允许WindowManag
  • RxJava + 改造,获取列表并为每个项目添加额外信息

    我正在玩 RXJava 在 Android 中进行改造 我正在努力完成以下任务 我需要定期轮询一个给我 Observable gt 的调用 从这里我可以做到 一旦我得到这个列表 我想在每个交付中迭代并调用另一个方法来给我预计到达时间 所以只
  • 如何使用文本操作

    使用目的是什么文本动作 http docs oracle com javase 6 docs api javax swing text TextAction html from 抽象动作 http docs oracle com javas
  • RxJava 作为事件总线?

    我开始学习 RxJava 到目前为止我很喜欢它 我有一个片段与单击按钮时的活动进行通信 用新片段替换当前片段 谷歌推荐界面 http developer android com training basics fragments commu
  • RxJava 中的 n 元笛卡尔积

    现在我持有一个Observable
  • Rxjava - 链接可观察量时如何获取其他类型的流(返回值)而不是当前的流?

    我执行了一个 Retrofit2 observable 调用 在完成后它立即链接到另一个 observable 以将结果存储到数据库中 它看起来很简单 如下所示 protected Observable
  • 基于内容的 RxJava Observable 缓冲区

    我使用 vertX 和 RxJava 启动了一个项目 但遇到了一个问题 但没有找到解决方案 我有一个 Observable 它为传入通信发出 WebSocketFrame 每个 WebSocketFrame 由有效负载 ByteBuffer
  • 如何在BehaviorSubject中设置默认值

    可能是一个菜鸟问题 如何为BehaviorSubject 设置默认值 我有一个具有 2 个不同值的枚举 enum class WidgetState HIDDEN VISIBLE 以及发出状态的行为主体 val widgetStateEmi
  • 具有自定义计数标准的 RxJava 缓冲区/窗口

    我有一个 Observable 它发出许多对象 我想使用以下方法对这些对象进行分组 window or buffer运营 但是 不是指定count用于确定窗口中应有多少对象的参数我希望能够使用自定义标准 例如 假设可观察对象正在发出 a 的
  • Single.zip - 如何捕获失败的呼叫并继续其余的网络呼叫?

    我正在进行 5 个并行网络调用 模拟其中 4 个成功 其中 1 个失败 失败的调用使整个Single zip 失败 即使其他 4 个网络调用成功 我也无法获得它们的结果 如何处理单个失败的网络调用的错误Single zip 并获得成功者的结
  • 重构 google 的 NetworkBoundResource 类以使用 RxJava 而不是 LiveData

    谷歌的android架构组件教程here https developer android com topic libraries architecture guide html有一部分解释了如何抽象通过网络获取数据的逻辑 在其中 他们使用
  • RxJava、Proguard 和 sun.misc.Unsafe

    我有以下问题RxJava 1 1 0 使用时Proguard 我没有更改 RxJava 版本或其 pro文件 但更新后OkHttp我无法编译使用Proguard因为我有关于sun misc Unsafe不在场 rxJava pro keep
  • 如何实现Tabbar中每个选项卡的搜索动作

    我有一个页面 TabBar 中有 2 个选项卡 如下所示 class SearchByCityOrPerson extends StatefulWidget SearchByCityOrPerson Key key this title s

随机推荐

  • Java 设计模式之策略模式

    一 了解策略模式 1 1 什么是策略模式 策略模式 Strategy Pattern 是指对一系列的算法定义 xff0c 并将每一个算法封装起来 xff0c 而且使它们还可以相互替换 此模式让算法的变化独立于使用算法的客户 1 2 策略模式
  • Java 设计模式之适配器模式

    一 了解适配器模式 1 1 什么是适配器模式 适配器模式将一个类的接口 xff0c 转换成客户期望的另一个接口 适配器让原来接口不兼容的类可以合作无间 适配器模式有两种 xff1a 对象 适配器和 类 适配器 这个模式可以通过创建适配器进行
  • 责任链模式

    责任链模式的定义与特点 责任链模式的定义 xff1a 使多个对象都有机会处理请求 xff0c 从而避免请求的发送者和接受者之间的耦合关系 xff0c 将这个对象连成一条链 xff0c 并沿着这条链传递该请求 xff0c 直到有一个对象处理他
  • java设计模式-桥接模式

    桥接模式定义 桥接模式 xff08 Bridge Pattern xff09 xff0c 将抽象部分与它的实现部分分离 xff0c 使它们都可以独立地变化 更容易理解的表述是 xff1a 实现系统可从多种维度分类 xff0c 桥接模式将各维
  • java设计模式-状态模式

    1 状态模式的定义和特点 状态 xff08 State xff09 模式的定义 xff1a 对有状态的对象 xff0c 把复杂的 判断逻辑 提取到不同的状态对象中 xff0c 允许状态对象在其内部状态发生改变时改变其行为 状态模式是一种对象
  • java设计模式-命令模式

    18 xff0c 命令模式 18 1 命令模式的定义和特点 命令 xff08 Command xff09 模式的定义如下 xff1a 将一个请求封装为一个对象 xff0c 使发出请求的责任和执行请求的责任分割开 这样两者之间通过命令对象进行
  • java设计模式-代理模式

    17 xff0c 代理模式 17 1 代理模式的定义和特点 代理模式的定义 xff1a 由于某些原因需要给某对象提供一个代理以控制对该对象的访问 这时 xff0c 访问对象不适合或者不能直接引用目标对象 xff0c 代理对象作为访问对象和目
  • 工厂方法模式

    概念定义 工厂方法 Factory Method 模式 xff0c 又称多态工厂 Polymorphic Factory 模式或虚拟构造器 Virtual Constructor 模式 工厂方法模式通过定义工厂抽象父类 或接口 负责定义创建
  • TextFuseNet: Scene Text Detection with Richer Fused Features论文阅读

    TextFuseNet Scene Text Detection with Richer Fused Features 利用更丰富的特征融合进行场景文本检测 代码 xff1a https github com ying09 TextFuse
  • JUC原子类: CAS, Unsafe和原子类详解

    CAS 线程安全的实现方法包含 互斥同步 synchronized 和 ReentrantLock非阻塞同步 CAS AtomicXXXX无同步方案 栈封闭 xff0c Thread Local xff0c 可重入代码 什么是CAS CAS
  • OKHttp中的责任链模式

    一 什么是责任链模式 责任链 xff0c 顾名思义是将多个节点通过链条的方式连接起来 xff0c 每一个节点相当于一个对象 xff0c 而每一个对象层层相关 xff0c 直接或者间接引用下一个对象 xff08 节点 xff09 xff1b
  • android bugly关于混淆后如何知道正确代码

    bugly xff1a 腾讯自制 xff0c 是个4 xff0c 5句代码就能简单加入在线更新 捕获异常的好功能 xff0c 后台也是使用腾讯的 Android混淆 xff1a 启用一个配置 xff0c 把所有变量 类名改成 34 a 34
  • 大康Dacom Athlete+蓝牙耳机与手机配对上的原因及解决办法:

    1 原因 xff1a 蓝牙耳机没有进入配对模式 解决办法 xff1a 蓝牙耳机都有一个功能键 xff0c 长按听到开机提示音后不要松手 xff0c 继续长按 xff0c 直至听到进入配对模式提示音或者 滴 的提示音 xff0c 此时蓝红等交
  • android查看编译后的class文件

    其查看目录如下 然后在硬盘文件中打开 xff0c 可以看到详细的class文件列表
  • socket的shutdownInput和shutdownOutput

    虽然在大多数的时候可以直接使用Socket类或输入输出流的close方法关闭网络连接 xff0c 但有时我们只希望关闭OutputStream或InputStream xff0c 而在关闭输入输出流的同时 xff0c 并不关闭网络连接 这就
  • 使用广播接收器时,onReceive 会多次执行

    原因一 xff1a 没有在onDestory中调用解注册 unregisterReceiver 原因二 xff1a BroadcastReceiver变量所在的Activity或者Fragment被创建的多次 xff0c 形成多个对象
  • Android Studio自动生成单例代码

    AS中有可以自己设置代码模板 xff0c 使用起来简单方便 同样的 xff0c 单例类的代码样式统一 xff0c 除了类名外全部一致 所以使用模板更加方便 在设置中的Editor Live Template中新建模板 xff0c 然后把单例
  • android:excludeFromRecents 属性需要注意的小地方

    在 Android 系统中 xff0c 如果我们不想某个 Activity 出现在 Recent screens 中 xff0c 可以设置 lt activity gt 属性 android excludeFromRecents 为 tru
  • G.711编码原理

    目录 参考概述G 711原理总结 1 参考 1 wikipedia A law algorithm 2 github com quatanium foscam ios sdk 3 charybdis G711算法学习 2 概述 本文目的 x
  • RxJava 之Consumer和Action的使用

    在之前的RxJava中已经讲到创建观察者的代码如下 xff1a 创建观察者 Observer lt String gt observer 61 new Observer lt String gt 64 Override public voi