Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡)

2023-10-29

一、RoundRobin 分区分配策略原理

  • RoundRobin 针对集群中所有Topic而言。
  • RoundRobin 轮询分区策略,是把所有的 partition 和所有的consumer 都列出来,然后按照 hashcode 进行排序,最后通过轮询算法来分配 partition 给到各个消费者。
    在这里插入图片描述

二、RoundRobin分区分配策略代码案例

2.1、创建带有7个分区的sixTopic主题

  • 在 Kafka 集群控制台,创建带有7个分区的sixTopic主题

    bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 7 --replication-factor 1 --topic sixTopic
    

    在这里插入图片描述

2.3、创建三个消费者 组成 消费者组

  • 复制 CustomConsumer1类,创建 CustomConsumer2和CustomConsumer3。这样可以由三个消费者组成消费者组,组名都为“test1”,设置分区分配策略为 RoundRobin。

    package com.xz.kafka.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;
    
    public class CustomConsumer1 {
    
        public static void main(String[] args) {
    
            // 0 配置
            Properties properties = new Properties();
    
            // 连接 bootstrap.servers
            properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            // 反序列化
            properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
            // 配置消费者组id
            properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test1");
            // 设置分区分配策略
            properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
    
            // 1 创建一个消费者  "", "hello"
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
    
            // 2 订阅主题 sixTopic
            ArrayList<String> topics = new ArrayList<>();
            topics.add("sixTopic");
            kafkaConsumer.subscribe(topics);
    
            // 3 消费数据
            while (true){
    
                ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
    
                for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                    System.out.println(consumerRecord);
                }
            }
        }
    }
    

2.3、创建生产者

  • 创建CustomProducer生产者。

    package com.xz.kafka.producer;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;
    
    public class CustomProducerCallback {
    
        public static void main(String[] args) throws InterruptedException {
    
            //1、创建 kafka 生产者的配置对象
            Properties properties = new Properties();
    
            //2、给 kafka 配置对象添加配置信息:bootstrap.servers
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");
    
            //3、指定对应的key和value的序列化类型 key.serializer value.serializer
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
    
            //4、创建 kafka 生产者对象
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
            //5、调用 send 方法,发送消息
            for (int i = 0; i < 200; i++) {
                kafkaProducer.send(new ProducerRecord<>("sixTopic", "hello kafka" + i), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){
                            System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());
                        }
                    }
                });
                Thread.sleep(2);
            }
    
            // 3 关闭资源
            kafkaProducer.close();
        }
    }
    
    

2.4、测试

  • 首先,在 IDEA中分别启动消费者1、消费者2和消费者3代码
    在这里插入图片描述

  • 然后,在 IDEA中分别启动生产者代码
    在这里插入图片描述

  • 在 IDEA 控制台观察消费者1、消费者2和消费者3控制台接收到的数据,如下图所示:

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

2.5、RoundRobin分区分配策略代码案例说明

  • 由上述测试输出结果截图可知: 消费者1消费1、4分区的数据;消费者2消费2和5分区的数据;消费者3消费0、3、6分区的数据。
  • 说明:Kafka 采用修改后的RoundRobin分区分配策略。

三、RoundRobin 分区分配再平衡案例

3.1、停止某一个消费者后,(45s 以内)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 2、5号分区数据。
    在这里插入图片描述
  • 由下图控制台输出可知:3号消费者 消费到 0、3、6号分区数据。
    在这里插入图片描述

3.2、停止某一个消费者后,(45s 以后)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 1、3、5号分区数据。
    在这里插入图片描述

  • 由下图控制台输出可知:3号消费者 消费到 0、2、4、6号分区数据。
    在这里插入图片描述

3.3、RoundRobin 分区分配再平衡案例说明

  • 1号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
  • 消费者 1 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡) 的相关文章

  • 动态规划python实现

    什么叫动态规划问题 考虑一个场景 当你有去沙漠旅行 你有一个背包和一些物品 背包有最大承受重量 物品也有重量和价值 而物品种类很多 不可能全都装在背包里 如何去选取价值总量最高的物品组合呢 物品价值表 物品名 价值 water 10 boo

