RXJava - 制作一个可暂停的可观察对象(例如带有缓冲区和窗口)

2024-03-20

我想创建执行以下操作的可观察对象:

  • 在暂停时缓冲所有项目
  • 立即发出项目,同时它们不会暂停
  • 暂停/恢复触发器必须来自另一个可观察的
  • 必须保存它才能供不在主线程上运行的可观察对象使用,并且必须保存更改主线程的暂停/恢复状态

我想用一个BehaviorSubject<Boolean>作为触发器并将该触发器绑定到活动的onResume and onPause事件。 (附代码示例)

Question

我已经设置了一些东西,但它没有按预期工作。我像下面这样使用它:

Observable o = ...;
// Variant 1
o = o.lift(new RxValve(getPauser(), 1000, getPauser().getValue())
// Variant 2
// o = o.compose(RXPauser.applyPauser(getPauser()));
o
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();

目前的问题是,变体 1 应该可以正常工作,但有时,事件只是没有发出 - 阀门没有发出,直到阀门一切正常(可能是线程问题......)!解决方案2更简单并且似乎可行,但我不确定它是否真的更好,我不这么认为。我实际上不确定,为什么解决方案一有时会失败,所以我不确定解决方案2是否解决了(目前对我来说未知)问题......

有人可以告诉我可能是什么问题或者简单的解决方案是否应该可靠地工作?或者告诉我一个可靠的解决方案?

Code

RxValue

https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08 https://gist.github.com/akarnokd/1c54e5a4f64f9b1e46bdcf62b4222f08

RXPauser 函数

public static <T> Observable.Transformer<T, T> applyPauser(Observable<Boolean> pauser)
{
    return observable -> pauser(observable, pauser);
}

private static <T> Observable<T> pauser(Observable<T> source, Observable<Boolean> pauser)
{
    // this observable buffers all items that are emitted while emission is paused
    Observable<T> sharedSource = source.publish().refCount();
    Observable<T> queue = sharedSource
            .buffer(pauser.distinctUntilChanged().filter(isResumed -> !isResumed), aBoolean -> pauser.distinctUntilChanged().filter(isResumed -> isResumed))
            .flatMap(l -> Observable.from(l))
            .doOnNext(t -> L.d(RXPauser.class, "Pauser QUEUED: " + t));

    // this observable emits all items that are emitted while emission is not paused
    Observable<T> window = sharedSource.window(pauser.distinctUntilChanged().filter(isResumed -> isResumed), aBoolean ->  pauser.distinctUntilChanged().filter(isResumed -> !isResumed))
            .switchMap(tObservable -> tObservable)
            .doOnNext(t -> L.d(RXPauser.class, "Pauser NOT QUEUED: " + t));

    // combine both observables
    return queue.mergeWith(window)
            .doOnNext(t -> L.d(RXPauser.class, "Pauser DELIVERED: " + t));
}

Activity

public class BaseActivity extends AppCompatActivity {

    private final BehaviorSubject<Boolean> pauser = BehaviorSubject.create(false);

    public BaseActivity(Bundle savedInstanceState)
    {
        super(args);
        final Class<?> clazz = this.getClass();
        pauser
                .doOnUnsubscribe(() -> {
                    L.d(clazz, "Pauser unsubscribed!");
                })
                .subscribe(aBoolean -> {
                    L.d(clazz, "Pauser - " + (aBoolean ? "RESUMED" : "PAUSED"));
                });
    }

    public PublishSubject<Boolean> getPauser()
    {
        return pauser;
    }

    @Override
    protected void onResume()
    {
        super.onResume();
        pauser.onNext(true);
    }

    @Override
    protected void onPause()
    {
        pauser.onNext(false);
        super.onPause();
    }
}

你实际上可以使用.buffer()运算符将其传递为可观察的,定义何时停止缓冲,来自书中的示例:

Observable.interval(100, TimeUnit.MILLISECONDS).take(10)
    .buffer(Observable.interval(250, TimeUnit.MILLISECONDS))
    .subscribe(System.out::println);

来自第 5 章“驯服序列”:https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20 序列/5.%20 时移%20sequences.md https://github.com/Froussios/Intro-To-RxJava/blob/master/Part%203%20-%20Taming%20the%20sequence/5.%20Time-shifted%20sequences.md

您可以使用PublishSubject as Observable在您的自定义运算符中为其提供元素。每次需要开始缓冲时,通过以下方式创建实例Observable.defer(() -> createBufferingValve())

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RXJava - 制作一个可暂停的可观察对象(例如带有缓冲区和窗口) 的相关文章

  • Android:如何检查ScrollView内的View是否可见?

    我有一个ScrollView其中持有一系列Views 我希望能够确定视图当前是否可见 如果它的任何部分当前由ScrollView 我希望下面的代码可以做到这一点 令人惊讶的是它没有 Rect bounds new Rect view get
  • 如何在 Android 中使用 Gson 和 Retrofit 解析深层嵌套 json 对象中的字段?

    我有一个独特的情况 我必须从 json 的深层嵌套对象中获取某些时间 这有点复杂 我找不到解决方案 所以寻找解决这个问题的想法和方法 我有一个 json 如下 mySpaceId 73220 myBuildingId 14019 myFlo
  • 如何恢复默认状态栏颜色?

    在我的一项活动中 我不希望状态栏有任何颜色 我希望它在我的应用程序运行之前保持为任何颜色 我可以将其更改为黑色 但黑色不是默认值 默认值是透明的 我不想尝试找出哪种透明度是正确的 因为它在其他人的手机上可能有所不同 所以我想在此特定活动中基
  • 为自定义库添加警告“有更新版本的......可用”

    我刚刚发布了版本 1 2 1 的库 代码位于Github https github com UdiOshi85 libSearchToolbar As you can see in the picture 我收到 警告 指出我正在使用旧版本
  • 如何减少 MediaCodec 视频/avc 解码中的延迟

    我执行了一些简单的计时电影播放器 java https github com google grafika blob master src com android grafika MoviePlayer java in the Grafik
  • 如何自定义锁定屏幕?就像 Android 中的 WaveSecure

    我想做一个像 WaveSecure 一样的演示 它以第三名的成绩赢得了 Android Develop Challenge 2 现在我在自定义锁定屏幕时遇到了问题 所以我想知道WaveSecure是如何实现其锁定功能的 如下图所示 当手机锁
  • 为什么即使优化级别为 3,向量分配也需要花费这么多时间?

    以前我在这里问过类似的问题 Android NDK vector resize 太慢 与分配有关 https stackoverflow com q 58745415 5709159 问题是这段代码 include
  • 使用 Retrofit 的 Google 地图方向 API

    我想绘制两个位置之间的路线 我使用retrofit库来调用API 我没有得到任何回应 我需要 ArrayList 中的折线 我怎么做到这一点 还需要帮助来创建 GsonAdapter 谢谢 在活动中 String base url http
  • Android WebView - 带有经过身份验证的代理

    我目前正在尝试调试围绕 WebView 构建的 Android 应用程序 我负责处理的开发网络环境 不是我的选择 这是 企业 安全决策 是WPA WiFi 代理服务器 代理身份验证 虽然a上的说明以前的答案非常有帮助 https stack
  • 如何在 El Capitan (OS X 10.11) 中设置 Android Studio?

    全新安装 El Capitan 10 11 尝试安装 Android Studio 版本 1 21 Error Android Studio was unable to find a valid JVM Please download it
  • readdir:我如何知道它是文件还是目录

    readdir http linux die net man 3 readdir返回有关目录中所有项目的信息 如何判断该项目是文件还是目录 谢谢 编辑 抱歉 忘了说了 我的时间目标平台是iOS和android 检查d type being
  • SQLite 上下文.MODE_PRIVATE

    我想知道 我们可以使用Context MODE PRIVATE in SQLite创建数据库以防止不必要的数据库访问 我在谷歌上没有得到任何例子 如何使用这个Context MODE PRIVATE在数据库中 请帮助我 提供任何链接或示例
  • mapFragment.getMapAsync 处的 NullPointerException

    在解决了与我的标题相关的问题后 我找不到问题的解决方案 我有一个NullPointerException at mapFragment getMapAsync 下面是我的MapActivity code package com exampl
  • 从适用于 Android 的 Facebook SDK 将内容添加到 Facebook feed 对话框

    在我的 Android 应用程序中 我希望用户在他们的墙上 共享 我的应用程序 因此我希望他们在他们的墙上发布预定义的内容状态 如何自定义墙状态 我想添加我的应用程序图标和一些耀斑文本 下载 Facebook SDK 并将其导入到您的项目中
  • 在 Android 中 - 如何使用 ClickableSpan 只注册长点击

    我想注册对包含在 ClickableSpan 中的文本的点击 前提是点击时间超过 1 秒 有什么办法可以做到这一点吗 如果没有 捕获双击也可以 如果 onClick 方法捕获了一个包含有关点击的一些元数据的事件 那就太好了 那么如果点击长度
  • 检测应用程序的阶段(alpha、beta 或生产)

    我正在使用 cordova 开发一个 android 应用程序 我希望使用三个给定的阶段来逐步发布它 IT 测试的 Alpha 合作伙伴测试版 为其他人生产 但是 我正在使用 mixpanel 来跟踪一些用户输入 Mixpanel 需要一个
  • Android 上可靠的重复后台任务

    我正在尝试运行一个后台任务 该任务每分钟左右运行一次 Android 应用程序 但我正在努力确保该任务实际上每分钟运行一次 我尝试了从使用 SystemClock sleep 到 AlarmManager 重复闹钟和固定闹钟 的各种方法 但
  • 使用 IntelliJ 的 Crashlytics 运行 Android 应用程序

    我正在尝试使用 Maven 将 Crashlytics 集成到我们的 Android 项目中 我已设置所有必需的插件并构建执行并完成mvn clean package从命令行 当运行从命令行构建的应用程序时 一切正常 从 IntelliJ
  • 使用本地 SQlite 数据库填充可扩展列表视图的方法

    我的应用程序中有一个 sqlite 数据库 我想用它制作一个可扩展的列表视图 我已经确定了我应该采取的方法 尝试了很多方法来找到相同的教程 但找不到一个使用本地数据库填充可扩展列表的教程 Android 网站上有一个教程 他们使用手机中的联
  • 从 Android 通知中的 URL 加载图像

    在我的 Android 应用程序中 我想动态设置将从 URL 加载的通知图标 为此 我使用了setLargeIcon中的NotificationBuilder的属性receiver 我参考了很多链接并尝试了各种解决方案 但无法获得所需的输出

随机推荐

  • java.lang.OutOfMemoryError:为 ChunkPool::allocate 请求 32756 字节。交换空间不足?

    我正在使用通过 WebLogic 10 3 部署在 HP 服务器上的 java 应用程序 版本信息 WebLogic Version 10 3 OS Version B 11 23 java version java version 1 6
  • 为什么这个工厂返回 $$state 对象而不是 response.data?

    所以我在服务器中有一个对象集合 我想在页面加载时填充 ng repeat 我创建了一个工厂 它从服务器上的资源中获取列表 如下所示 app factory objectArray http function http This is ret
  • 在 Elasticsearch 中搜索所有嵌套子级与给定查询匹配的对象

    给定一个具有以下映射的对象 a properties id type string b type nested properties key type string 我想检索该对象的所有实例 其中所有嵌套子对象都与给定查询匹配 例如 假设我
  • 如何使用 MATLAB 从 WEKA 检索类值

    我正在尝试使用 MATLAB 和 WEKA API 从 WEKA 检索类 一切看起来都很好 但类始终为 0 有什么想法吗 我的数据集有 241 个属性 将 WEKA 应用于该数据集我得到了正确的结果 创建第一个训练和测试对象 然后构建分类器
  • 是/否 - 有没有办法用纯 SVG 工具改进鼠标拖动?

    所以我花了一些时间尝试纯 无外部库 SVG 元素拖动 一般来说 一切正常 但是对于快速移动的鼠标来说存在一个令人讨厌的问题 当用户将可拖动的 SVG 元素靠近其边缘时 然后拖动 鼠标移动 这样的可拖动速度太快 鼠标 失去 可拖动 这里更详细
  • 带有 Picturebox 的 MouseWheel 事件?

    我想将鼠标悬停在图片框 或所有图片和主窗体 上并使用鼠标滚轮滚动 然而我没有运气 我编写了 pictureBox1 MouseWheel 并检查了增量 我在它 0 时设置了一个断点 到目前为止 无论我做什么 我都无法发生任何事情 我也尝试过
  • 在 UI-Grid 标题中实现多列分组有更好的方法吗?

    我尝试使用以下方法在 UI Grid 的列标题级别实现多列分组 我遵循的步骤 包括 UI 网格的以下标题单元格模板以及另一个 UI 网格行 div class ui grid header custom ui grid header div
  • 在动态创建的 Web 应用服务中添加自定义域

    我使用 REST API 创建了 azure Web 应用程序 是否有任何选项可以使用rest api 自定义域映射 通过下面的链接 我创建了新的网络应用服务 https learn microsoft com en us rest api
  • Ionic 3 启用单页滑回功能

    我已在根组件和模块配置中全局禁用 向后滑动
  • 替换三元运算中已弃用的“define(@array)”

    我有以下需要更正的代码 如defined array 在最新的 Perl 中已弃用 my inputs defined padSrc gt inouts padSrc gt inouts defined padSrc gt inputs p
  • 从“void*”到“unsigned char*”的转换无效

    我有以下代码 void buffer operator new 100 unsigned char etherhead buffer 尝试编译时 我收到该行的以下错误 error invalid conversion from void t
  • 上下移动 ListViewItems

    我有一个 ListView WinForms 我想通过单击按钮来上下移动项目 要移动的项目是已检查的项目 因此 如果选择了第 2 6 和 9 项 当我按下向上移动按钮时 它们将变为 1 5 和 8 并且这些位置上的项目将向下移动一步 我觉得
  • 基于标签的 SQL 查询

    我已经有一段时间没有做过任何 SQL 了 我不确定这个问题是否有一个简单的解决方案 我也有点菜鸟 我正在尝试构建一个图像库 允许用户使用标签来搜索图像 然后单击其他标签来优化搜索并减少结果数量 但我在所涉及的查询方面遇到了大问题 这是我当前
  • 从 firebase Swift 加载聊天

    我目前正在做一个聊天信使 我能够检索我发送给其他用户的所有消息 但无法检索他们发送的任何消息 我用来加载消息的代码是 func loadMsg let toId user id let fromId Auth auth currentUse
  • CMake 不再找到静态 Boost 库

    我正在开发一个依赖 Boost 的大型 C 项目 该项目使用 CMake 在各种平台上构建 在我的 Windows 计算机上 我使用 CMake 2 8 9 Visual Studio 2010 和 Boost 1 50 0 从源代码构建
  • 从 Openlayers 3 视口获取所有功能

    我试图找出 Openlayers 3 中图层上可见的所有功能 视口 如果我向地图添加点击事件 我可以找到一个功能 如下所示 但我无法找到视口中可见的所有功能 有人可以帮忙解决这个问题吗 map on click function evt v
  • 如何使用geodjango返回距某个点最近距离的记录?

    我正在使用 geodjango 并在我的数据库中有一个点的集合 为了获取某个区域内的点的查询集 我使用以下命令 queryset Spot objects filter point distance lte origin distance
  • Android 使用包含另一个 hashmap 的 hashmap 实现 Parcelable 对象

    这是一个扩展Android 实现具有 hashmap 的 Parcelable 对象 https stackoverflow com questions 22498746 android implement parcelable objec
  • XSS 背后的一般概念是什么?

    跨站脚本 XSS 是一种类型 计算机安全漏洞 通常出现在网络应用程序中 这使得恶意攻击者能够 将客户端脚本注入网络 其他用户查看的页面 一个 利用跨站点脚本 漏洞可被攻击者利用 绕过访问控制 例如 同源政策 跨站点 在网站上执行的脚本是 大
  • RXJava - 制作一个可暂停的可观察对象(例如带有缓冲区和窗口)

    我想创建执行以下操作的可观察对象 在暂停时缓冲所有项目 立即发出项目 同时它们不会暂停 暂停 恢复触发器必须来自另一个可观察的 必须保存它才能供不在主线程上运行的可观察对象使用 并且必须保存更改主线程的暂停 恢复状态 我想用一个Behavi