flink 连接被对等方重置

2024-06-23

我有一个 Flink Streaming 作业,它失败了,我得到如下日志。有人能告诉我如何解决这个问题吗? 有时运行一天后失败,有时运行几个小时后失败。

09:30:25 948  INFO (org.apache.flink.runtime.executiongraph.ExecutionGraph:1240) - TriggerWindow(TumblingProcessingTimeWindows(600000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@ece0f926}, ProcessingTimeTrigger(), WindowedStream.process(WindowedStream.scala:563)) -> Filter -> Filter -> Map (40/48) (19ea993ced2b161422c345c9b633853a) switched from RUNNING to FAILED.
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: Lost connection to task manager . This indicates that the remote task manager was lost.
    at org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler.exceptionCaught(PartitionRequestClientHandler.java:146)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
    at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
    at org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
    at org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerAdapter.exceptionCaught(ChannelHandlerAdapter.java:79)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:275)
    at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:253)
    at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireExceptionCaught(DefaultChannelPipeline.java:835)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.handleReadException(AbstractNioByteChannel.java:87)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:162)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    at org.apache.flink.shaded.netty4.io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:311)
    at org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
    at org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:241)
    at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
    ... 6 more

我最终在作业管理器日志中找到了根本原因:

- Closing TaskExecutor connection container_e06_1554425226316_0158_01_000024 because: Container [pid=14446,containerID=container_e06_1554425226316_0158_01_000024] is running beyond physical memory limits. Current usage: 12.5 GB of 12.5 GB physical memory used; 14.7 GB of 26.2 GB virtual memory used. Killing container.

所以我增加了TM记忆

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

