首先有一个类似的未回答的问题将路由加入单个聚合器
我们有一些消费者路由(ftp、file、smb)从远程系统读取文件。
简化了直接路由的测试,但与批量消费者的行为类似:
from("direct:"+routeId).id(routeId)
.setProperty(AGGREGATION_PROPERTY, constant(routeId))
.log(String.format("Sending (${body}) to %s", "direct:start1"))
.to("direct:aggregate");
转换后,一次轮询的所有结果都会在单独的路径中按批次聚合:
from("direct:aggregate")
.aggregate(property(AGGREGATION_PROPERTY), new BodyInAggregatingStrategy())
.completionFromBatchConsumer()
.to("log:result", "mock:result");
如果每个消费者都分开运行,那么一切都会正常进行。但如果多个消费者并行运行,聚合将分割轮询。例如,如果文件消费者轮询 500 条消息,并且第二条路由开始从 ftp 读取 6 个文件,则期望我们得到 2 个聚合,其中 1 个包含来自文件的 500 条消息,1 个包含来自 ftp 的 6 个消息。
测试用例:
public void testAggregateByProperty() throws Exception {
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedBodiesReceived("A+A+A", "B+B", "A", "Z");
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 3);
template.sendBodyAndProperty("direct:B", "B", Exchange.BATCH_SIZE, 2);
template.sendBodyAndProperty("direct:A", "A", Exchange.BATCH_SIZE, 1);
template.sendBodyAndProperty("direct:Z", "Z", Exchange.BATCH_SIZE, 7);
assertMockEndpointsSatisfied();
}
结果是:“A+A”、“B”、“A”、“B”、“A”,而不是预期的“A+A+A”、“B+B”、“A”、“Z” 。
问题:
- 我们关于聚合的假设是错误的吗?
- 我们如何才能实现预期的行为?
- 如果我们设置completionTimeout,那么超时将从第一次交换开始发生——如果仍然有新的交换,则独立?