我使用了下面的方法,但它使用了大量的 CPU,所以我想使用 Actor.Ask 而不是下面的方法,任何人都可以帮助我如何使用 Actor.Ask
KafkaConsumer.PlainSource(
consumerSettings, subscription)
.RunForeach(result =>
{
_ActorRef.Tell(result.Message.Value);
}, materializer);
我之前的答案是通过回复流来让目标参与者背压,这是最可靠的方法。但由于 OP 表示他们的演员今天没有发回任何回复,因此还有另一种方法。
KafkaConsumer.PlainSource(
consumerSettings, subscription)
.Throttle(100, TimeSpan.FromSeconds(1), 100, ThrottleMode.Shaping)
.RunForeach(result =>
{
_ActorRef.Tell(result.Message.Value);
}, materializer);
这将强制执行每秒 100 条消息的最大输出限制,一旦达到该阈值,该阶段将对 Kafka 施加背压,以保持较低的内存和 CPU 消耗。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)