我使用以下代码引用 Kafka 源连接器的 Flink 1.14 版本。
我期待以下要求。
- 在应用程序刚开始时必须读取 Kafka 主题的最新偏移量
- 在检查点上,它必须将消耗的偏移量提交给 Kafka
- 重新启动后(当应用程序手动终止/系统错误时),它必须从最后提交的偏移量中进行选择,并且应该必须消耗消费者滞后以及此后的新鲜事件源。
使用 Flink 新的 KafkaConsumer API (KafkaSource) 我面临以下问题
- 能够满足上述要求,但无法在检查点(500ms)上提交消耗的偏移量。它会在 2 秒或 3 秒后提交。
当您在 2 秒/3 秒内手动终止应用程序并重新启动时。由于最后消费的消息未提交,因此它被读取两次(重复)。
为了交叉检查此功能,我尝试使用 Flink Kafka 的旧消费者 API (FlinkKafkaConsumer)。在那里它工作得很好。当一条消息立即被消费时,它会被提交回 Kafka。
遵循的步骤
- 设置Kafka环境
- 运行下面的flink代码来消费。代码包括新旧 API。这两个 API 都将从 Kafka 主题消费并在控制台打印
- 将一些消息推送到 Kafka 主题。
- 推送一些消息并在控制台中可见后,终止 Flink 作业。
- 检查两个 API 的 kafka 消费者组。与旧版消费者 api 的 group-id(older_test1) 相比,新的 flink 消费者 api 的 group-id(test1) 消费者滞后 > 0。
- 当您重新启动 Flink 作业时,您可以在新的 Flink kafka-consumer API 的控制台中看到那些未提交的消息,从而导致重复消息。
如果我缺少任何内容或需要添加任何属性,请提出建议。
@Test
public void test() throws Exception {
System.out.println("FlinkKafkaStreamsTest started ..");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
env.enableCheckpointing(500);
env.setParallelism(4);
Properties propertiesOld = new Properties();
Properties properties = new Properties();
String inputTopic = "input_topic";
String bootStrapServers = "localhost:29092";
String groupId_older = "older_test1";
String groupId = "test1";
propertiesOld.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
propertiesOld.put(ConsumerConfig.GROUP_ID_CONFIG, groupId_older);
propertiesOld.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
/******************** Old Kafka API **************/
FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(inputTopic,
new KRecordDes(),
propertiesOld);
flinkKafkaConsumer.setStartFromGroupOffsets();
env.addSource(flinkKafkaConsumer).print("old-api");
/******************** New Kafka API **************/
KafkaSourceBuilder<String> sourceBuilder = KafkaSource.<String>builder()
.setBootstrapServers(bootStrapServers)
.setTopics(inputTopic)
.setGroupId(groupId)
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("enable.auto.commit", "false")
.setProperty("commit.offsets.on.checkpoint", "true")
.setProperties(properties)
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST));
KafkaSource<String> kafkaSource = sourceBuilder.build();
SingleOutputStreamOperator<String> source = env
.fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source");
source.print("new-api");
env.execute();
}
static class KRecordDes implements KafkaDeserializationSchema<String>{
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
@Override
public boolean isEndOfStream(String nextElement) {
return false;
}
@Override
public String deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
return new String(consumerRecord.value());
}
}
注意:我还有其他要求,希望在同一代码中使用 Flink Kafka 有界源读取器,这在新的 API(KafkaSource)中可用。