flink 连接被对等方重置 的相关文章

  • 是否可以将 Riak CS 与 Apache Flink 一起使用?

    我要配置filesystem状态后端和zookeeper恢复模式 state backend filesystem state backend fs checkpointdir recovery mode zookeeper recover
  • Flink 上运行 Beam pipeline 时出现与内存段相关的 EOFException

    我正在尝试在我们的测试集群上的 Flink 上运行 Apache Beam 管道 它一直失败EOFException at org apache flink runtime io disk SimpleCollectingOutputVie
  • logback 在 Flink 中不起作用

    我有一个单节点 Flink 实例 它在 lib 文件夹中具有 logback 所需的 jar logback classic jar logback core jar log4j over slf4j jar 我已从 lib 文件夹中删除了
  • 根据 Flink 的模式使用 GCS 文件

    由于 Flink 支持 Hadoop 文件系统抽象 并且有一个GCS连接器 https github com GoogleCloudPlatform bigdata interop 在 Google Cloud Storage 之上实现它的
  • 为什么 Flink 在 DataStream join + Global window 上发出重复记录?

    我正在学习 试验 Flink 并且观察到 DataStream 连接的一些意外行为 并且想了解发生了什么 假设我有两个流 每个流有 10 条记录 我想将其加入到id场地 假设一个流中的每条记录在另一个流中都有一个匹配的记录 并且 ID 在每
  • Apache Flink 1.3 中的 Elasticsearch 5 连接器

    通过阅读文档 我了解到使用 Apache Flink 1 3 我应该能够使用 Elasticsearch 5 x 但是 在我的 pom xml 中
  • Flink 中复杂拓扑(多输入)的集成测试

    我需要为 flink 流拓扑编写单元测试 这基本上是一个CoFlatMapFunction 并且它有 2 个输入 我尝试从这个页面中获得一些灵感 https ci apache org projects flink flink docs s
  • 如何在其他流的基础上过滤Apache flink流?

    我有两个流 一个是 Int 另一个是 json 在 json Schema 中 有一个键是一些 int 所以我需要通过与另一个整数流的键比较来过滤 json 流 那么在 Flink 中是否可能 是的 您可以使用 Flink 进行这种流处理
  • Flink 中的水印和触发器有什么区别?

    我读到 排序运算符必须缓冲它接收到的所有元素 然后 当它接收到水印时 它可以对时间戳低于水印的所有元素进行排序 并按排序顺序发出它们 这是正确 因为水印表明不能有更多元素到达并与已排序元素混合 https cwiki apache org
  • Apache Flink 检查点卡住

    我们正在运行一个 ListState 介于 300GB 到 400GB 之间的作业 并且有时该列表可能会增加到数千 在我们的用例中 每个项目都必须有自己的 TTL 因此我们使用 S3 上的 RocksDB 后端为此 ListState 的每
  • Flink CEP:对于不同类型的事件,使用哪种方法加入数据流?

    假设我有两种不同类型的数据流 一种提供天气数据 另一种提供车辆数据 我想使用 Flink 对数据进行复杂的事件处理 Flink 1 3 x 中哪种方法是正确的使用方法 我看到了不同的方法 如 Union Connect Window Joi
  • Flink 检查点到 Google Cloud Storage

    我正在尝试为 GCS 中的 flink 作业配置检查点 如果我在本地运行测试作业 没有 docker 和任何集群设置 一切正常 但如果我使用 docker compose 或集群设置运行它并在 flink 仪表板中使用作业部署 fat ja
  • Flink Logging 获取作业名称或作业 ID

    我正在尝试设置 logback xml 以便它将包含与日志记录关联的 JobName 或 JobId 我还没有找到一种方法来做到这一点 是否可以 最终我想要实现的是能够将日志发送到 ElasticSearch 并用消息标记 JobName
  • Apache Flink 上的 zipWithIndex

    我想为我的输入的每一行分配一个id 这应该是一个数字0 to N 1 where N是输入中的行数 粗略地说 我希望能够执行以下操作 val data sc textFile textFilePath numPartitions val r
  • 创建具有通用返回类型的 FlinkSQL UDF

    我想定义函数MAX BY接受类型值T和类型的订购参数Number并根据排序从窗口返回最大元素 类型为T 我试过了 public class MaxBy
  • 如何正确处理自定义MapFunction中的错误?

    我已经实施了MapFunction对于我的 Apache Flink 流程 它正在解析传入元素并将其转换为其他格式 但有时会出现错误 即传入数据无效 我看到两种可能的处理方法 忽略无效元素 但似乎我无法忽略错误 因为对于任何传入元素 我必须
  • Apache Flink 中的并行度

    我可以为 Flink 程序中任务的不同部分设置不同的并行度吗 例如 Flink 如何解释以下示例代码 两个自定义实践者MyPartitioner1 MyPartitioner2 将输入数据划分为两个4和2个分区 partitionedDat
  • Flink从hdfs读取数据

    我是 Flink 的新生 我想知道如何从 hdfs 读取数据 有人可以给我一些建议或一些简单的例子吗 谢谢你们 如果您的文件采用文本文件格式 则可以使用 ExecutionEnvironment 对象中的 readTextFile 方法 这
  • 我想使用 Flink 的 Streaming File Sink 写入 ORC 文件,但它无法正确写入文件

    我正在从 Kafka 读取数据并尝试将其以 ORC 格式写入 HDFS 文件系统 我使用了他们官方网站上的以下链接参考 但我可以看到Flink为所有数据写入完全相同的内容并生成这么多文件并且所有文件都可以103KB https ci apa
  • Flink Kafka Producer 中的 Exactly-once 语义

    我正在尝试使用 Kafka Source 和 Sink 测试 Flink 的一次性语义 运行 flink 应用程序 只需将消息从一个主题传输到另一个主题 并行度 1 检查点间隔 20 秒 使用 Python 脚本每 2 秒生成递增整数的消息

