Kafka 生产者超时异常

2024-02-23

我正在运行 Samza 流作业,将数据写入 Kafka 主题。 Kafka 正在运行一个 3 节点集群。 Samza 作业部署在纱线上。我们在容器日志中看到很多这样的异常:

 INFO [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.container.ContainerHeartbeatMonitor:[ContainerHeartbeatMonitor:stop:61] - [main] - Stopping ContainerHeartbeatMonitor
ERROR [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.runtime.LocalContainerRunner:[LocalContainerRunner:run:107] - [main] - Container stopped with Exception. Exiting process now.
org.apache.samza.SamzaException: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
        at org.apache.samza.task.AsyncRunLoop.run(AsyncRunLoop.java:147)
        at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:694)
        at org.apache.samza.runtime.LocalContainerRunner.run(LocalContainerRunner.java:104)
        at org.apache.samza.runtime.LocalContainerRunner.main(LocalContainerRunner.java:149)
Caused by: org.apache.samza.SamzaException: Unable to send message from TaskName-Partition 15 to system kafka.
        at org.apache.samza.system.kafka.KafkaSystemProducer$$anon$1.onCompletion(KafkaSystemProducer.scala:181)
        at org.apache.kafka.clients.producer.internals.RecordBatch.done(RecordBatch.java:109)
        at org.apache.kafka.clients.producer.internals.RecordBatch.maybeExpire(RecordBatch.java:160)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.abortExpiredBatches(RecordAccumulator.java:245)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:212)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:135)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 5 record(s) for Topic3-16 due to 30332 ms has passed since last attempt plus backoff time

这 3 种类型的异常出现很多。

59088 org.apache.kafka.common.errors.TimeoutException: Expiring 115 record(s) for Topic3-1 due to 30028 ms has passed since last attempt plus backoff time

61015 org.apache.kafka.common.errors.TimeoutException: Expiring 60 record(s) for Topic3-1 due to 74949 ms has passed since batch creation plus linger time

62275 org.apache.kafka.common.errors.TimeoutException: Expiring 176 record(s) for Topic3-4 due to 74917 ms has passed since last append

请帮助我了解这里的问题是什么。每当发生这种情况时,Samza 容器都会重新启动。


该错误表明某些记录放入队列的速度比从客户端发送的速度快。

当您的生产者发送消息时,它们会存储在缓冲区中(在将消息发送到目标代理之前),并且记录会分组在一起以提高吞吐量。当新记录添加到批次中时,必须在可配置的时间窗口内发送,该时间窗口由request.timeout.ms(默认设置为 30 秒)。如果该批次在队列中的时间较长,则会出现TimeoutException被抛出,然后批处理记录将从队列中删除,并且不会传递给代理。

增加价值request.timeout.ms应该可以帮到你。

如果这不起作用,您也可以尝试减少batch.size以便更频繁地发送批次(但这次将包含更少的消息)并确保linger.ms设置为 0(这是默认值)。

请注意,更改任何配置参数后,您需要重新启动 kafka 代理。

