如何一次处理 RxJS 流 n 个项目,并在完成一个项目后,再次自动填充回 n 个项目?

2024-03-11

我有一个事件流,我想调用一个函数,为每个事件返回一个承诺,问题是这个函数非常昂贵,所以我想一次最多处理 n 个事件。

这个卵石图可能是错误的,但这就是我想要的:

---x--x--xxxxxxx-------------x------------->  //Events
---p--p--pppp------p-p-p-----p------------->  //In Progress
-------d--d--------d-d-dd------dddd-------->  //Promise Done

---1--21-2-34-----------3----4-3210--------   //QUEUE SIZE CHANGES

这是我到目前为止的代码:

var n = 4;
var inProgressCount = 0;

var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
  .map((ev) => new Date().getTime());

var inProgress$ = events$.controlled();

var done$ = inProgress$      
  .tap(() => inProgressCount++)
  .flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));

done$.subscribeOnNext((timestamp) => {
  inProgressCount--;
  inProgress$.request(Math.max(1, n - inProgressCount));
});

inProgress$.request(n);

这段代码有两个问题:

  1. 它正在使用inProgressCountvar 随 side 更新 效应函数。
  2. The done$ subscription is only called once when I request more than 1 item from the controlled stream. This is making the inProgressCount var to update incorrectly, this eventually limits the queue to one at a time.

你可以在这里看到它的工作原理:http://jsbin.com/wivehonifi/1/edit?js,控制台,输出 http://jsbin.com/wivehonifi/1/edit?js,console,output

问题:

  1. 有更好的方法吗?
  2. 我怎样才能摆脱inProgressCount多变的?
  3. Why is the done$ subscription only getting called once when requesting multiple items?

Update:
问题#3 的答案:switchMap 与 flatMapLatest 相同,所以这就是为什么我只得到最后一个。将代码更新为 flatMap 而不是 switchMap。


实际上你根本不需要使用背压。有一个运算符叫flatMapWithMaxConcurrent这会为你做这件事。它本质上是调用的别名.map().merge(concurrency)并且它只允许一次传输最大数量的流。

我在这里更新了你的 jsbin:http://jsbin.com/weheyuceke/1/edit?js,输出 http://jsbin.com/weheyuceke/1/edit?js,output

但我在下面注释了重要的一点:

const concurrency = 4;

