大数据组件-Kafka的javaAPI操作,Kafka StreamingAPI开发,

2023-10-26

1.KafkaJavaApi操作

1.添加maven依赖

 <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>0.10.0.0</version>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <!-- java编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

2.生产者代码

kafkaproducerAPI文档

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyProducer {
    /**
     * 实现生产数据到kafka test这个topic里面去
     * @param args
     */

    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("acks", "all"); //消息确认机制
        props.put("retries", 0); //消息发送失败后重试次数
        props.put("batch.size", 16384); //处理一批数据大小
        props.put("linger.ms", 1); //消息每天都进行确认
        props.put("buffer.memory", 33554432); //缓冲区的大小
        //指定k和v序列化类StringSerializer
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //获取kafkaProduce这个类
        Producer<String,String> kafkaProducer = new KafkaProducer<>(props);

        //使用循环发送消失
        for (int i = 0; i < 100; i++) {
            Thread.sleep(1200);
            kafkaProducer.send(new ProducerRecord<String, String>("test","mymessage"+i));//向test这个topic发送messagei这这个信息
        }
        //关闭资源
        kafkaProducer.close();
    }
}

3.1生产者分区策略

  • 如果指定分区号,那么数据直接产生到对应的分区里面去
  • 如果没有指定分区号,通过数据的key取其hashCode来计算数据落到那个分区
  • 如果没有分区号,数据也不存在key,那么使用round-robin轮询来实现
package it.yuge;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class PartitionProducer {
    /**
     * kafka生成数据
     * @param args
     */
    public static void main(String[] args) throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("acks", "all"); //消息确认机制
        props.put("retries", 0); //消息发送失败后重试次数
        props.put("batch.size", 16384); //处理一批数据大小
        props.put("linger.ms", 1); //消息每天都进行确认
        props.put("buffer.memory", 33554432); //缓冲区的大小
        //指定k和v序列化类StringSerializer
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        
        //匹配自定义分区类
        props.put("partitioner.class","it.yuge.MyPartition")
        
        //获取kafkaProduce这个类
        Producer<String,String> kafkaProducer = new KafkaProducer<>(props);

        //使用循环发送消失
        for (int i = 0; i < 100; i++) {
            
            //第一种分区策略:即没有指定分区号,又没有指定数据的key,那么使用轮询的方式将数据均匀的发送到不同的分区里面去
            ProducerRecord<String, String> producerRecord1 = new ProducerRecord<>("mypartition", "message" + i);
            //第二种分区策略:没有指定分区号,指定了数据的key,通过key.hashCode % numPartition来计算数据会落到那个分区
            ProducerRecord<String, String> producerRecord2 = new ProducerRecord<>("mypartition", "mykey", "mymessage" + i);
            //第三种分区策略:如果指定了分区号,那么就会将数据直接写入到对应的分区里面去
            ProducerRecord<String, String> producerRecord3 = new ProducerRecord<>("mypartition", 0, "mykey", "mymessage" + i);
            
            //自定义分区
            ProducerRecord<String, String> producerRecord4 = new ProducerRecord<>("mypartition", 0, "mykey", "mymessage" + i);

            kafkaProducer.send(producerRecord1);//向test这个topic发送messagei这这个信息
        }
        //关闭资源
        kafkaProducer.close();
    }
}

自定义分区类

package it.yuge;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyPartition implements Partitioner {
    //这个方法就是确定分区数据到哪一个分区里面去
    //直接return 2 表示将数据写入到2号分区里面去
    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

3.消费者代码

cunsumerAPI文档

  • offsit:记录了消息消费到了那一条,下一次来的时候,我们继续从上一次的记录接着消费
  • 自动提交
  • 手动提交

(1)自动提交offset

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class MyConsumer {
    /**
     * 自动提交offset
     * @param args
     */
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("group.id", "test_group"); //消费组
        props.put("enable.auto.commit", "true"); //允许自动提交
        props.put("auto.commit.interval.ms", "1000"); //自动提交的间隔时间
        props.put("session.timeout.ms", "30000"); //超时时间
        //指定k和v的反序列化类StringDeserializer
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //指定消费那个topic里面的数据
        consumer.subscribe(Arrays.asList("test"));
        //使用死循环来消费test这个topic里面的数据
        while (true) {
            //records是所有拉取到的数据
            ConsumerRecords<String, String> records = consumer.poll(1000); //1000毫秒没拉到数据就认为超时
            for (ConsumerRecord<String, String> record : records) {
                long offset = record.offset();
                String value = record.value();
                System.out.println("消息的offset值为:"+offset+"消息的内容是:"+value);
            }
        }
    }
}

