java客户端作为kafka消费者测试

2023-11-02

【README】

本文主要对 java客户端作为kafka 消费者进行测试, 生产者由 kafka客户端扮演; 

 

【1】普通消费者

设置消费者组;

重置消费者的offset, 即每次都从最头开始消费(默认仅保持7天内数据) ;

类似于 命令行 --from-beginning

kafka-console-consumer.sh --topic first --zookeeper centos201:2181 --from-beginning

小结:从头开始消费,必须满足2个条件;

条件1: 必须重新换组, 如本文中的消费者组 从 sichuan 更新为 sichuan1 ;
条件2: 需要设置offset, 修改为 earliest, 默认值是 lastest;

/**
 * 普通消费者
 */
public class MyConsumer {
	public static void main(String[] args) {
		/* 1.创建消费者配置信息 */
		Properties props = new Properties();
		/*2.给配置信息赋值*/
		/*2.1连接的集群*/
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
		/*2.2开启自动提交 */
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
		/*2.3 自动提交的延时*/
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		/*2.4 key value的反序列化 */
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		/*2.5 消费者组 */
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); 
		/*2.6 重置消费者的offset */ 
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest 
		
		/* 创建消费者 */
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
		/* 订阅主题 */
		consumer.subscribe(Arrays.asList("first", "second"));
		/* 循环拉取 */ 
		while(true) {
			/* 消费消息-获取数据 */
			ConsumerRecords<String, String> consumerRds  = consumer.poll(100);
			/* 解析并打印 ConsumerRecords  */
			/* 遍历 ConsumerRecords*/
			for(ConsumerRecord<String, String> rd : consumerRds) {
				System.out.println("[消费者] " + rd.key() + "--" + rd.value()); 
			}
		} 
		/* 关闭消费者 */ 
//		consumer.close(); 
	}
}

 从官网可以找到以上配置值; https://kafka.apache.org/0110/documentation.html#configuration

 

【2】kafka消费者-手动提交offset 

手动提交offset有3种方式:

  • 方式1:同步手动提交;
  • 方式2:异步手动提交; 
  • 方式3:自定义手动提交策略;

0)为啥需要手动提交?

kafka自动提交是在kafka拉取到数据之后就直接提交,这样很容易丢失数据,尤其是在需要事物控制的时候。
很多情况下我们需要从kafka成功拉取数据之后,对数据进行相应的处理之后再进行提交。如拉取数据之后进行写入mysql这种 , 所以这时我们就需要进行手动提交kafka的offset下标。

这里顺便说下offset具体是什么。
offset:指的是kafka的topic中的每个消费组消费的下标。
简单的来说就是一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。
比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。

1)关闭自动提交(默认为true)

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

第一次启动 consumer 从 90 开始消费;
第2次启动相同 consumer ,还是从90开始消费;

2) 如何使用手动提交?

kafka提供了手动提交offset的api;
方法1:commitSync 同步提交:  ;
方法2:commitAsync 异步提交;
两者相同点:都会将本次 poll  的一批数据最高的偏移量提交; 
不同点是, commitSync 阻塞当前线程,一直到提交成功, 并且会自动失败重试;
而 commitAsync 没有失败重试机制, 可能提交失败; 

3)同步手动提交offset

/**
 * 手动同步提交offset 
 */
public class ManSyncCommitOffsetConsumer {
	public static void main(String[] args) {
		/* 1.创建消费者配置信息 */
		Properties props = new Properties();
		/*2.给配置信息赋值*/
		/*2.1连接的集群*/
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
		/*2.2 关闭自动提交(默认为true) */ 
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
		
		/*2.3 自动提交的延时*/
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		/*2.4 key value的反序列化 */
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		/*2.5 消费者组 */
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); 
		/*2.6 重置消费者的offset */ 
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest 
		
		/* 创建消费者 */
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
		/* 订阅主题 */
		consumer.subscribe(Arrays.asList("first", "second"));
		/* 循环拉取 */ 
		while(true) {
			/* 消费消息-获取数据 */
			ConsumerRecords<String, String> consumerRds  = consumer.poll(100);
			/* 解析并打印 ConsumerRecords  */
			/* 遍历 ConsumerRecords*/
			for(ConsumerRecord<String, String> rd : consumerRds) {
				System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); 
			}
			/* 【同步提交】,当前线程会阻塞直到 offset提交成功 */ 
			consumer.commitSync();
		} 
		/* 关闭消费者 */ 
