Kafka分区中消息分布不均匀

2024-03-27

我有一个主题,有 10 个分区,1 个消费者组,有 4 个消费者,工作线程大小为 3。

我可以看到分区中的消息分布不均匀,一个分区有太多数据,而另一个分区是空闲的。

如何让我的生产者将负载均匀分配到所有分区,以便所有分区都得到正确利用?


根据DefaultPartitioner类本身的JavaDoc注释,默认的分区策略是:

  • 如果记录中指定了分区,则使用它。
  • 如果未指定分区但存在密钥,则根据密钥的哈希值选择分区。
  • 如果不存在分区或键,则以循环方式选择分区。

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/ Producer/internals/DefaultPartitioner.java https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java

因此,这里有两个可能导致分布不均匀的原因,具体取决于您在生成消息时是否指定密钥:

  • 如果您指定一个键并且使用 DefaultPartitioner 得到不均匀的分布,最明显的解释是您多次指定相同的键。

  • 如果您没有指定键并使用 DefaultPartitioner,则可能会发生不明显的行为。根据上面的内容,您会期望消息的循环分发,但情况不一定如此。 0.8.0 中引入的优化可能会导致使用相同的分区。检查此链接以获取更详细的说明:https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-当未指定分区键时,为什么数据不会均匀分布在分区中? https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whyisdatanotevenlydistributedamongpartitionswhenapartitioningkeyisnotspecified? .

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka分区中消息分布不均匀 的相关文章

  • Kafka-python 检索主题列表

    我在用着卡夫卡蟒蛇 http kafka python readthedocs org en 1 0 2 我想知道是否有办法显示所有主题 像这样的事情 bin kafka topics sh list zookeeper localhost
  • Kafka结构化流KafkaSourceProvider无法实例化

    我正在开发一个流项目 其中有一个 ping 统计数据的 kafka 流 如下所示 64 bytes from vas fractalanalytics com 192 168 30 26 icmp seq 1 ttl 62 time 0 9
  • 使用 Kafka Streams 进行 OpenTracing - 如何?

    我正在尝试将 Jaeger 跟踪集成到 K Streams 中 我计划将跟踪添加到几个最重要的管道中 并且想知道将 Traceid 从一个管道传递到另一个管道的好方法是什么 这是我到目前为止所做的 在流处理管道开始时 我启动一个服务器范围并
  • Kafka Streams 在 HDFS 上查找数据

    我正在使用 Kafka Streams v0 10 0 1 编写一个应用程序 并希望通过查找数据来丰富我正在处理的记录 该数据 带时间戳的文件 每天 或每天 2 3 次 写入 HDFS 目录 我怎样才能将其加载到Kafka Streams应
  • 处理 Kafka Broker 宕机时的故障

    我有一个 Kafka 代理正在运行 消息已成功消费 但我想处理 Kafka 代理在 Kafka 消费者端出现故障的情况 我读过了this https github com spring projects spring kafka issue
  • 卡夫卡幂等生产者

    卡夫卡文档说 幂等生产者可以使用相同的生产者会话 但我无法理解这一点 比如说 Kafka 为每条消息添加序列号 最后一个序列号保存在 Kafka 中 不确定它在哪里维护 它如何生成序列号以及它保存在哪里 为什么当生产者崩溃并再次出现时它无法
  • Kafka Connect 进入重新平衡循环

    我刚刚部署了 Kafka Connect 我只使用连接源 MQTT 应用程序位于两个实例的集群上 2 个容器上 机器 现在它似乎进入了一种重新平衡循环 我一开始有一点数据 但没有新数据出现 这就是我在日志中得到的内容 2017 08 11
  • KeeperErrorCode = /admin/preferred_replica_election 的 NoNode

    当我启动kafka时 zookeeper发生错误 INFO Got user level KeeperException when processing sessionid 0x156028651c00001 type delete cxi
  • 如何连接Kafka和Elasticsearch?

    我是Kafka的新手 我使用kafka通过logstash收集netflow 可以 并且我想将数据从kafka发送到elasticsearch 但是存在一些问题 我的问题是如何将 Kafka 与 Elasticsearch 连接起来 net
  • 如何评估kafka流应用程序的消耗时间

    我有 1 0 0 kafka 流应用程序 有两个类 如下所示 class FilterByPolicyStreamsApp 和 class FilterByPolicyTransformerSupplier 在我的应用程序中 我读取事件 执
  • 无法向 Kafka 发送大消息

    我想从生产者向 Kafka 发送一条大消息 因此我更改了以下属性 代理 服务器 属性 replica fetch max bytes 317344026 message max bytes 317344026 max message byt
  • Spark:将 bytearray 转换为 bigint

    尝试使用 pyspark 和 Spark sql 将 kafka 键 二进制 字节数组 转换为 long bigint 会导致数据类型不匹配 无法将二进制转换为 bigint 环境详情 Python 3 6 8 Anaconda custo
  • 具有替代方案的重载方法值表

    我有编译器抱怨的以下代码 val state KTable String String builder table BARY PATH Materialized as PATH STORE 错误信息 error home developer
  • 如何在 Spring Kafka 中以编程方式设置 Jsonserializer Type Value 方法

    所以我无法仅使用 yaml 为 JsonSerializer 配置 JavaType 方法 还不确定原因 但与此同时 我如何以编程方式设置它 我在文档中看到了它的代码 但是该代码到底需要在哪里运行 Spring Kafka JsonDese
  • 在 WSL2 中通过 IDE 连接到 kafka 服务器时出错

    我无法通过在 Windows 上运行的 intellij 或 vscode 连接到在 ubuntu 上运行的 kafka 服务器 我在 WSL2 上尝试的第一个服务器 我什至尝试使用虚拟机的IP 但没有成功 据我了解 我们应该能够根据此文档
  • Kafka中如何使用事务以及如何使用abortTransaction?

    我是 kafka 新手 我使用 Kafka Producer Java api 面对Kafka的这个问题 Kafka Invalid transition attempted from state COMMITTING TRANSACTIO
  • 在 Confluence 4.1 + Kafka 1.1 中为 Kafka Connect 打包自定义 Java `partitioner.class` 插件?

    我已经成功地将用 Java 编写的简单自定义 Partitioner 类用于 Confluence 3 2 x Kafka 0 10 x 上的 Kafka Connect 接收器 我想升级到 Confluence 4 1 Kafka 1 1
  • Grafana/prometheus 中没有 kafka 指标

    我成功部署了 Helm Chart普罗米修斯操作员 https github com coreos prometheus operator tree master helm prometheus operator kube 普罗米修斯 ht
  • 有没有办法重新分区 Kafka 流中的输入主题?

    我有一个由 byte 键控的主题 我想对其进行重新分区并通过消息正文中字段中的另一个键处理该主题 我发现有KGroupedStream and groupby功能 但它需要一个聚合函数来转换为 KTable KStream 我不需要聚合 我
  • 是否可以使用 Kafka Streams 访问消息头?

    随着添加Headers http apache spinellicreations com kafka 0 11 0 0 javadoc org apache kafka common header Header html到记录 生产者记录