var done$ = events$
  //Only allows a maximum number of items to be subscribed to at a time
  .flatMapWithMaxConcurrent(concurrency, 
    ({timestamp}) =>   
      //This overload of `fromPromise` defers the execution of the lambda
      //until subscription                    
      Rx.Observable.fromPromise(() => { 
        //Notify the ui that this task is in progress                                 
        updatePanelAppend(inProgress, timestamp);
        removeFromPanel(pending, timestamp);
        //return the task
        return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)
     }));
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何一次处理 RxJS 流 n 个项目,并在完成一个项目后,再次自动填充回 n 个项目? 的相关文章

  • TinyMCE 输入以相反顺序写入

    我面临这个问题 每当我输入 TinyMCE 时 我的光标会自动向左移动 最终从右向左写入文本 它只发生在我部署的应用程序中 但如果我在我的机器上本地运行代码 同样可以正常工作 发生这种情况的任何可能原因 相同的屏幕截图 https i st
  • 将字符串转换为变量名。 (JavaScript)

    我确实查看了前面的问题 但它们是针对整数值的 我需要文本值的答案 我在本周早些时候问了一个与此相关的问题 但现在是这样 如下所示 我使 Make x 等于某个字符串值 Acura Honda Toyota 当我将 Make x 传递到函数
  • 将 FBX 文件转换为 .gltf 后,模型非常小,为什么?

    问题 将 FBX 文件转换为 gltf 后 模型非常小 为什么 我尝试用以下方法缩放模型frontObject scale set 1000 1000 1000 但我收到以下错误 TypeError Cannot read property
  • RequireJS 不遵循设置了 baseUrl 的 data-main 的相对路径

    使用 requireJS 我尝试为我的数据主指定一个与 baseUrl 不同的路径 看来 requireJS 会忽略我在文件名之前输入的任何内容 并始终在 baseUrl 文件夹中查找该文件 我有以下文件夹结构 index html scr
  • TypeScript 中类和命名空间的区别

    到底有什么区别classes and namespaces在打字稿中 我知道 如果您创建一个带有静态方法的类 您可以在不实例化该类的情况下访问它们 这正是我猜想的命名空间的要点之一 我还知道你可以创建多个同名的命名空间 并且它们的方法在编译
  • IE8 中的 Javascript 消息超出堆栈空间

    我正在使用 Breeze 1 4 1 Internet Explorer 8 和 ASP NET MVC 4 Web API 我在查询时收到以下消息 查询失败 localhost port breeze Data Metadata 元数据导
  • Angular 7 Guard 重定向仅适用于双击

    问题是我已经实现了一个 Guard 旨在处理特定的目录 如果当前用户名的角色等于 2 它应该返回 true 如果没有 那么它不应该重定向 这是我的 app routing module ts 文件 问题出在 userlist 路径中 我们是
  • Javascript 根据字段值任意排序数组

    所以我有一个对象数组 如下所示 var myArray priority low priority critical priority high 我需要以这种方式排序 1 关键 2 高和3 低 如何才能做到这一点 我建议使用一个对象来存储排
  • jQuery 问题:它的真正含义是什么?

    function window undefined jquery code jQuery window 它到底意味着什么 是不是也意味着 document ready 或者只是两种不同的东西 已经有两个答案 但这是我对代码缺失端的猜测 fu
  • Angular UI.Bootstrap 单选按钮在 ng-repeat 中表现得很奇怪[重复]

    这个问题在这里已经有答案了 我在 Angular 的 ui bootstrap 中动态生成无线电模型的选项时遇到问题 我想我可以简单地对数组进行 ng repeat 使用 btn radio 属性的内容 如下所示 in the contro
  • JQuery _renderItem 没有被调用

    我正在尝试使用 renderItem 函数创建自定义 ui menu item 元素 但经过可能尝试后 我什至无法调用该函数 自动完成功能正在工作 但就像 renderItem 函数不存在一样 这是我的脚本部分
  • 从 UnityWebGL jslib 返回字符串

    我想使用 jslib 来获取网址参数 像这样的代码 jslib GetUrl function var s var strUrl window location search var getSearch strUrl split var g
  • 如何混淆或使 JavaScript 文件不可读?

    我的应用程序中有 JavaScript 脚本 其中包含 JavaScript 和 jQuery 函数 所有用户与我的应用程序的交互都是动态的 并且通过 jQuery 传递到应用程序 我意识到 当我在客户端运行我的应用程序时 客户端可以通过查
  • 传单 - 导入 Geojson - Angular 6

    我尝试将 GeoJson 文件导入到 Angular 的应用程序 6 中的传单中 通过这个解决方案 我的 geojson 是在 leafletmap 中绘制的 但我有这个错误 我无法构建我的应用程序 有人知道一种解决方案吗 错误 TS234
  • 自动调整元素 (div) 大小以适合水平内容

    我尝试谷歌搜索 但没有得到太多结果 我正在构建一个水平轮播 它在浮动的 LI 中显示图像 我想解决的问题是 每次我向轮播添加缩略图 我是延迟加载 时 我都需要重新计算轮播的宽度 以便所有浮动缩略图很好地并排排列 其一 我宁愿不必在 JS 中
  • Node.js - Async.js:并行执行如何工作?

    我想知道 async js 中并行执行是如何工作的 async require async async parallel function callback for var i 0 i lt 1000000000 i Do nothing
  • NodeJS 中的缩进多行日志记录

    我要打印JSON stringify d 反对控制台 将上下文作为 Mocha 测试套件输出的一部分 当测试缩进时 我希望对象日志行向右缩进足够远 例如 3 4 个制表符空格 以便它们可以识别地位于右侧describe group 我怎样才
  • router.navigate 使用查询参数 Angular 5

    我在使用查询参数路由到路由时遇到问题我有一个像这样的函数 goToLink link this router navigate link split 0 queryParams this sortParams link 和这个功能 sort
  • 在 HTML5 画布上创建颜色选择器

    如何在 HTML5 画布上绘制颜色选择器 一个基本的例子是使用getImageData http jsfiddle net eGjak 60 http jsfiddle net eGjak 60 var ctx cv get 0 getCo
  • ES6解构对象赋值函数参数默认值

    您好 我正在查看在传递函数参数时使用对象解构的示例对象解构演示 https developer mozilla org en US docs Web JavaScript Reference Operators Destructuring