//		consumer.close(); 
	}
}

4)异步手动提交offset 

/**
 * 异步手动提交offset  
 */
public class ManASyncCommitOffsetConsumer {
	public static void main(String[] args) {
		/* 1.创建消费者配置信息 */
		Properties props = new Properties();
		/*2.给配置信息赋值*/
		/*2.1连接的集群*/
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
		/*2.2 关闭自动提交(默认为true) */ 
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
		
		/*2.3 自动提交的延时*/
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		/*2.4 key value的反序列化 */
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		/*2.5 消费者组 */
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); 
		/*2.6 重置消费者的offset */ 
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest 
		
		/* 创建消费者 */
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
		/* 订阅主题 */
		consumer.subscribe(Arrays.asList("first", "second"));
		/* 循环拉取 */ 
		while(true) {
			/* 消费消息-获取数据 */
			ConsumerRecords<String, String> consumerRds  = consumer.poll(100);
			/* 解析并打印 ConsumerRecords  */
			/* 遍历 ConsumerRecords*/
			for(ConsumerRecord<String, String> rd : consumerRds) {
				System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); 
			}
			/* 【异步提交】 当前线程会阻塞直到 offset提交成功 */  
			consumer.commitAsync(new OffsetCommitCallback() {
				@Override 
				public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets,
						Exception exception) {
					if (exception !=null) {
						System.out.println("异步提交失败");
					} else {
						System.out.println("异步提交成功"); 
					}
				}
			}); 
		} 
		/* 关闭消费者 */ 
//		consumer.close(); 
	}
}

5)自定义手动提交offset策略

5.0)为啥需要自定义?

因为异步提交有一些问题,如下:
先消费数据,后提交offset, 可能导致数据重复消费; 
先提交offset, 后走业务逻辑,可能会丢数据; 

5.1)应用场景:

把 offset 存储到本地库 和 消息消费逻辑 在同一个数据库事务里面;

5.2)如何实现?需要实现 ConsumerRebalanceListener 来实现。

/**
 * 自定义手动提交offset策略  
 */
public class DiyCommitOffsetConsumer {
	public static void main(String[] args) {
		/* 1.创建消费者配置信息 */
		Properties props = new Properties();
		/*2.给配置信息赋值*/
		/*2.1连接的集群*/
		props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos201:9092");
		/*2.2 关闭自动提交(默认为true) */ 
		props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); 
		
		/*2.3 自动提交的延时*/
		props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
		/*2.4 key value的反序列化 */
		props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
		/*2.5 消费者组 */
		props.put(ConsumerConfig.GROUP_ID_CONFIG, "sichuan1"); 
		/*2.6 重置消费者的offset */ 
		props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // 默认值是 lastest 
		
		/* 创建消费者 */
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); 
		/* 订阅主题 */
		consumer.subscribe(Arrays.asList("first", "second"), new ConsumerRebalanceListener() {
			@Override
			public void onPartitionsRevoked(Collection<TopicPartition> partitions) { // 在 rebalance方法【前】调用
				
			}
			@Override
			public void onPartitionsAssigned(Collection<TopicPartition> partitions) { // 在 rebalance方法【后】调用  
				/* 分区分配方法 */
				for (TopicPartition partition :  partitions) { 
					/*定位到某个 offset*/
					consumer.seek(partition, 1); // TODO: 这里需要输入1  
				}
			}  
		});
		/* 循环拉取 */ 
		while(true) {
			/* 消费消息-获取数据 */
			ConsumerRecords<String, String> consumerRds  = consumer.poll(100);
			/* 解析并打印 ConsumerRecords  */
			/* 遍历 ConsumerRecords*/
			for(ConsumerRecord<String, String> rd : consumerRds) {
				System.out.println("[消费者] [partition]" + rd.partition() + " [offset]" + rd.offset() + rd.key() + "--" + rd.value()); 
			}
			/* 【同步提交】,当前线程会阻塞直到 offset提交成功 */ 
			consumer.commitSync();
		} 
		/* 关闭消费者 */ 
//		consumer.close(); 
	}
}

补充: 消费者rebalance 是什么?

