Apache Kafka 编程实战-java客户端开发例子(入门教程轻松学)

2023-11-09

作者:稀有气体
来源:CSDN
原文:https://blog.csdn.net/liyiming2017/article/details/82805479
版权声明:本文为博主原创文章,转载请附上博文链接!

本入门教程,涵盖Kafka核心内容,通过实例和大量图表,帮助学习者理解,任何问题欢迎留言。

目录:

本章通过实际例子,讲解了如何使用java进行kafka开发。

准备
添加依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>

创建主题
下面是创建主题的代码:

public class TopicProcessor {
    private static final String ZK_CONNECT="localhost:2181";
    private static final int SESSION_TIME_OUT=30000;
    private static final int CONNECT_OUT=30000;
 
    public static void createTopic(String topicName,int partitionNumber,int replicaNumber,Properties properties){
        ZkUtils zkUtils = null;
        try{
            zkUtils=ZkUtils.apply(ZK_CONNECT,SESSION_TIME_OUT,CONNECT_OUT, JaasUtils.isZkSecurityEnabled());
            if(!AdminUtils.topicExists(zkUtils,topicName)){
             AdminUtils.createTopic(zkUtils,topicName,partitionNumber,replicaNumber,properties,AdminUtils.createTopic$default$6());
            }
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            zkUtils.close();
        }
    }
 
    public static void main(String[] args){
        createTopic("javatopic",1,1,new Properties());
    }
}

首先定义了zookeeper相关连接信息。然后在createTopic中,先初始化ZkUtils,和zookeeper交互依赖于它。然后通过AdminUtils先判断是否存在你要创建的主题,如果不存在,则通过createTopic方法进行创建。传入参数包括主题名称,分区数量,副本数量等。

生产者生产消息
生产者生产消息代码如下

public class MessageProducer {
    private static final String TOPIC="education-info";
    private static final String BROKER_LIST="localhost:9092";
    private static KafkaProducer<String,String> producer = null;
 
    static{
        Properties configs = initConfig();
        producer = new KafkaProducer<String, String>(configs);
    }
 
    private static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
        properties.put(ProducerConfig.ACKS_CONFIG,"all");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
        return properties;
    }
 
    public static void main(String[] args){
        try{
            String message = "hello world";
            ProducerRecord<String,String> record = new ProducerRecord<String,String>(TOPIC,message);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if(null==exception){
                        System.out.println("perfect!");
                    }
                    if(null!=metadata){
                        System.out.print("offset:"+metadata.offset()+";partition:"+metadata.partition());
                    }
                }
            }).get();
        }catch (Exception e){
            e.printStackTrace();
        }finally {
            producer.close();
        }
    }
}

1、首先初始化KafkaProducer对象。

producer = new KafkaProducer<String, String>(configs);

2、创建要发送的消息对象。

ProducerRecord<String,String> record = new ProducerRecord<String,String>(TOPIC,message);

3、通过producer的send方法,发送消息

4、发送消息时,可以通过回调函数,取得消息发送的结果。异常发生时,对异常进行处理。

初始化producer时候,需要注意下面属性设置:

properties.put(ProducerConfig.ACKS_CONFIG,"all");

这里有三种值可供选择:

0,不等服务器响应,直接返回发送成功。速度最快,但是丢了消息是无法知道的
1,leader副本收到消息后返回成功
all,所有参与的副本都复制完成后返回成功。这样最安全,但是延迟最高。

消费者消费消息
我们直接看代码

public class MessageConsumer {
 
    private static final String TOPIC="education-info";
    private static final String BROKER_LIST="localhost:9092";
    private static KafkaConsumer<String,String> kafkaConsumer = null;
 
    static {
        Properties properties = initConfig();
        kafkaConsumer = new KafkaConsumer<String, String>(properties);
        kafkaConsumer.subscribe(Arrays.asList(TOPIC));
    }
 
    private static Properties initConfig(){
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,BROKER_LIST);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG,"test");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        return properties;
    }
 
    public static void main(String[] args){
        try{
            while(true){
                ConsumerRecords<String,String> records = kafkaConsumer.poll(100);
                for(ConsumerRecord record:records){
                    try{
                        System.out.println(record.value());
                    }catch(Exception e){
                        e.printStackTrace();
                    }
                }
            }
 
        }catch(Exception e){
            e.printStackTrace();
        }finally {
            kafkaConsumer.close();
        }
    }
}

