forkJoin, zip, combineLatest区别

2023-10-29

前言
forkJoin, zip, combineLatest是rxjs中的合并操作符,用于对多个流进行合并。很多人第一次接触rxjs时往往分不清它们之间的区别,其实这很正常,因为当你准备用来合并的流是那种只会发射一次数据就关闭的流时(比如http请求),就结果而言这三个操作符没有任何区别。

const ob1 = Rx.Observable.of(1).delay(1000);
const ob2 = Rx.Observable.of(2).delay(2000);
const ob3 = Rx.Observable.of(3).delay(3000);

Rx.Observable.forkJoin(ob1, ob2, ob3).subscribe((data) => console.log(data));
Rx.Observable.zip(ob1, ob2, ob3).subscribe((data) => console.log(data));
Rx.Observable.combineLatest(ob1, ob2, ob3).subscribe((data) => console.log(data));

// [1, 2, 3]
// [1, 2, 3]
// [1, 2, 3]
// 都是在3秒的时候打印

rxjs中很多操作符功能相近,只有当其操作的流会多次发射数据时才会体现出它们之间的区别,下面我们来详细解释forkJoin, zip, 和combineLatest。

一个基本概念
首先我们要知道,一个流(或者说Observable序列)的生命周期中,每次发射数据会发出next信号(Notification),结束发射时会发出complete信号,发生错误时发出error信号,三个信号分别对应observer的三个方法。next信号会由于发射源的不同发射0到多次;而complete和error仅会发射其中一个,且只发射一次,标志着流的结束。
subscribe接收一个observer对象用来处理上述三种信号,只传入一个函数会被认为是next方法,因此传入subscribe的next方法会执行0到N次,N为序列正常发射信号的次数。

forkJoin
用forkJoin合并的流,会在每个被合并的流都发出结束信号时发射一次也是唯一一次数据。假设我们有两个流:

const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);

Rx.Observable.forkJoin(ob1, ob2).subscribe((data) => console.log(data));
// ["ob1:2", "ob2:1"]

ob1会在发射完第三个数据时停止发射,ob2会在发射完第二个数据时停止,而forkJoin合并后的流会等到ob1和ob2都结束时,发射一次数据,也就是触发一次subscribe里的回调,接收到的数据为ob1和ob2发射的最后一次数据的数组。

zip 成对组合

zip工作原理如下,当每个传入zip的流都发射完毕第一次数据时,zip将这些数据合并为数组并发射出去;当这些流都发射完第二次数据时,zip再次将它们合并为数组并发射。以此类推直到其中某个流发出结束信号,整个被合并后的流结束,不再发射数据。

const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);

Rx.Observable.zip(ob1, ob2).subscribe({
  next: (data) => console.log(data),
  complete: () => console.log('complete')
});
// ["ob1:0", "ob2:0"] ob1等待ob2发射数据,之后合并
// ["ob1:1", "ob2:1"] 此时ob2结束,整个合并的流也结束
// "complete"

zip和forkJoin的区别在于,forkJoin仅会合并各个子流最后发射的一次数据,触发一次回调;zip会等待每个子流都发射完一次数据然后合并发射,之后继续等待,直到其中某个流结束(因为此时不能使合并的数据包含每个子流的数据)。

combineLatest 一方变化就组合输出
combineLatest与zip很相似,combineLatest一开始也会等待每个子流都发射完一次数据,但是在合并时,如果子流1在等待其他流发射数据期间又发射了新数据,则使用子流最新发射的数据进行合并,之后每当有某个流发射新数据,不再等待其他流同步发射数据,而是使用其他流之前的最近一次数据进行合并。

const ob1 = Rx.Observable.interval(1000).map(d => `ob1:${d}`).take(3);
const ob2 = Rx.Observable.interval(2000).map(d => `ob2:${d}`).take(2);