随机推荐

  • 它们真的是虚拟代码吗?

    某些按键的虚拟按键代码 例如移位 Del 等与 C C 相比 在 java 中显示为不同的值 例如 Key Java C C Shift 16 160 91 219 93 221 92 220 Del 127 46 Window 524 9
  • 如何为另一个类型类中的所有类型编写实例?

    我必须定义一个类型类Truthy其中包含一个方法true将类型类的实例转换为Bool value 我的类型类声明 class Truthy a where true a gt Bool 接下来 我必须为各种类型定义此类的实例 包括列表和数字
  • 这段代码使用 wstring 和 MultiByteToWideChar 安全吗?

    Using std wstring我现在的样子MultiByteToWideChar std wstring widen const std string in int len MultiByteToWideChar CP UTF8 0 i
  • Pandas:将列的值分配给字典值设置的限制

    我怎样才能删除iterrows 使用 numpy 或 pandas 可以更快地完成此操作吗 import pandas as pd import numpy as np df pd DataFrame A foo bar foo bar f
  • 获取没有滚动条的UITableView的高度

    我需要获得一个的完整高度UITableView 即没有更多内容可滚动的高度 有什么办法可以做到这一点吗 我试过了 tableView sizeThatFits CGSizeZero 但只返回 0x0CGSize Try the conten
  • GitHub 项目最新版本的下载链接

    我正在尝试向我的网站添加一个项目最新 github 版本的下载链接 例如链接https github com mongodb mongo archive r3 0 0 rc7 zip https github com mongodb mon
  • 无法恢复几何备份 MySQL 5.7 错误

    我一直在从 Mysql 升级网站5 6 to 5 7 当从以下位置恢复备份时mysqldump 在Mysql 5 1 5 6下工作了10年 不变 在MySQL下不再工作5 7 具体来说 第一行几何数据恢复失败 ERROR 1416 2200
  • 特殊字符(夏威夷语“Okina”)导致奇怪的字符串行为

    The 夏威夷语报价 https en wikipedia org wiki CA BBOkina当 T SQL 与字符串函数结合使用时 它会出现一些奇怪的行为 这里发生了什么 我错过了什么吗 其他角色是否也遇到同样的问题 SELECT U
  • 打包时只生成一场战争

    默认情况下 JHipster 在打包阶段生成 2 个 war your project version war 和 your project version war original 第一个是可执行 jar 第二个是可以在 servlet
  • Clojure 中如何泄漏内存?

    为了周四在湾区 Clojure 聚会上的演讲 我正在整理一份 Clojure 中泄漏内存的方法列表 到目前为止我有 抓住无限序列的头部 通过在循环中调用 lambda 创建大量泛型类 这仍然是一个问题 保存对未使用数据的引用 还有什么 通过
  • 显式遵守 Codable 删除了结构上的成员初始化程序生成

    Given struct Foo let bar Bar 我得到了一个方便的初始化程序来使用 let foo Foo bar Bar But if Bar不是它本身Codable 或者由于某些其他原因我需要明确实现Codable on Fo
  • React - 反应脚本 publicPath

    是否有可能覆盖开发环境的反应脚本中的 publicPath 我使用 symfony 并且在 twig 中包含了 React 应用程序 所以我必须更改资产来提供服务http localhost 3000 static js bundle js
  • SetWindowPos() 函数不移动窗口?

    我有一个对话框 我想将其放置在另一个对话框中 并相对于主对话框上的其中一个控件进行定位 void CspAceDlg DrawResultsArea CWnd pTabCtl GetDlgItem IDC BUILDTABS CRect r
  • 由于 GCM SenderId Android 无法生成 APK 版本

    我在我的应用程序中实现了 GCM Google Cloud Messaging Google Play 服务库已自动生成values xml其中我的senderId is
  • 如何在android VideoView中显示Youtube视频? [关闭]

    很难说出这里问的是什么 这个问题是含糊的 模糊的 不完整的 过于宽泛的或修辞性的 无法以目前的形式得到合理的回答 如需帮助澄清此问题以便重新打开 访问帮助中心 help reopen questions 我正在使用 android 应用程序
  • 使用 Jquery 更改选择值

    我正在尝试使用 Jquery 更改表单中 选择 输入的值 但是当它更改时 该更改的函数不起作用 HTML
  • servlet 的根 URl

    我想从其中一个 servlet 获取 Web 应用程序的根 url 如果我将应用程序部署在 www mydomain com 中 我想获得像 http www mydomain com http www mydomain com 同样的事情
  • 使用 Neo4j 的 Cypher 返回我的朋友和朋友的朋友

    我有具有双向关系的节点 如下 我正在尝试创建一个查询 我想返回特定节点的所有后续节点及其后续节点的后续节点 3 希望深度 例如假设这些关系 gt 符号跟随 A gt B B gt A B gt C C gt B C gt D D gt C
  • Spring Data Mongo - 如何映射继承的 POJO 实体?

    我对 Spring 还很陌生 但我想在这个项目上尝试一下 我有一个 MongoDB 数据库 其中填充了相当复杂的文档 我想使用 Spring data Mongo 来查询 没有其他 CRUD 操作 数据库 我已经使用 POJO 描述了我的文
  • 如何一次处理 RxJS 流 n 个项目,并在完成一个项目后,再次自动填充回 n 个项目?

    我有一个事件流 我想调用一个函数 为每个事件返回一个承诺 问题是这个函数非常昂贵 所以我想一次最多处理 n 个事件 这个卵石图可能是错误的 但这就是我想要的 x x xxxxxxx x gt Events p p pppp p p p p