对于多播+聚合,我有以下奇怪的(或者至少对我来说不清楚)行为。考虑以下路线:
from("direct:multicaster")
.multicast()
.to("direct:A", "direct:B")
.aggregationStrategy(new AggregationStrategy() {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
List firstResult = newExchange.getIn().getBody(List.class);
newExchange.getIn().setBody(ImmutableList.copyOf(firstResult));
return newExchange;
} else {
List oldResults = oldExchange.getIn().getBody(List.class);
List newResults = newExchange.getIn().getBody(List.class);
ImmutableList aggResult = ImmutableList.copyOf(Iterables.concat(oldResults, newResults));
oldExchange.getIn().setBody(aggResult);
return oldExchange;
}
}
})
.end()
// .to("log:bla")
本质上,该路由接受一个输入,将其发送到direct:A
and direct:B
,期望来自这两个端点的列表并将它们连接起来(最后一行中的注释存在的原因我将在稍后解释)。
现在假设这两个端点分别“返回”列表 [A] 和 [B]。如果我发送消息M
to direct:multicaster
,然后聚合器被调用一次oldExchange = null
and newExchange.in.body=[A]
,然后用oldExchange.in.body=[A]
and newExchange.out.body=[B]
(正如它应该做的那样)。
到目前为止一切都很好。但是聚合器再次被调用oldExchange.in.body=[A,B]
and newExchange.in=M
(M
是初始消息)。这看起来类似于包含的丰富模式。
您可以通过删除最后一行中的注释来获得预期的行为,即简单地添加一个虚拟to("log:bla")
。有了这个,一切都会按预期进行。
更新:尝试(参见克劳斯提供的提示)
.multicast()
.aggregationStrategy(aggStrategy)
.to("direct:A", "direct:B")
.end()
and
.multicast(aggStrategy)
.to("direct:A", "direct:B")
.end()
两者都会导致相同的行为。
这里发生了什么事——我做错了什么?
提前致谢
马库斯