RxJava 缓冲区直到更改

2023-12-26

我有一个可观察的对象,它会发出大量数据,例如

[1,1,1,2,2,2,3,3,1,1,5,5……]

在RxJava中我们可以使用直到改变() http://reactivex.io/documentation/operators/distinct.html获得一个不同的项目,直到它发生变化,结果会像

[1,2,3,1,5,……]

同样有没有办法buffer直到更改为止的项目?例如我期望像这样的输出

[[1,1,1]、[2,2,2]、[3,3]、[1,1]、[5,5]......]


您可以分享源序列,应用distinctUntilChanged到一条路径,然后将驱动buffer使用的运算符Observable指示边界:

@Test
@SuppressWarnings("unchecked")
public void test() {
    Observable.fromArray(1,1,1,2,2,2,3,3,1,1,5,5)
    .compose(bufferUntilChanged(v -> v))
    .test()
    .assertResult(
            Arrays.asList(1, 1, 1),
            Arrays.asList(2, 2, 2),
            Arrays.asList(3, 3),
            Arrays.asList(1, 1),
            Arrays.asList(5, 5)
        );
}

static final <T, K> ObservableTransformer<T, List<T>> bufferUntilChanged(
        Function<T, K> keySelector) {
    return o -> o.publish(q -> q.buffer(q.distinctUntilChanged(keySelector).skip(1)));
}

The skip(1)是因为第一个项目通过distinctUntilChanged将触发一个新的缓冲区,使第一个缓冲区为空。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

RxJava 缓冲区直到更改 的相关文章

  • RxJava重试时出现奇怪的行为

    我正在玩 RxJava重试时间 http reactivex io documentation operators retry html操作员 网上关于它的资料很少 唯一值得一提的是this http blog danlew net 201
  • HTTP 失败:java.net.SocketException:套接字关闭改造

    尝试了多种方法仍然面临这个问题 我正在使用 RxJava 和 Retrofit 来完成所有网络操作 下面是我的代码 服务 kt GET v1 contact id fun getContactDetails Path id id Strin
  • RxJava中concatMap和flatMap有什么区别

    看起来这两个功能非常相似 他们有相同的签名 接受rx functions Func1
  • 使用 RxJava 实现存储库模式

    我正在尝试找出一种更好的方法来实现 Android 中 RxJava 中的存储库模式之类的功能 这是我到目前为止所拥有的 从here https gist github com pieces029 5e92f9003fa1a4ebc59b
  • Web 响应式编程 - 从 HTTP 客户端的角度来看有哪些优点?

    让我们假设控制器生成一些带有延迟的随机数的这两种情况 1 Reactive Spring 5响应式应用 GetMapping randomNumbers public Flux
  • RxJava 出错后如何继续流式传输项目?

    我是 RxJava 新手 我遇到了以下问题 假设我有一系列项目 并且其中一个项目传播错误 我想忽略它并继续处理其他项目 我有以下片段 Observable from Arrays asList 1 2 3 map x gt if x equ
  • 如何在 RxJava 中计算移动平均线

    在金融领域 我们通常需要从时间序列数据流中计算移动窗口总值 以移动平均线为例 假设我们有以下数据流 T是时间戳 V是实际值 T0 V0 T1 V1 T2 V2 T3 V3 T4 V4 T5 V5 T6 V6 T7 V7 T8 V8 T9 V
  • 何时使用 doOnTerminate 与 doOnUnsubscribe?

    当有人订阅我的可观察对象时 我需要收到通知 我还需要收到通知 可观察对象已出错或已完成 所以我想我应该使用doOnSubscribe 注册观察者订阅时要采取的操作 可观察的 and doOnTerminate 注册一个 Observable
  • RxJava-在 Observable 链中执行 peek() 或 void 操作?

    Java 8 lambda 流有一个peek 运算符允许您对每个项目执行 void 操作 这通常用于调试 但它也是一种欺骗和启动 void 操作而不映射到某些内容的好方法 RxJava 中有类似的东西吗 也许我没有遵循良好的实践或反应性思考
  • 使用 RxJava 进行电子邮件登录验证,可观察对象发出两次

    我正在制作一个简单的登录表单 电子邮件和密码 来尝试增强我的反应式编程技能 我在让电子邮件字段验证按照我想要的方式工作时遇到一些问题 这是我的代码 final Observable
  • 如何过滤 RXJava 中 observable 发出的重复值?

    我有一个对象集合 我想在其中抑制重复的项目 我知道关于Distinct http reactivex io documentation operators distinct html运算符 但如果我没有记错的话 它会通过正确覆盖的哈希码方法
  • 基于 Observable 的 API 和取消订阅问题

    我正在尝试使用 Rx Java 创建一个用于 Android 上位置跟踪的类 我仍然不知道如何正确处理我的 Observable 的生命周期 我想要的是一个 Observable 它在第一次订阅发生时开始跟踪位置 并在最后一次订阅被丢弃时停
  • 如何使用 Retrofit 和 RxJava/RxAndroid 处理响应错误?

    我无法弄清楚如何使用改造和 RxAndroid 处理响应错误 如果存在网络错误等 则会调用 onError 但我需要能够获取响应以检查是否存在身份验证错误 相反 我得到的是一个带有空字符串的令牌 但我找不到原因 解决这个问题的最佳方法是什么
  • 如何使用一个可观察量的状态来跳过另一个可观察量的值?

    通过一个简短的例子可以最好地解释这一点 假设这是我想要过滤的可观察源 Observable interval 1 TimeUnit SECONDS 我使用复选框来处理过滤器状态 当未选中该框时 我想跳过所有值 我使用 RxAndroid 来
  • RxJava 作为事件总线?

    我开始学习 RxJava 到目前为止我很喜欢它 我有一个片段与单击按钮时的活动进行通信 用新片段替换当前片段 谷歌推荐界面 http developer android com training basics fragments commu
  • 仅当其中一个流发生更改时,combineLatest 才会发出

    我有一个具有频繁值的流和一个具有较慢值的流 我想将它们组合起来 但仅当较慢的发出时才发出一个值 所以combineLatest不起作用 就像这样 a1 a2 b1 a2 b1 a3 a4 a5 b2 a5 b2 目前我正在这样做 有没有更干
  • 如何在BehaviorSubject中设置默认值

    可能是一个菜鸟问题 如何为BehaviorSubject 设置默认值 我有一个具有 2 个不同值的枚举 enum class WidgetState HIDDEN VISIBLE 以及发出状态的行为主体 val widgetStateEmi
  • RxJava 相当于 orElse 是什么

    在其他语言的流 函数域中有一个常见的操作 那就是 orElse 它就像一个 if 当当前链没有得到任何结果时 它会更改为备用链 在具有 Maybe 类型的语言中 它基本上会继续 Some 类型的链或更改为 None 类型的 orElse 理
  • Rxjava 中“背压”一词是什么意思?

    我是 RxJava 的初学者 我很好奇 背压 这是否意味着生产者在背后给消费者施压 或者这是否意味着消费者正在向生产者施加压力 反方向施压 RxJava 背压 当你有一个 observable 发射物品的速度太快 以至于消费者无法跟上流量
  • Single.zip - 如何捕获失败的呼叫并继续其余的网络呼叫?

    我正在进行 5 个并行网络调用 模拟其中 4 个成功 其中 1 个失败 失败的调用使整个Single zip 失败 即使其他 4 个网络调用成功 我也无法获得它们的结果 如何处理单个失败的网络调用的错误Single zip 并获得成功者的结

