【Flink】Flink 消费kafka报错 AMRMClientAsyncImpl Interrupted while waiting for queue InterruptedException

2023-11-15

在这里插入图片描述

1.背景

一个flink etl程序,读取一个kafka集群的数据,到两外一个集群,然后报错

2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.flink.runtime.executiongraph.ExecutionGraph] INFO:Source: com.dbapp.ailpha.topic.securityalarm.copy -> filter_2 -> Sink: com.dbapp.ailpha.topic.securityalarm.copy (1/1) (e6a0562faebd649e008ca6b5f0e29804) switched from CANCELING to CANCELED.
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.flink.runtime.executiongraph.ExecutionGraph] INFO:Source: com.dbapp.ailpha.topic.securityevent.copy -> filter_4 -> Sink: com.dbapp.ailpha.topic.securityevent.copy (1/1) (aec115a27429dc50b9539b8ccbac3626) switched from CANCELING to CANCELED.
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.flink.runtime.executiongraph.ExecutionGraph] INFO:Try to restart or fail the job execute w11 (66eaf554a91fea36beb582a0392be44b) if no longer possible.
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.flink.runtime.executiongraph.ExecutionGraph] INFO:Job execute w11 (66eaf554a91fea36beb582a0392be44b) switched from state FAILING to FAILED.
java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
	at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
	at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
	... 13 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.


后面还有一个错误

2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-2][ Class:org.apache.flink.runtime.dispatcher.MiniDispatcher] INFO:Stopping all currently running jobs of dispatcher akka.tcp://flink@1.datanode2:37818/user/dispatcher.
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.hadoop.yarn.client.api.impl.NMClientImpl] INFO:Clean up running containers on stop.
2020-06-06 15:56:00 PM [Thread: AMRM Callback Handler Thread][ Class:org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl] INFO:Interrupted while waiting for queue
java.lang.InterruptedException
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
	at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
	at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.hadoop.yarn.client.api.impl.NMClientImpl] INFO:Stopping container_1587901917276_0063_01_000002

因为这个是info日志,网上说是没有问题的,请忽略,暂时没找到原因,先记录一下。

参考:https://github.com/DTStack/flinkx/issues/142

2. 第一个错误

根据您提供的日志,可以看到 Flink ETL 程序在从一个 Kafka 集群读取数据并写入另一个集群时发生了错误。错误消息显示了以下几点:

作业中的两个任务(Source、Filter、Sink)都从 CANCELING 状态转换为 CANCELED 状态,表示任务被取消执行。

作业尝试重新启动或失败(Try to restart or fail the job),但可能不再可能进行重新启动。

作业最终从 FAILING 状态转换为 FAILED 状态,表示作业执行失败。

错误堆栈跟踪中显示了 Could not forward element to next operator 的异常,指示在元素传递到下一个操作符时发生了问题。

最后,显示了 org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms 的异常,表示更新元数据超时。

根据这些信息,可能有以下原因导致了错误:

Kafka 集群问题:由于元数据更新超时,可能是源 Kafka 集群无法正确响应 Flink ETL 程序的请求。这可能是由于网络问题、Kafka 集群负载过高或其他原因导致的。

Flink 配置问题:可能是 Flink 的相关配置有问题,导致无法正确连接和操作 Kafka 集群。您可以检查 Flink 和 Kafka 的连接配置,确保其准确性和一致性。

数据处理问题:作业中的操作符可能无法正确处理从源 Kafka 主题读取的数据,并将其传递给下一个操作符。这可能是由于数据格式不匹配、数据处理逻辑错误或其他问题导致的。

为了进一步诊断和解决问题,您可以考虑以下步骤:

检查 Kafka 集群的健康状态,确保 Kafka 集群正常运行,并且能够响应 Flink ETL 程序的请求。

检查 Flink 和 Kafka 的连接配置,包括 Kafka 主题、ZooKeeper 地址、消费者组等信息,确保其准确性和一致性。

检查 Flink ETL 程序的数据处理逻辑,确保它能够正确地处理从源 Kafka 主题读取的数据,并将其传递给下一个操作符。

增加 Kafka 客户端的超时设置,如果连接和操作 Kafka 的超时值太短,可以尝试增加超时时间以适应网络延迟或 Kafka 集群的响应时间。

查看 Flink 和 Kafka 的日志文件,以获取更详细的错误和警告信息,有助于进一步定位问题所在。

根据具体情况,您可能需要进行更详细的排查和调试,可能需要查看更多的日志、和监控。