如果您仍然收到错误消息,我认为您的网络出现问题。您启用了 SSL 吗?

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Kafka 生产者超时异常 的相关文章

  • 由于 jaas.conf 不正确而导致 Kafka TopicAuthorizationException

    我指的是JAAS登录配置文件 https docs oracle com javase 7 docs technotes guides security jgss tutorials LoginConfigFile html 它讨论了两种指
  • 在SSL模式下使用apache kafka

    我正在尝试在 SSL 1 way 模式下设置 kafka 我已经阅读了官方文档并成功生成了证书 我将记下两种不同情况的行为 此设置只有一名经纪人和一名动物园管理员 案例 1 经纪人间通信 明文 我的相关条目server properties
  • 安装 confluence-kafka 时“文件名或扩展名太长”?

    我在使用 pip install confluence kafka 安装 confluence kafka 时遇到一些问题 但我收到此错误 文件名或扩展名太长 详细信息如下 Collecting confluent kafka Using
  • Apache Kafka 与 Apache Storm

    Apache Kafka 分布式消息系统Apache Storm 实时消息处理 我们如何在实时数据管道中使用这两种技术来处理事件数据 就实时数据管道而言 在我看来 两者的工作都是相同的 我们如何在数据管道上使用这两种技术 您可以使用 Apa
  • 如何复制或配置kafka connect插件文件?

    我已经从以下位置下载了插件文件https www confluence io connector kafka connect cdc microsoft sql https www confluent io connector kafka
  • 找不到 io.confluence:kafka-protobuf-serializer:6.0.0

    直接的问题是 为什么 Gradle 没有解决我添加的这个依赖关系 dependencies kafka protobuf serializer implementation io confluent kafka protobuf seria
  • 我可以限制kafka-node消费者的消费吗?

    这看起来像我的 kafka 节点消费者 var kafka require kafka node var consumer new Consumer client 在某些情况下 获取的消息数量超出了我的处理能力 有没有办法限制它 例如每秒接
  • Kafka:隔离级别的影响

    我有一个用例 我需要 Kafka 分区中的 100 可靠性 幂等性 无重复消息 以及顺序保留 我正在尝试使用事务 API 来建立概念验证来实现这一目标 有一个名为 isolation level 的设置 我很难理解 In this arti
  • Kafka JDBC Sink Connector 对于具有可选字段的模式的消息给出空指针异常

    Kafka JDBC Sink Connector 对于具有可选字段 parentId 的模式的消息给出空指针异常 我错过了什么吗 我正在使用开箱即用的 JSONConverter 和 JDBC Sink Connector 关于 Kafk
  • Kafka Streams - 跳跃窗口 - 去重键

    我正在 4 小时窗口上进行跳跃窗口聚合 每 5 分钟前进一次 由于跳跃窗口重叠 我得到了具有不同聚合值的重复键 TimeWindows of 240 60 1000L advanceBy 5 60 1000L 如何消除具有重复数据的重复键或
  • 为什么卡夫卡这么快[关闭]

    Closed 这个问题需要多问focused help closed questions 目前不接受答案 如果我有相同的硬件 请使用 Kafka 或我们当前的解决方案 ServiceMix Camel 有什么区别吗 Kafka 能处理比它
  • Apache Kafka Streams 将 KTable 物化到主题似乎很慢

    我正在使用 kafka 流 并试图将 KTable 具体化为一个主题 它有效 但似乎每 30 秒左右完成一次 Kafka Stream 如何 何时决定将 KTable 的当前状态具体化为主题 有没有什么办法可以缩短这个时间 让其更加 实时
  • Kafka Producer配置重试策略

    需要更改 Kafka Producer 配置的哪些参数 以便生产者应该 1 重试n次 2 n个间隔后 如果代理关闭 也会收到相同的消息 我需要处理与此相关的情况 https github com rsyslog rsyslog issues
  • Kafka Connect Confluence S3 Sink 连接器:找不到类 io.confluence.connect.avro.AvroConverter

    使用此 Kafka Connect 连接器 https www confluence io hub confluenceinc kafka connect s3 https www confluent io hub confluentinc
  • 将数据从 Kafka 存储传输到 Kafka 主题

    我想在卡夫卡做这样的事情 继续将数据存储在 KStream Ktable Kafka store 中 当我的应用程序收到特定事件 数据时 仅将上述存储中的特定数据集发送到主题 我们可以在卡夫卡中做到这一点吗 我认为单独使用 Kafka 消费
  • 从 Apache Kafka 中的主题删除消息

    所以我是 Apache Kafka 的新手 我正在尝试创建一个简单的应用程序 以便我可以更好地理解 API 我知道这个问题在这里被问了很多 但是如何清除存储在主题上的消息 记录 我看到的大多数答案都说要更改消息保留时间或删除并重新创建主题
  • 使用 offsets_for_times 从时间戳消费

    尝试使用 confluence kafka AvroConsumer 来消费给定时间戳的消息 if flag creating a list topic partitons to search list map lambda p Topic
  • 使用 Spring Boot 进行 Kafka 流

    我想在我的 Spring Boot 项目中使用 Kafka Streams 实时处理 所以我需要 Kafka Streams 配置或者我想使用 KStreams 或 KTable 但我在互联网上找不到示例 我做了生产者和消费者 现在我想实时
  • kafka消费者群体正在重新平衡

    我正在使用 Kafka 9 和新的 java 消费者 我正在循环内进行轮询 当代码尝试执行 Consumer commitSycn 时 由于组重新平衡 我收到 commitfailedexcption 请注意 我将 session time
  • 了解Kafka流groupBy和window

    我无法理解 kafka 流中的 groupBy groupById 和窗口的概念 我的目标是聚合一段时间内 例如 5 秒 的流数据 我的流数据看起来像 value 0 time 1533875665509 value 10 time 153

