Kafka3.0.0版本——消费者(自动提交 offset)

2023-10-29

一、自动提交offset的相关参数

  • 官网文档
    在这里插入图片描述

  • 参数解释

    参数 描述
    enable.auto.commi 默认值为 true,消费者会自动周期性地向服务器提交偏移量。
    auto.commit.interval.ms 如果设置了 enable.auto.commit 的值为 true, 则该值定义了消费者偏移量向 Kafka 提交的频率,默认 5s。
  • 图解分析

    在这里插入图片描述

二、消费者(自动提交 offset)代码示例

  • 消费者自动提交 offset代码

    // 自动提交
    properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
    // 提交时间间隔 1秒
    properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
    
  • 消费者自动提交 offset代码完整代码

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumerAutoOffset {
    
        public static void main(String[] args) {
    
            // 配置
            Properties properties = new Properties();
    
            // 连接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消费者组id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test3");
    
            // 自动提交
            properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
    
            // 提交时间间隔 1秒
            properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
    
            // 1 创建一个消费者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 订阅主题 first
            ArrayList<String> topics = new ArrayList<>();
            topics.add("sevenTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消费数据
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka3.0.0版本——消费者(自动提交 offset) 的相关文章

  • 【CentOS7离线ansible-playbook自动化安装CDH5.16(内附离线安装包地址,及自动化脚本)】

    CentOS7 离线环境 使用ansible自动部署CDH5 16 前言 本文介绍如何使用作者开发的自动化脚本 离线部署cdh集群 只需要简单的配置下yum源和cdh集群节点IP等几个参数 就可实现一键部署cdh集群 省去配置mysql n
  • Spring Boot Kafka - 序列化和反序列化JSON

    文章目录 Spring Boot Kafka 序列化和反序列化JSON 前言 配置JsonSerializer和JsonDeserializer 定义一个Model类 Producer类 Consumer类 Controller类 测试 小
  • 20道常见的kafka面试题以及答案

    JAVA面试宝典 搞定JAVA面试 不再是难题 系列文章传送地址 请点击本链接 目录 1 kafka的消费者是pull 拉 还是push 推 模式 这种模式有什么好处 2 kafka维护消息状态的跟踪方法 3 zookeeper对于kafk
  • Kafka面试必问几个概念 与 使用场景

    介绍下我写的这个kafka项目 里面做了详细的配置注释已经代码的demo 可供大家学习 项目 地址 springboot kafka集群项目实战 kafka集群批量消费数据去重和一致性 kafka的几个重要概念 接下来围绕下面几个概念来进行
  • kafka知识 --kafka权威指南

    我想既然Kafka是为了写数据而产生的 那么用作家的名字来命名会显得更有意义 我在大学时期上过很多文学课程 很喜欢Franz Kafka 况且 对于开源项目来说 这个名字听起来很酷 因此 名字和应用本身基本没有太多联系 Jay Kreps
  • Kafka3.0.0版本——消费者(消费者组案例)

    目录 一 消费者组案例 1 1 案例需求 1 2 案例代码 1 2 1 消费者1代码 1 2 2 消费者2代码 1 2 3 消费者3代码 1 2 4 生产者代码 1 3 测试 一 消费者组案例 1 1 案例需求 测试同一个主题的分区数据 只
  • Kafka/Spark消费topic到写出到topic

    1 Kafka的工具类 1 1 从kafka消费数据的方法 消费者代码 def getKafkaDStream ssc StreamingContext topic String groupId String consumerConfigs
  • Kafka【命令行操作】

    Kafka 命令行操作 Kafka 主要包括三大部分 生产者 主题分区节点 消费者 1 Topic 命令行操作 也就是我们 kafka 下的脚本 kafka topics sh 的相关操作 常用命令行操作 参数 描述 bootstrap s
  • Kafka原理分析

    在基础篇中我们介绍MQ的一些基础原理 这篇文章 我们针对kafka进行较深入的分析 上篇文章中我们提到了kafka中一个名词broker 其实broker可以理解成为一台kafa服务器 kafka的特性和功能 在kafka设计之初是为了实时
  • springboot本机启动elasticjob抛出异常HostException(ip is null)

    1 使用的elasticjob版本为3 0 1 2 本机的IPV4在校验isReachable 返回false 可能是使用无线网 导致ip验证问题 3 最后引入Groovy解决 引入包
  • 黑马头条 热点文章实时计算、kafkaStream

    热点文章 实时计算 1 今日内容 1 1 定时计算与实时计算 1 2 今日内容 kafkaStream 什么是流式计算 kafkaStream概述 kafkaStream入门案例 Springboot集成kafkaStream 实时计算 用
  • Flink设置Source数据源使用kafka获取数据

    流处理说明 有边界的流bounded stream 批数据 无边界的流unbounded stream 真正的流数据 Source 基于集合 package com pzb source import org apache flink ap
  • Kafka入门基础知识学习笔记-Kafka只是消息引擎吗

    学习极客时间 Kafka核心技术与实战 入门 03 05 作者 胡夕 Apache Kafka 的一名代码贡献者 目前在社区的 Patch 提交总数位列第 22 位 应该说算是国内比较活跃的贡献者了 胡夕老师 赠言 聪明人也要下死功夫 最近
  • WebSocket + kafka实时推送数据(springboot纯后台)

    逻辑 kafka订阅消费者主题 消费后通过webSocket推送到前端 kafka vue financial webSocket 学习引用 SpringBoot2 0集成WebSocket 实现后台向前端推送信息 World Of Mos
  • [分布式] zookeeper集群与kafka集群

    目录 一 Zookeeper 概述 1 1 Zookeeper定义 1 2 Zookeeper 工作机制 1 3 Zookeeper 特点 1 4 Zookeeper 数据结构 1 5 Zookeeper 应用场景 1 6 Zookeepe
  • kafka系列——KafkaProducer源码分析

    实例化过程 在KafkaProducer的构造方法中 根据配置项主要完成以下对象或数据结构的实例化 配置项中解析出 clientId 用于跟踪程序运行情况 在有多个KafkProducer时 若没有配置 client id则clientId
  • Kafka——Mac搭建kafka环境

    1 下载Kafka安装包 下载地址 将压缩包移动到 usr local mv kafka 2 12 3 1 0 tgz usr local 解压 tar zxvf kafka 2 12 3 1 0 tgz 2 启动 启动zookeeper
  • Kafka 监控系统Eagle 使用教程 V1.4.0

    1 下载安装zookeeper 2 下载安装kafka 3 下载安装kafka eagle http download kafka eagle org tar zvxf kafka eagle bin 1 4 0 tar gz 4 配置JA
  • MQ - KAFKA 高级篇

    kafak是一个分布式流处理平台 提供消息持久化 基于发布 订阅的方式的消息中间件 同时通过消费端配置相同的groupId支持点对点通信 适用场景 构造实时流数据管道 用于系统或应用之间可靠的消息传输 数据采集及处理 例如连接到一个数据库系
  • 【ranger】CDP环境 更新 ranger 权限策略会发生低概率丢失权限策略的解决方法

    一 问题描述 我们的 kafka 服务在更新 添加 ranger 权限时 会有极低的概率导致 MM2 同步服务报错 报错内容 Not Authorized 但是查看 ranger 权限是赋予的 并且很早配置的权限策略也会报错 相关组件版本

随机推荐

  • 华为一二三面

    目录 机试 关于机考简介 第一次机考之前HR小哥哥发给我的 好像没啥用 但还是放这儿吧 我的机试 2021年9月 一面 10 25 二面 10 26 三面 10 29 机试 关于机考简介 第一次机考之前HR小哥哥发给我的 好像没啥用 但还是
  • 【PyCharm】设置(风格

    设置方法见 分享一款好看的PyCharm风格 转 K Code 博客园
  • Invalid operation: Lob is closed. ERRORCODE=-4470, SQLSTATE=null

    使用db2的过程中出现这个错误 搜索有如下两种解决办法 这里尝试只有一种有效 记录如下 方法1 设置JDBC驱动的progressiveStreaming属性值为2 jdbc db2 localhost 50000 SAMPLE progr
  • Strtus2历史漏洞复现

    Strtus简介 Apache Struts是美国阿帕奇 Apache 软件基金会负责维护的一个开源项目 是一套用于创建企业级Java Web 应用的开源MVC框架 主要提供两个版本框架产品 Struts 1和Struts2 Struts2
  • 软件测试内容

    软件测试涉及以主要方面 需求收集 没有明确的要求 项目就无法起飞 这是最关键的阶段 需要将想法写成格式正确且易于理解的文档 以下生命周期代表了收集需求的关键步骤 收集 记录 分析 论证 验证 追踪 确认 如果错过了任何信息 以下是在此阶段应
  • python版本号比对_比较Python中的版本号

    假设我们必须比较两个版本号version1和version2 如果version1 gt version2 则返回1 否则 返回1 否则 当version1 我们可以假定版本号的每个级别的默认修订号为0 例如 版本号3 4的第一级和第二级修
  • Java从入门到实战总结-4.3、数据库进阶-事务

    Java从入门到实战总结 4 3 数据库进阶 事务 文章目录 Java从入门到实战总结 4 3 数据库进阶 事务 1 事务的语法 2 事务的ACID特性 3 事务的并发问题 4 事务隔离级别 5 不同的隔离级别的锁的情况 了解 6 隐式提交
  • 华为OD机试 - 字符串解密(Java)

    题目描述 给定两个字符串string1和string2 string1是一个被加扰的字符串 string1由小写英文字母 a z 和数字字符 0 9 组成 而加扰字符串由 0 9 a f 组成 string1里面可能包含0个或多个加扰子串
  • 云计算在IT领域的发展和应用

    文章目录 云计算的发展历程 云计算的核心概念 云计算在IT领域的应用 1 基础设施即服务 IaaS 2 平台即服务 PaaS 3 软件即服务 SaaS 云计算的拓展应用 结论 欢迎来到AIGC人工智能专栏 云计算在IT领域的发展和应用 o
  • ARMv8的两种执行状态: AArch64/AArch32

    Copied from ARM Compiler User Guide When compiling code you must first decide which target the executable is to run on A
  • 智能温控风扇设计(采用74ls164移位寄存器)

    温度传感器DS18B20采集环境模拟信号 其输出送入AT89C51 单片机在程序的控制下 将处理过的数据送到移位寄存器74LS164 经74LS164输出后驱动三位数码管显示 当被测温度高于18 时 单片机发出控制信号使降温电扇以自然风的形
  • 电脑能上QQ无法上网页解决方法总结

    一 简单办法 使用腾讯管家 gt 工具箱 gt 电脑诊所 gt 上网异常 gt 能上QQ无法上网页 gt 立即修复 二 较复杂办法 很明显的问题 这个是DNS出了问题 我告诉你一招 基本是100 可以解决的 网上邻居右键属性 找到你的本地连
  • RocketMQ参数配置

    一 MQ启动 gt nohup sh mqnamesrv gt nohup sh mqbroker n 47 97 72 25 9876 c broker properties jps查看启动情况 二 broker properties参数
  • Git - 如何将git修改的文件导出和导入

    1 应用场景 主要用于将git修改的文件导出和导入 帮助提高工作效率 2 学习 操作 1 文档阅读 来自chatGPT的对话 2 整理输出 2 1 如何将git修改的文件导出来 如果您只是想将 Git 仓库中的文件导出到本地磁盘中 而不需要
  • 【PTA】分解质因数

    求出区间 a b 中所有整数的质因数分解 输入格式 输入两个整数a b 数据规模和约定 2 lt a lt b lt 10000 输出格式 每行输出一个数的分解 形如k a1a2a3 a1 lt a2 lt a3 k也是从小到大的 具体可看
  • Solidity 基础(一)

    Solidity 官网 github Solidity是一种静态类型的花括号 curly braces 编程语言 旨在开发运行在以太坊 Ethereum 上的智能合约 Solidity正在迅速发展 作为一门相对年轻的语言 Solidity正
  • 网络工程(计算机网络)毕业论文+PPT【银行内部网络系统规划与设计】

    我有两位优秀的小兄弟毕业了 在他们的允许下 把这两篇毕业论文写成博客 作为网络专业同学的参考 仅供参考 切勿用作其他用途 论文一 网络工程 计算机网络 毕业论文 PPT 银行内部网络系统规划与设计 论文二 网络工程 计算机网络 毕业论文 P
  • Vite 配置 cdn 加载资源

    一 介绍 上篇文章我们从零配置 Vite Vue3 0 开发环境 生产环境 本篇文章我们配置 CDN 加载 因为 Vite 不会重写从外部文件导入的内容 我们需要使用支持 ESM 编译的 CDN 这里我们使用 https esm sh 来引
  • 下拉ajax,ajax下拉框联动

    My JSP car jsp starting page 请选择汽车品牌 宝马 奥迪 奔驰 请选择系列 var xmlHttp 创建XMLHttpRequest对象 function createXMLHttpRequest if wind
  • Kafka3.0.0版本——消费者(自动提交 offset)

    目录 一 自动提交offset的相关参数 二 消费者 自动提交 offset 代码示例 一 自动提交offset的相关参数 官网文档 参数解释 参数 描述 enable auto commi 默认值为 true 消费者会自动周期性地向服务器