跟进问题https://stackoverflow.com/a/47136941/1776585 https://stackoverflow.com/a/47136941/1776585
我无法使我的集成处理程序在使用时在并行线程中运行Flux
+ split()
+ FluxMessageChannel
.
考虑以下片段:
// ...
.handle(message -> Flux.range(0, 10)
.doOnNext(i -> LOG.info("> " + i))
.subscribeOn(Schedulers.parallel()))
.split()
.channel(new FluxMessageChannel())
.handle(message -> LOG.info(" -> " + message.getPayload())))
// ...
所有日志都在一个线程中输出:
[ parallel-1] d.a.Application : > 0
[ parallel-1] d.a.Application : -> 0
[ parallel-1] d.a.Application : > 1
[ parallel-1] d.a.Application : -> 1
[ parallel-1] d.a.Application : > 2
[ parallel-1] d.a.Application : -> 2
[ parallel-1] d.a.Application : > 3
[ parallel-1] d.a.Application : -> 3
[ parallel-1] d.a.Application : > 4
[ parallel-1] d.a.Application : -> 4
[ parallel-1] d.a.Application : > 5
[ parallel-1] d.a.Application : -> 5
[ parallel-1] d.a.Application : > 6
[ parallel-1] d.a.Application : -> 6
[ parallel-1] d.a.Application : > 7
[ parallel-1] d.a.Application : -> 7
[ parallel-1] d.a.Application : > 8
[ parallel-1] d.a.Application : -> 8
[ parallel-1] d.a.Application : > 9
[ parallel-1] d.a.Application : -> 9
如何强制在多个线程中进行处理?
我尝试过使用.parallel().runOn()
on the Flux
,但这只是并行获取数据,但实际处理仍然在一个线程上运行。
我也尝试过.publishOn(Schedulers.parallel())
on the Flux
没有效果。
并且还添加ExecutorChannel
or a Poller
与处理程序的执行者没有帮助。
这有一些技巧:
.channel(new FluxMessageChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(message -> LOG.info(" -> " + message.getPayload())))
而这些消息被FluxMessageChannel
将与额外的并行ExecutorChannel
.
我认为您所要求的就像是提出上述功能的请求FluxMessageChannel
可配置。还有这样一个subscribeOn/publishOn
等可以在那里配置。
请随意提出JIRA https://jira.spring.io/browse/INT就此事!
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)