如何对消费者组内的 Kafka 消费者进行负载均衡

2024-02-05

Scenario

  • 同一消费者组内有 10 个 kafka 消费者。
  • Kafka 有 10 个分区 => 这意味着每个分区都会自动分配给组内的单个消费者。
  • 消息以循环方式发送到分区。
  • 有时,一条消息的处理时间会比其他消息长得多。
  • 在这种情况下,下一条消息可能会被分配给仍在忙于工作的消费者,而还有其他空闲消费者

Question

  1. Kafka是否支持自动向消费者空闲的分区发送消息的机制?
  2. 如果没有,这种情况的常见方法是什么?

虽然你可以实现一个custom Assignor class https://kafka.apache.org/31/javadoc/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.html,默认情况下,仅根据分配消耗,不根据负载消耗;此类信息不会反馈给小组协调员。另外,根据负载不断调整可能会导致频繁的组重新平衡,从而导致消耗甚至更慢

关于处理长度,我不知道您的消费者能够在分区分配和轮询此类记录之前检查消息的任何方式。因此,如果您想缩短处理时间,则需要将处理逻辑与实际的轮询循环分离。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何对消费者组内的 Kafka 消费者进行负载均衡 的相关文章

  • 如何评估kafka流应用程序的消耗时间

    我有 1 0 0 kafka 流应用程序 有两个类 如下所示 class FilterByPolicyStreamsApp 和 class FilterByPolicyTransformerSupplier 在我的应用程序中 我读取事件 执
  • 即使没有消费者,消费者群体仍陷入“再平衡”

    我正在使用kafka版本2 4 1 最近从2 2 0升级到2 4 1 并注意到一个奇怪的问题 即使应用程序 kafka Streams 已关闭 没有正在运行的应用程序 但消费者组命令返回状态为重新平衡 我们的应用程序作为 kubernete
  • 仅针对使用通道而定制的 Phoenix 应用程序如何在多台机器上扩展?使用HAProxy?如何向所有节点广播消息?

    我将节点应用程序纯粹用于带有 Redis PubSub 的 socket io 通道 目前我将其分布在 3 台机器上 并由其中一台机器上的 nginx 负载平衡提供支持 我想用 Phoenix 应用程序替换这个节点应用程序 而且我对 erl
  • Kafka 消费者通过 JMX 滞后

    我正在尝试监控 Kafka 0 10 中消费者组的滞后情况 我们的消费者在 Kafka 而不是 ZooKeper 中跟踪他们的偏移量 这意味着我可以使用以下方式获取数据 bin kafka consumer groups sh bootst
  • RSS:刷新率?

    我正在编写一个供自己使用的小应用程序 它将使用公开发布的 RSS 提要 据我所知 该协议中没有订阅 发布机制 我需要让我的应用程序定期通过 HTTP GET 获取 RSS 提要 如果是这样的话 我想每隔十分钟左右就抢一次 但我担心被视为施虐
  • 安装 confluence-kafka 时“文件名或扩展名太长”?

    我在使用 pip install confluence kafka 安装 confluence kafka 时遇到一些问题 但我收到此错误 文件名或扩展名太长 详细信息如下 Collecting confluent kafka Using
  • 如何在kafka消费组中动态添加消费者

    我应该如何知道何时必须扩展消费者组中的消费者 当存在快速生产者时 消费者扩大规模的触发因素是什么 一种直接的方法是获取消费者延迟 这可以计算为提交的偏移量和开始偏移量之间的差值 如果最后 n 次计算的延迟正在增加 您可以扩大规模 反之亦然
  • 无法初始化类 io.confluence.kafka.schemaregistry.client.rest.RestService

    我正在尝试使用 KafkaAvroSerialzer 设置一个卡夫卡生产者以获得价值 当 rit 尝试创建生产者时 我遇到了这个错误 我正在使用 confluence 5 2 1 中提供的所有罐子 java lang NoClassDefF
  • Kafka 0.8.2 中是否可以向现有主题添加分区

    我有一个Kafka https kafka apache org 集群运行有 2 个分区 我一直在寻找一种将分区计数增加到 3 的方法 但是 我不想丢失有关该主题的现有消息 我尝试停下来Kafka https kafka apache or
  • 无法向 kafka 主题发送消息

    我正在使用 Kafka Play 以及 Scala 这是我的代码 我想在其中发送消息到kafka服务器 主题名称是 测试主题 尽管我没有在主题中看到我发送的消息 但我没有收到任何错误 这里有什么问题吗 import kafka produc
  • 我们如何读取给定时间范围内的Kafka主题?

    我需要读取 Kafka 主题中给定时间范围内的消息 我能想到的解决方案是首先找出时间范围开始的最大偏移量 然后继续消费消息 直到所有分区上的偏移量超过时间范围的末尾 有没有更好的方法来解决这个问题 谢谢 好吧 您肯定必须首先搜索适合时间范围
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • 如何使用 C# 从 Kafka 获取主题列表

    我想从卡夫卡获取主题列表 我正在使用 kafka net 客户端 但无法在有关获取主题列表的文档中找到 您可以使用 Confluence Kafka 包中提供的 AdminClient 列出所有主题 using Confluent Kafk
  • Redis Cluster 与 Pub/Sub 中的 ZeroMQ,用于水平扩展的分布式系统

    如果我要设计一个巨大的分布式系统 其吞吐量应随系统中的订阅者数量和通道数量线性扩展 哪个会更好 1 Redis集群 仅适用于Redis 3 0 alpha 如果是集群模式 您可以在一个节点上发布并在另一个完全不同的节点上订阅 消息将传播并到
  • Strimzi 运算符 Kafka 集群 ACL 未启用类型:简单

    我们知道要启用Kafka ACL属性authorizer class name kafka security auth SimpleAclAuthorizer要添加到server properties但是如果 Kafka 集群由 Strim
  • Kafka:隔离级别的影响

    我有一个用例 我需要 Kafka 分区中的 100 可靠性 幂等性 无重复消息 以及顺序保留 我正在尝试使用事务 API 来建立概念验证来实现这一目标 有一个名为 isolation level 的设置 我很难理解 In this arti
  • Kafka Streams 如何处理包含不完整数据的分区?

    Kafka Streams 引擎将一个分区映射到一个工作线程 即 Java 应用程序 以便该分区中的所有消息都由该工作线程处理 我有以下场景 并试图了解它是否仍然可行 我有一个主题 A 有 3 个分区 发送给它的消息由 Kafka 随机分区
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导
  • Kafka JDBC Sink Connector 对于具有可选字段的模式的消息给出空指针异常

    Kafka JDBC Sink Connector 对于具有可选字段 parentId 的模式的消息给出空指针异常 我错过了什么吗 我正在使用开箱即用的 JSONConverter 和 JDBC Sink Connector 关于 Kafk
  • 编辑 Kafka Listener Spring 应用程序以更改阶段/目标

    我可以利用另一个运行 Kafka 应用程序 代码库的团队来使用相同的数据 将其加载到我们的新暂存表中 而不是他们的 他们在 Messages 文件夹中有许多不同的 kafka 侦听器适配器 java 文件 每个文件消耗不同类型的数据 每个

