我正在尝试使用 RxJS 编写一个脚本来处理数百个日志文件,每个日志文件大约 1GB。脚本的骨架看起来像
Rx.Observable.from(arrayOfLogFilePath)
.flatMap(function(logFilePath){
return Rx.Node.fromReadStream(logFilePath)
.filter(filterLogLine)
})
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)
该代码有效,但请注意,所有日志文件的过滤步骤将同时启动。然而,从文件系统IO性能的角度来看,最好是一个接一个地处理文件(或者至少将并发限制在几个文件,而不是同时打开所有数百个文件)。对此,我该如何以“功能反应式方式”来实现呢?
我曾想过调度程序,但不知道它在这里有何帮助。
您可以使用.merge(maxConcurrent) https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/mergeproto.md来限制并发。因为.merge(maxConcurrent)
将元可观察量(可观察量的可观察量)展平为可观察量,您需要替换.flatMap
with .map
这样输出是一个元可观察的(“不平坦”),然后你调用.merge(maxConcurrent)
.
Rx.Observable.from(arrayOfLogFilePath)
.map(function(logFilePath){
return Rx.Node.fromReadStream(logFilePath)
.filter(filterLogLine)
})
.merge(2) // 2 concurrent
.groupBy(someGroupingFunc)
.map(someFurtherProcessing)
.subscribe(...)
该代码尚未经过测试(因为我无权访问您拥有的开发环境),但这就是继续的方法。 RxJS 没有很多带有并发参数的运算符,但您几乎总是可以使用.merge(maxConcurrent)
.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)