无法在 Flink 新 Kafka Consumer-api (1.14) 中的检查点上向 Kafka 提交消费偏移量

2024-01-17

我使用以下代码引用 Kafka 源连接器的 Flink 1.14 版本。

我期待以下要求。

  • 在应用程序刚开始时必须读取 Kafka 主题的最新偏移量
  • 在检查点上,它必须将消耗的偏移量提交给 Kafka
  • 重新启动后(当应用程序手动终止/系统错误时),它必须从最后提交的偏移量中进行选择,并且应该必须消耗消费者滞后以及此后的新鲜事件源。

使用 Flink 新的 KafkaConsumer API (KafkaSource) 我面临以下问题

  • 能够满足上述要求,但无法在检查点(500ms)上提交消耗的偏移量。它会在 2 秒或 3 秒后提交。

当您在 2 秒/3 秒内手动终止应用程序并重新启动时。由于最后消费的消息未提交,因此它被读取两次(重复)。

为了交叉检查此功能,我尝试使用 Flink Kafka 的旧消费者 API (FlinkKafkaConsumer)。在那里它工作得很好。当一条消息立即被消费时,它会被提交回 Kafka。

遵循的步骤

  • 设置Kafka环境
  • 运行下面的flink代码来消费。代码包括新旧 API。这两个 API 都将从 Kafka 主题消费并在控制台打印
  • 将一些消息推送到 Kafka 主题。
  • 推送一些消息并在控制台中可见后,终止 Flink 作业。
  • 检查两个 API 的 kafka 消费者组。与旧版消费者 api 的 group-id(older_test1) 相比,新的 flink 消费者 api 的 group-id(test1) 消费者滞后 > 0。
  • 当您重新启动 Flink 作业时,您可以在新的 Flink kafka-consumer API 的控制台中看到那些未提交的消息,从而导致重复消息。

如果我缺少任何内容或需要添加任何属性,请提出建议。

 @Test
    public void test() throws Exception {

        System.out.println("FlinkKafkaStreamsTest started ..");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.enableCheckpointing(500);
        env.setParallelism(4);

        Properties propertiesOld = new Properties();
        Properties properties = new Properties();
        String inputTopic = "input_topic";
        String bootStrapServers = "localhost:29092";
        String groupId_older = "older_test1";
        String groupId = "test1";

        propertiesOld.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        propertiesOld.put(ConsumerConfig.GROUP_ID_CONFIG, groupId_older);
        propertiesOld.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);


        /******************** Old Kafka API **************/
        FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(inputTopic,
                new KRecordDes(),
                propertiesOld);
        flinkKafkaConsumer.setStartFromGroupOffsets();
        env.addSource(flinkKafkaConsumer).print("old-api");


        /******************** New Kafka API **************/
        KafkaSourceBuilder<String> sourceBuilder = KafkaSource.<String>builder()
                .setBootstrapServers(bootStrapServers)
                .setTopics(inputTopic)
                .setGroupId(groupId)
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setProperty("enable.auto.commit", "false")
                .setProperty("commit.offsets.on.checkpoint", "true")
                .setProperties(properties)
                .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST));

        KafkaSource<String> kafkaSource = sourceBuilder.build();

        SingleOutputStreamOperator<String> source = env
                .fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");

        source.print("new-api");

        env.execute();
    }
    static class KRecordDes implements  KafkaDeserializationSchema<String>{
        @Override
        public TypeInformation<String> getProducedType() {
            return TypeInformation.of(String.class);
        }
        @Override
        public boolean isEndOfStream(String nextElement) {
            return false;
        }
        @Override
        public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
            return new String(consumerRecord.value());
        }
    }

注意:我还有其他要求,希望在同一代码中使用 Flink Kafka 有界源读取器,这在新的 API(KafkaSource)中可用。


来自 Kafka Source 的文档:

请注意,Kafka 源确实NOT依靠承诺的抵消额 容错能力。提交offset只是为了暴露进度 消费者和消费群体的监控。

当 Flink 作业从故障中恢复时,它不会使用代理上提交的偏移量,而是从最新成功的检查点恢复状态,并从该检查点中存储的偏移量恢复消费,因此检查点之后的记录将被“重放”一点少量。由于您使用的是不支持一次语义的打印接收器,因此您将看到重复的记录,这些记录实际上是最新成功检查点之后的记录。

对于你提到的offset commit延迟2-3秒,是因为执行了SourceReaderBase。简而言之SplitFetcher管理一个任务队列,当一个offset commit任务被推入队列时,直到有正在运行的fetch任务调用时,它才会被执行KafkaConsumer#poll()超时。如果流量很小,延迟可能会更长。但请注意,这不会影响正确性:KafkaSource 不使用提交的偏移量来实现容错。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法在 Flink 新 Kafka Consumer-api (1.14) 中的检查点上向 Kafka 提交消费偏移量 的相关文章

随机推荐