消费者 rebalance, 什么时候触发 rebalance?  如 同一个消费者组下的 某个消费者机器宕机,或新增一个消费者机器,都会触发 rebalance,即重新分配  kafka分区数据与 消费者的对应关系; 

 

 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

java客户端作为kafka消费者测试 的相关文章

  • Kafka Producer 发送数据

    Kafka Producer 发送数据 1 生产者概览 1 不同的应用场景对消息有不同的需求 即是否允许消息丢失 重复 延迟以及吞吐量的要求 不同场景对Kafka生产者的API使用和配置会有直接的影响 2 Kafka发送消息的主要步骤 消息
  • Zookeeper的常见面试题

    1 Zookeeper 1 1 Zookeeper基本概念 Zookeeper作为一个优秀高效且可靠的分布式协调框架 ZooKeeper 在解决分布式数据一致性问题时并没有直接使用Paxos算法 而是专门定制了一致性协议叫做 ZAB Zoo
  • librdkafka的使用和介绍

    librdkafka的使用介绍 librdkafka是kafka的c语言接口 下面简单的介绍一下其接口 1 rd kafka conf set设置全局配置 2 rd kafka topic conf set设置topic配置 3 rd ka
  • Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

    一 案例说明 现有一电商网站数据文件 名为buyer favorite1 记录了用户对商品的收藏数据 数据以 t 键分割 数据内容及数据格式如下 二 前置准备工作 项目环境说明 Linux Ubuntu 16 04 jdk 7u75 lin
  • kafka创建话题遇到的错误

    确定Kafka安装和启动正确 ZooKeeper可以查到所有的Brokers 但执行 kafka topics sh create zookeeper localhost 2181 replication factor 3 partitio
  • kafka知识 --kafka权威指南

    我想既然Kafka是为了写数据而产生的 那么用作家的名字来命名会显得更有意义 我在大学时期上过很多文学课程 很喜欢Franz Kafka 况且 对于开源项目来说 这个名字听起来很酷 因此 名字和应用本身基本没有太多联系 Jay Kreps
  • Kafka:主题创建、分区修改查看、生产者、消费者

    文章目录 Kafka后台操作 1 主题 2 分区 3 生产者 4 消费者组 Kafka后台操作 1 主题 1 创建主题 bin kafka topics sh create bootstrap server hadoop102 9092 r
  • 六、Kafka consumer及ConsumerRebalanceListener实现

    1 comsumer代码示例 public class ConsumerMessage private static final String TOPIC NAME topic 07 public static void main Stri
  • Linux 下搭建 Kafka 环境

    安装步骤 准备软件目录 mkdir datalake 上传之前下载好的安装包到 datalake 目录下 jdk 8u181 linux x64 gz kafka 2 11 2 1 0 tgz zookeeper 3 4 5 tar gz
  • Kafka剖析(一):Kafka背景及架构介绍

    转载自 http www infoq com cn articles kafka analysis part 1 Kafka 是由 LinkedIn 开发的一个分布式的消息系统 使用 Scala 编写 它以可水平扩展和高吞吐率而被广泛使用
  • Kafka一致性

    一 存在的一致性问题 1 生产者和Kafka存储一致性的问题 即生产了多少条消息 就要成功保存多少条消息 不能丢失 不能重复 更重要的是不丢失 其实就是要确保消息写入成功 这可以通过acks 1来保证 保证所有ISR的副本都是一致的 即一条
  • Kafka一文懂

    初识 Kafka 什么是 Kafka Kafka 是由 Linkedin 公司开发的 它是一个分布式的 支持多分区 多副本 基于 Zookeeper 的分布式消息流平台 它同时也是一款开源的基于发布订阅模式的消息引擎系统 Kafka 的基本
  • springboot集成kafka实战项目,kafka生产者、消费者、创建topic,指定消费分区

    springboot集成kafka实战项目 kafka生产者 消费者 创建topic 指定消费分区 前言 本项目代码可直接集成到你现有的springboot项目中 功能包括 1 kafka生产者配置 2 kafka消费者配置 指定分区消费
  • win10系统下安装Kafka 的详细步骤

    Win10 系统下要使用Kafka需要经过以下三个步骤 1 安装JDK 需要安装依赖java JDK 2 安装zookeeper 资源协调 分配管理 3 安装Kafka 一 安装 Java SE Development Kit 13 0 1
  • 附录:kafka源码启动

    本文以源码2 8为例 准备如下 idea 2019 1 4 jdk 1 8 scala 2 12 8 gradle 6 8 1 zookeeper 3 4 10 kafka2 8源码 注意 以下安装都需要装在没有空格的路径上 比如D Pro
  • kafka系列——KafkaProducer源码分析

    实例化过程 在KafkaProducer的构造方法中 根据配置项主要完成以下对象或数据结构的实例化 配置项中解析出 clientId 用于跟踪程序运行情况 在有多个KafkProducer时 若没有配置 client id则clientId
  • Kafka——Mac搭建kafka环境

    1 下载Kafka安装包 下载地址 将压缩包移动到 usr local mv kafka 2 12 3 1 0 tgz usr local 解压 tar zxvf kafka 2 12 3 1 0 tgz 2 启动 启动zookeeper
  • kafka的新API 得到最新一条数据

    业务的需要 需要得到最新的一条消息从kafka中 但是发现ConsumerRecords 这个对象并没有 get index 这种方式的获取并且只能 iterator 或者增强for 循环这种方式来循环 记录 但是有一个count 可以得到
  • MQ - KAFKA 基础篇

    1 KAFKA的核心组件 API Producer API 它允许应用程序向一个或多个 topics 上发送消息记录 Consumer API 允许应用程序订阅一个或多个 topics 并处理为其生成的记录流 Streams API 它允许
  • 阿里技术官亲笔力作:Kafka限量笔记,一本书助你掌握Kafka的精髓

    前言 分布式 堪称程序员江湖中的一把利器 无论面试还是职场 皆是不可或缺的技能 而Kafka 这款分布式发布订阅消息队列的璀璨明珠 其魅力之强大 无与伦比 对于Kafka的奥秘 我们仍需继续探索 要论对Kafka的熟悉程度 恐怕阿里的大佬们

