Camel 2.11批量聚合如何与单独的路由一起工作?

2023-12-05

首先有一个类似的未回答的问题将路由加入单个聚合器

我们有一些消费者路由(ftp、file、smb)从远程系统读取文件。 简化了直接路由的测试,但与批量消费者的行为类似:

from("direct:"+routeId).id(routeId)
 .setProperty(AGGREGATION_PROPERTY, constant(routeId))
 .log(String.format("Sending (${body}) to %s", "direct:start1"))
 .to("direct:aggregate");

转换后,一次轮询的所有结果都会在单独的路径中按批次聚合:

from("direct:aggregate")
  .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
  .completionFromBatchConsumer()
  .to("log:result", "mock:result");

如果每个消费者都分开运行,那么一切都会正常进行。但如果多个消费者并行运行,聚合将分割轮询。例如,如果文件消费者轮询 500 条消息,并且第二条路由开始从 ftp 读取 6 个文件,则期望我们得到 2 个聚合,其中 1 个包含来自文件的 500 条消息,1 个包含来自 ftp 的 6 个消息。

测试用例:

public void testAggregateByProperty() throws Exception {
    MockEndpoint result =  getMockEndpoint("mock:result");

    result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");

    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
    template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
    template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
    template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);

    assertMockEndpointsSatisfied();
}

结果是:“A+A”、“B”、“A”、“B”、“A”,而不是预期的“A+A+A”、“B+B”、“A”、“Z” 。 问题:

  1. 我们关于聚合的假设是错误的吗?
  2. 我们如何才能实现预期的行为?
  3. 如果我们设置completionTimeout,那么超时将从第一次交换开始发生——如果仍然有新的交换,则独立?

你几乎已经可以工作了。这是您需要的更改(之后我将解释)。

from("direct:aggregate").id("aggregate")
    .aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregationStrategy())
    .completionSize(property(Exchange.BATCH_SIZE))
    .to("log:result", "mock:result")

结果将是:

Exchange received, body: A+A+A
Exchange received, body: B+B
Exchange received, body: A

注意:您不会收到以下结果:"Z"因为批量大小是7.

解释一下 - 正如您所读到的,聚合器是一个多功能的骆驼组件,正确定义的关键是:

  • 聚合表达式
  • 完成规则

现在,就您而言,您正在聚合一处房产AGGREGATION_PROPERTY这将是A, B or Z。此外,您还要指定批量大小。

