FlinkKafkaConsumer 源码剖析
FlinkKafkaConsumer 的继承关系如下图所示。
可以发现几个版本的 FlinkKafkaConsumer 都继承自 FlinkKafkaConsumerBase 抽象类,所以可知 FlinkKafkaConsumerBase 是最核心的类了。FlinkKafkaConsumerBase 实现了 CheckpointedFunction、CheckpointListener 接口,继承了 RichParallelSourceFunction 抽象类来读取 Kafka 数据。
在 FlinkKafkaConsumerBase 中的 open 方法中做了大量的配置初始化工作,然后在 run 方法里面是由 AbstractFetcher 来获取数据的,在 AbstractFetcher 中有用 List> 来存储着所有订阅分区的状态信息,包括了下面这些字段:
private final KafkaTopicPartition partition;
private final KPH kafkaPartitionHandle;
private volatile long offset;
private volatile long committedOffset;
在 FlinkKafkaConsumerBase 中还有字段定义 Flink 自动发现 Kafka 主题和分区个数的时间,默认是不开启的(时间为 Long.MIN_VALUE),像如果传入的是正则表达式参数,那么动态的发现主题还是有意义的,如果配置的已经是固定的 Topic,那么完全就没有开启这个的必要,另外就是 Kafka 的分区个数的自动发现,像高峰流量的时期,如果 Kafka 的分区扩容了,但是在 Flink 这边没有配置这个参数那就会导致 Kafka 新分区中的数据不会被消费到,这个参数由 flink.partition-discovery.interval-millis 控制。
FlinkKafkaProducer 源码剖析
FlinkKafkaProducer 这个有些特殊,不同版本的类结构有些不一样,如 FlinkKafkaProducer011 是继承的 TwoPhaseCommitSinkFunction 抽象类,而 FlinkKafkaProducer010 和 FlinkKafkaProducer09 是基于 FlinkKafkaProducerBase 类来实现的。
在 Kafka 0.11.x 版本后支持了事务,这让 Flink 与 Kafka 的事务相结合从而实现端到端的 Exactly once 才有了可能。
数据 Sink 到下游的 Kafka,可你能会关心数据的分区策略,在 Flink 中自带了一种就是 FlinkFixedPartitioner,它使用的是 round-robin 策略进行下发到下游 Kafka Topic 的分区上的,当然也提供了 FlinkKafkaPartitioner 接口供你去实现自定义的分区策略。
使用 Flink-connector-kafka 可能会遇到的问题
如何消费多个 Kafka Topic
通常可能会有很多类型的数据全部发到 Kafka,但是发送的数据却不是在同一个 Topic 里面,然后在 Flink 处消费的时候,又要去同时消费这些多个 Topic,在 Flink 中除了支持可以消费单个 Topic 的数据,还支持传入多个 Topic,另外还支持 Topic 的正则表达式(因为有时候可能会事先不确定到底会有多少个 Topic,所以使用正则来处理会比较好,只要在 Kafka 建立的 Topic 名是有规律的就行),如下几种构造器可以传入不同参数来创建 FlinkKafkaConsumer 对象。
public FlinkKafkaConsumer011(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
this(Collections.singletonList(topic), valueDeserializer, props);
}
public FlinkKafkaConsumer011(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
this(topics, new KafkaDeserializationSchemaWrapper<>(deserializer), props);
}
public FlinkKafkaConsumer011(Pattern subscriptionPattern, DeserializationSchema<T> valueDeserializer, Properties props) {
this(subscriptionPattern, new KafkaDeserializationSchemaWrapper<>(valueDeserializer), props);
}
想要获取数据的元数据信息
在消费 Kafka 数据的时候,有时候想获取到数据是从哪个 Topic、哪个分区里面过来的,这条数据的 offset 值是多少。这些元数据信息在有的场景真的需要,那么这种场景下该如何获取呢?其实在获取数据进行反序列化的时候使用 KafkaDeserializationSchema 就行。
public interface KafkaDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
boolean isEndOfStream(T nextElement);
T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception;
}
在 KafkaDeserializationSchema 接口中的 deserialize 方法里面的 ConsumerRecord 类中是包含了数据的元数据信息。
public class ConsumerRecord<K, V> {
private final String topic;
private final int partition;
private final long offset;
private final long timestamp;
private final TimestampType timestampType;
private final long checksum;
private final int serializedKeySize;
private final int serializedValueSize;
private final K key;
private final V value;
}
所在在使用 FlinkKafkaConsumer011 构造对象的的时候可以传入实现 KafkaDeserializationSchema 接口后的参数对象。
public FlinkKafkaConsumer011(String topic, KafkaDeserializationSchema<T> deserializer, Properties props) {
this(Collections.singletonList(topic), deserializer, props);
}
public FlinkKafkaConsumer011(List<String> topics, KafkaDeserializationSchema<T> deserializer, Properties props) {
super(topics, deserializer, props);
}
public FlinkKafkaConsumer011(Pattern subscriptionPattern, KafkaDeserializationSchema<T> deserializer, Properties props) {
super(subscriptionPattern, deserializer, props);
}
多种数据类型
因为在 Kafka 的数据的类型可能会有很多种类型,比如是纯 String、String 类型的 JSON、Avro、Protobuf。那么源数据类型不同,在消费 Kafka 的时候反序列化也是会有一定的不同,但最终还是依赖前面的 KafkaDeserializationSchema 或者 DeserializationSchema (反序列化的 Schema),数据经过处理后的结果再次发到 Kafka 数据类型也是会有多种,它依赖的是 SerializationSchema(序列化的 Schema)。
序列化失败
因为数据是从 Kafka 过来的,难以避免的是 Kafka 中的数据可能会出现 null 或者不符合预期规范的数据,然后在反序列化的时候如果作业里面没有做异常处理的话,就会导致作业失败重启,这样情况可以在反序列化处做异常处理,保证作业的健壮性。
Kafka 消费 Offset 的选择
因为在 Flink Kafka Consumer 中是支持配置如何确定从 Kafka 分区开始消费的起始位置的。
FlinkKafkaConsumer011<String> myConsumer = new FlinkKafkaConsumer0111<>(...);
consumer.setStartFromEarliest();
consumer.setStartFromLatest();
consumer.setStartFromTimestamp(...);
consumer.setStartFromGroupOffsets();
另外还支持根据分区指定的 offset 去消费 Topic 数据,示例如下:
Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("topic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("topic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("topic", 2), 43L);
myConsumer.setStartFromSpecificOffsets(specificStartOffsets);
注意:这种情况下如果该分区中不存在指定的 Offset 了,则会使用默认的 setStartFromGroupOffsets 来消费分区中的数据。如果作业是从 Checkpoint 或者 Savepoint 还原的,那么上面这些配置无效,作业会根据状态中存储的 Offset 为准,然后开始消费。
每个 Kafka 分区的时间戳
当以 Kafka 来作为数据源的时候,通常每个 Kafka 分区的数据时间戳是递增的(事件是有序的),但是当你作业设置多个并行度的时候,Flink 去消费 Kafka 数据流是并行的,那么并行的去消费 Kafka 分区的数据就会导致打乱原每个分区的数据时间戳的顺序。在这种情况下,你可以使用 Flink 中的 Kafka-partition-aware 特性来生成水印,使用该特性后,水印会在 Kafka 消费端生成,然后每个 Kafka 分区和每个分区上的水印最后的合并方式和水印在数据流 shuffle 过程中的合并方式一致。
如果事件时间戳严格按照每个 Kafka 分区升序,则可以使用前面提到的 AscendingTimestampExtractor 水印生成器来为每个分区生成水印。下面代码如何使用 per-Kafka-partition 来生成水印。
FlinkKafkaConsumer011<Event> kafkaSource = new FlinkKafkaConsumer011<>("topic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
@Override
public long extractAscendingTimestamp(Event event) {
return event.eventTimestamp();
}
});
DataStream<Event> stream = env.addSource(kafkaSource);
上面这几种策略是支持可以配置的,需要在作业中指定,具体选择哪种是需要根据作业的业务需求来判断的。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)