随机推荐

  • 放大发布导致 AccessDenied 错误

    我部署了一个简单的网络应用程序S3 via amplify publish 主办方有Cloudfront启用 我在设置托管时选择了放大中的 PROD 环境 并且我正在工作eu central 1地区 但每当我尝试访问Cloudfront网址
  • sed:替换文本块

    我有一堆文件 从一段代码开始 我试图用另一个代码块替换 Replace
  • 安装软件包时出现无法加载软件包 %s 错误

    我正在 Delphi 2007 上进行测试 我的小组项目由 2 个包组成 包运行 bpl 它被标记为 仅运行时 并包含一个名为 uMyTestRun pas 的单元 其中定义了一个空的 TFrame 后代 unit uMyTestRun i
  • NoSuchMethodError:org.slf4j

    我正在将storm与python一起使用 我使用此命令在本地运行拓扑 mvn compile exec java Dexec classpathScope compile Dexec mainClass my Topology 并得到这个错
  • 使用 Authlogic 仅使用用户名进行身份验证

    有一个基于 Java Struts 的母应用程序 它也处理身份验证 我的 Rails 应用程序正在集成到母应用程序中 它使用 authlogic 当然 要求是 一旦有人登录到母应用程序 他们应该能够自动访问我的 Rails 应用程序 而无需
  • Linux - 查找特定文件之后和之前的文件

    我试图在我的计算机上查找特定文件创建前 1 小时和创建后 1 小时创建的文件 这是我尝试过的方法 find root newermt 2012 10 04 1800 and newermt 2012 10 04 2000 exec ls l
  • ins SpriteKit,“选择”我的手指在移动时触摸到的所​​有精灵

    所以我尝试学习 SpriteKit 同时构建一个我认为简单的益智游戏 我有一个 5x5 网格的不同颜色的 SKSpriteNode 我想要的是能够触摸一个节点 然后水平或垂直移动我的手指并检测我的手指正在触摸的所有节点 就像我正在 选择 它
  • 让列在 CSS 网格中换行

    使用时如何指定最大列数display grid 当内容对于空间来说太宽 或小于最小尺寸 时它会自动中断 有没有办法在没有媒体查询的情况下做到这一点 例如 当没有足够的内容空间时 我有以下内容不会中断为单列模式 grid display gr
  • Drupal - 在模板内渲染子视图/部分

    如何创建一个可以在多个模板页面中重复使用并且可以将变量传递到其中的 html 片段 有些像这样 但显然更复杂一些 ul li li ul Thanks Use hook theme http api drupal org api drupa
  • 仅从 URL 获取 URI 段

    我正在尝试使用正则表达式获取 URI 段 示例 URI http abc com hello hi bye humm ok hi ya yaya wow waaah 我在尝试 lt w r g 但它不能正常工作 查询字符串没有被排除 wow
  • Python:写入大文件时,保持文件打开还是打开文件并根据需要追加到文件中?

    我想知道如何最好地处理 python 中的大文件写入 我的Python代码多次循环运行外部程序 古老的Fortran 具有奇怪的输入文件格式 读取其输出 一行文件 进行一些非常简单的处理 然后写入编译后的输出文件 外部程序执行速度很快 远低
  • 如何将枚举绑定到 playframework 表单?

    我有一个以下形式的枚举 object MatchFilterType extends Enumeration type MatchFilterType Value val gt Value gt val lt Value lt val eq
  • TensorFlowdynamic_rnn 回归器:ValueError 维度不匹配

    我想构建一个用于回归的玩具 LSTM 模型 This http mourafiq com 2016 05 15 predicting sequences using rnn in tensorflow html不错的教程对于初学者来说已经太
  • 如何使用java sdk创建AWS Elastic Beanstalk环境?

    任何人都可以帮助我或提供任何资源来使用 java 程序创建 Aws Elastic beanstalk 环境并在其中部署我们的应用程序吗 先感谢您 您可以下载 AWS Java SDKhere http aws amazon com sdk
  • 如何选择PyQt的图形工具包

    我正在为我的客户开发一个 PyQt 应用程序 问题是我的默认图形是 Gnome 而我的客户端是 KDE 所以有些差异是我无法控制的 如何强制 pyQt 选择良好的图形系统 Gnome 而不是默认系统 KDE Use QApplication
  • List.Enumerator IEnumerator.Reset() 方法实现

    尽管事实如此 IEnumerator Reset永远不应该使用方法 https stackoverflow com a 5968973 1163867我发现方法实现的奇怪行为List
  • MsgBox 和 MessageBox.Show 有区别吗?

    以下两者有区别吗 msgbox messagebox show 有些教程使用 msgbox 有些教程使用另一个 messagebox show 我看到两者都可以有可编辑的样式 但我想知道 为什么有两个 是为了适应老程序员 他们学习过旧版本的
  • 对多维数组的键进行递归排序

    我很难尝试对多维数组的键进行递归排序 我尝试过usort 但没有成功 样本数据 first level gt dir 3 gt subdir 1 gt file 2 mp4 gt object name gt file 2 mp4 file
  • Git 将合并恢复到特定父级

    我有一个 git 存储库 但在恢复合并时遇到问题 当前哈希为 0ce2ca0b35f59af267241cf4d40d16a3e13ba6f3 它有两个父母 df1acf5f54426d30f12c6b4558c3dd922297aae3
  • flink 连接被对等方重置

    我有一个 Flink Streaming 作业 它失败了 我得到如下日志 有人能告诉我如何解决这个问题吗 有时运行一天后失败 有时运行几个小时后失败 09 30 25 948 INFO org apache flink runtime ex