Spring Integration 5.0 + Project Reactor:控制线程

2024-02-29

跟进问题https://stackoverflow.com/a/47136941/1776585 https://stackoverflow.com/a/47136941/1776585

我无法使我的集成​​处理程序在使用时在并行线程中运行Flux + split() + FluxMessageChannel.

考虑以下片段:

// ...
.handle(message -> Flux.range(0, 10)
    .doOnNext(i -> LOG.info("> " + i))
    .subscribeOn(Schedulers.parallel()))
.split()
.channel(new FluxMessageChannel())
.handle(message -> LOG.info(" -> " + message.getPayload())))
// ...

所有日志都在一个线程中输出:

[     parallel-1] d.a.Application    : > 0
[     parallel-1] d.a.Application    :  -> 0
[     parallel-1] d.a.Application    : > 1
[     parallel-1] d.a.Application    :  -> 1
[     parallel-1] d.a.Application    : > 2
[     parallel-1] d.a.Application    :  -> 2
[     parallel-1] d.a.Application    : > 3
[     parallel-1] d.a.Application    :  -> 3
[     parallel-1] d.a.Application    : > 4
[     parallel-1] d.a.Application    :  -> 4
[     parallel-1] d.a.Application    : > 5
[     parallel-1] d.a.Application    :  -> 5
[     parallel-1] d.a.Application    : > 6
[     parallel-1] d.a.Application    :  -> 6
[     parallel-1] d.a.Application    : > 7
[     parallel-1] d.a.Application    :  -> 7
[     parallel-1] d.a.Application    : > 8
[     parallel-1] d.a.Application    :  -> 8
[     parallel-1] d.a.Application    : > 9
[     parallel-1] d.a.Application    :  -> 9

如何强制在多个线程中进行处理?

我尝试过使用.parallel().runOn() on the Flux,但这只是并行获取数据,但实际处理仍然在一个线程上运行。

我也尝试过.publishOn(Schedulers.parallel()) on the Flux没有效果。

并且还添加ExecutorChannel or a Poller与处理程序的执行者没有帮助。


这有一些技巧:

.channel(new FluxMessageChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(message -> LOG.info(" -> " + message.getPayload())))

而这些消息被FluxMessageChannel将与额外的并行ExecutorChannel.

我认为您所要求的就像是提出上述功能的请求FluxMessageChannel可配置。还有这样一个subscribeOn/publishOn等可以在那里配置。

请随意提出JIRA https://jira.spring.io/browse/INT就此事!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spring Integration 5.0 + Project Reactor:控制线程 的相关文章