随机推荐

  • ‘cmake' 不是内部或外部命令 也不是可运行的程序 或批处理文

    在 Win7下的命令行模式下 输入cmake相关命令 出现如下错误 cmake 不是内部或外部命令 也不是可运行的程序 或批处理文件 解决方法 在环境变量中添加cmake的文件路径 计算机 右键 属性 高级系统设置 高级 环境变量 系统变量
  • pytorch 多GPU训练总结(DataParallel的使用)

    参考 主页 PyTorch中文文档 前言 博主最近搭建网络的时候 需要调用不同的GPU 实现训练的加速 有时间会出现显卡现存分布不均的情况 有时间有的显卡温度特别高 博客持续更新 一更 2022 09 01 DP模式见本文 使用最少的代码实
  • Go语言上手-实战案例(1)

    猜谜游戏 在这个游戏里面 程序首先会生成一个介于1 100之间的随机整数 然后提示玩家进行猜测 玩家每次输入一个数字 程序就会告诉玩家这个猜测的值是高于还是低于那个秘密的随机数 并且让玩家再次猜测 如果猜对了就告诉玩家胜利并且退出程序 生成
  • 搜索神器Everything的功能技巧(非NTFS文件搜索,FTP/HTTP服务)

    Everything这个搜索神器估计大家都听过 磁盘上的任何文件只要输入后基本就是秒搜 但Everything除了搜索 还自带了一些好用的功能 1 添加非NTFS格式的驱动器索引 默认Everything只会索引查询本地的NTFS格式磁盘
  • Linux云计算命令大全

    云计算命令总结 一 系统命令精讲 二 目录和文件管理 三 安装及管理程序 四 账号管理 五 权限及归属管理 六 磁盘管理 七 文件系统与LVM 八 服务器RAID及配置实战 九 引导过程与服务控制 十 进程和计划任务管理 十一 系统安全及应
  • 【linux】nginx: [emerg] the “ssl“ parameter requires ngx_http_ssl_module

    1 概述 我使用 Linux centos8 安装nginx详细步骤 这个安装了一个nginx 然后启动如下 root zdh2 nginx 1 18 0 sudo usr local nginx sbin nginx c usr
  • class与prototype

    创建实例对象 ES5中常用的构造函数模式 function Person name this name name this getName function return this name ES6 通过class定义类 class Per
  • selenium处理登陆爬虫(维持登陆状态请求页面)

    selenium在处理需要登陆的时候 需要修改浏览器请求头参数cookie或token 在请求需要登陆的页面时 添加参数 跳过登陆 直接获取登陆后的内容 直接在driver对象内添加cookie参数绕开登陆 处理思路 浏览器先登陆 请求同一
  • umi如何实现鉴权

    什么是jwt鉴权 JWT JSON Web Token 本质就是一个字符串书写规范 作用是用来在用户和服务器之间传递安全可靠的信息 在目前前后端分离的开发过程中 使用token鉴权机制用于身份验证是最常见的方案 流程如下 服务器当验证用户账
  • 2.查询分离:表数据量大读写缓慢如何优化?

    查询分离 表数据量大读写缓慢如何优化 01 讲中我们提到过 冷热分离解决方案的性价比高 但它并不是一个最优的方案 仍然存在诸多不足 比如 查询冷数据慢 业务无法再修改冷数据 冷数据多到一定程度系统依旧扛不住 我们如果想把这些问题一一解决掉
  • 如何设置电脑永不熄屏

    1 win q调出搜索框 输入系统 点击系统 2 电源和睡眠 两个选项改为从不 如果是虚拟机 设置永不熄屏的方法 设置
  • 猿创征文

    文章目录 1 PolarDB X是什么 2 PolarDB X架构 3 PolarDB X架构优势 4 PolarDB X核心特性 5 PolarDB X部署 5 1 通过PXD部署集群 5 2 通过 K8S 部署 5 3 通过编译安装 1
  • 【Mybatis-puls 】返回map下划线自动转成驼峰

    文章目录 问题描述 1 yml配置解决方案 错误分析 解决方案 转换器代码 ConfigurationPropertiesBinding的作用 2 通过Java配置bean解决 觉得第一种麻烦的直接用第二种 问题描述 VO实体类自动转换驼峰
  • 使用plsql工具查看oracle中的blob字段的可视化值

    SELECT utl raw cast to varchar2 dbms lob substr t detailsql from Voucher t
  • 每天一个设计模式——装饰模式(C++实现)

    设计模式的代码十分难写的 要充分的体现可复用性 网上有着大量关于设计模式的代码 其中很多的代码违背了很多设计原则 比如依赖倒置原则 开放封闭原则 需要我们明辨是非 设计模式的原则大于使用哪个设计模式 类的组合关系也大于类的继承 通过不断的写
  • ECCV 2022

    作者 机器之心编辑部 来源 机器之心 如何将现有的图像 文本多模态大模型 例如 OpenAI CLIP 用于视频内容理解 是一个非常实用且具有前景的研究课题 它不仅可以充分挖掘图像大模型的潜力 还可以为视频大模型的设计和研究铺平道路 在视频
  • 年轻人还记得KCP吗?什么是KCP,怎么使用呢!!!

    一 什么是KCP KCP是一种网络传输协议 A Fast and Reliable ARQ Protocol 可以视它为TCP的代替品 但是它运行于用户空间 它不管底层的发送与接收 只是个纯算法实现可靠传输 它的特点是牺牲带宽来降低延迟 因
  • C51单片机学习笔记(二)——花样流水灯的实现

    C51单片机学习笔记 二 花样流水灯的实现 文章目录 C51单片机学习笔记 二 花样流水灯的实现 1 单片机引脚 晶振 复位的作用 2 流水灯原理图 3 单片机的周期 4 延时函数的编写 5 使用 位操作 控制流水灯 6 使用字节控制 并行
  • lenovo联想笔记本ThinkBook 14 Gen5+ IRH(21HW)原装Win11系统镜像原厂OEM恢复出厂状态

    LENOVO联想笔记本电脑 ThinkBook 14 Gen5 IRH 21HW 原厂Windows11原装OEM系统 恢复出厂时状态系统 系统自带所有驱动 出厂主题壁纸LOGO Office办公软件 联想电脑管家等预装程序 所需要工具 1
  • java客户端作为kafka消费者测试

    README 本文主要对 java客户端作为kafka 消费者进行测试 生产者由 kafka客户端扮演 1 普通消费者 设置消费者组 重置消费者的offset 即每次都从最头开始消费 默认仅保持7天内数据 类似于 命令行 from begi