随机推荐

  • 如何禁用 FlipView 上的箭头?

    我有一个 FlipView 它工作得很好 但我想禁用滚动时淡入的 箭头 我认为这会打破键盘 鼠标用户的情况 不是吗 如果确实需要 您可以从控件模板中删除按钮
  • Android dex问题:嵌套类+最终布尔值:com.android.dex.util.ExceptionWithContext

    我正在尝试构建我的 Android 项目 但在构建步骤中遇到了这个问题 INFO UNEXPECTED TOP LEVEL EXCEPTION INFO com android dex util ExceptionWithContext I
  • 使用 C 或 C++ 从 USB 设备接收数据

    我需要所有插入的 USB 设备的列表 并让用户选择一个 让控制台应用程序接收 USB 设备发送的任何数据 然后我可以开始处理程序中的数据 我不想使用库 只想使用标准 C 函数 并且该程序应该在 Windows 98 中运行 这是一个very
  • 如何使选项菜单始终显示在屏幕上

    我需要始终在屏幕上显示选项菜单 我已经编写了在活动启动时打开选项菜单的代码 Override public void onAttachedToWindow openOptionsMenu 但是 单击屏幕上的另一个项目时 菜单会下降 我希望菜
  • 保留history.pushState ie8-9的黑客

    在大多数现代浏览器上 我可以使用 history pushState Our Work url path 显然 IE 不支持这一点 但我想知道为什么我的简单 hack 不起作用 history pushState function stat
  • 无法删除 matplotlib 在 imshow() 图周围的填充

    我正在将 matplotlib 嵌入到我的 PyQt4 GUI 中 我玩得很开心 我可以显示图像 但它在我想要删除的内容周围添加了非常厚的填充 这就是我正在做的 from PyQt4 QtCore import from PyQt QtGu
  • 使用 MethodHandle::invokeExact 作为方法引用引起的 LambdaConversionException 导致的 BootstrapMethodError

    我试图检查是否可以使用 MethodHandle invoke 或 MethodHandle invokeExact 作为接受 MethodHandle 并返回通用输出的功能接口的方法引用 我知道 invoke 和 invokeExact
  • 关于高分辨率性能计数器及其与.NET Stopwatch相关的存在的解释?

    静态内部Stopwatch在构造函数中我们可以看到下面的代码 它主要检查高分辨率性能计数器是否存在 static Stopwatch if SafeNativeMethods QueryPerformanceFrequency out Fr
  • 但是, import sklearn 仍然给我错误。下面给出更多细节。我该如何解决这个问题?

    我在 Windows 7 上使用 python3 6 之前尝试过 python3 8 来解决同样的问题 我已经安装了 joblib 0 14 0 numpy 1 17 4 scikit learn 0 22 和 scipy 1 3 3 用于
  • 导入 igraph 时出错

    在 python 中导入 igraph 时 出现错误 见下文 由于 igraph 不是 anaconda 的一部分 因此我执行了以下概述的安装步骤 libglpk 35 dylib是什么 我应该如何加载它 为什么会出现这个问题 igraph
  • Visual Studio 可扩展性,如何枚举解决方案中的项目?

    只是想加快 SDK 的使用速度 所以 我创建了自己的工具窗口 现在我想迭代当前加载的解决方案中的现有项目并在工具窗口中显示它们的名称 但不太确定枚举项目的最佳方式是什么 有什么线索吗 检查这个微软的代码 https github com M
  • 使用 OpenCV 生成鸟瞰图/顶视图

    我正在尝试从图像生成鸟瞰图 对于相机的内在特性和变形 我使用的是从驾驶模拟器中检索到的硬编码值 该模拟器的顶部安装了相机 代码的基础来自 使用 OpenCV 库学习 OpenCV 计算机视觉 第 409 页 当我在包含每行 3 个内角 每列
  • ggplot 指定分类 x r 的垂直线段的位置

    我正在绘制行数据 并为统计计算的拟合值添加了一段可信区间和一个黑点 我的问题是我想让这些线 和黑点 相对于行数据稍微移动 水平 我尝试了抖动及其所有可能的组合 结果很糟糕 因为我从不同的列中获取了 y start 和 end 的值 因此 由
  • 泛型中奇怪的嵌套结构类型

    有人可以解释一下嵌套在泛型中的结构类型的奇怪构造吗 implicit def Function1Functor R Functor type R gt new Functor type R gt 这个例子来自Scalaz库 函子 scala
  • 毫秒是python中箭头时间库的一个特性吗

    使用这种方法 arrow utcnow format YYYY MM DD HH mm ss 或者箭头中类似的东西是否可以将毫秒添加到时间中 事实上 您最多可以使用 6 位数字来表示秒的小数部分 gt gt gt now arrow utc
  • 使用正则表达式有效测试“EndsWith”

    我需要构建一个正则表达式 NET 语法 来确定字符串是否以特定值结尾 具体来说 我需要测试文件是否具有特定扩展名 或一组扩展名 我试图修复的代码使用的是 png jpg gif 在我的场景中 对于失败的匹配来说 速度非常慢 可能是由于回溯
  • 工作表、单元格和范围的默认范围是什么?

    当您仅键入 worksheets 时 默认范围 ActiveWorkbook 或 ThisWorkbook 是什么 对于那些不了解这些区别的人来说 它们非常重要 尤其是在 Excel 2013 中 当您希望在切换到不同工作簿时运行宏时 在标
  • 如何高效调试webpack应用?

    我正在尝试在我的项目中采用 webpack 开发服务器 我知道它被广泛采用 因此令我惊讶的是调试应用程序似乎非常困难 由于 webpack 默认情况下会生成一个巨大的包 因此源映射是必须的 我对他们有一个很大的问题 根据devtool模式下
  • 用索引展平嵌套列表

    给定一个任意大小的任意深度嵌套列表的列表 我想要一个对树中所有元素进行平面 深度优先的迭代器 但也具有路径索引 使得 for x y in flatten L x L y 0 y 1 y 1 That is L 1 2 3 4 5 6 7
  • RxJava 缓冲区直到更改

    我有一个可观察的对象 它会发出大量数据 例如 1 1 1 2 2 2 3 3 1 1 5 5 在RxJava中我们可以使用直到改变 http reactivex io documentation operators distinct htm