增加 Kafka 消费者在单次轮询中读取的消息数量

2024-03-02

Kafka消费者有一个配置max.poll.records它控制单次调用 poll() 及其返回的最大记录数默认值为 500 https://kafka.apache.org/documentation/#max.poll.records。我已将其设置为非常高的数字,以便我可以在一次轮询中获取所有消息。 然而,即使主题有更多消息,民意调查在一次调用中仅返回几千条消息(大约 6000 条)。

如何进一步增加单个消费者读取的消息数量?


您可以增加消费者poll()通过增加批量大小max.partition.fetch.bytes,但仍然根据文档它有限制fetch.max.bytes这也需要随着所需的批量大小而增加。而且从文档中还有另一个属性message.max.bytes in 主题配置 https://kafka.apache.org/documentation/#topicconfigs and 经纪商配置 https://kafka.apache.org/documentation/#brokerconfigs来限制批量大小。所以一种方法是根据您所需的批量大小增加所有这些属性

In Consumer config 最大分区获取字节数默认值为1048576

服务器将返回的每个分区的最大数据量。记录由消费者批量获取。如果提取的第一个非空分区中的第一个记录批次大于此限制,该批次仍将被返回以确保消费者可以取得进展。代理接受的最大记录批量大小是通过 message.max.bytes (代理配置)或 max.message.bytes (主题配置)定义的。请参阅 fetch.max.bytes 以限制消费者请求大小

In Consumer Config 获取最大字节数默认值为52428800

服务器应为获取请求返回的最大数据量。记录由消费者批量获取,如果获取的第一个非空分区中的第一个记录批次大于该值,仍然会返回该记录批次以确保消费者能够取得进展。因此,这不是绝对最大值。代理接受的最大记录批量大小是通过 message.max.bytes (代理配置)或 max.message.bytes (主题配置)定义的。请注意,消费者并行执行多个提取。

In Broker config 消息最大字节数默认值为1000012

Kafka允许的最大记录批量大小。如果增加此值并且存在早于 0.10.2 的消费者,则消费者的获取大小也必须增加,以便他们可以获取这么大的记录批次。

在最新的消息格式版本中,为了提高效率,记录总是分组为批次。在以前的消息格式版本中,未压缩的记录不会分组为批次,并且此限制仅适用于这种情况下的单个记录。

这可以使用主题级别 max.message.bytes 配置针对每个主题进行设置。

In Topic config 最大消息字节数默认值为1000012

Kafka允许的最大记录批量大小。如果增加此值并且存在早于 0.10.2 的消费者,则消费者的获取大小也必须增加,以便他们可以获取这么大的记录批次。

在最新的消息格式版本中,为了提高效率,记录总是分组为批次。在以前的消息格式版本中,未压缩的记录不会分组为批次,并且此限制仅适用于这种情况下的单个记录。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

