如何限制flatMap的并发数?

2024-03-26

我正在尝试使用 RxJS 编写一个脚本来处理数百个日志文件,每个日志文件大约 1GB。脚本的骨架看起来像

Rx.Observable.from(arrayOfLogFilePath)
.flatMap(function(logFilePath){
   return Rx.Node.fromReadStream(logFilePath)
   .filter(filterLogLine)
})
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)

该代码有效,但请注意,所有日志文件的过滤步骤将同时启动。然而,从文件系统IO性能的角度来看,最好是一个接一个地处理文件(或者至少将并发限制在几个文件,而不是同时打开所有数百个文件)。对此,我该如何以“功能反应式方式”来实现呢?

我曾想过调度程序,但不知道它在这里有何帮助。


您可以使用.merge(maxConcurrent) https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/mergeproto.md来限制并发。因为.merge(maxConcurrent)将元可观察量(可观察量的可观察量)展平为可观察量,您需要替换.flatMap with .map这样输出是一个元可观察的(“不平坦”),然后你调用.merge(maxConcurrent).

Rx.Observable.from(arrayOfLogFilePath)
.map(function(logFilePath){
   return Rx.Node.fromReadStream(logFilePath)
   .filter(filterLogLine)
})
.merge(2) // 2 concurrent 
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)