随机推荐

  • MySQL 查询 - 基于两个因素连接数据,然后根据值自定义数据排序方式

    首先 我是查询多个表的新手 所以如果这是一个有点愚蠢的问题 我很抱歉 但我们都必须从某个地方开始 我制作了一张图片 应该更容易理解 http www mediumsliced co uk temp mysqlhelp jpg http ww
  • 为什么 valarray 这么慢?

    我正在尝试使用 valarray 因为它在操作向量和矩阵时非常类似于 MATLAB 我首先做了一些性能检查 发现valarray无法达到书中声明的性能C 编程语言 https en wikipedia org wiki The C 2B 2
  • 为什么 IIS 中的授权规则不会限制对我的 WCF 服务的访问?

    我有一个托管在 IIS 10 中的独立 WCF 服务 我想将对 Web 服务的访问限制为选定的用户组 我可以通过在 IIS 中执行以下操作来为 Web 应用程序执行此操作 身份验证 仅 Windows 身份验证 禁用匿名身份验证 授权规则
  • JAXB:类转换异常,但类具有相同的名称

    我有一个有趣的问题 当我启动 glassfish 服务器时 一切正常 但是 我更改了一些代码并发布了服务器 然后运行我的客户端 SistemGirisClientKullaniciDogrula 应用程序抛出此异常 java lang Cl
  • UnicodeDecodeError:“utf-8”编解码器无法解码位置 34 中的字节 0xe3:无效的连续字节

    我想用以下代码在 python 文件中打开一些波斯语文本文件 for line in codecs open 0001 txt encoding UTF 8 lines appends line 但它给了我这个错误 gt Traceback
  • Primefaces - 用于数据表中实时过滤的自定义组件

    PrimeFaces 对 p dataTable 的过滤器做得非常好 UX 网站很棒 因为过滤器字段位于列标题中 因此毫无疑问您正在过滤什么 并且它正在实时工作 数据会随着您的输入而变化 好吧 只有当您短暂停顿时 但它在我的意见正是用户所期
  • ASP.NET MVC Html 帮助程序

    我尝试创建一些 Html Helpers 它们将具有开始标记和结束标记 其中将包含其他内容 如 Html BeginForm 那样 例如 在 Razor 中 我们可以使用 Html BeginForm 帮助器 其语法如下 using Htm
  • 如何将外部JS脚本添加到VueJS组件中?

    我必须为支付网关使用两个外部脚本 现在两者都被放入index html file 但是 我不想在开始时加载这些文件 仅当用户打开特定组件时才需要支付网关 using router view 有办法实现这个目标吗 Thanks 解决这个问题的
  • 如何手动填充 ViewModel(不使用 AutoMapper!)

    我知道有很多关于这个主题的帖子 但我找不到一个可以帮助我做我想做的事情 我知道我最终会使用 Automapper 但在开始使用它之前 我想学习如何手动执行操作 我想创建一个 ViewModel 通过存储库用我的实体中的值填充它并将其发送到我
  • 从 Mathematica 中的 Web 设置用户代理导入

    当我使用 Mathematica 连接到我的网站时 Import mysite Data 并查看我的 Apache 日志 我看到 99 XXX XXX XXX 22 May 2011 19 36 28 0200 GET HTTP 1 1 2
  • 我可以向 JLabel 添加操作侦听器吗?

    我想用 JLabel 替换 JButton 并且希望我的代码在单击 JLabel 时执行某些操作 当我拥有 JButton 时 我使用操作侦听器来处理按钮上的点击 myButton addActionListener new clicksL
  • 如何在Robot Framework中将图像添加到html日志中?

    如何将图像添加到机器人框架的html日志中 我想在 Robot Framework 的 html 日志中添加一些图片 有人可以对此有一些想法吗 Keyword Log来自内置库有html参数可能可以满足您的需要 参见文档 http robo
  • DDD:通过身份引用聚合根内的实体

    我一直在寻找正确的参考方式entities位于一个聚合根 当我们只得到他们的身份来自 URL 参数 我问了一个上一个问题 https stackoverflow com questions 7196820 update an entity
  • C++ 方法调用中前导“::”的目的是什么[重复]

    这个问题在这里已经有答案了 我一直在使用 Boost 库 在 Boost Exception 中 我注意到如下代码 define BOOST THROW EXCEPTION x boost throw exception x 只是出于好奇
  • 避免控制台消息形式封装函数

    我正在使用一个包函数 coreenv 来自 seewave 它在控制台中创建一条 请稍候 消息 正如我反复所说的那样 该消息非常烦人 所以 我需要一种方法 从我的代码中 暂时禁止控制台消息 OR 访问功能代码并取消消息行 以下不是我的真实代
  • 为什么 JSON 比 XML 更轻量?

    我发现 JSON 和 XML 之间的区别 因为 两者都是为了系统之间的数据交换 但是JSON和XML之间有一个很大的区别 即JSON比XML更轻量级 但我无法找到 JSON 轻量级的真正原因 是什么让 JSON 变得轻量级 我发现的一个答案
  • 在 Mac OS X 上使用 pip 安装 pycrypto 时出现 Broken Pipe 错误

    我正在尝试通过 pip 在 OS X 上安装 pycrypto 版本 2 3 当编译器尝试编译 MD2 c 时 我收到 Broken pipeline 错误 使用 easy install 时我遇到了非常类似的错误 这是我收到的错误 bas
  • 在 Three.js 中针对“子场景”进行光线投射

    因此 我正在使用 Three js 示例中的 webgl interactive cubes html 并且我有一个相对简单的问题 是否可以测试光线与对象的子对象的相交 例如 如果我做类似的事情 for var i 0 i lt 2000
  • IPython 的历史向后搜索未按预期工作

    IPython 的history search backward功能是我最喜欢的功能之一 history search backward允许您键入命令的一部分 然后在阅读行历史记录中向后搜索以该命令的该部分开头的命令 默认情况下 我相信 这
  • Kafka分区中消息分布不均匀

    我有一个主题 有 10 个分区 1 个消费者组 有 4 个消费者 工作线程大小为 3 我可以看到分区中的消息分布不均匀 一个分区有太多数据 而另一个分区是空闲的 如何让我的生产者将负载均匀分配到所有分区 以便所有分区都得到正确利用 根据De