RxJava之PublishSubject、BehaviorSubject、ReplaySubject和AsyncSubject

2023-11-14

 
public class T2 {

    /**
     * subject 是一个神奇的对象,它可以是一个Observable同时也可以是一个Observer:它作为连接这两个世界的一座桥梁。
     * 一个主题可以订阅一个Observable,就像一个观察者,并且它可以发射新的数据,或者传递它接受到的数据,就像一个Observable。
     * 很明显,作为一个Observable,观察者们或者其它主题都可以订阅它。
     * 串行化如果你把 Subject 当作一个 Subscriber 使用,注意不要从多个线程中调用它的onNext方法(包括其它的on系列方法),
     * 这可能导致同时(非顺序)调用,这会违反Observable协议,给Subject的结果增加了不确定性。
     * 要避免此类问题,你可以将 Subject 转换为一个 SerializedSubject ,类似于这样:
     * mySafeSubject = new SerializedSubject( myUnsafeSubject );
     */
    public static void main(String[] args) {
        T2 t2 = new T2();
        System.out.println("===================testPublishSubject==========================");
        t2.testPublishSubject();
        System.out.println("===================testBehaviorSubject==========================");
        t2.testBehaviorSubject();
        System.out.println("===================testReplaySubject==========================");
        t2.testReplaySubject();
        System.out.println("===================testAsyncSubject==========================");
        t2.testAsyncSubject();

    }

    /*
    PublishSubject的观察者接收到的是后续的消息
    输出为:===================testPublishSubject==========================
    observer1 - A	observer1 - B	observer1 - C	observer2 - C	observer1 - D	observer2 - D	onCompleted
    onCompleted
    * */
    private void testPublishSubject() {
        Observer<String> observer1 = new Observer<String>() {

            @Override
            public void onNext(String t) {
                System.out.print("observer1 - " + t + "\t");
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e.getMessage());
            }

        };

        Observer<String> observer2 = new Observer<String>() {

            @Override
            public void onNext(String t) {
                System.out.print("observer2 - " + t + "\t");
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e.getMessage());
            }

        };
        PublishSubject<String> publishSubject = PublishSubject.create();
        publishSubject.subscribe(observer1);
        publishSubject.onNext("A");
        publishSubject.onNext("B");
        publishSubject.subscribe(observer2);
        publishSubject.onNext("C");
        publishSubject.onNext("D");
        publishSubject.onCompleted();
        System.out.println();
    }

    /** BehaviorSubject的观察者接收到的永远是最近的消息 和后续的消息
     * 输出为===================testBehaviorSubject==========================
     * default	A	B	C
     * B	C	D
     * onCompleted
     * error
     * */
    private void testBehaviorSubject() {
        Observer<String> observer = new Observer<String>() {

            @Override
            public void onNext(String t) {
                System.out.print(t + "\t");
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e.getMessage());
            }

        };

        //收到所有消息
        BehaviorSubject<String> subject1 = BehaviorSubject.create("default");
        subject1.subscribe(observer);
        subject1.onNext("A");
        subject1.onNext("B");
        subject1.onNext("C");
        System.out.println();

        //不能收到default、A
        BehaviorSubject<String> subject2 = BehaviorSubject.create("default");
        subject2.onNext("A");
        subject2.onNext("B");
        subject2.subscribe(observer);
        subject2.onNext("C");
        subject2.onNext("D");
        System.out.println();

        //只能收到onCompleted
        BehaviorSubject<String> subject3 = BehaviorSubject.create("default");
        subject3.onNext("A");
        subject3.onNext("B");
        subject3.onCompleted();
        subject3.subscribe(observer);
        System.out.println();

        // 只能收到error
        BehaviorSubject<String> subject4 = BehaviorSubject.create("default");
        subject4.onNext("A");
        subject3.onNext("B");
        subject4.onError(new RuntimeException("error"));
        subject4.subscribe(observer);

        System.out.println();
    }

    /** ReplaySubject会缓存所有消息,所以观察者都会收到所有消息
     * 输出:===================testReplaySubject==========================
     * A	B	A	B	C	C	D	D	onCompleted
     * onCompleted
     * */
    private void testReplaySubject() {
        Observer<String> observer1 = new Observer<String>() {

            @Override
            public void onNext(String t) {
                System.out.print(t + "\t");
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e.getMessage());
            }

        };

        Observer<String> observer2 = new Observer<String>() {

            @Override
            public void onNext(String t) {
                System.out.print(t + "\t");
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e.getMessage());
            }

        };
        ReplaySubject<String> publishSubject = ReplaySubject.create();
        publishSubject.subscribe(observer1);
        publishSubject.onNext("A");
        publishSubject.onNext("B");
        publishSubject.subscribe(observer2);
        publishSubject.onNext("C");
        publishSubject.onNext("D");
        publishSubject.onCompleted();
        System.out.println();
    }

    /**当Observable完成时AsyncSubject只会发布最后一条消息给已经订阅的每一个观察者,
     * 如果没有调用onCompleted则被观察者不会发送任何消息给观察者
     * 输出===================testAsyncSubject==========================
     * C	onCompleted
     * */
    private void testAsyncSubject() {
        Observer<String> observer = new Observer<String>() {

            @Override
            public void onNext(String t) {
                System.out.print(t + "\t");
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println(e.getMessage());
            }
        };

        AsyncSubject<String> publishSubject1 = AsyncSubject.create();
        publishSubject1.subscribe(observer);
        publishSubject1.onNext("A");
        publishSubject1.onNext("B");
        publishSubject1.onNext("C");

        AsyncSubject<String> publishSubject2 = AsyncSubject.create();
        publishSubject2.subscribe(observer);
        publishSubject2.onNext("A");
        publishSubject2.onNext("B");
        publishSubject2.onNext("C");
        publishSubject2.onCompleted();
        System.out.println();
    }
}


