Kafka新生产者超时

2024-03-25

我正在使用新的 kafka 生产者客户端并将 timeout.ms 属性设置为 50 毫秒。

这是生产者中使用的完整配置:

props.put("acks", "1");
props.put("buffer.memory", "33554432");
props.put("retries", "1");
props.put("batch.size", "16384");
props.put("client.id", "foo");
props.put("linger.ms", "0");
props.put("timeout.ms", "50");

在某些高负载时刻,请求平均响应时间为 4 秒,但我没有收到任何超时错误。

有人知道这个超时是如何计算的,什么时候开始计算,什么时候结束?有没有办法配置从调用生产者的发送方法那一刻开始的超时?


The new timeout.ms财产与ack生产者的配置。例如考虑以下情况

ack = all
timeout.ms = 3000

在这种情况下ack = all意味着领导者在收到完整同步副本集(ISR)的确认之前不会做出响应,并且获得此确认的最大等待时间为3000 ms。如果在给定时间内没有收到预期数量的确认,它将返回错误。

另请注意,此属性不考虑网络延迟。

从文档页面:

该配置控制服务器等待追随者确认的最长时间,以满足生产者使用 acks 配置指定的确认要求。如果超时后未满足请求的确认数量,则会返回错误。该超时是在服务器端测量的,不包括请求的网络延迟。