Rx.Observable.combineLatest(ob1, ob2).subscribe({
  next: (data) => console.log(data),
  complete: () => console.log('complete')
});
// ["ob1:1", "ob2:0"] ob1等待ob2发射,当ob2发射时ob1已经发射了第二次数据,使用ob1的第二次数据
// ["ob1:2", "ob2:0"] ob1继续发射第三次也是最后一次数据,ob2虽然还未发射,但是可以使用它上一次的数据
// ["ob1:2", "ob2:1"] ob2发射第二次也是最后一次数据,使ob1上一次的数据。
// "complete"

转自:https://segmentfault.com/a/1190000012369871

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

forkJoin, zip, combineLatest区别 的相关文章

  • Angular 2 - FormGroup ValueChanges 取消订阅

    我有一个带有 ValueChanges 事件的 FormGroup 当用户从组件的路由移动到另一个组件然后返回到该组件时 该事件不会从内存中释放 这意味着 如果用户离开组件然后返回组件 5 次 则 onFormChange 方法会触发 5
  • 如果 observable 在 X 时间内没有发出值,则会产生副作用

    我正在研究一个用例 要求如果可观察量在一定时间内没有发出值 那么我们应该做一些副作用 给出一个实际用例 打开网络套接字连接 如果在 X 时间内没有发送 接收消息 则关闭 Web 套接字连接并通知用户 这需要在每个发出的值上以及在初始订阅可观
  • 为什么 RxJS subscribe 允许省略箭头函数和以下方法参数?

    最近需要使用RxJS 我尝试设计一个错误处理流程 但我发现了一些奇怪的语法传递方法参数 subscribe x gt console warn lt Why does this compile and warn is not 7 in de
  • 如何在多个 Angular 组件之间共享 api 响应?

    我试图把我的头绕过去并使用BehaviourSubject在 Angular 中 到目前为止还没有太多运气 我的目标是在我的组件之间共享 api 请求响应 这是我的项目的 stackblitz 设置 https stackblitz com
  • 如何使用 rxjs 在 angular2 中实现输入 keyup 事件的去抖服务

    我正在尝试在输入按键事件上调用服务 The HTML
  • RxJs 将流拆分为多个流

    如何根据分组方法将永无止境的流拆分为多个结束的流 a a a a a b b b b c c c c d d d e gt 到这些可观察到的 a a a a a b b b b c c c c d d d e gt 如您所见 a是在开始的时
  • 我的“zipLatest”运算符是否已经存在?

    关于我自己写的一个运算符的快速问题 请原谅我可怜的大理石图表 zip aa bb cc dd ee ff gg 11 22 33 44 55 a1 b2 c3 d4 e5 combineLatest aa bb cc dd ee ff gg
  • 如何在 RXJS 中启动和停止可观察的间隔?

    我有一个非常简单的 timeInterval 可观察对象 我想在不断开订阅者连接的情况下启动 停止传输 无论可观察状态如何 都应该坐下来等待 有可能吗 如果可以的话怎么办 var source Rx Observable interval
  • RXJS 等待所有 Observable 完成并返回结果

    我正在尝试创建一个 RX 流 该流将异步执行 XHR 调用列表 然后等待它们完成 然后再进行下一个调用 为了帮助解释这一点 可以在普通 JS 中这样写 try await requests map r gt angularHttpServi
  • 如何在角度中对取消订阅功能进行单元测试

    我想找到一种方法来测试订阅和主题上的取消订阅函数调用 我想出了一些可能的解决方案 但每一种都有优点和缺点 请记住 我不想出于测试目的而更改变量的访问修饰符 通过反射访问组件的私有变量 在这种情况下 我有一个存储订阅的私有类变量 组件 ts
  • 如何在计时器触发后正确调用函数进行单元测试。角度 7、RXJS 6

    在我的 Angular 7 应用程序中 我有一个用于跟踪活动用户任务的服务 在该服务中 计时器每秒运行一次 以检查是否有任何任务在 30 秒内仍未完成 如果发现任何任务已过期 则该任务将通过服务上的事件发射器发出 以便在其他地方处理 当应用
  • 获取所有当前(活动)订阅

    是否可以获取所有 活动 订阅而不手动存储它们 我想unsubscribe所有 活动 订阅 并且不想在数组或变量中引用它们中的每一个 我取决于你使用的是Subject还是Observable 但可能没有办法 自动 做到这一点 观测值 我不认为
  • Angular 自定义错误处理程序未从 Promise 获取错误类型

    当从承诺中抛出每个错误时 我的自定义错误处理程序都会丢失其类型 import HttpErrorResponse from angular common http import ErrorHandler Injectable Injecto
  • 非规范化 ngrx 存储 - 设置选择器?

    我目前正在 ngrx 项目中处理一个有点复杂 深层 的结构 它可以被认为是父对象的数组 具有多个级别的子对象 它在服务器端标准化 扁平化 我的商店中的功能看起来像这样 rootObjs level1 byId lvl1 1 id lvl1
  • Angular2 - *ngIf 和异步可观察量

    我在将 ngIf 与可观察变量一起使用时遇到问题 问题是 当我隐藏元素时 ngIf 然后再次显示 值将不会加载 因此 div someObservable async div 基本上当 showDiv 设置为true首先 加载了 someO
  • 角度 2 中的事件发生后重置计时器

    我在 15 分钟空闲会话后实现注销 而不使用第 3 方库或 ngrx 我创建了一个服务 run window onload gt this startTimer window onmousemove gt this resetTimer s
  • Angular2 RxJS从地图函数调用类函数

    我是 Angular 2 和 Observables 的新手 所以如果我的问题微不足道 我深表歉意 无论如何 我正在尝试使用 RxJS 测试 Angular 2 HTTP 客户端 虽然我让它工作了 但我需要向我当前正在处理的服务添加更多逻辑
  • rxjs 7 更新 - 主题 - 预期 1 个参数,但得到 0 个

    我将 rxjs 从版本 6 x x 更新到 7 x x 但出现以下错误 src app app component ts 中出现错误 12 19 预期有 1 个参数 但得到 0 个 当试图next一个空值Subject destroy ne
  • Angular 7通过调用两次服务订阅方法进行通信

    我正在使用角度 尝试与非父子组件进行通信 所以我通过服务来传达它 服务 ts Istoggle false Output change EventEmitter lt boolean gt new EventEmitter toggle t
  • RxJS - 我需要取消订阅吗

    如果我有这样的事情 class MyComponent constructor this interval Observbale interval 1000 const c new MyComponent const subscriptio

