如何决定spring kafka设置的并发数?

2024-01-10

我正在使用 @KafkaListener 注释编写一个 kafka 消费者,我知道有一种方法可以使用 ConcurrentKafkaListenerContainerFactory 中的方法增加来自不同分区的并发 kafka 消费者的数量

e.g. factory.setConcurrency(3);

setconcurrency 的 Javadoc 是这样说的:-

KafkaMessageListenerContainer 运行的最大并发数。来自同一分区内的消息将按顺序处理。

现在我的问题是

我有一个带有 144 个分区的 kafka 主题,我们的应用程序需要使用消息,并且 3 个应用程序实例正在并行运行。

我想知道如何决定需要下注的并发值

ConcurrentKafkaListenerContainerFactory.setconcurrency (<Value>) 

这样我们就可以在消费消息时实现高吞吐量。

我应该使用 144/3 = 48 作为并发系数还是有公式可以得出这个数字?


是的,最好的方法是将并发设置为48在每个实例中,以便每个分区都将从消费者组中的唯一线程中消耗,并且为了实现高吞吐量,您可以使用批量监听 https://docs.spring.io/spring-kafka/reference/html/#batch-listeners with 更大的批量 https://stackoverflow.com/questions/51753883/increase-the-number-of-messages-read-by-a-kafka-consumer-in-a-single-poll/51755259#51755259

另一个最佳选择是运行更多实例,例如 14 个实例,每个实例的并发级别为 10。在这两种方法中,您还需要考虑每个实例的可用 CPU拥有比 CPU 更高的开销线程不会提供更好的性能 https://stackoverflow.com/questions/36200089/when-does-concurrency-multithreading-help-improve-performance