然而你并没有表达completionSize()在你的路线中。相反,你正在使用completionFromBatchConsumer- 它做了一些不同的事情(代码指出它寻找一个Exchange#BATCH_COMPLETE属性),因此产生了奇怪的结果。

Anyway, .completionSize(Exchange.BATCH_SIZE)将使您的测试按预期运行。

祝你好运。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Camel 2.11批量聚合如何与单独的路由一起工作? 的相关文章

  • 获取 UndeclaredThrowableException 而不是我自己的异常

    我有以下代码 public Object handlePermission ProceedingJoinPoint joinPoint RequirePermission permission throws AccessException
  • java字符串索引越界异常

    从类中调用函数时出现以下错误 java lang StringIndexOutOfBoundsException 字符串索引超出范围 1 尽管我使用系统打印来查看我在 substring 函数中传递的输入 但一切似乎都是正确的 函数 isC
  • 超立方体错误。非法的最小或最大规格

    尝试从这里运行示例代码http tess4j sourceforge net codesample html http tess4j sourceforge net codesample html我收到一条错误消息 Error Illega
  • Java程序验证signtool签名的数字签名

    我已经使用 SignTool 对文件 exe 或 dll 不是 jar 文件 进行了数字签名 Signtool还可以验证数字签名 但我的要求是使用java程序检查由signtool签名的文件的数字签名 我在互联网上搜索但没有找到任何信息 您
  • jUnit 中每个 @Test 的不同拆卸

    有没有办法为 jUnit 中的每个 Test 定义不同的拆卸 Use the After注释来指示每个之后要运行的方法 Test 像这样的全套注释是 BeforeClass 首先 Tests are run Before 在每个之前 Tes
  • 通过代理从java发送电子邮件

    我使用 Java Mail API 来发送和接收电子邮件 现在我做这个项目的地方有一个代理服务器 我可以知道如何通过代理服务器从java发送电子邮件吗 请参阅此处的常见问题解答 http www oracle com technetwork
  • 使用Keycloak保护Tomcat应用程序时出现HTTP 403禁止错误

    我为这个错误苦苦挣扎了一整天 我一遍又一遍地检查我在tomcat中Keycloak和APP的配置 没有发现错误 下图为测试场景 APP配置 1 Keycloak json是从Keycloak控制台复制的 2 context xml 也正确
  • Hazelcast Jet 变更数据捕获

    我在我的应用程序中使用 Hazelcast 更改数据捕获 CDC 我使用 CDC 的原因是 如果使用 jdbc 或其他替代功能将数据加载到缓存中 会花费大量时间 所以CDC将在数据库和 Hazelcast Jet 之间进行数据同步 Stre
  • Java GUI,根据actionListener更改面板

    我在两个不同的面板中添加了两个按钮 如果单击第一个按钮 则需要转到下一个面板 其中包含第二个按钮 但是当我单击第一个按钮时 该按钮没有被替换 Java GUI import java awt event ActionEvent import
  • Mac OSX 上使用 Java 7 的透明 JFrame/JWindow

    我们有一个屏幕共享小程序 它打开 Swing JFrame 并使用 Robot 类捕获空框架后面的屏幕 用户可以单击框架并与小程序后面的任何内容进行交互 这在 Windows 上运行良好 并且用于 Apple 的 Java 版本 但对于 M
  • 如何知道 Solr Optimize 何时完成?

    我正在使用 Solr php client 通过 php 与 Solr 进行通信 这段代码触发solr优化命令 solr gt optimize 请问有没有什么方法可以确定优化完成了 这都是因为我的网站上有一个管理页面 我每天必须手动优化
  • HttpMediaTypeNotAcceptableException / HttpMediaTypeNotAcceptableException:找不到可接受的表示

    我有一个客户端正在尝试连接的 API 但是它会抛出错误 2015 09 22 04 21 44 297 org springframework web servlet mvc method annotation HttpEntityMeth
  • 使用 spring mvc 的多个域

    假设我有一个应用程序必须缩短 URL 但还要执行其他操作 喜欢google com and goo gl or facebook com and fb me 部署两个应用程序很容易 但 目前 仅部署一个应用程序更简单 使用 spring 和
  • 应返回带有 html 代码的字符串的支持 bean 属性返回空字符串

    我的支持 bean 中有一个返回 html 代码的属性 public String getHtmlPrevisualizar return Hello world 我想要做的是在 iframe 中显示这个 html 代码 我用 JavaSc
  • 选择活动时运行时崩溃

    首先我想说我几乎没有 Android 经验 这是我在 Android 中的第一个项目 而且我的老师不太擅长教学 所以我对任何过度的无知表示歉意 在进一步讨论之前先解释一下 我的应用程序的目标本质上是能够记录您在某些活动上花费了多少时间 记录
  • 如果使用 Maven,是否应该忽略 VCS 中 Eclipse 特定的文件?

    我知道为什么不将 Eclipse IDE 特定的文件提交到像 Git 我实际上正在使用的 这样的 VCS 中 这就是我使用 Maven 并让它为您生成这些文件的原因之一not将它们置于版本控制之下 但我想知道 是否应该在 gitignore
  • 当列表中不存在 X 时,从列表中查找大于 X 的值

    我试图从列表中查找大于特定值 在我的情况下已知 的值 Example Given list 1 2 5 10 15 list is sorted 查找大于的值X 7在这种情况下 期望的结果 返回一个包含值的列表 10 15 我尝试使用jav
  • 为什么我得到:没有有效的 JFX 运行时

    我有一个使用 java 1 6 编译并使用 jnlp webstart 运行的现有应用程序 如果我使用 JRE 1 6 从客户端运行此应用程序 一切都会很好 但是 当我使用 java JDK 7 编译代码并使用 JRE 1 7 67 运行客
  • 如何在 Hibernate 中使用 SELECT 进行 INSERT

    我需要在休眠中实现以下请求 insert into my table max column values select max id from special table where 如何在休眠中使用注释来做到这一点 Special tab
  • 这种说法是否恰当。 if (0 != 表达式或变量) {} 在java中? [关闭]

    就目前情况而言 这个问题不太适合我们的问答形式 我们希望答案得到事实 参考资料或专业知识的支持 但这个问题可能会引发辩论 争论 民意调查或扩展讨论 如果您觉得这个问题可以改进并可能重新开放 访问帮助中心 help reopen questi

随机推荐