代码逻辑如下:

1、初始化消费者KafkaConsumer,并订阅主题。

kafkaConsumer = new KafkaConsumer<String, String>(properties);
kafkaConsumer.subscribe(Arrays.asList(TOPIC));

2、循环拉取消息

ConsumerRecords<String,String> records = kafkaConsumer.poll(100);

poll方法传入的参数100,是等待broker返回数据的时间,如果超过100ms没有响应,则不再等待。

3、拉取回消息后,循环处理。

for(ConsumerRecord record:records){
     try{
            System.out.println(record.value());
        }catch(Exception e){
             e.printStackTrace();
        }
}

消费相关代码比较简单,不过这个版本没有处理偏移量提交。学习过第四章-协调器相关的同学应该还记得偏移量提交的问题。我曾说过最佳实践是同步和异步提交相结合,同时在特定的时间点,比如再均衡前进行手动提交。

加入偏移量提交,需要做如下修改:

1、enable.auto.commit设置为false

2、消费代码如下:

public static void main(String[] args){
    try{
        while(true){
            ConsumerRecords<String,String> records =
                    kafkaConsumer.poll(100);
            for(ConsumerRecord record:records){
                try{
                    System.out.println(record.value());
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
            kafkaConsumer.commitAsync();
        }
 
    }catch(Exception e){
        e.printStackTrace();
    }finally {
        try{
            kafkaConsumer.commitSync();
        }finally {
            kafkaConsumer.close();
        }
    }
}

3、订阅消息时,实现再均衡的回调方法,在此方法中手动提交偏移量

kafkaConsumer.subscribe(Arrays.asList(TOPIC), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                //再均衡之前和消费者停止读取消息之后调用
                kafkaConsumer.commitSync(currentOffsets);
            }
   });

通过以上三步,我们把自动提交偏移量改为了手动提交。正常消费时,异步提交kafkaConsumer.commitAsync()。即使偶尔失败,也会被后续成功的提交覆盖掉。而在发生异常的时候,手动提交 kafkaConsumer.commitSync()。此外在步骤3中,我们通过实现再均衡时的回调方法,手动同步提交偏移量,确保了再均衡前偏移量提交成功。

以上面的最佳实践提交偏移量,既能保证消费时较高的效率,又能够尽量避免重复消费。不过由于重复消费无法100%避免,消费逻辑需要自己处理重复消费的判断。

至此,本系列kafka轻松学教程也就完结了。教程涵盖了Kafka大部分的内容,但没有涉及到流相关的内容。Kafka实现的部分细节也没有做过多的讲解。学习完本教程,如果你能够对kafka的原理有较为深刻的理解,并且能够上手开发程序,目的就已经达到了。如果想继续探寻Kafka工作的细节,可以再看更为深入的资料。相信通过此教程打好基础,再深入学习kafka也会更为容易!

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Kafka 编程实战-java客户端开发例子(入门教程轻松学) 的相关文章

