一、自动提交offset的相关参数
-
官网文档
![在这里插入图片描述](https://img-blog.csdnimg.cn/2bc4eece1aee4a588dda0c9aa5cf1b43.png)
-
参数解释
参数 |
描述 |
enable.auto.commi |
默认值为 true,消费者会自动周期性地向服务器提交偏移量。 |
auto.commit.interval.ms |
如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。 |
-
图解分析
![在这里插入图片描述](https://img-blog.csdnimg.cn/e3bf29455e214aeb95a3ded76305015b.png)
二、消费者(自动提交 offset)代码示例
-
消费者自动提交 offset代码
// 自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 提交时间间隔 1秒
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
-
消费者自动提交 offset代码完整代码
package com.xz.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
public class CustomConsumerAutoOffset {
public static void main(String[] args) {
// 配置
Properties properties = new Properties();
// 连接 bootstrap.servers
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
// 反序列化
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 配置消费者组id
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");
// 自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
// 提交时间间隔 1秒
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
// 1 创建一个消费者 "", "hello"
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
// 2 订阅主题 first
ArrayList<String> topics = new ArrayList<>();
topics.add("sevenTopic");
kafkaConsumer.subscribe(topics);
// 3 消费数据
while (true){
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
System.out.println(consumerRecord);
}
}
}
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)