我们有一个要求,即使用 Kafka Streams 从 Kafka 主题读取数据,然后通过会话池通过网络发送数据。然而,有时,网络调用有点慢,我们需要经常暂停流,以确保网络不会过载。目前,我们将数据捕获到流中并将其加载到执行器服务,然后通过会话池通过网络发送。
如果执行器服务中的数据过高,我们需要暂停流一段时间,然后在清除执行器服务上的积压后恢复流。为了实现这种暂停机制,我们目前正在关闭流,并在积压清除后重新开始。
有什么办法可以暂停kafka流吗?
如果我理解正确的话,你不需要做任何特别的事情。您正在谈论“背压”,Kafka Streams 可以开箱即用地处理它。
可以做的是将这些数据放入具有某个最大大小的队列中,并使用该队列加载执行程序服务。每当队列达到某个阈值时,有两种方法:
- 如果您对将数据放入队列的调用正在阻塞且没有超时,则您无需执行任何操作。只需等到系统重新上线,您的电话
返回,处理将恢复。
- 如果您对将数据放入队列的调用因超时而阻塞,只需发出查找来检查队列的大小。重复此操作,直到系统重新联机且呼叫成功。
唯一需要注意的是,只要您的 Streams 应用程序阻塞,内部使用的 Kafka 消费者客户端就不会向 Kafka 发送任何心跳,并且可能会超时。因此,您需要将超时配置参数设置为高于外部系统的预期最大停机时间。
另一种方法是使用 Kafka-streams 中可用的处理器 API,但通常不推荐这种模式。
如果有帮助请告诉我!
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)