所以在你的情况下ack=1(我对此不是 100% 确定,如果适用的话,愿意接受任何更正)如果领导者无法在 50 毫秒内响应(将记录写入自己的日志而不等待所有追随者的完全确认),则应抛出错误。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka新生产者超时 的相关文章

  • 增加 Java 中主题的分区数量

    我正在使用名称 卡夫卡 2 12版本 2 3 0 根据我想更改的流量 负载最大分区某个主题的编号 Kafka启动后是否可以进行这种更改 并且可以通过代码完成吗 是的 您可以通过代码增加分区 使用AdminClient createParti
  • RocksDb sst 文件的 GUI 查看器

    我正在与 Kafka 合作 将数据保存到rocksdb 中 现在我想看看 Kafka 创建的数据库键和值 我下载了 FastNoSQL 并尝试但失败了 该文件夹包含 sst 文件 日志文件 当前文件 身份文件 锁定文件 日志文件 清单文件
  • 当我按键对数据进行分区,然后向 Kafka 中的主题添加新分区时,会发生什么?

    当我按键对数据进行分区 然后向 Kafka 中的主题添加新分区时 会发生什么 现有记录是否会发生变化 未来的数据将如何分区 当新分区添加到特定主题时 现有数据的分区不会改变 Kafka 不会尝试重新分发现有记录 此修改只会对新记录产生影响
  • Kafka-python 检索主题列表

    我在用着卡夫卡蟒蛇 http kafka python readthedocs org en 1 0 2 我想知道是否有办法显示所有主题 像这样的事情 bin kafka topics sh list zookeeper localhost
  • Kafka结构化流KafkaSourceProvider无法实例化

    我正在开发一个流项目 其中有一个 ping 统计数据的 kafka 流 如下所示 64 bytes from vas fractalanalytics com 192 168 30 26 icmp seq 1 ttl 62 time 0 9
  • Spring Cloud Stream动态通道

    我正在使用 Spring Cloud Stream 想要以编程方式创建和绑定通道 我的用例是 在应用程序启动期间 我收到要订阅的 Kafka 主题的动态列表 如何为每个主题创建一个频道 我最近遇到了类似的场景 下面是我动态创建 Subscr
  • Kafka Streams 在 HDFS 上查找数据

    我正在使用 Kafka Streams v0 10 0 1 编写一个应用程序 并希望通过查找数据来丰富我正在处理的记录 该数据 带时间戳的文件 每天 或每天 2 3 次 写入 HDFS 目录 我怎样才能将其加载到Kafka Streams应
  • 尝试升级到 flink 1.3.1 时出现异常

    我尝试将集群中的 flink 版本升级到 1 3 1 以及 1 3 2 但我的任务管理器中出现以下异常 2018 02 28 12 57 27 120 ERROR org apache flink streaming runtime tas
  • WARN 获取相关 ID 为 1 的元数据时出错:{MY_TOPIC?=INVALID_TOPIC_EXCEPTION} (org.apache.kafka.clients.NetworkClient)

    当我使用 kafka 运行以下命令时0 9 0 1 我收到这些警告 1 你能告诉我我的主题有什么问题吗 我正在与在 ec2 中运行的 kafka 经纪人交谈 kafka console consumer sh new consumer bo
  • Kafka 连接教程停止工作

    我在此链接中执行了步骤 7 使用 Kafka Connect 导入 导出数据 http kafka apache org documentation html quickstart http kafka apache org documen
  • KeeperErrorCode = /admin/preferred_replica_election 的 NoNode

    当我启动kafka时 zookeeper发生错误 INFO Got user level KeeperException when processing sessionid 0x156028651c00001 type delete cxi
  • 动态创建消费者spring kafka

    我正在创建一个与另一个服务通信的服务 以便识别要收听的 kafka 主题 kafka主题可能有不同的键和值类型 因此 我想为每个配置 主题 键类型 值类型 动态创建不同的 kafka 消费者 其中配置仅在运行时已知 然而在 spring k
  • 在 WSL2 中通过 IDE 连接到 kafka 服务器时出错

    我无法通过在 Windows 上运行的 intellij 或 vscode 连接到在 ubuntu 上运行的 kafka 服务器 我在 WSL2 上尝试的第一个服务器 我什至尝试使用虚拟机的IP 但没有成功 据我了解 我们应该能够根据此文档
  • 在SSL模式下使用apache kafka

    我正在尝试在 SSL 1 way 模式下设置 kafka 我已经阅读了官方文档并成功生成了证书 我将记下两种不同情况的行为 此设置只有一名经纪人和一名动物园管理员 案例 1 经纪人间通信 明文 我的相关条目server properties
  • 安装 confluence-kafka 时“文件名或扩展名太长”?

    我在使用 pip install confluence kafka 安装 confluence kafka 时遇到一些问题 但我收到此错误 文件名或扩展名太长 详细信息如下 Collecting confluent kafka Using
  • 无法初始化类 io.confluence.kafka.schemaregistry.client.rest.RestService

    我正在尝试使用 KafkaAvroSerialzer 设置一个卡夫卡生产者以获得价值 当 rit 尝试创建生产者时 我遇到了这个错误 我正在使用 confluence 5 2 1 中提供的所有罐子 java lang NoClassDefF
  • Kafka Java 消费者从未收到任何消息

    我正在尝试设置一个基本的 Java 消费者来接收来自 Kafka 主题的消息 我已经跟踪了样本 https cwiki apache org confluence display KAFKA Consumer Group Example h
  • Kafka Streams 内部数据管理

    在我的公司 我们广泛使用 Kafka 但出于容错的原因 我们一直使用关系数据库来存储多个中间转换和聚合的结果 现在我们正在探索 Kafka Streams 作为一种更自然的方式来做到这一点 通常 我们的需求非常简单 其中一个例子是 监听输入
  • 命名 kafka 主题的最佳实践是什么?

    我们是 kafka 的新手 我们有几个团队正在开发一些相互发布 订阅事件的应用程序 由于kafka主题名称将在团队之间共享 那么命名有什么最佳实践吗 基本上我们不希望看到 A 团队命名主题companyname appname events
  • 从副本消费

    Kafka 将主题的每个分区复制到指定的复制因子 据我所知 所有写入和读取请求都会路由到分区的领导者 有没有办法从追随者那里消费而不是从领导者那里消费 Kafka中的复制只是为了故障转移吗 在 Kafka 2 3 及更早版本中 您只能从领导

