kafka中partition数量与消费者对应关系

2023-11-17

   kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。

   kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一般情况下partition-0存储a,c,e3个数据,partition-1存储b,d,f另外3个数据。

消费者组数量的不同以及partition数量的不同对应着不同的消费情况,下面分别进行梳理之:

1、单播模式,只有一个消费者组

  (1)topic只有1个partition,该组内有多个消费者时,此时同一个partition内的消息只能被该组中的一个consumer消费。当消费者数量多于partition数量时,多余的消费者是处于空闲状态的,如图1所示。topic,test只有一个partition,并且只有1个group,G1,该group内有多个consumer,只能被其中一个消费者消费,其他的处于空闲状态。

图一
图一

2)该topic有多个partition,该组内有多个消费者,比如test 有3个partition,该组内有2个消费者,那么可能就是C0对应消费p0,p1内的数据,c1对应消费p2的数据;如果有3个消费者,就是一个消费者对应消费一个partition内的数据了。图解分别如图2,图3.这种模式在集群模式下使用是非常普遍的,比如我们可以起3个服务,对应的topic设置3个partiition,这样就可以实现并行消费,大大提高处理消息的效率。
在这里插入图片描述
图二
在这里插入图片描述
图三

2、广播模式,多个消费者组

如果想实现广播的模式就需要设置多个消费者组,这样当一个消费者组消费完这个消息后,丝毫不影响其他组内的消费者进行消费,这就是广播的概念。

(1)多个消费者组,1个partition

该topic内的数据被多个消费者组同时消费,当某个消费者组有多个消费者时也只能被一个消费者消费,如图4所示:
在这里插入图片描述
图四

(2)多个消费者组,多个partition

该topic内的数据可被多个消费者组多次消费,在一个消费者组内,每个消费者又可对应该topic内的一个或者多个partition并行消费,如图5:
在这里插入图片描述
3、Java实践

这里使用Java服务进行实践,模拟2个parition,然后同一个组内有2个消费者的情况:

首先创建一个发送消息的controller方法:

 @ApiOperation(value = "向具有kafka-2个partition的topic发送信息")
    @RequestMapping(value = "/testSendMessage2", method = RequestMethod.POST)
    public String testSendMessage(@RequestParam("msg") String msg) {
        KafkaTemplate.send(KafkaTopicEnum.TEST_TWO_PARTITION_MSG.code,msg);
        System.out.println("发送的消息是:"+msg);
        return "2个partition的topic数据!--ok";
    }

然后再创建一个监听类监听该topic,这里的监听类即为消费者。

 /**
     * @date 2020-09-24
     * 两个partition的topic,同一个组的两个消费者就可以并行的消费了,需要kafka也是集群才行,单机版并不支持
     * @param consumerRecord
     * @param acknowledgment
     */
    @KafkaListener(topics = "two-partition-msg",groupId ="serverGroup1",containerFactory = "ackContainerFactory")
    public void receiveKafkaTwoParMsg(ConsumerRecord<?,?> consumerRecord, Acknowledgment acknowledgment){
        InetAddress address = null;
        try {
            address = InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        System.out.println("当前的IP地址是:"+address.getHostAddress());
        System.out.println("监听服务A-收到的消息是::");
        System.out.println(consumerRecord.value().toString());
        System.out.println("=================== end =================");
//        ack 提交掉,避免服务重启再次拉取到消息
        acknowledgment.acknowledge();
    }

然后我们给该服务起2个实例,即模拟该组内serverGroup1内的2个消费者,然后我们使用测试方法进行测试,向该topic内发送多个消息,观察2个实例的输出日志:

  1.  实例1:    
    
    发送的消息是:111
    当前的IP地址是:10.244.3.114
    监听服务A-收到的消息是::
    “111”
    =================== end =================
    发送的消息是:222
    发送的消息是:333
    当前的IP地址是:10.244.3.114
    监听服务A-收到的消息是::
    “333”
    =================== end =================
    发送的消息是:444
    发送的消息是:555
    当前的IP地址是:10.244.3.114
    监听服务A-收到的消息是::
    “555”
    =================== end =================
    发送的消息是:666
    发送的消息是:777
    当前的IP地址是:10.244.3.114
    监听服务A-收到的消息是::
    “777”
    =================== end =================
    发送的消息是:888
    发送的消息是:999
    当前的IP地址是:10.244.3.114
    监听服务A-收到的消息是::
    “999”
    实例2:
    当前的IP地址是:10.244.0.237
    监听服务A-收到的消息是::
    “222”
    =================== end =================
    当前的IP地址是:10.244.0.237
    监听服务A-收到的消息是::
    “444”
    =================== end =================
    当前的IP地址是:10.244.0.237
    监听服务A-收到的消息是::
    “666”
    =================== end =================
    当前的IP地址是:10.244.0.237
    监听服务A-收到的消息是::
    “888”
    发现该组内的一个消费者消费到了111,333,555,777,999 ,另外一个消费者消费到了222,444,666,888,起到了均衡消费的效果。

所以在微服务的集群中,我们可以通过给topic设置多个partition,然后让每一个实例对应消费1个partition的数据,从而实现并行的处理数据,可以显著地提高处理消息的速度。

4、 使用kafkaManager为topic增加partition数量
在这里插入图片描述
1)首先点击 Add Partitions 增加partition的数量,然后点击Generate Partition Assignments ,此时系统自动会为每个分区下的副本分配broker, 最后点击 Reassign Partitions,可以平衡集群的负载

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