随机推荐

  • 如何设置shell脚本的进程组

    如何设置shell脚本的进程组 我还希望所有子进程都位于同一个进程组中 我期望类似的东西setpgid in C As 普斯科西克指出 https stackoverflow com a 45112755在大多数 shell 中 通过激活作
  • 使用 WebFlux 的 Spring Boot 在测试中总是抛出 403 状态

    非常感谢您查看我的问题 我有一些奇怪的主题 我的 Spring Boot 测试不起作用 它们启动成功 但在向任何控制器发出请求时总是抛出 403 HTTP 状态 我有一些具有下一个依赖项的项目 buildscript ext kotlin
  • 多索引数据框 pandas 中的操作

    我需要处理大数据 csv 中的地理和统计数据 它包含来自地理行政和地统计的数据 城市 区位 地统计基本区划和区块构成层次指标 我必须为地理索引中数据的最大值的每个元素创建一个新列 data2 并将每个块值除以该值 对于每个索引级别 索引级别
  • 如何在不使用库(Metrics)的情况下制作MAE和RAE的函数?

    我的目标是创建平均绝对误差 MAE 和相对绝对误差 RAE 的函数 而不使用任何类型的库 例如库 Metrics 我尝试在 MAE 和 RAE 的函数内输入公式 mae lt function a b mean abs a b rae lt
  • Android WebView HTTP Cookie 在 API 21 中不起作用

    我有一个使用 WebView 和 HTTP cookie 的 Android 应用程序 此应用程序适用于运行 API 19 或更低版本的 Android 设备 API 21 不会保存 http cookie 以供以后参考 Android W
  • ffmpeg 连接并保留元数据流

    我正在尝试连接 GoPro Hero6 分割电影的多个文件以避免 FAT 4GB 限制 ffmpeg 非常适合此目的 但我需要在元数据流中编码的遥测数据 而 ffmpeg 默认情况下似乎不保留此数据 使用ffprobe命令你可以看到源视频有
  • 如何在 MDriven 中设置日期和时间选择器?

    我试图在 MDriven 中捕获日期和时间 但数据类型 DateTime 的默认值仅显示日期选择器 在 Web 中 但时间存储在持久层中 我又如何捕捉时间 我在wiki mdriven net https wiki mdriven net
  • 在xml中定义没有class属性的bean

    我是 Spring 世界的新手 在一次采访中 有人问我们是否可以在 XML 中创建一个 bean 而不指定class 也就是说 bean 只会有一个id属性 我对此没有答案 请告知我们是否可以在 Spring 中以 XML 形式创建一个 b
  • Android NDK - 在配置更改时强制库重建

    在 Eclipse 中更改构建配置时 有没有办法强制 Android NDK 重建特定库 我正在使用 Android NDK 构建一个 Android 项目来构建 C 库 我正在使用带有 Sequoyah 插件的 Eclipse 一切都已设
  • 是否有现有的 gem 或脚本可以将数字转换为 comp-3/压缩十进制格式?

    继续我将 COBOL 转换为 Ruby 程序的冒险 我必须将十进制数字转换为 comp 3 压缩十进制格式 有人知道一个简单的 Ruby 脚本或 gem 可以做到这一点吗 伯恩斯 Ruby 知道如何打包半字节 因此结果非常简单 def pa
  • fparsec 解析字符串序列

    我有一个用户输入文本 例如 abc def ghi 我想解析它以获取字符串列表 abc def I tried let str Parser lt gt many1Chars noneOf let listParser Parser lt
  • 如何识别访客用户的时间比会话通常存在的时间长

    我知道 我可以使用 Session getId 但它会随着时间的推移而改变 也许我不明白这些会议 据我所知 它在 php 运行时启动 并在 php 代码完成时删除 另一方面 我读到会话 ID 存储在 cookie 中 当用户再次打开您的网站
  • 使用类实例作为 Typescript 映射中的键

    当获取和设置映射值时 映射必须以某种方式知道键是否等于另一个已设置的键 如何在 Typescript 中实现复杂数据类型 自定义类 的相等性 在Java中我会重写equals方法 打字稿中有等价的方法吗 就我而言 我有以下课程 export
  • cplex boolVarArray 给出双精度值

    我一直在尝试使用 CPLEX Java 实现 ILP 并且长期以来一直被一个问题困扰 以下是 ILP 的几个变量 IloIntVar above new IloIntVar numRect IloIntVar below new IloIn
  • 酿造安装 nvm。 nvm:找不到命令

    使用brew安装nvm并运行后nvm 它说nvm command not found 我怎样才能得到要执行的命令 使用brew 安装nvm 有两个步骤 首先使用brew安装应用程序 brew install nvm 然后查看brew 信息的
  • 我可以使用ASP.Net Core 3.0中的IEmailSender接口向多个接收者发送电子邮件吗

    我是 ASP Net core 的初学者 实际上我正在使用 ASP Net Core 3 0 我想向多个收件人发送电子邮件 我可以使用IEmailSender接口吗 或者有什么建议吗 我的 IEmailSender 实现是这样的 publi
  • 根据太阳位置(方位角和仰角)以及纬度和经度计算日期和时间

    与此相关非常有帮助question https stackoverflow com questions 8708048 position of the sun given time of day latitude and longitude
  • 应包含哪个 aSmack jar 文件(android-14、android-15...)以支持 SDK 版本 14-19?

    我陷入了两个不同的错误之间 无法实例化活动 ComponentInfo https stackoverflow com questions 16610296 android unable to instantiate activity cl
  • 为什么rvm需要登录shell?

    据我所知 rvm是一组bash脚本 为什么需要登录 shell 哪些仅存在于登录 shell 中的属性对于 rvm 是必需的 相关帖子 rvm 安装无法正常工作 RVM 不是一项功能 https stackoverflow com ques
  • Kafka 生产者超时异常

    我正在运行 Samza 流作业 将数据写入 Kafka 主题 Kafka 正在运行一个 3 节点集群 Samza 作业部署在纱线上 我们在容器日志中看到很多这样的异常 INFO 2018 10 16 11 14 19 410 U 2 151