如何在可观察流中以不同方式处理前 n 项和剩余一项

2024-06-28

例如,

给定一定数量 (m) 的数字流 (m1, m2, m3, m4, m5, m6...),并对前 n 个项目应用变换 (2 * i)(n 可以小于、等于或大于 m),对其余项目应用另一个变换 (3 * i)。和

返回结果:m1*2、m2*2、m3*3、m4*3、m5*3、m6*3...(此处假设 n=2)。

我试图使用 take(n) 和skip(n),然后使用 concatwith,但看起来 take(n) 会删除序列中的剩余项目,并在之后使skip(n) 什么都不返回。


您可以分享您的 m 的直播,然后重新合并在一起take() and skip()流,像这样:

    int m = 10;
    int n = 8;
    Observable<Integer> numbersStream = Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .publish();

    Observable<Integer> firstNItemsStream = numbersStream.take(n)
            .map(i -> i * 2);

    Observable<Integer> remainingItemsStream = numbersStream.skip(n)
            .map(i -> i * 3);

    Observable.merge(firstNItemsStream, remainingItemsStream)
            .subscribe(integer -> System.out.println("result = " + integer));
    numbersStream.connect();

EDIT:
正如@A.E. 所指出的。芫,share()将开始与第一个订阅者一起发送,因此如果 Observable 已经开始发送项目,第二个订阅者可能会错过通知,因此在这种情况下还有其他可能性:
cache()- 将回复所有缓存发出的项目并将其回复给每个新订阅者,但会牺牲取消订阅的能力,因此需要谨慎使用。
reply().refCount()- 将创建Observable that reply()每个新订阅者的所有先前项目(类似于缓存),但当最后一个订阅者取消订阅时,将取消订阅。

在这两种情况下,都应考虑内存Observable将在内存中缓存所有发出的项目。

publish()- 在不缓存所有先前项目的情况下,另一种可能性是使用publish()创造ConnectableObservable,并称之为connect()方法在所有所需订阅者订阅后开始发射,从而获得同步并且所有订阅者将正确收到所有通知。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何在可观察流中以不同方式处理前 n 项和剩余一项 的相关文章

随机推荐

  • 使用 Auth0、withAuthenticationRequired 登录不会显示在 Gatsby 中

    我将 Gatsby 与 auth0 一起使用 当我用withAuthenticationRequired 然后我得到一个空白页 上面写着 重定向 import as React from react import withAuthentic
  • Python:pip 找不到 setup.py

    我怎样才能直接pip找到setup py My setup py文件位于 setuptools 3 5 1 I ran dustin dustin python setuptools 3 5 1 setup py egg info runn
  • 以编程方式添加超链接到列表项

    我想以编程方式获得以下 HTML ul li a href a li ul 我可以添加 li to ul But a to li 不可能 My code BulletedList ul new BulletedList ListItem l
  • React Table - useRowSelect 的单选输入

    如何在 React Table 中使用单选输入而不是复选框作为可选表 有一个复选框但没有单选按钮的示例 https github com tannerlinsley react table blob master examples row
  • 无法访问内存-gdb

    这是我的disas代码 Dump of assembler code for function main 0x00000000000006b0 lt 0 gt push rbp 0x00000000000006b1 lt 1 gt mov
  • Java进程的dump文件分析?

    如果我使用 Windbg 转储 Windows 上运行的 Java 进程 我可以 容易吗 分析 Java 堆 对象和线程吗 就像我可以使用 SOS 进行 Net 进程一样吗 否则 如何离线调试生产系统上发生的问题 Thanks Window
  • 等待异步TaskEx

    What is TaskEx In http www i programmer info programming c 1514 async await and the ui problem html start 1 http www i p
  • 类型错误:序列项 0:预期字符串,未找到 NoneType

    我正在努力改进战舰游戏 原始版本工作正常 没有错误 我编写了代码来帮助克服第一个版本每次都将船只放置在同一个位置的事实 因此我从一艘船 由两个方块组成 开始 我通过创建两个函数来完成此操作 第一个函数生成一个随机坐标 Destroyer 2
  • catch(...) 没有捕获异常,我的程序仍然崩溃

    我的测试仪遇到问题 我的应用程序在初始化时崩溃 我添加了更多的日志记录和异常处理 但它仍然崩溃并显示通用的 此程序已停止工作 消息 而不是触发我的错误处理 鉴于我的 main 看起来像这样并且有catch 什么情况下不会触发 try sim
  • Collectors.groupingBy() 返回按升序排序的结果 java

    我按降序发送结果 但得到的输出按升序排列 List
  • 如何使用RxJsdistinctUntilChanged?

    我正在开始使用 RxJs 使用 v5 beta 但不知何故我不知道如何使用distinctUntilChanged 如果我在 babel node 中运行下面的代码 其输出是 a 1 key a state 1 Next value 42
  • Boto 与 EC2 IAM 角色间歇性“无法加载凭证”

    我使用 Elastic Beanstalk 环境来部署 Web 应用程序 并为应用程序将在其上运行的实例设置了 IAM 角色 99 99 的时间里一切都完美无缺 但是我会间歇性地在日志中看到错误 请求失败显示 botocore 错误 如下所
  • Python - 比较2个文件并输出差异

    我的目标是编写一个脚本来比较文件中的每一行 并根据此比较创建一个新文件 其中包含第二个文件中不存在的文本行 例如 File 1 Bob 20 Dan 50 Brad 34 Emma 32 Anne 43 File 2 Dan 50 Emma
  • SQL Server 2008:TOP 10 和不同的一起

    正如标题所示 我正在使用 SQL Server 2008 如果这个问题非常基本 我深表歉意 我才使用 SQL 几天 现在我有以下查询 SELECT TOP 10 p id pl nm pl val pl txt val from dm la
  • excel vba范围单元格错误对象定义[重复]

    这个问题在这里已经有答案了 我一直在 Excel 中开发一个宏 该宏对一张工作表 次要 中的表格进行排序 当满足条件时 它应该将该表中的数据添加到第二张工作表 Sheet1 中的另一个表格中 但是我一直运行时出现 1004 错误 对象未定义
  • 需要更好的等待解决方案

    最近 我一直在用 C 编写一个程序 它可以 ping 三个不同的网站 然后根据通过或失败的情况 它将等待 5 分钟或 30 秒 然后再次尝试 目前我一直在使用ctime库和以下函数来处理我的等待 然而 根据我的 CPU 计量表 这是一个不可
  • Jetpack Compose Navigation - 将参数传递给 startDestination

    我正在构建的应用程序使用带有路线的组合导航 挑战在于起始目的地是动态的 这是一个最小的例子 class MainActivity ComponentActivity override fun onCreate savedInstanceSt
  • Stream API - 如果紧随其后放置 filter(),sorted() 操作如何工作?

    采取以下代码 对列表进行排序 然后对其进行过滤 public static void main String args List
  • XPS 文件中打印的重复图像

    首先 我想指出 我已将此作为错误向 Microsoft 提出 但他们目前不愿意修复它 我正在寻找一种解决方法或更好的方法来实现我想要做的事情 因为我们的客户认为这是一个相当重要的问题 The code 主窗口 xaml
  • 如何在可观察流中以不同方式处理前 n 项和剩余一项

    例如 给定一定数量 m 的数字流 m1 m2 m3 m4 m5 m6 并对前 n 个项目应用变换 2 i n 可以小于 等于或大于 m 对其余项目应用另一个变换 3 i 和 返回结果 m1 2 m2 2 m3 3 m4 3 m5 3 m6