随机推荐

  • Java EE 7 属性文件配置的最佳实践建议是什么?

    应用程序配置在现代 Java EE 应用程序中属于什么位置 人们有哪些最佳实践建议 通过应用程序配置 我的意思是诸如与其他设备上的服务的连接设置之类的设置 包括外部设备 例如 Twitter 和我们的内部 Cassandra 服务器 用于主
  • VS2015中如何设置工具提示颜色?

    我刚刚安装了 Visual Studio 2015 总体来说非常好 但是您可以在 工具 gt 选项 gt 字体和颜色 下配置的大量不同内容完全让我感到困惑 我需要找到允许我更改此工具提示颜色的设置 以便我可以实际阅读它 它在哪里 下载颜色主
  • php:: tmp 文件保留多长时间?

    我正在编写上传脚本 如果用户上传一个文件并且该文件已经存在 我想警告用户 这都是通过ajax 并让他们选择替换它或取消 而不是移动文件 我很好奇是否可以将文件保留在 tmp 中并在 ajax 响应中传回该文件的路径 如果用户说覆盖该 aja
  • Android 版 Chrome 无法正确显示 Google 网络字体

    我已经使用 CSS 重现了我遇到的问题font family以及 Android 版 Chrome Web 浏览器无法正确继承字体 而是使用后备字体 http jsbin com iyifah 1 edit http jsbin com i
  • RxJS - 使用成对确认和恢复输入字段

    所以我对可观察的东西有点陌生 我正在努力解决一个场景 我认为它可能是一个很好的候选问题 开始了 场景是这样的 我有一个下拉字段 当它改变时 我想要 检查基于条件以前的值和新的值领域的 如果条件通过 则请求用户确认 并且 如果用户未确认 则恢
  • Android 上的 Bootstrap 3 长模态滚动背景

    我有一个长模态框 无法在我的 Android 移动设备上完全显示 按钮位于屏幕底部下方 模态框根本不滚动 但模态框后面的灰色背景会滚动 是否有任何 css js锁定背景并允许模式在显示时滚动的技巧 可能是因为模态类位置是固定的 尝试将下面的
  • 当某些值丢失时如何绘制谷歌折线图?

    我在以下位置找到了以下 JavaScript 代码谷歌图表工具 http code google com apis chart interactive docs gallery imagelinechart html function dr
  • Colab 上的 gdrive 问题

    安装谷歌驱动器后 我正在使用 colab 在 cifar10 上训练 resnet 我克隆了存储库并且能够运行该脚本 然而 Tensorflow 已加载 数据文件已传递到网络 但我以以下内容结束 tensorflow python fram
  • Java中获取默认根目录

    我正在制作一个基本的文件浏览器 并且想知道如何获取默认根目录 我知道java io File listRoots 给出所有的根 对我来说是A C D E F G H I L T U X Y Z 但我想要用户主要使用的那个 即带有操作系统的那
  • XSD 中缺少响应和 DTO 对象

    我正在使用最新版本的 ServiceStack 和 NuGet 我已经有了一个基本的服务设置 可以与 JsonServiceClient 很好地配合 并且按预期通过了所有单元测试 不幸的是 我还尝试支持 SOAP 和 Visual Stud
  • 适用于 Microsoft Windows 的终端多路复用器 - GNU Screen 或 tmux 的安装程序 [关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我正在寻找适用于 Microsoft Windows 的终端多路复用器 我无法找到 tmux 和 GNU Screen 的 Microso
  • AngularJS 指令根据指令名称的第一个字符而中断

    我编写了一个 Angular 指令 该指令表现出一些奇怪的行为 该指令向 parsers 添加了一个函数 以根据正则表达式模式限制用户输入的内容 如果当前文本与模式不匹配 解析器会将文本恢复为该字段的先前值 因此 当文本恢复时 Angula
  • 特定于平台的 std::chrono::high_resolution_clock::period::num

    我注意到了std chrono high resolution clock period num 1对于我测试过的每个系统 是否存在任何系统 嵌入式 桌面 移动或其他 它恰好是其他数字 在这样的系统上 1 秒不能用刻度来表示 有以下三种实现
  • Chrome 扩展 - 如何使用清单 v3 访问本地 file://

    我有一个 Chrome 扩展程序 可以 如果您允许访问文件 URL 抓取您在 Chrome 中打开的本地 pdf 文件 并将其发送到我们的 API 进行处理 这是通过获取 pdf 来完成的XMLHttpRequest to file Use
  • 时间:2019-03-07 标签:c#thread

    我正在学习有关线程的更多信息 并且我使用以下代码创建了一个相当简单的 WPF 应用程序 x64 平台构建 public partial class MainWindow Window public MainWindow Initialize
  • mysqldump:错误 2020:转储表时数据包大于“max_allowed_pa​​cket”字节

    mysqldump Error 2020 Got packet bigger than max allowed packet bytes when dumping table 发生 当我做一个 mysqldump u root p 数据库
  • 使用 jQuery 每 10 秒用 php 数据刷新一个 div

    我尝试使用 jQuery 每 10 秒刷新一次 div 中存储的数据 我的 HTML 代码是
  • 如何将 Python 中的所有 unicode 小写字符与正则表达式匹配?

    我正在尝试编写一个与 Python 3 中的 Unicode 小写字符匹配的正则表达式 我正在使用re图书馆 例如 re findall some pattern u K 应该返回 u 在 Sublime Text 中 我只需输入 lowe
  • Docker PHP 与 Xdebug 3 env XDEBUG_MODE 不起作用

    我正在尝试配置Xdebug 3在 PHP 容器中 并设置XDEBUG MODE环境变量为off根据文件https xdebug org docs all settings mode https xdebug org docs all set
  • Spring Integration 5.0 + Project Reactor:控制线程

    跟进问题https stackoverflow com a 47136941 1776585 https stackoverflow com a 47136941 1776585 我无法使我的集成 处理程序在使用时在并行线程中运行Flux