(2)手动提交offset

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class manualConsumer {
    /**
     * 实现手动提交offset
     * @param args
     */
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("group.id", "test_group");
        props.put("enable.auto.commit", "false"); //禁用自动提交offset,后期我们手动提交offset
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        
        //消费者订阅test这个topic
        consumer.subscribe(Arrays.asList("test"));
        
        final int minBatchSize = 100;//达到100条进行批次处理,处理完成后提交offset
        //定义一个集合,用于存储我们的ConsumerRecord(拉取的数据对象)
        List<ConsumerRecord<String, String>> consumerRecordList = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> consumerRecords1 = consumer.poll(1000);
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords1) {
                consumerRecordList.add(consumerRecord); //拉取的一批批数据往集合中存储
                if (consumerRecordList.size() >= minBatchSize) {
                    //如果集合当中的数据大于等于200条,我们批量进行一个处理
                    //将这一批次的数据保存到数据库里面
                    //insertTODb(consumerRecordList);//jdbc-伪代码

                    //提交offset,表示这一批次的数据全部都处理完了
                    //consumer.commitAsync(); //异步提交offset值,异步提交效率更高,不会阻塞代码的执行.

                    //同步提交offset值,同步是一个进入提交就上锁,其他等待,以保障线程安全,但是判断锁,释放锁线程效率低下
                    consumer.commitSync();
                    System.out.println("提交完成");
                    //清空集合数据
                    consumerRecordList.clear();
                }
            }
        }
    }
}

(3)处理完每个分区里面的数据之后,然后就进行一次提交(相比上面两种方式数据更安全)

package it.yuge;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.util.*;

public class ConmsumerPartition {
    /**
     * 处理完每一个分区里面数据,就马上提交这个分区里面的数据
     * @param args
     */
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("group.id", "test_group");
        props.put("enable.auto.commit", "false"); //禁用自动提交offset,后期我们手动提交offset
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        kafkaConsumer.subscribe(Arrays.asList("mypartition"));
        while (true){
            //通过while true消费数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(1000);
            //获取mypartition这个topic里面所有的分区
            Set<TopicPartition> partitions = consumerRecords.partitions();

            //循环遍历每一个分区里面数据,然后将每一个分区里面的数据进行处理,处理完成后再进行提交
            for (TopicPartition partition : partitions) {
                //获取每一个分区里面的数据
                List<ConsumerRecord<String, String>> records = consumerRecords.records(partition);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.value()+"==="+record.offset());
                }
                //获取我们的分区里面最后一条数据的offset,表示我们已经消费到了这个offset了
                long offset = records.get(records.size() - 1).offset();

                //提交offset,使用Collection创建一个线程安全的map集合
                //提交我们offset,并且给offset值加1,表示我们从下沉没有消费的那一条数据开始消费
                kafkaConsumer.commitSync(Collections.singletonMap(partition,new OffsetAndMetadata(offset+1)));
            }
        }
    }
}

(4)指定消费topic当中某些分区的数据

package it.yuge;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerSomePartition {
    //实现消费一个topic里面某些分区的数据
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node01:9092");
        props.put("group.id", "test_group");
        props.put("enable.auto.commit", "true"); //禁用自动提交offset,后期我们手动提交offset
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //获取kafkaConsumer
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(props);
        
        //通过consumer订阅某一个topic,进行消费,会消费topic里面所有的分区的数据
        //kafkaConsumer.subscribe();
        
        //通过调用assian发法实现消费mypartition这个topic里面0号和1号分区里面的数据
        TopicPartition topicPartition1 = new TopicPartition("mypartition", 0);
        TopicPartition topicPartition2 = new TopicPartition("mypartition", 1);
        kafkaConsumer.assign(Arrays.asList(topicPartition1,topicPartition2));
        
        while (true){
            ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
            //得到一条条的数据redcord
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("数据值为"+record.value()+"偏移量为:"+record.offset());
            }
        }
    }
}