2. 第2个错误

根据提供的日志,可以看到以下信息:

MiniDispatcher 正在停止当前正在运行的所有作业,停止正在执行的任务。
NMClientImpl 正在清理正在运行的容器。
AMRMClientAsyncImpl 的回调处理线程被中断,等待队列时发生了中断异常。
Container container_1587901917276_0063_01_000002 被停止。
根据这些信息,可以得出以下结论:

Flink 的 MiniDispatcher 正在停止所有当前正在运行的作业。这可能是由于某种原因触发了作业的停止或终止操作。

NMClientImpl 是与 YARN(Apache Hadoop 的资源管理器)相关的组件,它负责与容器进行通信和管理。在停止过程中,正在进行容器的清理工作。

AMRMClientAsyncImpl 是 YARN 中用于与资源管理器进行异步通信的组件。回调处理线程等待队列时发生了中断异常,这可能是由于程序终止或其他中断原因导致的。

Container container_1587901917276_0063_01_000002 被停止。这可能是与 Flink 作业相关的容器,它正在被停止执行。

根据提供的日志,无法确定问题的具体原因。要进一步诊断和解决问题,可以尝试以下步骤:

检查 Flink 作业的配置和代码,确保没有异常或错误逻辑导致作业停止。

检查 YARN 的配置和状态,确保 YARN 正常运行,并且与 Flink 集成正常。

检查作业运行时的日志,查看是否有其他异常或错误信息。

确保系统资源(内存、CPU 等)足够支持作业的执行。

如果问题仍然存在,可能需要进一步调查和分析作业的配置、Flink 和 YARN 的日志以及系统状态,以便确定问题的根本原因。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

【Flink】Flink 消费kafka报错 AMRMClientAsyncImpl Interrupted while waiting for queue InterruptedException 的相关文章

  • 为什么 JavaFX API 不包含在 Java 8 J2SE 中? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 有谁知道为什么 JavaFX 8 仍然不是即将推出的 Java 8 中的日常 J2SE API 显示所有 Java 组件的技术图清楚地将 Jav
  • Java - 红、绿、蓝获取RGB

    通过致电getRGB int x int y with a BufferedImage对象 得到一个负数 如何将三个不同的值 红色 绿色和蓝色 转换为这个单个负数 使用颜色类 new Color r g b getRGB
  • 最终字段可能尚未/已经初始化[重复]

    这个问题在这里已经有答案了 可能的重复 如何处理抛出检查异常的静态最终字段初始值设定项 https stackoverflow com questions 1866770 how to handle a static final field
  • APNS(Apple 推送通知服务器)的反馈服务

    我们正在使用Java作为推送通知提供商APNS I我能够将消息发送到APNS但我不知道如何获得该消息的反馈 请帮忙 反馈服务具有类似于用于发送推送通知的接口的二进制接口 您可以通过以下方式访问生产反馈服务feedback push appl
  • Spring @Validated 在服务层

    Hej 我想使用 Validated group Foo class 在执行方法之前验证参数的注释 如下所示 public void doFoo Foo Validated groups Foo class foo 当我将此方法放入 Spr
  • 为什么这个动作不抽象? [关闭]

    Closed 这个问题是无法重现或由拼写错误引起 help closed questions 目前不接受答案 我很难理解为什么一个类中的一个操作是抽象的 而另一个类中的操作不是 源代码1 编译时出错 https gyazo com cd3c
  • 如何添加 Java 正则表达式实现中缺少的功能?

    我是 Java 新手 作为一名 Net 开发人员 我非常习惯Regex Net 中的类 Java 实现Regex 正则表达式 还不错 但它缺少一些关键功能 我想为 Java 创建自己的帮助器类 但我想也许已经有一个可用的了 那么 是否有任何
  • javax.persistence.TransactionRequiredException:没有可用于当前线程的实际事务的 EntityManager

    我使用 Hibernate 创建了我的第一个 Spring MVC 项目 我的 DAO 层使用 JPA EntityManager 与数据库交互 GenericDao java Repository public abstract clas
  • 为什么Java中的文件名与公共类名相同? [关闭]

    Closed 这个问题是基于意见的 help closed questions 目前不接受答案 在Java中 文件名应该与文件名相同public class包含在该文件中 为什么这是一个限制 它有什么目的 Java 有一个有趣的方法 如果给
  • 如何修复maven错误JAVA_HOME环境变量未正确定义

    当我在虚拟环境中检查maven的版本时 出现以下错误 The JAVA HOME environment variable is not defined correctly This environment variable is need
  • 使用java在网页中进行字符编码

    如何使用java找出网页中的字符编码类型 打开与 URL 的连接 使用URL openConnection http download oracle com javase 6 docs api java net URL html openC
  • java数学中的组合“N选择R”?

    java库中是否有内置方法可以为任何N R计算 N选择R 公式 实际上很容易计算N choose K甚至不需要计算阶乘 我们知道 公式为 N choose K is N N K K 因此 公式为 N choose K 1 is N N N
  • java中的第三个布尔状态是什么?

    虽然我知道根据定义 布尔值仅包含两种状态 真或假 我想知道布尔值在用这些状态之一初始化之前有什么值 它默认为 false http java sun com docs books tutorial java nutsandbolts dat
  • 应用程序中空指针异常[重复]

    这个问题在这里已经有答案了 我正在尝试在我的应用程序中实施应用程序内计费 我写了这段代码 public class Settings extends PreferenceFragment ServiceConnection mService
  • Java给定长度的随机数

    我需要在 Java 中生成一个恰好 6 位数字的随机数 我知道我可以在随机发生器上循环 6 次 但是在标准 Java SE 中还有其他方法可以做到这一点吗 要生成 6 位数字 Use Random http download oracle
  • 内部类的访问修饰符[重复]

    这个问题在这里已经有答案了 可能的重复 受保护 公共内部类 https stackoverflow com questions 595179 protected public inner classes 我确信这个问题已经被问过 但我找不到
  • Javac 版本 1.7 无法为目标 1.7 构建

    我试图在 Linux Mint 系统上使用 Sun Java JDK 1 7 0 17 编译 Java 代码 但遇到了这个问题 javac version target 1 7 javac 1 7 0 17 javac invalid ta
  • 一个类中有多个具有相同参数类型的方法

    我知道 至少已经有了关于这个主题的一个问题 https stackoverflow com questions 5561436 can two java methods have same name with different retur
  • 如何让JComboBox中的内容居中显示?

    目前我有这个JComboBox 我怎样才能将其中的内容居中 String strs new String 15158133110 15158133124 15158133458 JComboBox com new JComboBox str
  • Swing:创建可拖动组件...?

    我在网上搜索了可拖动 Swing 组件的示例 但我发现示例不完整或不起作用 我需要的是一个摇摆组件那可以是dragged通过鼠标 在另一个组件内 被拖拽的时候 应该已经 改变它的位置 而不仅仅是 跳 到目的地 我很欣赏无需非标准 API 即

