我正在尝试构建并运行一个 akka 流(在 Java DSL 中),其中 2 个 actor 作为源,然后是一个合并结点,然后是 1 个接收器:
Source<Integer, ActorRef> src1 = Source.actorRef(100, OverflowStrategy.backpressure());
Source<Integer, ActorRef> src2 = Source.actorRef(100, OverflowStrategy.backpressure());
Sink<Integer, BoxedUnit> sink = Flow.of(Integer.class).to(Sink.foreach(System.out::println));
RunnableFlow<BoxedUnit> closed = FlowGraph.factory().closed(sink, (b, out) -> {
UniformFanInShape<Integer, Integer> merge = b.graph(Merge.<Integer>create(2));
b.from(src1).via(merge).to(out);
b.from(src2).to(merge);
});
closed.run(mat);
我的问题是如何获取对源参与者的 ActorRef 引用以便向他们发送消息?如果有 1 个演员,我不会使用图形生成器,然后 .run() 或 runWith() 方法将返回 ActorRef 对象。但是如果源演员很多怎么办?这样的流程有可能实现吗?
回答我自己的问题,以防有人需要。
根据 jrudolph 的建议,我能够使用这样的 actor(在实际代码中,我做了一些比 2 个 ActorRef 的列表更好的事情):
Source<Integer, ActorRef> src1 = Source.actorRef(100, OverflowStrategy.fail());
Source<Integer, ActorRef> src2 = Source.actorRef(100, OverflowStrategy.fail());
Sink<Integer, BoxedUnit> sink = Flow.of(Integer.class).to(Sink.foreach(System.out::println));
RunnableFlow<List<ActorRef>> closed = FlowGraph.factory().closed(src1, src2, (a1, a2) -> Arrays.asList(a1, a2), (b, s1, s2) -> {
UniformFanInShape<Integer, Integer> merge = b.graph(Merge.<Integer>create(2));
b.from(s1).via(merge).to(sink);
b.from(s2).to(merge);
});
List<ActorRef> stream = closed.run(mat);
ActorRef a1 = stream.get(0);
ActorRef a2 = stream.get(1);
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)