5.kafka Streams API开发

使用场景:
解决这样的需求:使用StreamAPI获取test这个topic当中的数据,然后将数据全部转为大写,写入到test2这个topic当中去
在这里插入图片描述

(1)创建一个topic

cd /export/servers/kafka_2.11-0.10.0.0/
bin/kafka-topics.sh --create  --partitions 3 --replication-factor 2 --topic test2 --zookeeper node01:2181,node02:2181,node03:2181

–create表示创建
–partition 3 表示有三个分区
–replication-factor 2 表示有两个副本
–topic test2 表示topic名字叫test2
–zookeeper 指定我们zookeeper的连接地址

(2)开发StreamAPI

public class StreamAPI {
	//通过StreamAPI实现将数据从test里面读取出来,写入到test2里面去
    public static void main(String[] args) {
    	//封装配置信息的方法
        Properties props = new Properties();
        //put一些参数
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");//应用id名称
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092");//指定kafka连接地址
        //数据序列化反序列化
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

		//获取核心类KStreamBuilder
        KStreamBuilder builder = new KStreamBuilder();
        //通过KStreamBuilder调用stream方法,表示从那个topic当中获取数据
        //调用maoValues方法,表示将每一行value都给取出来,做map映射
        //.to("test2")将转成大写的数据写到test2这个topic当中去
        builder.stream("test").mapValues(line -> line.toString().toUpperCase()).to("test2");
        //通过KStreamBuilder和Properties(所有配置文件),来创建KafkaStreams,通过KafkaStreams来实现流式编程的启动
        KafkaStreams streams = new KafkaStreams(builder, props);
        //调用start启动kafka的流API
        streams.start();
    }
}

(3)生产数据

//node01执行以下命令,向test这个topic当中生产数据
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic test

(4)消费数据

//node02执行一下命令消费test2这个topic当中的数据
cd /export/servers/kafka_2.11-0.10.0.0
bin/kafka-console-consumer.sh --from-beginning  --topic test2 --zookeeper node01:2181,node02:2181,node03:2181
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

大数据组件-Kafka的javaAPI操作,Kafka StreamingAPI开发, 的相关文章