随机推荐

  • 系统分析与设计——UML图总结

    一 前言 期末考试之前 我复习系统分析与设计的时候对UML图做了一些知识汇总 现在放到博客上 如果有不对或不恰当的地方 欢迎各位指正 本文仅仅起记录作用 可随意转载 荣幸之至 截图来自网络或是老师的PPT 二 概述 系统模型的三个主要部分
  • flutter 键盘挡住输入框问题

    bool isShowKeyboard false double keyboardSize 260 软键盘高度 类添加with WidgetsBindingObserver 生命周期监听器 class SendRedEveDialogSta
  • 浏览器上实现CNN可视化——清楚看到每一层卷积后的图

    目录 本文作用 CNN神经网络可视化工具1 解释器学习笔记 CNN神经网络可视化工具2 本文作用 学习卷积神经网络时 我们只知道输入一张图片后 通过一顿操作 便可以提取图片中的特征 我们对于其内部的操作 只有理论了解 并没有做到眼见为实 这
  • 使用cloudflare tunnel免费内网穿透,实现网站的外网访问和远程桌面

    前言 Cloudflare Tunnel是Cloudflare Zero Trust中的一个产品 它能够帮助用户将位于内网中的服务暴露到公网上 从而使得外部用户可以通过互联网访问这些服务 相比较于frp ngrok等内网穿透工具 使用Clo
  • 人工智能数学基础--概率与统计9:概率运算、加法公理、事件的独立性、概率乘法定理、条件概率、全概率公式以及贝叶斯公式

    一 概述 这大半年都很忙 学习时间太少 导致概率论的学习停滞不前 期间AI大佬herosunly推荐了陈希孺老先生的概率论教材 与最开始学习的美版M R 斯皮格尔等著作的 概率与统计 表示差异比较大 具体请见 人工智能数学基础 概率与统计7
  • ESP32-IDF环境搭建以及使用

    1默认已经安装了esp32 idf和vscode配置 离线版的esp32idf安装 windows eap32安装这里参考博客ESP32c3开发环境搭建 IDF V4 4离线版安装使用 esp idf v4 4 2 可能会遇到的问题 问题篇
  • 修改elementUI样式未生效问题(挂载到了body标签上)

    修改挂载到body标签上elementUI样式问题 目录 修改挂载到body标签上elementUI样式问题 前言 一 适用范围 二 示例 1 目标 2 实现思路 修改自带样式方法 最后看效果 总结 前言 在使用element ui库的时候
  • Aspose.Slides for Java Crack

    Aspose Slides for Java Crack Added support for changing the color of leader lines in pie charts Added new AfterAnimation
  • 2012年终总结 - I T征途

    2012年终总结 I T征途 在2012年年初的时候 自己曾写了一个规划 2012 这一年我该做些啥 里面简单的介绍了一下2012年 我应该做的事儿 如今到了为2012结账的时候 我想借助那篇文档来总结这一年我的所作所为 2012年 我该给
  • 用Sipp 对Asterisk 进行性能测试的工作笔记-1

    公司需要 对Asterisk 进行一定的性能测试 测试目标 1 IVR 支持多少路2 一对一通话 支持多少路3 不同编解码的性能影响 4 通话中 录音 支持多少路 测试工具 sipp http sipp sourceforge net 辅助
  • createrepo:创建本地源

    4月20日 createrepo 创建本地源 repodata作为软件的仓库 其目录下有四个必要文件 filelists xml gz other xml gz primary xml gz 和repomd xml md 意思是 metad
  • IDEA 中 .properties文件的中文显示乱码问题的解决办法

    今天使用IDEA 搭建Spring Boot 项目 配置application properties 配置文件 录入中文 在右下角出现如下截图提示语 重新打开application properties 文件出现汉字乱码 依据提示信息修改源
  • “你爱我,我爱你,蜜雪冰城甜蜜蜜“秋天的第一杯奶茶!Python安排!!

    立秋了 大家秋天的第一杯奶茶都安排上了么 前一段时间我相信很多人都被 你爱我 我爱你 蜜雪冰城甜蜜蜜 这首歌洗脑了 所以今天就爬取了某度地图上蜜雪冰城门店分布 看看全国有多少家蜜雪冰城 能不能满足大家的需求啦 哈哈哈 数据采集 首先 我们打
  • Linux部署宝塔

    1 linux服务器安装宝塔 宝塔地址 https www bt cn new download html 点击上方地址 进入下方页面 点击安装版本 复制第一个命令 得确认你服务器是centos 远程连接服务器 复制此命令运行 运行成功后
  • [CISCN2019 华东南赛区]Web11 SSTI

    这道SSTI 差点给我渗透的感觉了 全是API 我还想去访问API看看 发现这里读取了我们的ip 我们抓包看看是如何做到的 没有东西 我们看看还有什么提示 欸 那我们可不可以直接修改参数呢 我们传递看看 发现成功了 是受控的 这里我就开始没
  • mysql某批量更新导致死锁

    查询当前数据库全部线程 show full processlist 查询当前运行的全部事务 select from information schema innodb trx 查询锁情况 select from information sc
  • 碰撞改变材质颜色_bp

    感谢来自程序员的暴击 学习资料来于 https www bilibili com video BV125411h7c4 p 22 最大的收获是 材质编辑器上 1维向量到4维向量的生成 会者不难 难者不会 方法很简单 鼠标左键 数字1就会生成
  • 2023年电赛E题完整设计暨电赛全记录

    目录 一 2023年E题完整设计 lt 1 gt 选择方案 任务一 实现按键按下复位 基础部分 任务二 实现激光点绕边框一周 基础部分 任务三 实现激光点绕A4纸边缘一周 基础部分 任务四 实现绿色激光追踪红色激光 发挥部分 lt 2 gt
  • 【信号与系统】傅里叶变换

    傅里叶变换 文章目录 傅里叶变换 傅里叶级数 基本公式 常用公式 基本性质 其他公式 卷积公式 周期信号的傅里叶变换 抽样信号的傅里叶变换 提供延时的理想滤波器 无失真传输 傅里叶级数 https blog csdn net lafea a
  • 【Flink】Flink 消费kafka报错 AMRMClientAsyncImpl Interrupted while waiting for queue InterruptedException

    1 背景 一个flink etl程序 读取一个kafka集群的数据 到两外一个集群 然后报错 2020 06 06 15 56 00 PM Thread flink akka actor default dispatcher 20 Clas