kafka中partition数量与消费者对应关系 的相关文章

  • 如何优雅地结束 spring @Schedule 任务?

    我正在尝试让 Spring Boot 服务优雅地结束 它有一个方法 Scheduled注解 该服务使用 spring data 作为数据库 使用 spring cloud stream 作为 RabbitMQ 在计划的方法结束之前 数据库和
  • spring中rabbitmq监听器的异常处理

    使用spring 我是rabbitmq的新手 我想知道我错在哪里 我编写了一个rabbitmq连接工厂和一个包含侦听器的侦听器容器 我还为侦听器容器提供了错误处理程序 但它似乎不起作用 我的春豆
  • amqp 与 amqplib - 哪个 Node.js amqp 客户端库更好?

    这些 amqp 客户端库之间有什么区别 哪一款最值得推荐 主要区别是什么 我会推荐amqp node https github com squaremo amqp node and bramqp https github com bakke
  • 没有连接的 AMQP/RabbitMQ 通道什么时候会死亡?

    我有一个简单的 RabbitMQ 测试程序 随机将消息排队 另一个读取它们 所有这些都使用 Spring AMQP 如果消费者死亡 例如 在没有机会关闭其连接或通道的情况下终止进程 则它尚未确认的任何消息似乎将永远保持未确认状态 我看过很多
  • 如何根据条件限制并发消息消耗

    场景 我已经简化了事情 许多最终用户可以从前端 Web 应用程序 生产者 开始工作 繁重的工作 例如渲染大型 PDF 这些作业被发送到单个持久的 RabbitMQ 队列 许多工作应用程序 消费者 处理这些作业并将结果写回到数据存储中 这个相
  • 在 Red Hat 上安装 RabbitMQ - 错误的 Erlang 版本

    我正在尝试按照以下说明在 Red Hat Enterprise Linux 7 64 位工作站版本 的评估虚拟机上安装 RabbitMQhttps www rabbitmq com install rpm html https www ra
  • 使用AWS SQS作为Aurora数据库的写入队列来提高系统性能是否有效

    我正在 AWS 上开发一个 Web 应用程序服务器 需要支持高吞吐量的读写 我的老板给了我这样的高级设计 我被困在 写入队列 上 团队告诉我 我们需要它来提高写入性能 因为我们只能有 1 个可以写入的主副本 我对 SQS 和 RabbitM
  • Akka 的语言和产品替代品是什么?

    现在我正在看游戏框架 https www playframework com 并且非常喜欢它 Play 中提供的功能中最受宣传的部分之一是Akka http akka io 为了更好地理解 Akka 以及如何正确使用它 您能告诉我其他语言或
  • Spring AMQP Java 客户端中的队列大小

    我使用 Spring amqp 1 1 版本作为我的 java 客户端 我有一个大约有 2000 条消息的队列 我想要一个服务来检查这个队列大小 如果它是空的 它会发出一条消息说 所有项目已处理 我不知道如何获取当前队列大小 请帮忙 我用谷
  • 列出与rabbitmq java客户端API交换的绑定

    我似乎在文档中找不到任何信息 所以我想知道是否可以通过某种方式使用 java RabbitMQ API 获取与交换相关的所有绑定 我在查询 api bindings 时正在寻找类似 http api 结果的内容 api definition
  • 每次发布后我应该关闭通道/连接吗?

    我在 Node js 中使用 amqplib 但我不清楚代码中的最佳实践 基本上 我当前的代码调用amqp connect 当 Node 服务器启动时 然后为每个生产者和每个消费者使用不同的通道 而不会真正关闭它们中的任何一个 我想知道这是
  • RabbitMQ 启动失败

    RabbitMQ Windows 服务将无法启动 C Program Files x86 RabbitMQ Server rabbitmq server 3 0 4 sbin gt rabbitmq service bat start C
  • Celery 任务状态取决于 CELERY_TASK_RESULT_EXPIRES

    据我所知 任务状态完全取决于 CELERY TASK RESULT EXPIRES 设置的值 如果我在任务完成执行后检查此间隔内的任务状态 则返回的状态为 AsyncResult task id state 是正确的 如果没有 状态将不会更
  • 多个队列在一个通道中消耗

    我使用rabbitMq 来管理和使用队列 我有多个队列 它们的数量并不具体 我使用直接交换来发布消息 我怎样才能仅使用一个队列来消费每个队列的所有消息 基于routing key 渠道 此时我假设我有 5 个队列 我使用了 for 循环并为
  • 使用 Celery(RabbitMQ、Django)检索队列长度

    我在 django 项目中使用 Celery 我的代理是 RabbitMQ 我想检索队列的长度 我浏览了 Celery 的代码 但没有找到执行此操作的工具 我在 stackoverflow 上发现了这个问题 从客户端检查 RabbitMQ
  • 如何使用 Celery、RabbitMQ 和 Django 确保每个用户的任务执行顺序?

    我正在运行 Django Celery 和 RabbitMQ 我想要实现的是确保与一个用户相关的任务按顺序执行 具体来说 一次执行一个 我不希望每个用户执行任务并发 每当为用户添加新任务时 它应该取决于最近添加的任务 如果此类型的任务已为此
  • RabbitMQ Java 客户端自动重新连接

    当我的应用程序失去与 RabbitMQ 的连接时 我将其连接工厂设置为自动尝试并重新连接 ConnectionFactory factory new ConnectionFactory factory setUsername usernam
  • 如何重置rabbitmq管理用户

    使用rabbitmq 我们可以安装管理插件 然后我们通过浏览器访问http localhost 55672 使用访客 访客 问题是 我无法再登录 因为我更改了密码并为角色输入了空白 有没有办法重置rabbitmq管理的用户 您可以通过以下方
  • RabbitMQ:如何创建和恢复备份

    我是 RabbitMQ 的新手 我需要一些帮助 如何备份和恢复到RabbitMQ 以及我需要保存哪些重要数据 谢谢 如果您安装了管理插件 您可以在Overview页 在底部你会看到导入 导出定义您可以使用它来下载代理的 JSON 表示形式
  • Rabbit mq - 等待 Mnesia 表时出错

    我已经在 Kubernetes 集群上使用 Helm Chart 安装了 RabbitMQ rabbitmq pod不断重新启动 在检查 pod 日志时 我收到以下错误 2020 02 26 04 42 31 582 warning lt

