kafka的新API 得到最新一条数据

2023-11-20

业务的需要,需要得到最新的一条消息从kafka中,但是发现ConsumerRecords 这个对象并没有 get(index) 这种方式的获取并且只能 iterator  或者增强for 循环这种方式来循环 记录。

但是有一个count() 可以得到这次拉取的条数因此可以这么写

           

package com.test.kafka;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.JaasUtils;

import kafka.admin.AdminUtils;
import kafka.api.OffsetRequest;
import kafka.utils.ZkUtils;
import scala.collection.JavaConversions;

public class TestOffset {

	private static ZkUtils zkUtils = null;

	@SuppressWarnings("resource")
	public static void main(String[] args) {

		// final String topicName = "ROOM-FFFFFF-999";
		final String topicName = "test111";
		Properties props = new Properties();
		// 构建client name及groupId
		// String topic = topicPartitionInfo.topic;
		// int partitionID = topicPartitionInfo.partitionID;
		// String clientName = this.createClientName(topic, partitionID);
		// String groupId = clientName;

		props.put("bootstrap.servers", "localhost:9092");
		props.put("group.id", "front_ROOM-FFFFFF-999_0");
		props.put("enable.auto.commit", "true");
		props.put("auto.commit.interval.ms", "1000");
		props.put("session.timeout.ms", "30000");

		// 要发送自定义对象,需要指定对象的反序列化类
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

		long earliestTime = OffsetRequest.EarliestTime();
		long lastTime = OffsetRequest.LatestTime();
		long defaultClientId = OffsetRequest.LatestTime();
		final KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
		Map<TopicPartition, OffsetAndMetadata> hashMaps = new HashMap<TopicPartition, OffsetAndMetadata>();
		hashMaps.put(new TopicPartition(topicName, 0), new OffsetAndMetadata(0));
		consumer.subscribe(Arrays.asList(topicName)); // consumer 订阅主题

		// assign方法由用户 订阅哪些具体分区 按分区来订阅
		// consumer.assign(Arrays.asList(new TopicPartition(topicName, 0)));

		// 指定从第一条开始读 此topic
		// consumer.seekToBeginning(Arrays.asList(new
		// TopicPartition(topicName,0)));
		// 存在的groupid获取指定的topic任意的offset
		System.out.println("话题 topic " + topicName + "     eraliessTime   " + defaultClientId + "     lastTime :"
				+ earliestTime + "     " + lastTime);
		// consumer.seek(new TopicPartition(topicName,0),lastTime);

		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(500);

			int i = 1;
			for (ConsumerRecord<String, String> record : records) {
				if (records.count() > 0) {
					if (records.count() == 1) {
						// 提交偏移量 这个是如果自动提交偏移量改为false的时候 手动提交需要
						System.out.println(
								"==1==" + records.count() + "----->" + record.value() + "     " + record.offset());
					} else {
						if (i == records.count()) {
							System.out.println(
									"==>2==" + records.count() + "----->" + record.value() + "     " + record.offset());
						}
						i++;
					}
				}
			}
		}

	}

	

}

