我们正在运行一个 ListState 介于 300GB 到 400GB 之间的作业,并且有时该列表可能会增加到数千。在我们的用例中,每个项目都必须有自己的 TTL,因此我们使用 S3 上的 RocksDB 后端为此 ListState 的每个新项目创建一个新的计时器。
目前大约有 140 多个计时器(将在事件.时间戳 + 40 天).
我们的问题是,作业的检查点突然卡住,或者非常慢(比如几个小时内 1%),直到最终超时。它通常会停止(flink 仪表板显示0/12 (0%)
而前面几行显示12/12 (100%)
)在一段非常简单的代码上:
[...]
val myStream = env.addSource(someKafkaConsumer)
.rebalance
.map(new CounterMapFunction[ControlGroup]("source.kafkaconsumer"))
.uid("src_kafka_stream")
.name("some_name")
myStream.process(new MonitoringProcessFunction()).uid("monitoring_uuid").name(monitoring_name)
.getSideOutput(outputTag)
.keyBy(_.name)
.addSink(sink)
[...]
更多信息:
- AT_LEAST_ONCE 检查点模式似乎比 EXACTLY_ONCE 更容易卡住
- 几个月前,状态数据达到 1.5TB,我认为数十亿个计时器没有任何问题。
- 运行两个任务管理器的计算机上的 RAM、CPU 和网络看起来正常
state.backend.rocksdb.thread.num = 4
- 第一个事件发生在我们收到大量事件(大约几分钟内数百万个)但不是前一个事件的时候。
- 所有事件都来自 Kafka 主题。
- 当处于 AT_LEAST_ONCE 检查点模式时,作业仍然正常运行和消耗。
这是我们第二次遇到这样的情况:拓扑运行得非常好,每天有几百万个事件,但突然停止了检查点。我们不知道是什么导致了这种情况。
任何人都可以想到什么可能会突然导致检查点卡住?
一些想法:
如果您有许多计时器或多或少同时触发,那么计时器的风暴将阻止其他任何事情发生——任务将循环调用 onTimer 直到没有更多的计时器被触发,在此期间它们的输入队列将被被忽略,检查点障碍将不会进展。
如果这是您遇到麻烦的原因,您可以向计时器添加一些随机抖动,以便事件风暴以后不会变成计时器风暴。重新组织要使用的东西状态生存时间 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl可能是另一种选择。
如果堆上有很多计时器,这可能会导致非常高的 GC 开销。这不一定会使工作失败,但可能会使检查点不稳定。在这种情况下,将计时器移至 RocksDB 中可能会有所帮助。
另外:由于您使用的是 RocksDB,从 ListState 切换到 MapState,以时间为键,可以让您删除单个条目,而无需在每次更新后重新序列化整个列表。 (使用 RocksDB,MapState 中的每个键/值对都是一个单独的 RocksDB 对象。)以这种方式提高清理效率可能是最好的补救措施。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)