本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RxJava之PublishSubject、BehaviorSubject、ReplaySubject和AsyncSubject 的相关文章

随机推荐

  • 编写程序判断程序、数据大小端存储方式

    大小端这个问题在面试过程中偶尔会被问到 这里笔者总结了一 int CheckSystemDataFormat void int i 0x0a12780b char c i return c 0 0x0a c 1 0x12 c 2 0x78
  • 复变函数与积分变换(猴博士精华版)

    复数及其运算 复数形式的方程 映射 常见的四种初等函数 若 则 去掉2k i 解析 调和 复数的积分 级数 留数及留数定理 傅式变换 拉氏变换
  • 嵌入式C语言基础(4)———数据类型

    一 基本数据类型 1 整型 int 1 int signed int 有符号整型 占4个字节 2 unsigned int 无符号整型 占4个字节 3 short unsigned short 短整型 占2存储单元 2个字节 4 long
  • 红外遥控器实验

    红外遥控器实验 1 红外遥控的编码目前广泛使用的是 NEC Protocol 的PWM 脉冲宽度调制 和Philips RC 5 Protocol 的PPM 脉冲位置调制 2 NEC协议特征 3 NEC码位定义 4 硬件连接 5 程序设计思
  • hexdump记录,方便调试用

    define HEXDUMP COLS 16 void hexdump void mem unsigned int len unsigned int i j for i 0 i lt len len HEXDUMP COLS HEXDUMP
  • QT从入门到实战x篇_15_登录窗口布局(代码实现布局总结、手动布局实例、widget、行列形式的用栅格、弹簧调整控件与widget距离)

    本篇主要介绍利用代码和手动的方式创建和调整一个UI界面并进行布局调整 关于代码实现UI界面的一些总结 1 设置window dialog对象的最小尺寸 title setMinimumSize 800 600 setWindowTitle
  • android蓝牙连接133问题的解决办法---(连接篇)

    上一篇文章介绍了 蓝牙扫描 今天来说一下android蓝牙连接过程中133的问题 我们经常在网上看到一些答案说需要释放gatt资源 这种方式可以在一定程度上减少出现133的概率 个人发现的一个规律是 一般出现蓝牙连接133的问题 大多是an
  • win10系统无法开启远程服务器配置,win10系统无法连接远程服务器的方案介绍...

    作为大多数用户都使用的win10系统 如果发生对win10系统无法连接远程服务器进行设置情况 会让人束手无策 那么win10系统无法连接远程服务器是怎么设置的呢 如果有朋友想对win10系统无法连接远程服务器进行设置的话 按照1 在桌面上找
  • 「自动化」聊起来简单,做起来难

    第4期 自动化 聊起来简单 做起来难 在上一期 如何找到现有研发体系的 内耗问题 中 我们聊了评估现有研发体系 正确的找到 体系内耗问题 是改变研发体系的第一步 本期我们继续聊下一个关键点就是研发体系中引入自动化 看看下面两位嘉宾朋友是如何
  • 如何理解Zookeeper的顺序一致性

    2017饿了么做异地多活 我的团队承担Zookeeper的异地多活改造 在此期间我听到2种不同的关于一致性的说法 一种说法是Zookeeper是最终一致性 因为由于多副本 以及保证大多数成功的Zab协议 当一个客户端进程写入一个新值 另外一
  • OpenCV代码提取:erode函数的实现

    Morphological Operations A set of operations that process images based on shapes Morphological operations apply a struct
  • 备战电赛,该如何学习

    距2023年电赛 全称 全国大学生电子设计竞赛 只有二个月的时间了 今年是国赛通道 作为A类级竞赛 很多电子类专业的学生 以及非电子类专业的学生都想去参加 但是它的门槛有很高 很多大一的学生参加电赛只是为了获得体验感 当然你也可以抱学长的大
  • leetcode刷题(10.15总结)

    1 2的幂 题目描述 https leetcode cn problems power of two class Solution def isPowerOfTwo self n int gt bool return n gt 0 and
  • 部署elasticsearch集群

    创建es集群 编写一个docker compose yaml文件 内容如下 version 2 2 services es01 image elasticsearch 7 12 1 container name es01 environme
  • Android Studio中BitmapDrawable的使用2-1

    1 Drawable Android Studio中的Drawable叫做可绘制资源 指的是可以在屏幕上绘制的图形 Drawable资源分为BitmapDrawable 可绘制的位图文件 NinePatchDrawable 可绘制的九宫格文
  • 牛客网mysql刷题记录

    牛客网mysql刷题记录 SQL26 计算25岁以上和以下的用户数量 if expression A B select if age lt 25 or age is null 25岁以下 25岁及以上 as age cut count u
  • 云计算基础——期末大作业

    1 例举出目前云服务提供商最常用的云计算安全架 并说明其安全策略和安全机制 1 云计算安全框架中的模块组成及功能架构必须有图例说明 配以文字说明 2 云计算安全策略的一般应用的原则和针对性的问题 举例一个案例进行具体论述 3 云计算的安全机
  • 使用Python,OpenCV进行图像平移转换

    使用Python OpenCV进行图像平移转换 1 效果图 2 原理 3 源码 参考 这篇博客将介绍如何使用Python OpenCV对图像进行平移转换 平移是图像沿x轴和y轴的移动 使用平移 可以将图像上下左右移动 以及上述任意组合 要使
  • ESP8266引脚参考详解

    ESP8266引脚参考详解 ESP8266 ESP12 E芯片自带17个GPIO管脚 并不是所有的gpio在所有的ESP8266开发板中都是公开的 有些gpio不推荐使用 有些gpio有非常具体的功能 通过本指南 您将学习如何正确使用ESP
  • RxJava之PublishSubject、BehaviorSubject、ReplaySubject和AsyncSubject

    public class T2 subject 是一个神奇的对象 它可以是一个Observable同时也可以是一个Observer 它作为连接这两个世界的一座桥梁 一个主题可以订阅一个Observable 就像一个观察者 并且它可以发射新的