随机推荐

  • A complete log of this run can be found in

    安装element ui时遇到的问题 解决方法 接下来测试一下 问题解决
  • 使用OpenCV的GrabCut算法去除图像背景

    使用OpenCV的GrabCut算法去除图像背景 图像分割是计算机视觉中的重要任务之一 它可以将图像分割成不同的区域 并对这些区域进行进一步的分析和处理 其中一种常用的图像分割方法是GrabCut算法 它是一种基于图割的迭代算法 可以有效地
  • 对象引用与对象存放的地址和区别

    在java的学习当中 很多时候并没有能很好分清把对象和对象的引用 如果没能很好认识分清这两者的关系 就可能会很难理解一些指针的移动的代码 JAVA基本类型的变量的时其变量名及值 变量名及值是两个概念 是放在方法栈中 引用类型所声明的变量 该
  • 【mySQL】C++ 操作mySQL

    目录 通过mySQL 库 简介 安装和配置 linux环境 WIN32环境 C 调用mysql 通过Mysql connector c 库 前言 Connector C 使用 3 4 静态库和动态库 动态库 创建项目和配置 代码编写 使用中
  • C51定时器与计数器(学习笔记)

    1 什么是定时器与计数器 1 定时器与计数器都是soc当中的一个内部外设 计数器顾名思义是用来计数的 就和我们的秒表一样 假如定时20秒 当我们按下秒表开始计数时 数秒的过程就是计数 计时器 当秒表数到20时 定时器 就自动暂停 2 工作模
  • Redis系列--新数据类型详解

    一 Bitmaps 一 简介 计算机存储数据时 都是以二进制位表示 Redis提供了Bitmaps这个 数据类型 可以实现对位的操作 1 Bitmaps本身不是一种数据类型 实际上它就是字符串 key value 但是它可以对字符串的位进行
  • matlab 将深度图像转换为点云

    目录 一 功能概述 1 算法概述 2 主要函数 3 参考文献 二 代码实现 三 结果展示 1 深度图像 2 彩色图像 3 生成点云 四 参考链接 一 功能概述 1 算法概述 深度相机能够获取物体到相机的距离信息 可以根据距离信息 计算像素的
  • 递归调用之迷宫问题

    我们假设数字1表示墙 数字0表示可以走 那么就可以用一个二维数组来模拟一个迷宫 并可以用递归调用来求解路线 下面的代码是用Java模拟的一个迷宫 代码很简单 public class MiGong public static void ma
  • button标签的onclick事件

    1 普通的button标签定义onclick事件 1 无参
  • delphi 10.3 FastReport 多设备跨平台 打印之解决方法

    以下能WINDOWS10 DELPHI 10 3 FastReport6 0上顺利通过 基础知识点 需掌握 https blog csdn net qq 25439957 article details 87110559 FastRepor
  • Java天地 学习探讨Java Spring中使用classpath加载配置文件浅析

    Spring中使用classpath加载配置文件浅析 如果您感觉可以请提出您宝贵的意见 在应用Spring的工程中 使用class path的方式加载配置文件应该是最常用的做法 然而对大部分人来说 刚开始使用Spring时 几乎都碰到过加载
  • 免费搜索引擎登录入口

    百度免费登录入口 http www baidu com search url submit htm 新浪免费登录入口 http bizsite sina com cn newbizsite docc index 2jifu 09 htm 搜
  • Vue中@input用法以及v-model示例

  • 外部局域网直接访问WSL2

    1 开启hyper v 1 首先 进入控制面板 程序 启用或关闭windows功能 勾选hyper v 确认后重启电脑 2 打开 Windows PowerShell 输入 systeminfo 命令 能够看到出现了很多处理器的信息 最末尾
  • MongoDB - 安装

    一 Docker安装MongoDB 1 安装 安装版本 7 0 0 docker run itd name mongodb v C data mongodb data data db p 27017 27017 mongo 7 0 0 au
  • QT环境变量配置

    QT官网地址 https my qt io 一 windows系统 1 1 QT下载安装 1 2 进入Qt安装目录 1 3 拷贝gcc所在的路径 不同安装目录有所差异 C Qt Qt5 5 0 Tools mingw492 32 bin 1
  • 数学建模常用Matlab/Lingo/c代码总结系列——旅行商TSP问题

    Lingo代码 MODEL SETS CITY 1 6 U U I sequence no of city LINK CITY CITY DIST The distance matrix X X I J 1 if we use link I
  • vuex中拿不到state中值的问题(state是undefined)

    首先下载vuex然后看一下项目中main文件内有没有引入store文件并且挂载 import Vue from vue import App from App vue import Element from element ui impor
  • IDEA 打包MapReduce程序到集群运行的两种方式以及XShell和Xftp过期的解决

    参考博客 MapReduce打包成jar上传到集群运行 http t csdn cn 2gK1d Xshell7 Xftp7 解决强制更新问题 http t csdn cn rxiBG IDEA打包MapReduce程序 方式一 轻量级打包
  • forkJoin, zip, combineLatest区别

    前言 forkJoin zip combineLatest是rxjs中的合并操作符 用于对多个流进行合并 很多人第一次接触rxjs时往往分不清它们之间的区别 其实这很正常 因为当你准备用来合并的流是那种只会发射一次数据就关闭的流时 比如ht