随机推荐

  • Scala基础介绍

    Scala是一门主要以Java虚拟机 JVM 为目标运行环境 并将面向对象和函数式编程有机的结合在一起 因为Scala运行于JVM上 所以Scala可以访问任何Java类库 并且能够与Java框架进行互操作 Scala既有动态语言的灵活简洁
  • Javacv在Windows下正常运行,在Linux上报异常~Could not initialize class org.bytedeco.javacv.FFmpegFrameGrabber

    1 问题描述 今天来分享一个违背Java跨平台的问题 在学习Java第一课老师肯定就是吹嘘Java如何强大 如何跨平台 如何一次编译 到处执行 本文就遇见了在本地windows环境开发没有问题 在Linux的服务器上运行各种异常 这不是有点
  • NodeJs session中间件 及应用(简单的登录与登出)

    session中间件用于为了保存用户数据提供一个session管理器 虽然session中的数据与cookie分开保存 但是session中的数据经过加密处理后默认保存在一个cookie中 因此 在使用session中间件之前必须使用cok
  • layui 中的一些问题

    使用layui 版本号2 4 5 在引入js css之后 1 水平导航栏二级菜单一直不能显示 解决办法 最后发现layui all js引入不能放
  • 有趣的数据结构算法2——快速排序

    有趣的数据结构算法2 快速排序 题目复述 题目分析 具体实现代码 GITHUB下载连接 题目复述 数据排序算法是一类常见算法 其适用范围深入编程的方方面面 常见的数据排序算法有冒泡排序 堆排序 简单选择排序等等 各个适用范围不同 快速排序由
  • 13-Error接口错误处理以及go Module

    Error接口和错误处理 Go语言中的错误处理和其他语言不一样 它把错误当成一种值来处理 更强调判断错误 处理错误 而不是一股脑的catch错误 error接口 Go语言中把错误当成一种特殊的值来处理 不支持其他语言的try catch 捕
  • 【matlab】常用快捷键汇总

    记录自己常用的快捷键 冲冲冲 命令行窗口的常用快捷键 上下光标键 调用Matlab最近使用过的历史命令 便于快速重新执行 编辑器的常用快捷键 Ctrl R 快速注释代码段 拖动鼠标选中需要注释的代码行 Ctrl T 快速取消注释代码段 ht
  • Linux命令之tftp常用参数说明

    tftp 文件传输 实际操作 put 上传 get 下载 mode 文件传输模式 connect 连接 quit 退出 参数 g 下载文件 p 上传文件 l 本地文件名 r 远程主机文件名 常用命令 tftp g r a 10 0 0 1
  • ROS2 驱动思岚G4雷达(ydlidar)- Rviz显示

    记录G4雷达的配置 系统环境为 Ubuntu22 04 配置步骤 1 安装雷达SDK 2 构建 G4 雷达 ROS2 项目工程文件 3 使用Rviz可视化界面显示 1 安装雷达SDK 1 1 安装CMake YDLidar SDK需要CMa
  • 从服务器上复制文件复制不出来的,从远程服务器上复制文件

    从远程服务器上复制文件 内容精选 换一换 问题描述 将在x86平台上经过sh加密的XXX sh x文件复制到基于鲲鹏的服务器中 无法执行 例如 将test sh x从x86平台复制鲲鹏平台云服务器执行 test sh x回显内容如下 问题原
  • 毕设分享javaWeb的网络考试系统的设计与实现

    文章目录 1 项目简介 2 实现效果 3 系统设计 4 关键代码 5 论文概览 6 最后 1 项目简介 Hi 各位同学好呀 这里是L学长 今天向大家分享一个今年 2022 最新完成的毕业设计项目作品 毕设分享javaWeb的网络考试系统的设
  • C#this关键字的四种用法

    用法一 this代表当前类的实例对象 namespace Demo public class Test private string scope 全局变量 public string getResult string scope 局部变量
  • sqli-labs:less-32(宽字节注入)

    宽字节注入 产生原因 1 mysql 在使用 GBK 编码的时候 会认为两个字符为一个汉字 例如 aa 5c 就是一个汉字 前一个 ascii 码大于 128 才能到汉字的范围 2 mysqli real escape string 函数转
  • Helix QAC — 软件静态测试工具

    Helix QAC 是Perforce 公司 原PRQA 公司 产品 主要用于C C 代码的完全自动化静态分析工作 可以提供编码规则检查 代码质量度量 软件结构分析 测试结果管理等功能 Helix QAC 能够全面而正确地发现软件中潜在的问
  • Jmeter系列(33)- 跨平台运行 Jmeter,CSV 文件路径如何设置?

    抛出问题 上一篇文章中详细讲解了 CSV 数据文件设置的用法 https www cnblogs com poloyy 通常 我们编写 调试脚本都是在 Window 机器上 而真正性能测试时 脚本几乎都在 Linux 下运行 使用 CSV
  • 图像处理中 光场(Light Field)简介及理解

    1 光场 Light Field 是一个四维的参数化表示 是空间中同时包含位置和方向信息的四维光辐射场 简单地说 涵盖了光线在传播中的所有信息 光线携带二维位置信息 u v 和二维方向信息 x y 在光场中传递 光场是光线在空间传播中四维的
  • 语法怎么学

    为什么要学非谓语动词 太常见了 非谓语动词可以做句子中 除了谓语的任何成分 是一个非常重要课题 弄懂了这个 就相当于弄懂了语法的90 特别是语法题 怎样学习 我在小破站上查了不少教程 大多数都很好 但不太适合新手 讲得很复杂 我看到一个视频
  • 【Bugly干货分享】手把手教你逆向分析 Android 程序

    很多人写文章 喜欢把什么行业现状啊 研究现状啊什么的写了一大通 感觉好像在写毕业论文似的 我这不废话 先直接上几个图 感受一下 第一张图是在把代码注入到地图里面 启动首页的时候弹出个浮窗 下载网络的图片 苍老师你们不会不认识吧 第二张图是微
  • Pygame实战:Python做一款超好玩的滑雪大冒险小游戏,超会玩【附源码】

    导语 冬日当然要和心爱的人一起去滑雪 徜徉在雪白的世界 浪漫又刺激 唯有爱和滑雪不可辜负 不但风景绝美 而且还超 会 玩 现在还不是时候 但秋天已过半动冬天还会远吗 既然不能现在去滑雪 但是小编可以先让大家身临其境 带大家做一款超好玩的滑雪
  • Kafka3.0.0版本——消费者(RoundRobin分区分配策略以及再平衡)

    目录 一 RoundRobin 分区分配策略原理 二 RoundRobin分区分配策略代码案例 2 1 创建带有7个分区的sixTopic主题 2 3 创建三个消费者 组成 消费者组 2 3 创建生产者 2 4 测试 2 5 RoundRo