随机推荐

  • vuex中的mutations的两种调用方法

    直接通过 store commit调用
  • Ubuntu14.04 安装ffmpeg

    一 xvid x264 ffmpeg源码下载 链接 https pan baidu com s 13phSFrLqkGrKDGF3 a2cSA 提取码 ls2s 二 安装 1 xvid tar zxvf xvidcore 1 3 3 tar
  • 一文带你看懂Spring事务!

    点击上方 方志朋 选择 设为星标 做积极的人 而不是积极废人 前言 Spring事务管理我相信大家都用得很多 但可能仅仅局限于一个 Transactional注解或者在XML中配置事务相关的东西 不管怎么说 日常可能足够我们去用了 但作为程
  • 677. 键值映射

    实现一个 MapSum 类 支持两个方法 insert 和 sum MapSum 初始化 MapSum 对象 void insert String key int val 插入 key val 键值对 字符串表示键 key 整数表示值 va
  • 面试之计算机网络

    计算机网络 1 路由选择协议 常见的路由选择协议有 RIP协议 OSPF协议 RIP协议 底层是贝尔曼福特算法 它选择路由的度量标准 metric 是跳数 最大跳数是15跳 如果大于15跳 它就会丢弃数据包 OSPF协议 底层是迪杰斯特拉算
  • IDEA 设置默认Maven的路径

    文件 新项目设置 构建工具 Maven 修改主路径
  • linux调整queue_depth,linux – 无法编辑/ sys / block / sdX / device / queue_depth文件

    我正在尝试使用以下命令增加SSD的队列深度值 echo 64 gt sys block sda device queue depth 但是我收到以下错误 bash echo write error Invalid argument 我尝试使
  • STM32CubeIDE HAL库操作IIC (一)配置篇

    目录 一 MX配置 使能中断 可选 DMA设置 可选 二 生成的代码 三 IIC通信的三种方式 Polling IT DMA 代码源自官方例程 1 Polling 常用 2 IT 开启中断 接收到数据时会调用回调函数 3 DMA模式 回调函
  • Qt 如何使用正则表达式 正则表达式 密码 email

    Qt 正则表达式 regular expression 详细用法查看此博客 https blog csdn net dongdong csdn article details 78574168 QRegExp regExpPsw 正则表达式
  • pytorch每日一学24(torch.quantize_per_tensor()、torch.quantize_per_channel())使用映射过程将tensor进行量化

    第24个方法 torch quantize per tensor input scale zero point dtype Tensor torch quantize per channel input scales zero points
  • 用JSP实现简单的四则运算

    用JSP实现简单的四则运算 作者 GGG166 首先定义一个Java的CompuerBean类放在beans包中用来计算两个数的四则运算 代码如下 作者 GGG166 package beans public class CompuerBe
  • 机器学习实战第一章——读书笔记

    数据挖掘 使用机器学习方法挖掘大量数据来帮助发现不太明显的规律 这称作数据挖掘 机器学习分类标准 一 有监督学习和无监督学习 1 有监督学习 提供带标签的训练集 k 近邻算法 线性回归 逻辑回归 支持向量机 SVM 决策树和随机森林 神经网
  • blender 渲染预览按钮不见了怎么办

    如果你在使用 Blender 时发现渲染预览按钮不见了 你可以尝试以下方法来解决这个问题 检查是否切换到了其他工作区 Blender 中有多个工作区 每个工作区都有自己的工具栏和面板 如果你切换到了其他工作区 渲染预览按钮可能不在屏幕上 检
  • 计算机网络基础知识--应用层协议HTTP、FTP、SMTP

    目录 1 HTTP 协议 HTTP 特点 HTTP 与HTTPS 的区别 HTTP 请求报文 HTTP 响应报文 2 FTP 协议 与TFTP 协议 FTP 协议 TFTP 协议 3 SMTP 协议 POP3协议与IMAP协议 SMTP协议
  • C语言中,数组首地址,数组元素首地址,数组名的区别

    目录 前言 一 什么是指针 二 正式介绍 1数组名和数组首元素地址 2 arr和 arr的区别 总结 前言 刚刚接触指针 一直想搞明白数组首地址 数组元素首地址 数组名的区别 花了点时间还是搞懂了 一 什么是指针 其实指针就是地址 地址就是
  • 表单嵌套表单涉及的校验和数据回显

    props中接收父组件的传值 data中定义表单的初始值 不能写成空对象 会报错 在created中对编辑的情况进行赋值 注意不能直接赋值 数据不是响应式的 需要使用到this set props formData type Object
  • Java-JavaWeb—(12)Maven

    1 Maver简介 1 1Maver是什么 Maven的本质是一个项目管理工具 将项目开发和管理过程抽象成一个项目对象模型 POM Maven是用Java语言编写的 他管理的东西统统以面向对象的形式进行设计 最终他把一个项目看成一个对象 而
  • JVM之垃圾回收机制

    垃圾回收机制 垃圾回收时机 System gc JVM垃圾回收机制决定 垃圾回收策略 如何判断对象已死 引用计数算法 可达性分析算法 需要垃圾回收的内存 方法区 元空间 堆 新生代 Young Generation 老年代 Old Gene
  • 深度学习基础学习-注意力机制(计算机视觉中)

    在网上看到很多关于注意力机制的说明 下面自己总结一下 大佬绕道 下面放几个文章的链接 添深度学习中的注意力模型 计算机视觉中的注意力机制 图像处理注意力机制Attention汇总 注意力机制详述 注意力机制总结 空间注意力机制和通道注意力机
  • Apache Kafka 编程实战-java客户端开发例子(入门教程轻松学)

    作者 稀有气体 来源 CSDN 原文 https blog csdn net liyiming2017 article details 82805479 版权声明 本文为博主原创文章 转载请附上博文链接 本入门教程 涵盖Kafka核心内容