Spark超时可能是由于HDFS中文件超过100万个的binary Files()

2024-04-30

我正在通过以下方式读取数百万个 xml 文件

val xmls = sc.binaryFiles(xmlDir)

该操作在本地运行良好,但在纱线上失败并显示:

 client token: N/A
 diagnostics: Application application_1433491939773_0012 failed 2 times due to ApplicationMaster for attempt appattempt_1433491939773_0012_000002 timed out. Failing the application.
 ApplicationMaster host: N/A
 ApplicationMaster RPC port: -1
 queue: default
 start time: 1433750951883
 final status: FAILED
 tracking URL: http://controller01:8088/cluster/app/application_1433491939773_0012
 user: ariskk
Exception in thread "main" org.apache.spark.SparkException: Application finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:622)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:647)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

在 hadoop/用户日志中,我经常收到以下消息:

15/06/08 09:15:38 WARN util.AkkaUtils: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@2b4f336b,BlockManagerId(1, controller01.stratified, 58510))] in 2 attempts
java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.result(package.scala:107)
at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:427)

我通过spark-submit 运行我的spark 作业,它适用于仅包含37k 文件的其他HDFS 目录。有什么想法如何解决这个问题吗?


好吧,在 Sparks 邮件列表上获得一些帮助后,我发现有两个问题:

  1. src 目录,如果以 /my_dir/ 形式给出,则会导致 Spark 失败并产生心跳问题。相反,它应该以 hdfs:///my_dir/* 的形式给出

  2. 修复 #1 后,日志中出现内存不足错误。这是在纱线上运行的 Spark 驱动程序,由于文件数量而导致内存不足(显然它将所有文件信息保存在内存中)。所以我用 --conf spark.driver.memory=8g 提交了作业,解决了这个问题。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark超时可能是由于HDFS中文件超过100万个的binary Files() 的相关文章

  • hadoop中reducer的数量

    我正在学习hadoop 我发现减速器的数量非常令人困惑 1 reducer的数量与partition的数量相同 2 reducer 的数量是 0 95 或 1 75 乘以 节点数 每个节点的最大容器数 3 减速机数量设定为mapred re
  • 如何从spark管道逻辑模型中提取变量权重?

    我目前正在尝试学习 Spark Pipeline Spark 1 6 0 我将数据集 训练和测试 导入为 oas sql DataFrame 对象 执行以下代码后 生成的模型是oas ml tuning CrossValidatorMode
  • 如何通过sparkSession向worker提交多个jar?

    我使用的是火花2 2 0 下面是我在 Spark 上使用的 java 代码片段 SparkSession spark SparkSession builder appName MySQL Connection master spark ip
  • 如何从字符串列中提取数字?

    我的要求是从列中的评论列中检索订单号comment并且总是开始于R 订单号应作为新列添加到表中 输入数据 code id mode location status comment AS SD 101 Airways hyderabad D
  • HashPartitioner 是如何工作的?

    我阅读了文档HashPartitioner http spark apache org docs 1 3 1 api java index html org apache spark HashPartitioner html 不幸的是 除了
  • 如何读取一次流数据集并输出到多个接收器?

    我有 Spark 结构化流作业 它从 S3 读取数据 转换数据 然后将其存储到一个 S3 接收器和一个 Elasticsearch 接收器 目前 我正在做readStream一次然后writeStream format start 两次 这
  • Hive - 线程安全的自动递增序列号生成

    我遇到一种情况 需要将记录插入到特定的 Hive 表中 其中一列需要是自动递增的序列号 即在任何时间点都必须严格遵循 max value 1 规则 记录从许多并行的 Hive 作业插入到这个特定的表中 这些作业每天 每周 每月批量运行 现在
  • pyspark flatmat 错误:TypeError:“int”对象不可迭代

    这是我书中的示例代码 from pyspark import SparkConf SparkContext conf SparkConf setMaster spark chetan ThinkPad E470 7077 setAppNam
  • Scala:什么是 CompactBuffer?

    我试图弄清楚 CompactBuffer 的含义 和迭代器一样吗 请解释其中的差异 根据 Spark 的文档 它是 ArrayBuffer 的替代方案 可以提供更好的性能 因为它分配的内存更少 以下是 CompactBuffer 类文档的摘
  • 如何将 Pyspark Dataframe 标题设置到另一行?

    我有一个如下所示的数据框 col1 col2 col3 id name val 1 a01 X 2 a02 Y 我需要从中创建一个新的数据框 使用 row 1 作为新的列标题并忽略或删除 col1 col2 等行 新表应如下所示 id na
  • MiniDFSCluster UnsatisfiedLinkError org.apache.hadoop.io.nativeio.NativeIO$Windows.access0

    做时 new MiniDFSCluster Builder config build 我得到这个异常 java lang UnsatisfiedLinkError org apache hadoop io nativeio NativeIO
  • YARN UNHEALTHY 节点

    在我们的 YARN 集群已满 80 的情况下 我们看到一些纱线节点管理器被标记为不健康 在深入研究日志后 我发现这是因为数据目录的磁盘空间已满 90 出现以下错误 2015 02 21 08 33 51 590 INFO org apach
  • Spark 在 WholeTextFiles 上创建的分区少于 minPartitions 参数

    我有一个文件夹 里面有 14 个文件 我在一个集群上使用 10 个执行器运行 Spark Submit 该集群的资源管理器为 YARN 我创建了我的第一个 RDD 如下所示 JavaPairRDD
  • 如何将包含多个字段的大型 csv 加载到 Spark

    新年快乐 我知道以前曾提出 回答过此类类似的问题 但是 我的问题有所不同 我有大尺寸的 csv 有 100 个字段和 100MB 我想将其加载到 Spark 1 6 进行分析 csv 的标题看起来像附件sample http www roc
  • 使用 Python 计算 Spark 中成对 (K,V) RDD 中每个 KEY 的平均值

    我想与 Python 共享这个特定的 Apache Spark 解决方案 因为它的文档非常贫乏 我想通过 KEY 计算 K V 对 存储在 Pairwise RDD 中 的平均值 示例数据如下所示 gt gt gt rdd1 take 10
  • 将 CSV 转换为序列文件

    我有一个 CSV 文件 我想将其转换为 SequenceFile 我最终将使用它来创建 NamedVectors 以在聚类作业中使用 我一直在使用 seqdirectory 命令尝试创建 SequenceFile 然后使用 nv 选项将该输
  • pyspark 将 twitter json 流式传输到 DF

    我正在从事集成工作spark streaming with twitter using pythonAPI 我看到的大多数示例或代码片段和博客是他们从Twitter JSON文件进行最终处理 但根据我的用例 我需要所有字段twitter J
  • 了解 Spark 中的 DAG

    问题是我有以下 DAG 我认为当需要洗牌时 火花将工作划分为不同的阶段 考虑阶段 0 和阶段 1 有些操作不需要洗牌 那么为什么 Spark 将它们分成不同的阶段呢 我认为跨分区的实际数据移动应该发生在第 2 阶段 因为这里我们需要cogr
  • 如何从hdfs读取文件[重复]

    这个问题在这里已经有答案了 我在 project1目录下的hadoop文件系统中有一个文本文件名mr txt 我需要编写 python 代码来读取文本文件的第一行 而不将 mr txt 文件下载到本地 但我无法从 hdfs 打开 mr tx
  • 数量重新分配逻辑 - 具有外部数据集的 MapGroups

    我正在研究一种复杂的逻辑 需要将数量从一个数据集重新分配到另一个数据集 在例子中我们有Owner and Invoice 我们需要从数量中减去Invoice准确地Owner匹配 在给定汽车的给定邮政编码处 减去的数量需要重新分配回同一辆车出

