我之前问过这个问题 https://stackoverflow.com/questions/22844717/how-do-you-execute-map-reduce-operations-with-the-reactor-framework对于 Reactor 1.x:
假设我有一个Collection<Map>
。我想要:
改造每一个Map
类型对象的实例Foo
并发(每个实例完全独立于另一个实例 - 无需串行/迭代地转换每个实例)。
当它们全部转换后,我想要一个 a 方法,onReduce(Collection<Foo> foos)
,被调用 - 参数包含所有结果Foo
实例。
但我们似乎找不到 Reactor 2.x 的等效解决方案 - 只是单线程。
如何在 Reactor 2.x 中执行多线程 map/reduce?例如,如何使用基于 ExecutorService 的调度程序来做到这一点?
现在使用 Reactor 2.0 实际上非常容易。你可以这样做:
List<Map<String, Object>> data = readData(); // <1>
Streams.from(data)
.flatMap(m -> Streams.just(m)
.dispatchOn(Environment.cachedDispatcher()) // <2>
.map(ignored -> Thread.currentThread().getName()))
.buffer() // <3>
.consume(s -> System.out.println("s: " + s)); // <4>
- 创建一个
Stream
基于输入数据。
- 创建一个新的
Stream
对于每个Map
并在给定的上调度地图操作Dispatcher
.
- 缓冲所有值直到完成,当集合清空时,这些值将被发送到下游。
- 使用列表,它是子流负载平衡转换的结果。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)