Apache Camel:带有聚合的多播 - AggregationStrategy 调用过于频繁

2024-02-11

对于多播+聚合,我有以下奇怪的(或者至少对我来说不清楚)行为。考虑以下路线:

    from("direct:multicaster")
                .multicast()
                .to("direct:A", "direct:B")
                .aggregationStrategy(new AggregationStrategy() {
                    @Override
                    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                        if (oldExchange == null) {
                            List firstResult = newExchange.getIn().getBody(List.class);
                            newExchange.getIn().setBody(ImmutableList.copyOf(firstResult));
                            return newExchange;
                        } else {
                            List oldResults = oldExchange.getIn().getBody(List.class);
                            List newResults = newExchange.getIn().getBody(List.class);
                            ImmutableList aggResult = ImmutableList.copyOf(Iterables.concat(oldResults, newResults));
                            oldExchange.getIn().setBody(aggResult);
                            return oldExchange;
                        }
                    }
                })
                .end()
//                .to("log:bla")

本质上,该路由接受一个输入,将其发送到direct:A and direct:B,期望来自这两个端点的列表并将它们连接起来(最后一行中的注释存在的原因我将在稍后解释)。

现在假设这两个端点分别“返回”列表 [A] 和 [B]。如果我发送消息M to direct:multicaster,然后聚合器被调用一次oldExchange = null and newExchange.in.body=[A],然后用oldExchange.in.body=[A] and newExchange.out.body=[B](正如它应该做的那样)。

到目前为止一切都很好。但是聚合器再次被调用oldExchange.in.body=[A,B] and newExchange.in=M (M是初始消息)。这看起来类似于包含的丰富模式。

您可以通过删除最后一行中的注释来获得预期的行为,即简单地添加一个虚拟to("log:bla")。有了这个,一切都会按预期进行。

更新:尝试(参见克劳斯提供的提示)

            .multicast()
            .aggregationStrategy(aggStrategy)
            .to("direct:A", "direct:B")
            .end()

and

            .multicast(aggStrategy)
            .to("direct:A", "direct:B")
            .end()

两者都会导致相同的行为。

这里发生了什么事——我做错了什么?

提前致谢 马库斯


我尝试重现该问题,但没有成功。这就是我所做的:

路线:

public class MulticastRoute extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        AggregationStrategy myAggregationStrategy = new MyAggregationStrategy();
        List<String> listA = Lists.newArrayList("A");
        List<String> listB = Lists.newArrayList("B");
        from("direct:multicast").routeId("multicastRoute").multicast(myAggregationStrategy).to("direct:A", "direct:B").end();

        from("direct:A").setBody(constant(listA));
        from("direct:B").setBody(constant(listB));
    }

    class MyAggregationStrategy implements AggregationStrategy {
        @Override
        public org.apache.camel.Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            System.out.println("Aggregate called with oldExchange = " + (oldExchange == null ? "null" :
                    oldExchange.getIn().getBody().toString()) + ", newExchange = " +
                    newExchange.getIn().getBody().toString());
            return newExchange;
        }
    }
}

创建一个简单的测试只是为了运行该路线。

考试:

public class MulticastRouteTest extends CamelTestSupport {
  @Test
    public void testMulticastRoute() throws Exception {
        context.addRoutes(new MulticastRoute());
        template.sendBody("direct:multicast", null);
    }
}

这打印:

Aggregate called with oldExchange = null, newExchange = [A]
Aggregate called with oldExchange = [A], newExchange = [B]

这是我们所期望的。希望对你有帮助。我看不出我做事的方式有什么不同,但希望你能发现它。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Camel:带有聚合的多播 - AggregationStrategy 调用过于频繁 的相关文章

随机推荐