1.背景
一个flink etl程序,读取一个kafka集群的数据,到两外一个集群,然后报错
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.flink.runtime.executiongraph.ExecutionGraph] INFO:Source: com.dbapp.ailpha.topic.securityalarm.copy -> filter_2 -> Sink: com.dbapp.ailpha.topic.securityalarm.copy (1/1) (e6a0562faebd649e008ca6b5f0e29804) switched from CANCELING to CANCELED.
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.flink.runtime.executiongraph.ExecutionGraph] INFO:Source: com.dbapp.ailpha.topic.securityevent.copy -> filter_4 -> Sink: com.dbapp.ailpha.topic.securityevent.copy (1/1) (aec115a27429dc50b9539b8ccbac3626) switched from CANCELING to CANCELED.
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.flink.runtime.executiongraph.ExecutionGraph] INFO:Try to restart or fail the job execute w11 (66eaf554a91fea36beb582a0392be44b) if no longer possible.
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.flink.runtime.executiongraph.ExecutionGraph] INFO:Job execute w11 (66eaf554a91fea36beb582a0392be44b) switched from state FAILING to FAILED.
java.lang.Exception: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:203)
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:654)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
... 13 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
后面还有一个错误
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-2][ Class:org.apache.flink.runtime.dispatcher.MiniDispatcher] INFO:Stopping all currently running jobs of dispatcher akka.tcp://flink@1.datanode2:37818/user/dispatcher.
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.hadoop.yarn.client.api.impl.NMClientImpl] INFO:Clean up running containers on stop.
2020-06-06 15:56:00 PM [Thread: AMRM Callback Handler Thread][ Class:org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl] INFO:Interrupted while waiting for queue
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:442)
at org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl$CallbackHandlerThread.run(AMRMClientAsyncImpl.java:287)
2020-06-06 15:56:00 PM [Thread: flink-akka.actor.default-dispatcher-20][ Class:org.apache.hadoop.yarn.client.api.impl.NMClientImpl] INFO:Stopping container_1587901917276_0063_01_000002
因为这个是info日志,网上说是没有问题的,请忽略,暂时没找到原因,先记录一下。
参考:https://github.com/DTStack/flinkx/issues/142
2. 第一个错误
根据您提供的日志,可以看到 Flink ETL 程序在从一个 Kafka 集群读取数据并写入另一个集群时发生了错误。错误消息显示了以下几点:
作业中的两个任务(Source、Filter、Sink)都从 CANCELING 状态转换为 CANCELED 状态,表示任务被取消执行。
作业尝试重新启动或失败(Try to restart or fail the job),但可能不再可能进行重新启动。
作业最终从 FAILING 状态转换为 FAILED 状态,表示作业执行失败。
错误堆栈跟踪中显示了 Could not forward element to next operator 的异常,指示在元素传递到下一个操作符时发生了问题。
最后,显示了 org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms 的异常,表示更新元数据超时。
根据这些信息,可能有以下原因导致了错误:
Kafka 集群问题:由于元数据更新超时,可能是源 Kafka 集群无法正确响应 Flink ETL 程序的请求。这可能是由于网络问题、Kafka 集群负载过高或其他原因导致的。
Flink 配置问题:可能是 Flink 的相关配置有问题,导致无法正确连接和操作 Kafka 集群。您可以检查 Flink 和 Kafka 的连接配置,确保其准确性和一致性。
数据处理问题:作业中的操作符可能无法正确处理从源 Kafka 主题读取的数据,并将其传递给下一个操作符。这可能是由于数据格式不匹配、数据处理逻辑错误或其他问题导致的。
为了进一步诊断和解决问题,您可以考虑以下步骤:
检查 Kafka 集群的健康状态,确保 Kafka 集群正常运行,并且能够响应 Flink ETL 程序的请求。
检查 Flink 和 Kafka 的连接配置,包括 Kafka 主题、ZooKeeper 地址、消费者组等信息,确保其准确性和一致性。
检查 Flink ETL 程序的数据处理逻辑,确保它能够正确地处理从源 Kafka 主题读取的数据,并将其传递给下一个操作符。
增加 Kafka 客户端的超时设置,如果连接和操作 Kafka 的超时值太短,可以尝试增加超时时间以适应网络延迟或 Kafka 集群的响应时间。
查看 Flink 和 Kafka 的日志文件,以获取更详细的错误和警告信息,有助于进一步定位问题所在。
根据具体情况,您可能需要进行更详细的排查和调试,可能需要查看更多的日志、和监控。
2. 第2个错误
根据提供的日志,可以看到以下信息:
MiniDispatcher 正在停止当前正在运行的所有作业,停止正在执行的任务。
NMClientImpl 正在清理正在运行的容器。
AMRMClientAsyncImpl 的回调处理线程被中断,等待队列时发生了中断异常。
Container container_1587901917276_0063_01_000002 被停止。
根据这些信息,可以得出以下结论:
Flink 的 MiniDispatcher 正在停止所有当前正在运行的作业。这可能是由于某种原因触发了作业的停止或终止操作。
NMClientImpl 是与 YARN(Apache Hadoop 的资源管理器)相关的组件,它负责与容器进行通信和管理。在停止过程中,正在进行容器的清理工作。
AMRMClientAsyncImpl 是 YARN 中用于与资源管理器进行异步通信的组件。回调处理线程等待队列时发生了中断异常,这可能是由于程序终止或其他中断原因导致的。
Container container_1587901917276_0063_01_000002 被停止。这可能是与 Flink 作业相关的容器,它正在被停止执行。
根据提供的日志,无法确定问题的具体原因。要进一步诊断和解决问题,可以尝试以下步骤:
检查 Flink 作业的配置和代码,确保没有异常或错误逻辑导致作业停止。
检查 YARN 的配置和状态,确保 YARN 正常运行,并且与 Flink 集成正常。
检查作业运行时的日志,查看是否有其他异常或错误信息。
确保系统资源(内存、CPU 等)足够支持作业的执行。
如果问题仍然存在,可能需要进一步调查和分析作业的配置、Flink 和 YARN 的日志以及系统状态,以便确定问题的根本原因。