随机推荐

  • 直方图均衡化

    https www zhihu com question 37204742 answer 221844779 https zhuanlan zhihu com p 32857009
  • 明文传输漏洞

    业务系统对用户口令等机密信息的保护不足 攻击者可以利用攻击工具 从网络上窃取合法用户的口令数据 从而登录系统执行非法操作 攻击者可以利用监听工具在网络中窃取合法用户的口令数据 从而非法获取系统的访问权限 检测方法 通过burpsuite工具
  • OD-数列还原(python)

    数列还原 题目描述 有一个数列A n 从A 0 开始每一项都是一个数字 数列中A n 1 都是A n 的描述 其中A 0 1 规则如下A 0 1A 1 11 含义其中A 0 1是1个1 即11 表示A 0 从左到右连续出现了1次1A 2 2
  • C语言/C++基础之跨年烟花程序代码(附源码)

    C语言 C 基础之跨年烟花程序代码 程序之美 前言 主体 运行效果 代码实例 结束语 程序之美 前言 元旦将至 新年将至 转眼间2022年即将过去 崭新的一年正向我们缓缓走来 风花雪夜新年临近 入冬寒意随风吹进 繁星点点缀满天际 黎明晨阳元
  • 修改jar包中的class文件

    需求及准备 需求 现在有一个 jar文件 要修改其中某个文件的代码 准备 确保JRE已安装且环境变量已配置 安装Java Decompiler 官方地址为 http java decompiler github io 选择其中的JD GUI
  • Spring循环依赖源码debug详解

    1 什么是循环依赖 在Spring里 指两个或多个bean互相依赖 比如有两个Bean A B A中注入B B中注入A 这样就形成了循环依赖 Spring默认是支持循环依赖的 本文我们就从Spring源码层面对循环依赖进行分析 2 环境构建
  • Node.js学习四(文件流stream)

    文章目录 前言 一 Node处理缓存的方式 二 什么是Node js Stream 流 三 stream 流 的类型 四 创建可读流 五 拷贝文件 六 链式流 1 压缩文件 2 解压文件 前言 在我们学过fs模块后 可以知道读取文件时采用r
  • 12 shell命令之打包

    昨晚写的awk 说实话 对我而言 那是一个最复杂的命令 写得不是很好 可能在结构组织上面有很大的问题 后续有心得会再调整修改 本文将介绍linux的一组打包命令 这其中有我们最常用的tar 也有我们几乎没有见过的mksquansh 接下来就
  • 简明SQL截断和偏移指南:掌握LIMIT实现数据筛选

    以下是用到的表 截断 LIMIT 用于限制查询结果返回的行数 即最多返回多少行数据 例如 返回前两行数据 例如 从第二个数据开始返回两条数据 从0开始计算 偏移 OFFSET 用于指定查询结果的起始位置 即从结果集中的第几行开始返回数据 例
  • Spring使用——通过配置类注入Bean

    配置类 Configuration 告诉spring这是一个配置类 ComponentScan value指定要扫描的包 Filter excludeFilters default 扫描的时候按照规则排除哪些 ComponentScan v
  • App自动化测试 —— Appium的使用

    目录 简介 安装 配置 Run 问题 解决方案 优点 缺点 总结 简介 Appium是一个开源测试自动化框架 用于原生 混合和移动 Web 应用程序 安装 Appium安装方式有两种 一种是通过npm命令行安装 另一种是通过安装可视化工具
  • 华为OD机试真题 Java 实现【查找单入口空闲区域】【2022 Q4 100分】,附详细解题思路

    目录 一 题目描述 二 输入描述 三 输出描述 四 解题思路 五 Java算法源码 六 效果展示 1 输入 2 输出 3 说明 一 题目描述 给定一个 m x n 的矩阵 由若干字符 X 和 O 构成 X 表示该处已被占据 O 表示该处空闲
  • c3p0 mysql 自动重连_C3P0官方对于MySQL8小时问题的解决方案

    前一段时间在做一个发邮件的程序 程序是用定时器 每晚凌晨定时发邮件 邮件内容需要从数据库中获取 运行了一天就出问题了 问题信息如下 com mysql jdbc exceptions jdbc4 CommunicationsExceptio
  • 计算机系统基础课程实验课bomb--phase_1

    首先呢 栈顶减8个字节 然后将 0x402400放入第二个参数以调用
  • 如何打造一个高效的研发团队

    互联网公司的成功很大一部分归结为人才储备 如何打造有活力 持续创新的研发团队 相信很多管理者都比较关心 下面我们从业务支撑 技术架构 团队建设这几个方面做简单剖析 业务支撑 快速发展已经成为互联网公司的一种常态 那么在这么快的节奏下 如何快
  • python自动化赚钱-薅羊毛

    1 目 标 场 景 最近 有一个朋友告诉我 她在某平台上购买了一部手机 收到货之后发现商品质量挺好的 价格也不贵 临了随手给了个好评 商家最后还给她发一个 小红包 她把这个商品分享给了我 本篇文章的目的是利用Python 自动化完成商品购买
  • 好用的vscode vue3插件

    可能不全 但是是自己用的比较舒服的配置 包括高亮 class类名提示 引用跳转 模板快速创建 插件列表 1 别名路径跳转 2 Atom One Light Theme 3 Auto Rename Tag 4 Chinese Simplifi
  • 在R语言中使用stress.labels参数为可视化图像中的强调线添加标签信息

    在R语言中使用stress labels参数为可视化图像中的强调线添加标签信息 在数据可视化中 我们经常需要突出显示某些线条或数据点 以便更清楚地传达信息 在R语言中 我们可以使用stress labels参数来为图像中的强调线添加标签信息
  • 14.学习Camera之——camera基本知识

    一 Camera模组 大家都知道 手机背面的那个小小的孔 就叫摄像头 这个小孔幽幽的泛着光泽 深邃又迷人 如同 一个含苞待放的小萝莉一样 这个小萝莉还是个傲娇娘 像零之使魔的614一样惹人怜爱 而且在小萝莉身体 里面 不对 是在小孔的里面
  • 大数据组件-Kafka的javaAPI操作,Kafka StreamingAPI开发,

    1 KafkaJavaApi操作 1 添加maven依赖