增加 Kafka 消费者在单次轮询中读取的消息数量 的相关文章

  • Kafka Connect - 删除带有配置的连接器?

    我知道如何删除 Kafka 连接器 如此处所述Kafka Connect 如何删除连接器 https stackoverflow com questions 48947250 kafka connect how to delete a co
  • 卡夫卡保留政策

    假设我有一个多代理 在同一主机上运行 Kafka 设置 其中包含 3 个代理和 50 个主题 每个主题配置为具有 7 个分区和 3 的复制因子 我有 50GB 内存可用于 kafka 并确保 Kafka 日志永远不会超过此内存量 因此我想配
  • Spring Kafka 与嵌入式 Kafka 集成测试

    我有一个 Spring Boot 应用程序 它有一个消费者从一个集群中的主题消费并生成到不同集群中的另一个主题 现在我正在尝试使用 Spring 嵌入式 Kafka 编写集成测试用例 但遇到了问题KafkaTemplate could no
  • 何时使用 RabbitMQ 而不是 Kafka? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 我被要求评估 RabbitMQ 而不是 Kafka 但发现很难找到消息队列比 Kafka 更合适的情况 有谁知道消息队列在吞吐量 耐用性 延迟或
  • RocksDb sst 文件的 GUI 查看器

    我正在与 Kafka 合作 将数据保存到rocksdb 中 现在我想看看 Kafka 创建的数据库键和值 我下载了 FastNoSQL 并尝试但失败了 该文件夹包含 sst 文件 日志文件 当前文件 身份文件 锁定文件 日志文件 清单文件
  • Kafka-python 检索主题列表

    我在用着卡夫卡蟒蛇 http kafka python readthedocs org en 1 0 2 我想知道是否有办法显示所有主题 像这样的事情 bin kafka topics sh list zookeeper localhost
  • 使用 Kafka Streams 进行 OpenTracing - 如何?

    我正在尝试将 Jaeger 跟踪集成到 K Streams 中 我计划将跟踪添加到几个最重要的管道中 并且想知道将 Traceid 从一个管道传递到另一个管道的好方法是什么 这是我到目前为止所做的 在流处理管道开始时 我启动一个服务器范围并
  • Spring Cloud Stream动态通道

    我正在使用 Spring Cloud Stream 想要以编程方式创建和绑定通道 我的用例是 在应用程序启动期间 我收到要订阅的 Kafka 主题的动态列表 如何为每个主题创建一个频道 我最近遇到了类似的场景 下面是我动态创建 Subscr
  • Kafka Streams 在 HDFS 上查找数据

    我正在使用 Kafka Streams v0 10 0 1 编写一个应用程序 并希望通过查找数据来丰富我正在处理的记录 该数据 带时间戳的文件 每天 或每天 2 3 次 写入 HDFS 目录 我怎样才能将其加载到Kafka Streams应
  • 如何强制消费者读取kafka中的特定分区

    我有一个应用程序 用于从 1 个 Kafka 生产者生成的 URL 流中下载特定的 Web 内容 我创建了一个有 5 个分区的主题 有 5 个 kafka 消费者 但网页下载的超时时间为 60 秒 当下载其中一个 URL 时 服务器会假设消
  • KeeperErrorCode = /admin/preferred_replica_election 的 NoNode

    当我启动kafka时 zookeeper发生错误 INFO Got user level KeeperException when processing sessionid 0x156028651c00001 type delete cxi
  • 无法对 @KafkaListener 带注释的方法进行单元测试

    我正在尝试在 Spring 中对 kafka 消费者类进行单元测试 我想知道如果 kafka 消息发送到它的主题 则侦听器方法被正确调用 我的消费者类注释如下 KafkaListener topics kafka topics myTopi
  • 使用 Kafka Streams 在输出中设置时间戳无法进行转换

    假设我们有一个变压器 用 Scala 编写 new Transformer String V String V var context ProcessorContext override def init context Processor
  • Kafka 一遍又一遍地重放消息 - 心跳会话已过期 - 标记协调器已死亡

    使用 python kafka api 从只有少量消息的主题中读取消息 Kafka 不断地一遍又一遍地重放队列中的消息 它从我的主题接收一条消息 返回每条消息内容 然后抛出ERROR Heartbeat session expired ma
  • Spring Kafka MessageListenerContainer

    我看到 spring Kafka 代码 我有一些疑问 如果我们使用 1 个 kafkaListener 和 2 个主题 那么 spring Kafka 将创建一个 MessageListenerContainer 如果我为每个主题使用单独的
  • Grafana/prometheus 中没有 kafka 指标

    我成功部署了 Helm Chart普罗米修斯操作员 https github com coreos prometheus operator tree master helm prometheus operator kube 普罗米修斯 ht
  • Kafka 0.8.2 中是否可以向现有主题添加分区

    我有一个Kafka https kafka apache org 集群运行有 2 个分区 我一直在寻找一种将分区计数增加到 3 的方法 但是 我不想丢失有关该主题的现有消息 我尝试停下来Kafka https kafka apache or
  • Kafka Java 消费者从未收到任何消息

    我正在尝试设置一个基本的 Java 消费者来接收来自 Kafka 主题的消息 我已经跟踪了样本 https cwiki apache org confluence display KAFKA Consumer Group Example h
  • Apache Kafka 消费者组的偏移量如何过期?

    当我注意到一些奇怪的行为时 我正在对一个旧主题进行一些测试 阅读 Kafka 的日志时 我注意到这条 删除了 8 个过期的偏移量 消息 GroupCoordinator 1001 Stabilized group GROUP NAME ge
  • 事务性 Kafka 生产者

    我正在尝试让我的卡夫卡生产者具有事务性 我正在发送 10 条消息 如果发生任何错误 则不应向 kafka 发送任何消息 即不发送或全部消息 我正在使用 Spring Boot KafkaTemplate Configuration Enab