随机推荐

  • 金山WPS笔试题总结

    第一题 分别输出什么 var arr arr a 1 console log arr length 0 arr 2 2 console log arr length 3 arr length 0 console log arr a 1 这题
  • Mac安装protobuf 流程

    1 brew安装protobuf 1 安装brew ruby e curl fsSL https raw githubusercontent com Homebrew install master install 2 使用brew安装pro
  • 【超详细】记录从零开始学mmdetection(一)

    一 环境配置 本专题是为了记录学习mmdetection的过程 包括mmdetection的配置 代码的讲解 如何使用mmdetection训练自己的数据集 本节只记录第一部分 环境配置过程 本专题主要是在Linux下配置的 因为mmdet
  • Vue3.0开发之整合vue-admin-template模板

    起源 vue admin template模板算是一个比较好的前端开发模板 不过作者好像没有出vue3 0版本的 所以刚好自己在学习vue3 0 就想到开发一个vue3 0的模板 当然大部分代码都是参照vue admin template模
  • Threejs入门教程

    一 本地搭建Threejs官方文档网站 1 官网地址 Github https github com mrdoob three js 我们可以直接下载压缩包并解压或使用 git clone
  • python批量提取视频帧

    python批量提取视频帧 python批量提取视频帧 两种提取方式 按帧数提取 每个视频提取固定帧数 若所取帧数超过视频总帧数 则截取视频所有帧 按时间间隔提取 每个time提取一帧 1 使用示例 python video set py
  • 平衡小车设计_4_PID实现

    平衡小车设计 4 PID实现 参考平衡之家的算法实现 首先明确三个环的PID都是位置式PID 1 角度环 PD g tPidA actual roll g tPidA err g tPidA actual g tPidA set g tPi
  • Ubuntu20.04 搭建repo + gitlab的代码管理系统

    Ubuntu20 04 搭建repo gitlab的代码管理系统 1 为什么要用gitlab GitLab 是一个用于仓库管理系统的开源项目 使用Git作为代码管理工具 并在此基础上搭建起来的Web服务 安装方法是参考GitLab在GitH
  • 算法基础:k最近邻算法

    本博客所有内容均整理自 算法图解 欢迎讨论交流 了解过机器学习这个概念 一定知道有一种名为k最近邻的算法 简称KNN 对于k最近邻算法的定义 百度百科是这样给出的 K最近邻 k Nearest Neighbor KNN 分类算法 是一个理论
  • 异常类的定义、种类、抛出、声明和捕获

    目录 异常类定义 异常处理的必要性 异常处理 Java常见异常种类 1 Error 2 Exception 1 运行异常类Runtime Exception 2 非运行异常类Non RuntimeException 常见异常类 抛出异常th
  • 大话oracle rac 集群,大话Oracle RAC:集群、高可用性、备份与恢复

    第1部分 集群理论篇 第1章 RAC初体验 2 1 1 本书使用环境 3 1 1 1 硬件环境 3 1 1 2 软件环境 4 1 1 3 本书使用的环境 6 1 2 如何在PC机上搭建RAC环境 8 1 2 1 需要下载的软件 8 1 2
  • 拒绝服务攻击 - 学习笔记

    拒绝服务攻击 学习笔记 前言 概述 拒接服务攻击是目前来说 较为难以防御的攻击方式之一 其防御的难点在于难以分辨 正常用户与恶意用户 同时 随着攻击模式的进步 花样也越来越多 但是 服务器方也升级了安全策略 传统上 单机的 Dos 攻击已经
  • java设计6大原则总结

    1 开闭原则 Open Close Principle 定义 一个软件实体如类 模块和函数应该对扩展开放 对修改关闭 开放 封闭原则的意思就是说 你设计的时候 时刻要考虑 尽量让这个类是足够好 写好了就不要去修改了 如果新需求来 我们增加一
  • 【亲测有效新手教程】Vscode连接远程服务器编辑并运行深度学习代码

    文章目录 前言 使用步骤 1 安装OpenSSH 2 安装Remote SSH 参考链接 前言 在服务器上通过桌面已经配置好了深度学习的环境 并且存放了相关代码以及数据集 之前使用向日葵连接服务器进行代码编写 使用下来发现有延迟 不够丝滑
  • Kotlin如何提供与Java的100%互操作性?

    Kotlin与Java可以100 互操作 当您在每篇博客文章 社区话题或YouTube上首次搜索有关Kotlin的信息时 如果该话题是关于Kotlin的 那么这个词肯定在每个人的名单上都表明Kotlin可与Java 100 互操作 让我们看
  • 几个值得研究的工具

    一 DLIB库 C 实现的机器学习算法库 有离线的人脸识别SDK 比openCV里的人脸识别库效果好 除了这个库 要找到合适的离线版本的人脸识别库很难 国内的阿里 腾讯 百度 都没有离线SDK 都是在线的 二 KEEL 工具 是一款开源的数
  • php使用smtp发送邮件(ssl链接方式)

    在这里我要介绍的是如何使用smtp进行发送邮件 分别介绍了普通链接和ssl链接两种方式 一 准备材料 smtp类下载地址 https download csdn net download panjiapengfly 10688054 二 代
  • 甲骨文发布适用于 MongoDB 的 Oracle Database API;Chrome 和 Edge 互相“拉踩”;树莓派驱动程序现可在 Android 上运行

    整理 宋彤彤 责编 屠敏 开源吞噬世界的趋势下 借助开源软件 基于开源协议 任何人都可以得到项目的源代码 加以学习 修改 甚至是重新分发 关注 开源日报 一文速览国内外今日的开源大事件吧 一分钟速览新闻点 倪光南 可适当聚焦 RISC V
  • C++随机数之default_random_engine

    头文件 include
  • kafka中partition数量与消费者对应关系

    kafka是由Apache软件基金会开发的一个开源流处理平台 kafka是一种高吞吐量的分布式发布订阅消息系统 它可以处理消费者在网站中的所有动作流数据 kafka中partition类似数据库中的分表数据 可以起到水平扩展数据的目的 比如