从版本 1.1 开始,您可以配置 @KafkaListener 方法来接收从消费者轮询中收到的整批消费者记录。要配置监听器容器工厂来创建批量监听器,可以设置batchListener属性

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何决定spring kafka设置的并发数? 的相关文章

  • 无法向 Kafka 发送大消息

    我想从生产者向 Kafka 发送一条大消息 因此我更改了以下属性 代理 服务器 属性 replica fetch max bytes 317344026 message max bytes 317344026 max message byt
  • Kafka的消息键有什么特别的地方吗?

    我没有看到任何提及消息键 org apache kafka clients producer ProducerRecord key 除了它们可以用于主题分区 我可以自由地将我喜欢的任何数据放入密钥中 还是有一些我应该遵守的特殊语义 该密钥似
  • Kafka Streams - 减少大型状态存储的内存占用

    我有一个拓扑 见下文 可以读取一个非常大的主题 每天超过十亿条消息 这个 Kafka Streams 应用程序的内存使用量相当高 我正在寻找一些关于如何减少状态存储占用空间的建议 更多详细信息如下 Note 我并不是想逃避国有商店 我只是认
  • 如何处理Kafka流中的不同时区?

    因此 我正在评估 Kafka Streams 及其功能 看看它是否适合我的用例 因为我需要每 15 分钟 每小时 每天聚合传感器数据 并发现它由于其窗口功能而很有用 因为我可以通过应用创建窗口windowedBy on KGroupedSt
  • 具有替代方案的重载方法值表

    我有编译器抱怨的以下代码 val state KTable String String builder table BARY PATH Materialized as PATH STORE 错误信息 error home developer
  • 如何使用 haproxy 负载均衡器 Kafka Bootstrap?

    我有一个 kafka 集群 由 3 台在 AWS 上运行的机器组成 卡夫卡1到卡夫卡3 我正在使用新型卡夫卡消费者 gt 0 8 我知道kafka客户端连接到其中一台kafka服务器 获取服务器元数据 然后直接连接到代理 我想确保在代理发生
  • 动态创建消费者spring kafka

    我正在创建一个与另一个服务通信的服务 以便识别要收听的 kafka 主题 kafka主题可能有不同的键和值类型 因此 我想为每个配置 主题 键类型 值类型 动态创建不同的 kafka 消费者 其中配置仅在运行时已知 然而在 spring k
  • Spring Kafka MessageListenerContainer

    我看到 spring Kafka 代码 我有一些疑问 如果我们使用 1 个 kafkaListener 和 2 个主题 那么 spring Kafka 将创建一个 MessageListenerContainer 如果我为每个主题使用单独的
  • 在SSL模式下使用apache kafka

    我正在尝试在 SSL 1 way 模式下设置 kafka 我已经阅读了官方文档并成功生成了证书 我将记下两种不同情况的行为 此设置只有一名经纪人和一名动物园管理员 案例 1 经纪人间通信 明文 我的相关条目server properties
  • Grafana/prometheus 中没有 kafka 指标

    我成功部署了 Helm Chart普罗米修斯操作员 https github com coreos prometheus operator tree master helm prometheus operator kube 普罗米修斯 ht
  • Apache Kafka 与 Apache Storm

    Apache Kafka 分布式消息系统Apache Storm 实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据 就实时数据管道而言 在我看来 两者的工作都是相同的 我们如何在数据管道上使用这两种技术 您可以使用 Apa
  • kafka Avro 多个主题的消息反序列化器

    我正在尝试以 avro 格式反序列化 kafka 消息 我使用以下代码 https github com ivangfr springboot kafka debezium ksql blob master kafka research c
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • 通过SOCKS代理连接Kafka

    我有一个在 AWS 上运行的 Kafka 集群 我想用标准连接到集群卡夫卡控制台消费者从我的应用程序服务器 应用程序服务器可以通过 SOCKS 代理访问互联网 无需身份验证 如何告诉 Kafka 客户端通过代理进行连接 我尝试了很多事情 包
  • kafka 连接 s3 源无法与 Minio 一起使用

    我已经验证了与 minio 的连接 确保凭据工作正常并且可以访问 minio 另外 如果我尝试任何其他值store url http minio 9000我无法保存配置 所以我猜想在可见性方面不存在问题卡夫卡连接容器和minio容器 我不确
  • 调试自定义 Kafka 连接器的简单有效的方法是什么?

    我正在使用几个 Kafka 连接器 在控制台输出中没有看到它们的创建 部署有任何错误 但是我没有得到我正在寻找的结果 没有任何结果 无论是期望的还是否则 我基于 Kafka 的示例 FileStream 连接器制作了这些连接器 因此我的调试
  • 无法找到任何实现 Connector 且名称与 io.debezium.connector.mysql.MySqlConnector 匹配的类,可用的连接器有

    使用 Kafka MySQL 和 Debezium 设置数据流管道 我是这个版本的 Kafka 3 4 0 MySQL 8 Debezium 2 2 1 Java 11 目标 我想从 MySQL 捕获所有 CDC 并将数据流式传输到 Kaf
  • Kafka Consumer 无法加载任何密钥库类型和路径的 SSL 密钥库(Logstash ArcSight 模块)

    我需要为 Kafka Consumer 提供客户端身份验证证书 但是 它总是失败并出现以下异常 无法加载 SSL 密钥库 ssl cipher suites null ssl enabled protocols TLSv1 2 TLSv1
  • 使用表白名单选项更新 Debezium MySQL 连接器

    我正在使用 Debezium 0 7 5 MySQL 连接器 并且我试图了解如果我想使用以下选项更新此配置 最好的方法是什么table whitelist 假设我创建了一个连接器 如下所示 curl i X POST H Accept ap