这个是订阅的的简单的实现消费者这种,用了subscribe() 这个方法,就是这种就是订阅topic   消息会实时拉取显示,如果消费者停止,再启动,那么这段时间未读取的消息会一股脑拉取到,如果你这个groupID 新加入的那么之前的消息跟你就没关系了。   我这个就是 topic  和groupID 都是原来的旧的,有时候就会一下显示太多条,然后我就用 int  i=1   来控制这个。

     

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka的新API 得到最新一条数据 的相关文章

  • 计算广告读书笔记

    计算广告 广告主 媒体 用户 用户画像 ROI 进化 合约广告 多个合约在线分配问题 gt 竞价广告 交易终端TD 广告网络ADN gt 实时竞价RTB 广告交易平台ADX 需求方平台DSP 品牌广告 效果广告 点击率CTR 点击价值 到达
  • 【CentOS7离线ansible-playbook自动化安装CDH5.16(内附离线安装包地址,及自动化脚本)】

    CentOS7 离线环境 使用ansible自动部署CDH5 16 前言 本文介绍如何使用作者开发的自动化脚本 离线部署cdh集群 只需要简单的配置下yum源和cdh集群节点IP等几个参数 就可实现一键部署cdh集群 省去配置mysql n
  • kafka如何避免消费组重平衡

    目录 前言 协调者 重平衡的影响 避免重平衡 重平衡发生的场景 参考资料 前言 Rebalance 就是让一个 Consumer Group 下所有的 Consumer 实例就如何消费订阅主题的所有分区达成共识的过程 在 Rebalance
  • kafka知识 --kafka权威指南

    我想既然Kafka是为了写数据而产生的 那么用作家的名字来命名会显得更有意义 我在大学时期上过很多文学课程 很喜欢Franz Kafka 况且 对于开源项目来说 这个名字听起来很酷 因此 名字和应用本身基本没有太多联系 Jay Kreps
  • Kafka——javaAPI

    文章目录 Kafka的JavaAPI 1 未整合版的Kafka的API 1 1 Producer 消息发送端代码 1 2 Consumer 消息消费端代码 2 Spring Boot整合Kafka 2 1 发送者代码 Producer 2
  • Kafka3.0.0版本——消费者(消费者组案例)

    目录 一 消费者组案例 1 1 案例需求 1 2 案例代码 1 2 1 消费者1代码 1 2 2 消费者2代码 1 2 3 消费者3代码 1 2 4 生产者代码 1 3 测试 一 消费者组案例 1 1 案例需求 测试同一个主题的分区数据 只
  • springboot本机启动elasticjob抛出异常HostException(ip is null)

    1 使用的elasticjob版本为3 0 1 2 本机的IPV4在校验isReachable 返回false 可能是使用无线网 导致ip验证问题 3 最后引入Groovy解决 引入包
  • 六、Kafka consumer及ConsumerRebalanceListener实现

    1 comsumer代码示例 public class ConsumerMessage private static final String TOPIC NAME topic 07 public static void main Stri
  • Linux 下搭建 Kafka 环境

    安装步骤 准备软件目录 mkdir datalake 上传之前下载好的安装包到 datalake 目录下 jdk 8u181 linux x64 gz kafka 2 11 2 1 0 tgz zookeeper 3 4 5 tar gz
  • 第十四章 kafka专题之日志数据删除策略

    日志数据清理 为了控制磁盘的容量 需要对过去的消息进行清理 1 内部定时任务检测删除日志 默认是5分钟 2 日志清理参数配置 支持配置策略对数据进行清理 以segment为基本单位进行定期清理 当前正在使用的segment不会被清理 启用c
  • Flink设置Source数据源使用kafka获取数据

    流处理说明 有边界的流bounded stream 批数据 无边界的流unbounded stream 真正的流数据 Source 基于集合 package com pzb source import org apache flink ap
  • springboot集成kafka实战项目,kafka生产者、消费者、创建topic,指定消费分区

    springboot集成kafka实战项目 kafka生产者 消费者 创建topic 指定消费分区 前言 本项目代码可直接集成到你现有的springboot项目中 功能包括 1 kafka生产者配置 2 kafka消费者配置 指定分区消费
  • kafka + zookeeper下载/安装/使用(超详细)

    kafka是需要zk来支持 所以先下载zk 1 下载安装zookeeper 下载地址 选择不带source的 下载下来解压2次 进入到 D zookeeper apache zookeeper 3 6 1 bin conf 目录下 把zoo
  • java版kafka producer实现

    需求 1 kafka server已经配置完全 且设定了访问限制 基于这一点 必须要设定认证 及预先分配的账号密码 2 由于项目开发环境是java 且不允许使用LogStash 基于这一点 必须实现一个java版的producer 先贴一份
  • explain查看sql语句执行计划

    explain sql 执行结果字段描述 id select唯一标识 select type select类型 table 表名称 type 连接类型 possible keys 可能的索引选择 key 实际用到的索引 key len 实际
  • kafka 监控工具--CMAK

    CMAK previously known as Kafka Manager is a tool for managing Apache Kafka clusters See below for details about the name
  • shell脚本,一次性启动kafka集群

    版本centos6 5 64位操作系统 已配置JDK1 8 三个节点 在s121节点上可以免密登录到另外两个节点 另外kafka0 9 0 1的安装目录相同 修改了主机名 并在每个节点的hosts文件中设置了映射 脚本内容 bin bash
  • MQ - KAFKA 基础篇

    1 KAFKA的核心组件 API Producer API 它允许应用程序向一个或多个 topics 上发送消息记录 Consumer API 允许应用程序订阅一个或多个 topics 并处理为其生成的记录流 Streams API 它允许
  • 从 MySQL 到 DolphinDB,Debezium + Kafka 数据同步实战

    Debezium 是一个开源的分布式平台 用于实时捕获和发布数据库更改事件 它可以将关系型数据库 如 MySQL PostgreSQL Oracle 等 的变更事件转化为可观察的流数据 以供其他应用程序实时消费和处理 本文中我们将采用 De
  • 消息队列选型:Kafka 如何实现高性能?

    在分布式消息模块中 我将对消息队列中应用最广泛的 Kafka 和 RocketMQ 进行梳理 以便于你在应用中可以更好地进行消息队列选型 另外 这两款消息队列也是面试的高频考点 所以 本文我们就一起来看一下 Kafka 是如何实现高性能的

随机推荐