该代码尚未经过测试(因为我无权访问您拥有的开发环境),但这就是继续的方法。 RxJS 没有很多带有并发参数的运算符,但您几乎总是可以使用.merge(maxConcurrent).

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何限制flatMap的并发数? 的相关文章

  • 如何使用expressjs发送多个文件?

    我希望能够发送许多文件 如果可能的话 发送整个目录 以便我可以在从 html 文件调用的其他 js 文件中访问它 const app require express const http require http Server app co
  • setInterval() 如何影响性能?

    我们正在使用 Twitter Bootstrap 作为框架构建一个 Web 应用程序 但在显示 隐藏工具提示时遇到问题 除了尝试找到实际问题的解决方案之外 我还有一个关于我们同时使用的解决方法的问题 从性能角度来看 使用 setInterv
  • chai-http/superagent : 设置多部分表单字段的 Content-Type

    在集成测试中上传文件如下 chai request server instance post profile photo 0 set Access Token accessToken set API Key testConfig apiKe
  • 当模态打开时,如何将焦点设置在模态内的第一个 TouchableHighlight 组件(或另一个组件,例如由 ref 给出)上?

    当模式打开时 如何将焦点设置到模式内的第一个 或任何给定的 TouchableHighlight 组件 我正在使用方向键 键盘 电视遥控器 让我们使用react native文档模式示例的片段
  • 使用 Bluebird.js 和 Twitter 流的 Promise 和流

    我对 Promises 和 Node 非常陌生 并且对在流中使用 Promise 感到好奇 我可以承诺直播吗 使用 Bluebirdjs 和 Twit 模块 我有以下内容 var Twit require twit var Promise
  • Google javascript 登录 api:无法离线访问

    我正在尝试为服务器端应用程序实现 Google 登录 如 Google 文档中所示 服务器端应用程序的 Google 登录 https developers google com identity sign in web server si
  • webrtc - 获取网络摄像头的宽高比

    我正在尝试学习如何开发 webRTC 应用程序 我想知道是否可以获得相机的宽高比 我不知道它是否有帮助 但我正在使用 webrtc io 但是 if更好 我可以停止使用它 From MDN https developer mozilla o
  • 匹配 JavaScript RegEx 中的不可见字符

    我有一些包含不可见字符的字符串 但它们位于可预测的位置 通常 围绕我想要提取的文本片段 然后在第二次出现之后我想保留文本的其余部分 我似乎不知道如何关闭隐形字符 and将它们从我的结果中排除 为了匹配隐形 我一直在使用这个正则表达式 xA0
  • 如何设置黄瓜环境变量

    我有以下 package json name newcucumber version 1 0 0 main index js scripts test node modules bin cucumber js firefox node mo
  • JSSOR - 无法读取未定义的类型属性“currentStyle”

    我正在尝试将 Jssor 滑块实现到我的页面中 但我不断在标题中出现该错误 我的内容是通过 Javascript 动态创建的 如下所示 var slide app createHTML div id inventorySlides null
  • 如何将多个文件上传到Firebase?

    有没有办法将多个文件上传到 Firebase 存储 它可以在一次尝试内上传单个文件 如下所示 fileButton addEventListener change function e Get file var file e target
  • Node.js 循环发送 http 请求

    我实际上遇到了使用 node js 执行的 javascript 代码的问题 我需要循环发送http请求到远程服务器 我在代码中设置了www google ca 这是我的代码 var http require http var option
  • 如何使用 Selenium webdriver 测试对 SVG 对象的点击?

    我正在尝试编写代码来检查单击 SVG 对象的功能 例如此 URL 上的美国州 http www amcharts com svg maps map usa 这可行 但是有更好的方法吗 不需要物理移动鼠标的东西 robert new Robo
  • Ember:命名出口错误

    我不知道为什么我的模板没有在指定的插座中呈现 这是我第一次尝试学习 ember 我被困在指定的渠道上 我想渲染侧边栏模板 in the outlet sidebar 和内容模板 in the outlet content 但我不断在控制台中
  • 在流星收集加载时显示加载程序

    我有一个模板 task list 看起来像这样 each tasks gt task each Template task list tasks返回一个集合 在用户界面中 加载似乎需要一些时间 当集合正在加载时 我想显示一个加载指示器 关于
  • JQuery 验证不起作用

    我有一种表单 其中一个输入类型的值为 名字 但这可以在 onfocus 函数上更改我想验证此输入字段 如果它为空白或 名字 我有两个 jQuery 文件jquery 1 4 2 min js jquery validate pack js
  • 通过ajax POST提交两次表单

    插入到mysql using php通过文件调用AJAX 前insert语句php代码执行select查询到查找重复记录并继续insert statement Issue 从ajax调用php文件时 它执行了两次并得到作为重复记录的响应 好
  • 如何使用 javascript 迭代文件系统目录和文件?

    我正在使用 Javascript 编写一个应用程序 该应用程序将与 Phonegap 一起使用来制作 Android 应用程序 我正在使用 Phonegap File API 来读取目录和文件 相关代码如下所示 document addEv
  • 谷歌浏览器不显示一个网站的alert()弹出窗口

    我正在开发一个 javascript 循环 该循环会随着循环的进行而提醒每个键值 为了加快速度 我选中了 阻止此页面创建其他对话框 框 通常这只会抑制一个例程的弹出窗口 但它们还没有回来 在 Google Chrome 中 alert 消息
  • 您最喜欢的 JS/CSS 下拉菜单是什么? [关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 希望在网站上实现一个 只是好奇其他人都使用过什么以及他们有过什么样的体验 EDIT我也不是一个超级粉丝

随机推荐

  • Google reCAPTCHA、405 错误和 CORS 问题

    我正在使用 AngularJS 并尝试使用 Google 的 reCAPTCHA 我正在使用 显式呈现 reCAPTCHA 小部件 在我的网页上显示 reCAPTCHA 的方法 HTML 代码
  • 如何使用 ListView 呈现具有多个行跨列的数据表

    我需要在 html 表中显示数据库中的数据 我目前正在使用 ListView 控件 我希望最终的 HTML 表呈现如下所示的内容 其中某些行有一个rowspan属性大于一 原因是有些字段有几行信息 但对应同一个逻辑条目 例如 data da
  • 实现 Win32 消息循环并使用 P/Invoke 创建 Window 对象

    我的主要目标是纯粹使用以下方法实现正确的消息循环P Invoke http en wikipedia org wiki Platform Invocation Services能够处理 USB HID 事件的调用 当然 它的功能应该与以下代
  • vis.js - 如何运行 getSeed() 方法

    我正在使用 vis js 创建一些数据地图 在文档中它说 理想情况下 你尝试使用未定义的种子 重新加载 直到你满意为止 与布局并使用getSeed 确定种子的方法 然而 对于我的一生 我无法弄清楚如何 使用getSeed 方法 我认为它是一
  • 了解 Deferred.pipe()

    我一直在阅读有关 jQuery 中的 deferreds 和 Promise 的内容 但我还没有使用过它 除了方法管道之外 我已经很好地理解了一切 我实在没明白那是什么 有人可以帮助我了解它的作用以及它可以在哪里使用吗 我知道有一个问题的标
  • Angular2 - 多个组件中的“监视”提供程序属性

    我来自 NG1 环境 目前我正在创建具有所有可用功能的 NG2 应用程序 在提出这个问题之前 我正在探索 google 和 stackoverflow 的问题 但没有运气 因为 Angular 2 的 api 架构发展得如此之快 而且大多数
  • ASP.NET Web 部署失败;项目中不存在 AddScheduledJob

    我有一个包含两个 WebJobs 的 ASP NET 项目 其中一个 WebJobs 可以正常发布 但第二个 WebJobs 无法发布 并且收到以下错误消息 项目中不存在目标 AddScheduledJob 两个 WebJobs 都设置为
  • R - 根据多个条件匹配来自 2 个数据帧的值(当查找 ID 的顺序是随机时)

    嗨我有两个数据框 df1 data frame PersonId1 c 1 2 3 4 5 6 7 8 9 10 1 PersonId2 c 11 12 13 14 15 16 17 18 19 20 11 Played together
  • 每条路由的 NancyFx 身份验证

    从我在源代码中看到的 RequiresAuthentication 对整个模块进行身份验证检查 有什么办法可以按路线执行此操作吗 我有同样的问题 然而事实证明RequiresAuthentication在模块级别和路由级别都有效 为了演示
  • CollapsingToolBarLayout 无法实例化

    在过去的几个月里 我一直在使用折叠工具栏布局 没有任何问题 然而今天 每当我打开 xml 文件时都会弹出此错误 确切的错误如下 以下类无法实例化 android support design widget CollapsingToolbar
  • Angular 通用 - 为客户端缓存的服务器端请求

    我看过很多关于在角度通用应用程序中缓存客户端数据的文章 因此它不会重复客户端上已在服务器上解析的请求 我只是不明白数据如何从服务器传输到客户端 我是否将 JSON 注入到预渲染的 HTML 中 还是缺少其他内容 从 Angular 5 开始
  • 如何获取已安装打印机的列表?

    我正在寻找获取已安装打印机列表的可能性 我在 Windows 操作系统中使用 JDK 1 6 有谁知道解决方案吗 先感谢您 只是想添加一个小片段 import javax print class Test public static voi
  • Xcode 4 - 一键构建多个设备?

    在 Xcode 4 中 是否可以通过某种方式进行设置 以便通过单击左上角的 运行 按钮 可以一键将 Xcode 构建到多个设备 这会节省很多时间 不断地点击 构建到iPhone 然后构建到 iPod 然后依次点击 构建到iPad 一键完成所
  • 等待 puppeteer 中的过渡结束

    我正在尝试使用 Puppeteer 测试网站 不幸的是 我在单击工具栏中的元素时遇到问题 工具栏使用 CSS 过渡来优雅地滑入页面 我的代码失败 因为我在页面仍在动画时单击元素将出现的位置 我使用超时作为解决方法 但必须有一个更优雅的解决方
  • 如何选择字段总和大于 MongoDB 中的值的位置

    使用 MongoDB 我将如何编写这个常规 SQL 语句 SELECT FROM table WHERE field1 field2 field3 gt 1 我一直在搞乱 group project add 等 我觉得我在围绕解决方案跳舞
  • 如何额外保护已使用 OAuth 2.0 访问令牌的 REST 服务?

    我有以下 REST 服务 An 聚合器暴露于外界的服务 它由用户 OAuth 2 0 访问令牌保护 这聚合器称为Internal服务 The Internal服务是在网络级别的not暴露于外界 它还由同一用户 OAuth 2 0 访问令牌保
  • 如何在 Delphi 中检测等宽字体?

    如何在 Delphi 中检测等宽字体 TFont Pitch应该fpFixed我想 但它对我来说不适用于 Delphi XE4 var Font TFont begin Font TFont Create Font Name Courier
  • Android 上的“上下文”是什么?

    在Android编程中 到底什么是Context类以及它的用途是什么 我在开发者网站 https d android com reference android content Context 但我无法清楚地理解它 简单来说 顾名思义 它是
  • Hibernate/JPA DB 架构生成最佳实践

    我只是想听听 Hibernate 专家关于基于 Hibernate JPA 的项目的数据库模式生成最佳实践的意见 尤其 项目刚开始时采用什么策略 是否建议让 Hibernate 在这个阶段自动生成架构 还是从项目的最早阶段手动创建数据库表更
  • 如何限制flatMap的并发数?

    我正在尝试使用 RxJS 编写一个脚本来处理数百个日志文件 每个日志文件大约 1GB 脚本的骨架看起来像 Rx Observable from arrayOfLogFilePath flatMap function logFilePath