随机推荐

  • 如何改变Flutter对话框的位置

    我创建了一个对话框 在注册新用户时 Firestore 中存在号码时会显示该对话框 然而 默认情况下 Android 似乎将对话框定位在显示屏的中央 有没有办法将对话框定位在其调用的小部件的位置 对于我的情况 它是 凸起 按钮回调 还想知道
  • 如何在matplotlib中制作具有不同y轴的堆积折线图?

    我想知道如何制作堆积折线图 该图将在 matplotlib 中采用不同的列 关键是当我们进行聚合时 我需要在两个不同的列上进行数据聚合 我想我需要制作一个用于绘图的大数据框 我没有找到更漂亮 更方便的方法来在 pandas matplotl
  • 使用 Google Calendar API 返回 401(未经授权)

    我尝试通过 JavaScript 使用 Google Calendar API 并在使用以下代码时不断收到 401 错误响应 ajax dataType json url https www googleapis com calendar
  • 如何在 HTML 5 Web Worker 中访问 jQuery

    我无法在 HTML5 中访问 jQuery网络工作者 https en wikipedia org wiki Web worker 我有办法做到这一点吗 长话短说 包括这个脚本 http fel 8u cz workerrjs main w
  • 检索 arangodb 中没有链接边的顶点

    检索相关edge collection中没有边的所有顶点的最佳方法是什么 我尝试使用以下代码 但自 arangodb 2 8 以来 它变得非常慢 在以前的版本中并不是很快 但比现在快了大约 10 倍 对于大约 1000 个边和大约 3000
  • 将非泛型类扩展为泛型类

    org apache commons collections buffer 包中的 Java 类 CircularFifoBuffer 是非泛型的 可以存储任何类的对象 我想创建一个通用版本 它只能保存类 T 的对象 我的第一个想法是扩展
  • 为什么我可以通过带有太多参数的指针调用函数?

    假设我有这个功能 int func2 printf func2 n return 0 现在我声明一个指针 int fp double 这应该指向一个函数 该函数需要double参数并返回一个int func2没有任何论点 但当我写时仍然 f
  • str.isdecimal() 和 str.isdigit() 区别示例

    阅读 python 文档我了解到 isdecimal 和 isdigit 字符串函数 但我没有发现文献对它们的可用区别太清楚 有人可以向我提供这两个函数的区别的代码示例吗 类似行为 gt gt gt str isdecimal 1 True
  • DHCP 服务器将任何 url 重定向到登陆页面

    我有一个 Linux DHCP 服务器 我需要将所有网络流量重定向到一个登陆页面 该页面将包含有关如何在网络上注册计算机的说明 无论用户输入什么 URL 都需要将用户重定向到网页 在 DHCP 服务器上 即 用户输入 google com
  • 具有模板的 C++ 类找不到其构造函数

    我有一个问题我不太明白 我有一个节点类 template
  • CSS 背景图像尺寸过渡

    我正在研究一个简单的标记 可以调整 div 背景图像的大小 看小提琴 http jsfiddle net zeYZL http jsfiddle net zeYZL 我需要使用简单的 CSS 过渡来为其设置动画 我尝试这样做 tile ho
  • 如何阻止远程表单提交?

    我有一个可以远程和正常使用的表格 form for comment html class comment form remote request xhr do f f text area body f submit 我希望仅在以下情况下提交
  • 如何将“原始”字符串转换为普通字符串? [复制]

    这个问题在这里已经有答案了 在Python中 我有一个像这样的字符串 x89 n 如何将其解码为普通字符串 例如 x89 n 如果您的输入值为str字符串 使用codecs decode 转换 import codecs codecs de
  • 有 PayPal IPN 的示例吗

    我有一个 Asp Net WEB API 2 项目 我想实现一个即时付款通知 IPN 侦听器控制器 我找不到任何示例和 nuget 包 我所需要的只是确认用户使用 Paypal 上的标准 html 按钮付款 这很简单 所有 nuget 包都
  • 初始化列表中的依赖关系

    这种行为定义明确吗 class Foo int A B public Foo int Bar B Bar A B 123 int main Foo MyFoo 0 return 0 不 它是未定义的 A将首先初始化 它是类定义中的第一个 并
  • 博客的 mongodb 架构设计

    您将如何为具有基于文档的数据库 mongodb 的类似博客的网站设计架构 该站点具有以下对象 用户 文章 评论 用户可以向文章添加评论 每个用户还可以为每个评论投票一次 我希望能够有效地执行这些查询 1 获取文章A 文章A的评论以及每个评论
  • 从 R 中的选项卡式文本创建树形图

    我想制作以下数据的树 流程图 并用制表符正确缩进 Vertebrates fish goldfish clownfish amphibian frog toad reptiles snake lizard turtle tortoise b
  • python kivy在矩形内添加文本

    如何在矩形内添加文本 我使用下面的代码并在画布内添加了一个标签 希望它能显示在矩形内 import kivy from kivy app import App from kivy uix anchorlayout import Anchor
  • 如何编写看起来像方法的 lambda 表达式?

    我一直在疯狂地试图解决这个问题 考虑以下代码 我假设已定义前向引用 Signature representing a pointer to a method call typedef void MyClass MyMethod int cl
  • Spark超时可能是由于HDFS中文件超过100万个的binary Files()

    我正在通过以下方式读取数百万个 xml 文件 val xmls sc binaryFiles xmlDir 该操作在本地运行良好 但在纱线上失败并显示 client token N A diagnostics Application app