随机推荐

  • T-SQL删除插入的记录

    我知道标题可能看起来很奇怪 但这就是我想做的 我有很多记录的表 我想获取其中一些记录并将它们插入到其他表中 像这样的东西 INSERT INTO TableNew SELECT FROM TableOld WHERE 棘手的部分是我希望我插
  • Jquery UI 工具提示不支持 html 内容

    今天 我将所有 jQuery 插件升级为 jQuery 1 9 1 我开始将 jQueryUI 工具提示与 jquery ui 1 10 2 一起使用 一切都很好 但是当我在内容中使用 HTML 标签时 在title我正在应用工具提示的元素
  • 我怎样才能使这个模式持久化?

    我正在寻找一种方法 让这种模式在出现后持久存在 正如此处所示 用户只需在 div 外部单击一下即可将其关闭
  • 如何制作一个反应本机输入,向用户提供验证状态反馈。 [有效、Printine、错误、编辑]

    我希望输入能够随着用户键入而不断更新 然后失去焦点 反馈将是输入周围的边框 1 Green when valid 2 Amber when typing and is in error state Green when valid 3 Re
  • 一面一示例 T 测试 Python

    在 Python 中 我使用 SciPy 进行单样本 t 检验 from scipy import stats one sample data 177 3 182 7 169 6 176 3 180 3 179 4 178 5 177 2
  • Checkstyles + Gradle 抛出引起:java.lang.IllegalArgumentException:给定名称 COMPACT_CTOR_DEF

    我最近将 checkstyle 插件添加到项目中以进行静态代码分析 但更新之后google style xml从最新的大师那里 我开始收到以下异常 org gradle api tasks TaskExecutionException Ex
  • grails 2.0 - 正确使用 serverURL 进行生产?

    Grails 2 0 改变了它使用 grails serverURL 进行开发和测试环境的方式 如manual http grails org doc 2 0 x guide single html upgradingFromPreviou
  • Python从视频文件中提取wav

    Related 如何使用python从视频文件中提取音频 https stackoverflow com questions 19216450 how to extract audio from a video file using pyt
  • 如何在 Obj-C 类别中“伪造”ivars (iPhone)

    Update iPhone OS 3 1 有关联的对象 然而 iPhone 模拟器却没有 如果您想在模拟器中测试关联的对象代码 您应该提交错误 请参阅我的问题here https stackoverflow com questions 19
  • 最接近的 3 点组

    是否有一种已知的 有效的算法来查找最接近的组three云中的点 这类似于最近点对问题 http en wikipedia org wiki Closest pair of points problem但我正在寻找三点而不是两点 Edit 最
  • 导入路径 - 正确的方法?

    我知道有很多类似或相同的问题 但我仍然无法理解 找到使用模块的正确方法 Python 是我最喜欢的语言 除了使用导入之外 我喜欢其中的所有内容 递归导入 当您尝试引用尚不存在的名称时 导入路径等 所以 我有这样的项目结构 my projec
  • 类的 += 运算符的规范形式

    我知道尽可能多地使用非成员非友元类的接口是个好主意 而且我刚刚意识到 对于我的 3D 矢量类 Vector3 我可以移动 等运算符从类中删除 只留下构造函数和复制赋值运算符 问题是 这个运算符应该是什么样子 我见过许多其他运算符的规范形式并
  • 列表内部或外部的序言对有什么用

    我在 SWI PL 文档中多次遇到键值对 但无法获得有关它们的详细信息 这是序言中的标准内容还是只是 swi pl 的扩展 主要在这里找到 http www swi prolog org pldoc doc for object keyso
  • 调用带有返回值的存储过程

    我正在尝试从我的 C Windows 应用程序调用存储过程 该存储过程正在 SQL Server 2008 的本地实例上运行 我可以调用该存储过程 但无法从该存储过程检索值 该存储过程应该返回序列中的下一个数字 我在网上做了研究 我见过的所
  • 如何在 Swift 中将 CGRect 相互组合

    我想知道是否有任何方法可以将一个 CGRect 与另一个 CGRect 组合起来以获得一个新的 CGRect swift 是否有任何预设功能来执行此操作 或者是否有其他方法可以实现此目的 let rect1 CGRect x 0 y 0 w
  • 为什么这个交换函数调用不明确? [复制]

    这个问题在这里已经有答案了 我想使用模板交换两个数字 但为什么会这样 swap x y 给出一个错误作为不明确的调用 include
  • Python-3 和字符串编码中的 \x Vs \u Vs \U 以及原因

    为什么 Python 3 中有不同的面向字节的字符串表示形式 使用单一表示而不是多个表示还不够吗 对于打印字符串的 ASCII 范围编号 显示以以下开头的序列 x In 56 chr 128 Out 56 x80 在不同的数字范围内 Pyt
  • 阻止 EBS Linux 2 (Node.js) 尝试执行 npm install?

    我正在尝试在 Elastic Beanstalk 上的 AWS Linux 2 上运行节点应用程序 并且需要使用纱线安装依赖项 如果您尝试使用 npm 而不是纱线来安装依赖项 我的 Node 应用程序会导致错误 我已经想通了如何在 plat
  • 通过XPath获取tr中特定td的索引

    我的表中有 thead 和 tbody Thead 包含几个 s 每个都有一个 id 我需要通过 id 在 thead 中查找 td 的索引 然后在 tbody 中通过索引查找 table thead tr td td td td find
  • 如何决定spring kafka设置的并发数?

    我正在使用 KafkaListener 注释编写一个 kafka 消费者 我知道有一种方法可以使用 ConcurrentKafkaListenerContainerFactory 中的方法增加来自不同分区的并发 kafka 消费者的数量 e