随机推荐

  • 翻转视图 iPhone

    请考虑下面的代码 并告诉我我做错了什么 我想在两个 UIView 之间切换 不知何故 当我从初始视图翻转时 我只是得到翻转的视图 没有动画 当我向后翻转时 动画显示得很好 翻转是由视图本身上的按钮触发的 IBAction showMoreI
  • [UIAlertView 显示] 如何工作?

    我想做一些类似于 UIAlertView 的事情 即 不引用任何 UIView 或 UIViewController 使用presentModalViewController 在所有窗口顶部呈现一个 UIViewController 查看文
  • vim,将 script.py 粘贴到 python 解释器会丢失格式

    我正在尝试使用 ConqueTerm 重新映射一个键 以将脚本中选定的文本拉入 放入 Python 解释器中 一切都很好 除了这个 def main print Testing 123 main 变成这样 gt gt gt def main
  • 当数据为0时,如何使chartJs堆叠条形始终四舍五入?

    我想出了这个小提琴 https jsfiddle net 2s09hqLu https jsfiddle net 2s09hqLu 它按照我想要的方式堆叠了圆形图表 但问题是当数据数组中的值为 0 时 它不会使其四舍五入 我总是希望它四舍五
  • python中的连续字母列表并获取它的每个值

    我遇到了几乎同样的问题 如何制作连续的字母列表Python 从a z然后从aa ab ac等 https stackoverflow com questions 29351492 how to make a continuous alpha
  • 放大后删除绘制的矩形缩放框

    我正在尝试编写一个透明的可拖动矩形缩放框 一旦鼠标再次抬起 它就会放大该区域并删除绘制的矩形 我已经可以进行缩放并绘制矩形 但是我不能 1 弄清楚如何使其透明 和 2 弄清楚如何在放大后删除矩形 一旦单击鼠标在放大的图像上绘制另一个缩放框
  • 错误:请求失败,状态代码为 405

    我正在尝试在 Laravel React js 组合中建立 axios 示例 我通过以下命令配置了我的项目 作曲家创建项目 prefer dist laravel laravel react laravel basic 8 php arti
  • 使用for循环遍历java中的列表

    如何使用索引迭代列表数据结构 例如 考虑一个列表形式的句子 每个元素都是一个单词 我可以使用索引逐步浏览每个单词吗 像这样的事情 sentence defined something like this List
  • Django、ModelChoiceField() 和初始值

    我正在使用这样的东西 field1 forms ModelChoiceField queryset 如何使我的表单显示所选的值 如果你想设置默认的初始值 你应该定义initial http docs djangoproject com en
  • 如何将字体精美的图标放入选择下拉菜单中?

    我一直在努力在 vuejs 的选择下拉菜单中设置一个很棒的字体图标 我尝试过一些方法 比如
  • 在javascript中使用map或reduce将二维数组转换为对象

    是否可以转换 35 Bill 20 Nancy 27 Joan to Bill 35 Nancy 20 Joan 27 使用 map or reduce 方法 我可以使用以下方法转换数组 const arr 35 Bill 20 Nancy
  • Corona SDK 的本地通知 (Android)

    有没有办法在使用 Corona SDK 时实现本地通知 这是 Android 特定的 因为我们已经找到了 iOS 方向 Cheers 目前 Corona 上的本地通知仅适用于 iOS Android 版本尚未完成 完成后 我将按照 Andr
  • 将 Numpy 矩阵显示为视频

    我有一个 numpy 矩阵 其中每一行都是一张图片 我可以重塑行并使用 matplotlib pyplot 显示图像 问题是 我不想单独显示图像 我想像视频一样依次显示它们 这在Python中怎么可能呢 好吧 我不知道这是否是最好的方法 但
  • 包含(大多数)所有元素的 HTML 页面,用于样式设置

    是否有人拥有或知道包含所有元素 带有口语文本或其他内容 的 HTML 页面 我可以做一个 但我想一定有人已经这样做了 当开始一个项目时 我喜欢为链接 列表 表格等设置一些基本样式 包含所有元素的 HTML 页面将帮助我加快此过程 我很乐意创
  • 如何在 iOS 6 中启动具有特定地址的 iOS 地图应用程序?

    我有一个应用程序 允许用户启动地图应用程序 Google 或 Apple 来查看地址 我曾经这样做过 Address address self person addresses objectAtIndex 0 NSString addres
  • 访问页面时自动点击页面上的锚链接

    我之前问过这个问题 但一些专家告诉我补充一下 这可以重新加载给定的链接 但我想知道如何在 id 的帮助下单击元素 锚点 是否有任何代码在执行时会单击 id dp99 并且我希望在访问页面时执行此 javascript 这是 HTML a h
  • Cordova、iOS 和 iframe 不会加载内容,除非我允许访问 href="*"

    我有一个网络应用程序 它有一个嵌入式地图字段 它是使用 iframe 实现的https maps google com https maps google com 我正在将我们的应用程序 当前作为主屏幕图标运行 移植到 iOS 上的 Cor
  • 在 C++ 中将数据从一个线程发送到另一个线程的最快方法是什么?

    我尝试过构建一个简单的生产者 消费者程序的实验 它们在单独的线程中运行 生产者生成一些数据 消费者在另一个线程中获取它 我实现的消息传递延迟约为 100 纳秒 谁能告诉我这是否合理或者是否有更快的实现 我没有使用锁 只是简单的内存计数器 我
  • OpenGL 中的动画 [关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 我想请您分享一些关于如何在 OpenGL 应用程序中实现动画支持的想法 例如 如果我们希望在用户输入触发某些事件的情况下 在屏幕上为球
  • 增加 Kafka 消费者在单次轮询中读取的消息数量

    Kafka消费者有一个配置max poll records它控制单次调用 poll 及其返回的最大记录数默认值为 500 https kafka apache org documentation max poll records 我已将其设