spring-kafka并行度concurrency在应用集群部署(多个节点)正确设置,附上Kafka Manager监控效果

2023-11-06

我们都知道kafka有topic的概念,为了能够更好的支持水平扩容,topic又分了很多的Partition . 消费者负责消费Partition中的消息,一个Partition只能被一个消费者消费,但是一个消费者可以消费多个partition .
所以提升消费能力可以开多几个消费者来消费partition,从而提高系统性能。

spring.kafka.listener.concurrency就是spring-kafka组件用来开启消费者线程数的参数。应用在单机部署环境下,这个参数很好理解,你想要开几个相应设置几个就行,concurrency数不能大于partition数量,因为partition会尽量平均分配给消费者,多出的会再重新分配给某些消费者,即消费者消费的partition数量会不等。

以下为根据concurrency开启线程数的代码:
org.springframework.kafka.listener.ConcurrentMessageListenerContainer#doStart
在这里插入图片描述
最终容器container是KafkaMessageListenerContainer类,因为它实现了SmartLifecycle接口,所以会自动执行到上面的doStart方法,接着调用KafkaMessageListenerContainer.start方法,最终会调用这个类的doStart方法以下部分开启消费者线程。

this.listenerConsumerFuture = containerProperties
				.getConsumerTaskExecutor()
				.submitListenable(this.listenerConsumer);

下面介绍在应用集群部署环境下的concurrency正确配置方式。
案例:
某天晚上,有一个topic消息数猛增。两个小时内涨了500w+,导致kafka消息堆积。为了临时解决消费慢问题,想到加大concurrency并行度来提升。之前设置的是8,这个topic有24个partition,想让一个消费者处理一个,所以concurrency临时调整为24。接着让运维重启应用,查看消费情况,一顿操作猛如虎,最后我勒个去,还是堆积,并且堆积时间更长了。。。

后面想想,我们应用是集群部署的,不是只有一个节点,所以消费者线程数量= 8 * 节点数,远远高于partition=24,所以加大线程数没意义。反而需要更多CPU消耗,堆积时间变长就足以说明。
其中一个节点的线程情况:调整为24后的
在这里插入图片描述
所以这样开出来的线程是完全浪费且消耗资源的,应该按节点数来设置这个参数才是正确的做法。
假如节点=3,应该设置= 24 / 3 = 8 个就行了。

org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1 说明下其中0-3-C-1第一个0是用来区分不同topic的,假如有两个topic,那么就是0-x-C-1和1-x-C-1
线程名称设置代码如下:
org.springframework.core.task.SimpleAsyncTaskExecutor#doExecute

	protected void doExecute(Runnable task) {
		Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
		thread.start();
	}

为了验证这个观点,我自己本地也搭建了kafka集群。
设置了topic:hd-test-topic, 其中partition10个,我将spring.kafka.listener.concurrency设置为4

模拟应用集群部署,本地起两个端口的应用:
先起第一个,看到partition分配情况如下:
在这里插入图片描述
接着再启动另外一个节点:
在这里插入图片描述
在这里插入图片描述

明显看到之前设置的partition被撤销了,重新进行了分配,两个节点都成为了消费者。
所以证明上面观点是对的,在应用集群环境下concurrency要按应用部署节点数来设置。


以下部分为kafka的监控管理部分Kafka Manager
CMAK (Cluster Manager for Apache Kafka, previously known as Kafka Manager)
项目地址:Kafka Manager

要先下载cmak,我是下载当前最新版本的3.0.0.5 链接:CMAK
另外还要下载jdk11,zookeeper-3.5以上版本

配置好/cmak-3.0.0.5/conf/application.conf

kafka-manager.zkhosts="192.168.153.xxx:2181,192.168.153.xx:2182,192.168.153.xx:2183"
cmak.zkhosts="192.168.153.xx:2181,192.168.153.1xx:2182,192.168.153.1xx:2183"

先启动zk集群,然后启动kafka集群,最后再启动cmak。
kafka的zk集群配置好,在使用cmak过程中发生以下错误:

1.启动cmak后报错如下:
[error] k.m.a.c.BrokerViewCacheActor - Failed to get broker metrics for BrokerIdentity(1,192.168.153.1xx,9999,false,true,Map(PLAINTEXT -> 9092))

因为勾选了以下选项所以出现上面的错误
在这里插入图片描述
解决只需要修改kafka-server-start.sh 和 kafka-run-class.sh
kafka-server-start.sh加上export JMX_PORT=“9999”

if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
fi

kafka-run-class.sh
加上:-Djava.rmi.server.hostname=192.168.153.1xx

# JMX settings
if [ -z "$KAFKA_JMX_OPTS" ]; then
  KAFKA_JMX_OPTS="-Djava.rmi.server.hostname=192.168.153.1xx -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false  -Dcom.sun.management.jmxremote.ssl=false "
fi

Kafka Manager效果图:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

开始我选用CMAK1.3.3.18 版本发现Consumers consuming from this topic这部分显示不出来,后来换了最新版本的,很正常。
在这里插入图片描述

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

spring-kafka并行度concurrency在应用集群部署(多个节点)正确设置,附上Kafka Manager监控效果 的相关文章

  • 淘宝小程序数据流转

    目录 前言 一 淘宝小程序的数据流转 二 更新方法 1 由父及子 1 官方常见式 2 由子及父 反了老子 1 this page setData 3 全局绑定 总结 前言 最近做了好几个小程序 感觉自己review之前代码太难受了 数据和目
  • java基础之 IO 流(输入/出字符流)

    字符基流 FileReader FileWriter 代码示例 package IOTest import java io FileNotFoundException import java io FileReader import jav
  • Linux查看磁盘空间大小的命令

    1 查看磁盘空间大小的命令 df df命令用于查看磁盘分区上的磁盘空间 包括使用了多少 还剩多少 默认单位是KB 比如以下命令 df hl 执行的结果每列的含义 第一列Filesystem 磁盘分区 第二列Size 磁盘分区的大小 第三列U
  • 关机代码(强制关机)

    关机代码 很简单的一个代码 代码如下 from os import 库 system shutdown s t 10 强制关机 可以恶搞朋友 手动狗头 但是有点废朋友 滑稽
  • uni_app“一课一得”

    什么是uni app uni app是一个使用Vue js开发所有前端应用的框架 开发者编写一套代码 可发布到iOS Android Web 响应式 以及各种小程序 微信 支付宝 百度 头条 飞书 QQ 快手 钉钉 淘宝 快应用等多个平台
  • Unity InputSystem (一)

    什么是InputSystem InputSystem 是 2019 年 Unity 新推出的插件 输入系统包实现了一个系统来使用任何类型的输入设备来控制你的 Unity 内容 它旨在成为 Unity 的经典输入管理器更强大 更灵活 更可配置
  • C++调用类成员函数

    ifndef CLASS H define CLASS H class CanExtTxPDO t private unsigned char i public unsigned char buffer 10 CanExtTxPDO t u
  • 数据结构入门-二叉树篇(一)

    144 二叉树的前序遍历 给你二叉树的根节点 root 返回它节点值的前序遍历 示例 输入 root 1 null 2 3 输出 1 2 3 前序遍历是二叉树遍历的一种 首先访问根节点 然后遍历左子树 最后遍历右子树 可以记作根左右 解题思
  • PyTorch自然语言处理

    特点 展示如何使用基于 Python 的深度学习库 PyTorch 应用这些方法 演示如何使用 PyTorch 构建应用程序 探索计算图和监督学习范式 掌握 PyTorch 优化张量操作库的基础知识 概述传统的 NLP 概念和方法 学习构建
  • python将折线平滑为曲线

    目录 曲线的曲率介绍 平滑方法介绍 1 环境及模块介绍 2 代码示例 3 整体代码 曲线的曲率介绍 曲线的曲率 curvature 就是针对曲线上某个点的切线方向角对弧长的转动率 通过微分来定义 表明曲线偏离直线的程度 数学上表明曲线在某一
  • 编译器预定义总结.

    http sourceforge net p predef wiki Compilers ACC Type Macro Identification ACC Altium MicroBlaze C Type Macro Format Des
  • Faq about multimedia

    VCN Video Compression Networking Glossary This is a collection of often used and misused technical terms regarding video
  • 【基础知识】11、github上传本地代码

    第一步 下载git bash 下载链接 按步骤安装即可 第二步 配置git bash 一 输入ssh keygen t rsa C 24428078 qq com 获取钥匙 邮箱为github注册的邮箱 输入上述命令时注意空格 出现下面界面
  • bootstrap引入

  • pww区域连接特征提取算法

    主题思想 任何一个图像 肯定由多个或一个区域 每个区域在横向扫描时 会有分裂和合并 比如圆环 顶部有一个分裂点 底部有一个合并点 没有分裂合并的图形 就是简单的凸图像 很容易通过外形识别 而复杂的图像 就是凹的 就需要分裂合并点来识别 旋转
  • 动作捕捉(Motion Capture)文件BVH的解读笔记

    Bvh里面的JOINT 以及ROOT 都表示一个坐标空间 我们称之为关节坐标空间 在这个坐标空间里 它有下一级的子坐标空间 也就是下一级的JOINT 子坐标空间的原点位置由子JOINT的OFFSET字段指明 也就是说一个JOINT的OFFS
  • Linux系统内核文件Cache管理机制简介

    1 前言 自从诞生以来 Linux 就被不断完善和普及 目前它已经成为主流通用操作系统之一 使用得非常广泛 它与 Windows UNIX 一起占据了操作系统领域几乎所有的市场份额 特别是在高性能计算领域 Linux 已经成为一个占主导地位
  • 一文让前端搞懂shell编程

    概述 前端程序员有时会遇到部署项目的情况 有时需要看懂后台或者运维写的脚本 如果转型AI 数据分析和模型训练也经常用到shell编程 掌握shell编程 你的编程之路会越走越宽 shell 解析器 sudo cat etc shells t