随机推荐

  • 通过 FontAwesome 进行星级评定的 CSS

    我已经通过不同的方法尝试了 CSS 星级评定的一些变体 并且尝试通过 FontAwesome 而不是使用精灵来实现以下内容 我希望能够理想地包含半星 但这就是下面的示例失败的地方 这是我到目前为止所尝试过的 我无法让半 部分星在这里正常工作
  • 使用 EPSG:25832 投影在 Leaflet 中垂直对齐 TMS 图块

    我使用 Leaflet 和 Proj4Leaflet 来处理 25832 中的图块 该应用程序相当简单 我尝试将 EPSG 25832 中的图块叠加到全比例底图上 我已从瓷砖地图元信息中复制了各个分辨率和来源 我面临的问题是地图未对齐 一旦
  • R 和带有循环的网页抓取

    我正在抓取一个网站urls http example com post X 在哪里X是从1 5000开始的数字我可以使用刮擦rvest使用此代码 website lt html http www example com post 1 Nam
  • 如何加载 rjags

    与许多其他人一样 我在加载和安装 rjags 时遇到问题 我收到错误 library rjags Error onLoad failed in loadNamespace for rjags details call fun libname
  • StartSSL 证书在 Firefox 和 Android 上不受信任

    Apache 服务器 遵循此处的指南 https www startssl com Support v 21 https www startssl com Support v 21 httpd conf SSLEngine on SSLPr
  • Android 中按 MIME 类型获取联系人

    我想根据 Android 中的 MIME 类型获取联系人列表 例如 我想要一个具有电子邮件地址的联系人列表 您应该使用以下方法读取原始联系人以及与其关联的所有数据ContactsContract RawContacts Entity目录 如
  • Java FX 应用程序 onload 事件

    我正在使用 Google 地图 API 在 java swing java fx 中实现一个应用程序 我的问题是这样的 我需要在加载地图时运行 JavaScript 我一直在阅读该方法 webEngine getLoadWorker Sta
  • 如何在 $or 中使用 $regex 作为聚合表达式

    我有一个查询 允许用户使用如下格式按某些字符串字段进行过滤 如果最近一次检查的描述是以下任何一项 foo or bar 这对于以下查询非常有效 db getCollection permits find expr let vars late
  • PyYAML 转储格式

    我知道 StackOverflow 上有一些与此相关的问题 但我找不到我想要的内容 我在用着pyyaml http pyyaml org读书 load a yml文件 修改或添加密钥 然后写入 dump 再次 问题是我想保留转储后的文件格式
  • 尝试运行 XQuery 脚本以单点登录 MarkLogic 时出错

    我正在研究使用 xquery 重写器脚本来自动登录单点登录解决方案 我从端口 8001 的管理应用程序服务器开始 在应用程序服务器配置中 我有 设置身份验证 application level 设置 url 重写器 rewriter xqy
  • WinForms 文本框自动完成事件

    我的表单上有一个具有自动完成功能的 NET 文本框 该表格还具有AcceptButton and CancelButton定义的 如果我尝试提交建议Enter键或关闭下拉菜单Esc 我的表格关闭 我的想法是创建继承自的自定义文本框TexBo
  • 问题“线程“main”中出现异常# START NON-TRANSLATABLEjava.lang.NoClassDefFoundError: G€“Xmx3072m”(MyEclipse 12.0 Blue 和 Websphere7)

    我正在使用 MyEclipse 14 0 Blue 和 Websphere7 我正在尝试通过 服务器 选项卡部署到 Websphere 当它开始部署时 我遇到了以下问题 Exception in thread main START NON
  • grpc-java 的 BindableService 问题

    我正在尝试使用 grpc java v1 1 2 下面的 build gradle 部分 但是当我尝试为示例应用程序运行 fat jar 时 它会抛出下面给出的异常 编译应用程序时我没有看到任何问题 构建 gradle 部分 apply p
  • 从一个子域到另一个子域的 AJAX 请求,但在同一域上

    我知道除非使用 代理 一台服务器 否则无法执行跨域 ajax 请求 但是 如果我从 server1 example com 向 server2 example 发出 ajax 请求 即使它位于 example com 的同一域中 这是否不起
  • 如何使 switch 语句不区分大小写?

    在下面的例子中 有时 var可以是 Value value 甚至 VALUE switch var case value Value and VALUE don t seem to match here break 比较似乎区分大小写 仅匹
  • WooCommerce 3 中产品变体的 Ajax 添加到购物车按钮

    我这里有这个按钮 该按钮的用途是将产品 id 为 237 variation id 为 208673 attribute pa option 为蓝牙的产品添加到购物车 有没有办法 AJAX 这个 div class btnss span c
  • 如何在 Django 环境中使 html 从外部 js 文件调用 javascript 函数

    我正在使用 Django 开发一个应用程序 我希望用户一登陆某个 HTML 页面就弹出一个警报窗口 调用使窗口弹出的javascript函数funprova这个函数存储在一个名为的 js 文件中prova js 在路径上 静态 js pro
  • Ajax 调用 Github API 失败

    所以我对此有点陌生 到目前为止我的代码还不能工作 但如果有人能告诉我我错过了什么 我将不胜感激 基本上 我正在尝试调用 github 的 api 它返回 json 数据 我最终想解析它并仅显示特定信息 但目前我只是试图让数据显示在我的浏览器
  • 如何异步调用方法

    我尝试过点击此链接异步调用 http support microsoft com kb 315582但有些类已经过时了 所以我想要我的项目的确切答案 public class RegisterInfo public bool Registe
  • 如何对消费者组内的 Kafka 消费者进行负载均衡

    Scenario 同一消费者组内有 10 个 kafka 消费者 Kafka 有 10 个分区 gt 这意味着每个分区都会自动分配给组内的单个消费者 消息以循环方式发送到分区 有时 一条消息的处理时间会比其他消息长得多 在这种情况下 下一条