随机推荐

  • npm更新成功,但仍然显示旧版本

    当我做一个npm v我发现我仍然使用 5 6 0 并不断收到执行操作的提示npm i npm 但我已经做了一个sudo npm install npm latest g并用 a 进行双重检查npm outdated g depth 0看到我
  • MySQL命名约定,字段名应该包含表名吗?

    有朋友告诉我 我应该在同一个表的字段名中包含表名 我想知道为什么 而且应该是这样吗 例子 Table Users Fields user id username password last login time 我发现前缀 user 毫无意
  • “范围的坐标或尺寸无效”

    我正在开发一个与 REST API 链接并将数据放入 Google 表格的 Google Apps 脚本 我已经成功完成一次此操作 但是在访问一些不同的数据时 我收到错误消息 范围的坐标或尺寸无效 当它们在我的其他脚本上完美运行时 访问的所
  • 假设不变的 ASCII 编码,用 Rubyist 方法解码该编码字符串

    我的程序是二进制协议的解码器 该二进制协议中的字段之一是编码的String 中的每个角色String是可打印的 并且代表一个整数值 根据我正在解码的协议的规范 它表示的整数值取自下表 其中列出了所有可能的字符 Character Value
  • 依赖项的 Maven 项目变量

    我有一个加载小程序的 html 文件 html需要通过名称引用jar 并且由于maven根据artifactid 版本等对其进行命名 因此html需要随着项目的发展动态更新 似乎资源过滤是可行的方法 但我无法弄清楚要插入的变量应该是什么样子
  • jQuery 动画小数递增/递减

    我想一步一步地动画两个十进制数之间的差异 已经发现乔斯 克劳克罗夫特的解决方案 http www josscrowcroft com 2011 code jquery animate increment decrement numeric
  • 数据类中的属性

    描述 我正在尝试实现一个仅包含几个参数的简单数据类 dataclass class ReconstructionParameters img size int CR int denoise bool epochs int learning
  • 如何以可跨 Linux、Windows 和 MacOS 移植的方式收集 Python 3 中的当前架构?

    我正在尝试找到一种可移植的方式来收集当前的架构 例如x86 64 or AArch64 我将用它来填充一个标志 例如is x86 使用Python 3 它看起来像import platform platform machine 函数是正确的
  • 获取 CPU、RAM 和 GPU 信息 - UWP 应用

    是否可以在 UWP 应用程序中获取计算机的 CPU GPU 和 RAM 信息 E 在文本块中显示此信息 我想知道处理器型号 例如 Intel Core i7 xxxx 和总 RAM 我想知道处理器型号 例如 Intel Core i7 xx
  • 启发式参与者永无休止的定期恢复

    几天来我们的日志里一直充斥着这样的消息 2018 06 15 12 19 23 WARN com arjuna ats arjuna Periodic Recovery Transaction 0 ffff0a983f1e 1f3aa2ff
  • IE10 上的 WebSocket 出现 SecurityError

    我目前正在 IE10 在 Windows 8 上 下开发一个网站 使用 JavaScript 中的 WebSockets 它在 Firefox 18 和 Chrome 25 下运行良好 但在 IE10 上建立连接时出现 SecurityEr
  • 如何解决android中的OutOfMemoryError?

    我已经准备了可绘制动画的数量 当应用程序启动时 第一个动画将启动 我有两个按钮 下一个和上一个 具有相同的活动 当我单击下一个按钮时 我遇到了异常 例如 java lang OutOfMemoryError bitmap size exce
  • java中基于表单的身份验证的混乱

    谁能告诉我 我该如何处理j 安全检查java中基于表单的身份验证中的servlet 我是否必须映射 servlet 类j 安全检查web xml 文件中的名称 例如
  • 我如何在 Swift 中投射 @Binding

    很快我就可以用 Int doubleVariable 将 Int 转换为 Double 但是如何将 Binding 转换为 Binding 呢 然后我可以将 Binding var intVar Int 传递给需要 Double 绑定的函数
  • Python 正则表达式模块即使重叠 = True 也找不到所有匹配项

    我正在使用 PyPy正则表达式模块 https pypi org project regex 具有重叠匹配支持 我有以下代码 其中有一个字符串 A 我正在使用正则表达式查找在正则表达式中定义的 DNA 模式 我想找到与我的 RE 的所有匹配
  • RxJs 将流拆分为多个流

    如何根据分组方法将永无止境的流拆分为多个结束的流 a a a a a b b b b c c c c d d d e gt 到这些可观察到的 a a a a a b b b b c c c c d d d e gt 如您所见 a是在开始的时
  • 初始化 selenium webdriver 时如何修复 python-selenium 错误“连接被拒绝”?

    我正在非公共网页上运行非常复杂的 python selenium 测试 在大多数情况下 这些测试运行良好 但有时这些测试之一在 Webdriver 本身的初始化期间会失败 提示 当尝试初始化网络驱动程序时 即执行以下操作时 会发生此错误 S
  • 通过(sails js)水线将值推入mongodb数据库数组

    节点js 帆js 水线 插入后我需要将值更新 或推送 到以下架构中 我将 sailsjs 与 Waterline 和 mongodb 一起使用 countries states statename state districts distn
  • 在 ReleaseMutex 之前 CloseHandle 互斥锁 - 会发生什么?

    如果我在线程完成互斥体之前对互斥体调用 CloseHandle 因此尚未调用 ReleaseMutex 那么预期的行为是什么 CloseHandle 立即销毁传递给它的句柄 ReleaseMutex 然后会失败ERROR INVALID H
  • Kafka新生产者超时

    我正在使用新的 kafka 生产者客户端并将 timeout ms 属性设置为 50 毫秒 这是生产者中使用的完整配置 props put acks 1 props put buffer memory 33554432 props put