随机推荐

  • Win7下使用U盘安装Ubuntu16.04双系统图文教程(亲测)

    安装步骤 1 下载Ubuntu 16 04镜像软件 2 使用ultraISO软件制作U盘启动盘 3 利用U盘启动盘来安装Ubuntu系统 4 使用EasyBCD创建启动系统启动引导 可以省略 5 重启系统即可 一 下载ubuntu16 04
  • 使用Semaphore 实现一个简单的限流器

    使用Semaphore 实现一个简单的限流器 java api Java的api中 提供了semaphore这个线程同步的辅助类 用来控制同时访问共享资源的线程数量 Semaphore提供的主要方法如下 void acquire 获取一个信
  • NUC10 i7 黑苹果Big Sur 11.4 + win10 双系统安装指南

    说明 硬件 Intel NUC 10代 i7 10710U 系统说明 Mac OS Big Sur 11 4 Windows 10 注 本文默认已经安装好windows 安装过程就和普通的单系统安装步骤一样 安装步骤略 其他说明 由于个人知
  • 协同过滤与矩阵分解

    协同过滤算法的基本原理 用户行为数据是推荐系统最常用 也是最关键的数据 用户的潜在兴趣 用户对物品的评价好坏都反映在用户的行为历史中 而协同过滤算法 就是一种完全依赖用户和物品之间行为关系的推荐算法 我们从它的名字 协同过滤 中 也可以窥探
  • 2022高教杯思路合集!!全国大学生数学建模竞赛

    2022高教杯将于9 15开赛 思路贴将于晚10点前发布 粉丝可见 17日0 00转免 国一F奖3年数学建模经验团队 交流Q群 882663918 下文是2022年美赛的思路示例 要求由于公司的规定 ICM公司无法与您的团队分享关于他们的人
  • android 全局悬浮窗 可点击_Hi Translate 全局翻译外语 App 上的内容

    英文不太好的朋友可能在使用某些国外 App 在无中文语言支持的时候 也许 Hi Translate 这款 Android 应用可以帮助到你 这款应用不是简单的外语翻译 它可以在你使用满是外语的 App 时 帮助你实现全局页面翻译成中文 Hi
  • IDEA 集成 Sonar 完整流程

    目录 背景 相关的模块及关系 插件安装 SonarQube 启动 SonarQube 创建工程 插件配置 1 打开插件通用配置界面 2 点击 号添加 SonarServer 3 下一步配置认证信息 4 SonarLint 项目配置 mave
  • C++ char*两种初始化为零的方式

    C 中 char 两种初始化为零的的常用方式有以下两种 char data new char 50 char data memset data 0 50
  • ios-消息中心 NSNotificationCenter 的介绍

    1 通知中心概述 通知中心实际上是在程序内部提供了消息广播的一种机制 通知中心不能在进程间进行通信 实际上就是一个二传手 把接收到的消息 根据内部的一个消息转发表 来将消息转发给需要的对象 通知中心是基于观察者模式的 它允许注册 删除观察者
  • C++17入门经典

    C 17入门经典 注意 第1章 基本概念 第2章 基本数据类型 第3章 处理基本数据类型 第4章 决策 第5章 数组和循环 第6章 指针和引用 第7章 操作字符串 第8章 定义函数 第9章 函数模板 第10章 程序文件和预处理指令 第11章
  • python 修饰器 参数_Python修饰器讲解

    转自 http www cnblogs com rollenholt archive 2012 05 02 2479833 html 文章先由stackoverflow上面的一个问题引起吧 如果使用如下的代码 makebold makeit
  • 语义分割--PANet和Understanding Convolution for Semantic Segmentation

    语义分割 PAN Pyramid Attention Network for Semantic Segmentation FCN作为backbone的结构对小型目标预测不佳 论文认为这存在两个挑战 物体因为多尺度的原因 造成难以分类 针对这
  • J2EE规范技术

    原文 http blog csdn net erikxu archive 2004 12 07 208170 aspx J2EE的13种核心技术 1 JDBC Java Data Base Connectivity java数据库连接 是一
  • Unity3D 中使用OnTiggerEnter遇到的不触发问题

    移动GameObject 绑定BoxCollider Istrigger选中 固定GameObject 绑定BoxCollider 刚体属性 IsKinematic选中 此种情况下 移动GameObject中的OnTriggerEnter
  • 数据结构(1)前言

    1 学习数据结构前 需要掌握结构体和指针的使用 需要了解typedef这个关键字 对这部分知识欠缺的可以查看 C语言结构体详解 何为指针 与数组名有什么区别 2 作为一名想成为嵌入式软件工程师的人而言 很多像电气工程 电子信息等专业的人在大
  • Windows11 文件选择打开方式时卡死 解决

    发生的现象 在 打开方式 窗口的地址栏粘贴应用的地址 gt 打开方式界面卡死 完整步骤 左键点击打开epub文件 gt 跳出 寻找一个应用以打开此 epub文件 gt 选择 在电脑上选择应用 gt 弹出 打开方式 窗口 gt 在 打开方式
  • ❤ 15个基于Vue3.0全家桶的优秀开源项目

    15个基于Vue3 0全家桶的优秀开源项目 Vue Admin Better github https github com chuzhixin vue admin better vue admin better 对比其他来源 admin
  • 雨停了,我将雨伞收起

    天空中已有阳光从厚重的云层间隙射向大地 我将手中的雨伞收起 伞面上留存着的雨滴正沿着伞面的褶皱流下打在地上 我望向远方 一辆汽车沿着自己的轨迹行驶 路边小坑中的积水在车轮的驱赶下纷纷跳上灌木丛的绿叶寻找庇护 一阵清脆的自行车铃在身后响起 一
  • Android BaseQuickAdapter万能适配器

    RecycleView万能适配器 一导入 implementation com github CymChad BaseRecyclerViewAdapterHelper 2 9 24 implementation com android s
  • spring-kafka并行度concurrency在应用集群部署(多个节点)正确设置,附上Kafka Manager监控效果

    我们都知道kafka有topic的概念 为了能够更好的支持水平扩容 topic又分了很多的Partition 消费者负责消费Partition中的消息 一个Partition只能被一个消费者消费 但是一个消费者可以消费多个partition