SparkR 作业 100 分钟超时

2023-12-31

我编写了一个有点复杂的sparkR脚本并使用spark-submit运行它。脚本基本上做的是逐行读取基于 hive/impala parquet 的大表,并生成具有相同行数的新 parquet 文件。 但似乎工作在大约 100 分钟后停止,这似乎有些超时。

  • 对于最多 500K 行,脚本可以完美运行(因为它需要不到 100 分钟)
  • 对于 1、2、3 或更多百万行,脚本将在 100 分钟后退出。

我检查了我知道并测试过的所有可能的参数,其值范围为 100 分钟。但找不到任何解决办法。

[user@localhost R]$ time spark-submit sparkr-pre.R                           
Loading required package: methods

Attaching package: ‘SparkR’

The following objects are masked from ‘package:stats’:

    filter, na.omit

The following objects are masked from ‘package:base’:

    intersect, rbind, sample, subset, summary, table, transform

15/12/30 18:04:27 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
[Stage 1:========================================>                 (7 + 3) / 10]Error in if (returnStatus != 0) { : argument is of length zero
Calls: write.df -> write.df -> .local -> callJMethod -> invokeJava
Execution halted
15/12/30 19:44:52 ERROR InsertIntoHadoopFsRelation: Aborting job.
org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:703)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:702)
        at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
        at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:702)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1514)
        at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1438)
        at org.apache.spark.SparkContext$$anonfun$stop$7.apply$mcV$sp(SparkContext.scala:1724)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1185)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:1723)
        at org.apache.spark.SparkContext$$anonfun$3.apply$mcV$sp(SparkContext.scala:587)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:264)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:234)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:234)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:234)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:216)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1824)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1837)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1914)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp(InsertIntoHadoopFsRelation.scala:150)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(InsertIntoHadoopFsRelation.scala:108)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:57)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:57)
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:69)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:933)
        at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:933)
        at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:197)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:146)
        at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1855)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.api.r.RBackendHandler.handleMethodCall(RBackendHandler.scala:132)
        at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:79)
        at org.apache.spark.api.r.RBackendHandler.channelRead0(RBackendHandler.scala:38)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:244)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:308)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:294)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:846)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
15/12/30 19:44:52 ERROR DefaultWriterContainer: Job job_201512301804_0000 aborted.
15/12/30 19:44:52 ERROR RBackendHandler: save on 25 failed

real    100m30.944s
user    1m26.326s
sys     0m19.459s

环境 运行时信息

Name    Value
Java Home   /usr/java/jdk1.8.0_40/jre
Java Version    1.8.0_40 (Oracle Corporation)
Scala Version   version 2.10.4
Spark Properties

Name    Value
spark.akka.frameSize    1024
spark.app.id    application_1451466100034_0019
spark.app.name  SparkR
spark.driver.appUIAddress   http://x.x.x.x:4040
spark.driver.host   x.x.x.x
spark.driver.maxResultSize  8G
spark.driver.memory 100G
spark.driver.port   60471
spark.executor.id   driver
spark.executor.memory   14G
spark.executorEnv.LD_LIBRARY_PATH   $LD_LIBRARY_PATH:/usr/lib64/R/lib:/usr/local/lib64:/usr/lib/jvm/jre/lib/amd64/server:/usr/lib/jvm/jre/lib/amd64:/usr/lib/jvm/java/lib/amd64:/usr/java/packages/lib/amd64:/lib:/usr/lib::/usr/lib/hadoop/lib/native
spark.externalBlockStore.folderName spark-b60f685e-c46c-435d-ab1b-c9d1279f630f
spark.fileserver.uri    http://x.x.x.x:50281
spark.home  /datas/spark-1.5.2-bin-hadoop2.6
spark.kryoserializer.buffer.max 2000M
spark.master    yarn-client
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS  CDHPR1.dc.dialog.lk
spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES  http://CDHPR1.dc.dialog.lk:8088/proxy/application_1451466100034_0019
spark.scheduler.mode    FIFO
spark.serializer    org.apache.spark.serializer.KryoSerializer
spark.sql.parquet.binaryAsString    true
spark.submit.deployMode client
spark.ui.filters    org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
spark.yarn.dist.archives    file:/datas/spark-1.5.2-bin-hadoop2.6/R/lib/sparkr.zip#sparkr
spark.yarn.dist.files   file:/home/inuser/R/sparkr-pre.R
System Properties

Name    Value
SPARK_SUBMIT    true
SPARK_YARN_MODE true
awt.toolkit sun.awt.X11.XToolkit
file.encoding   UTF-8
file.encoding.pkg   sun.io
file.separator  /
java.awt.graphicsenv    sun.awt.X11GraphicsEnvironment
java.awt.printerjob sun.print.PSPrinterJob
java.class.version  52.0
java.endorsed.dirs  /usr/java/jdk1.8.0_40/jre/lib/endorsed
java.ext.dirs   /usr/java/jdk1.8.0_40/jre/lib/ext:/usr/java/packages/lib/ext
java.home   /usr/java/jdk1.8.0_40/jre
java.io.tmpdir  /tmp
java.library.path   :/usr/lib/hadoop/lib/native:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
java.runtime.name   Java(TM) SE Runtime Environment
java.runtime.version    1.8.0_40-b26
java.specification.name Java Platform API Specification
java.specification.vendor   Oracle Corporation
java.specification.version  1.8
java.vendor Oracle Corporation
java.vendor.url http://java.oracle.com/
java.vendor.url.bug http://bugreport.sun.com/bugreport/
java.version    1.8.0_40
java.vm.info    mixed mode
java.vm.name    Java HotSpot(TM) 64-Bit Server VM
java.vm.specification.name  Java Virtual Machine Specification
java.vm.specification.vendor    Oracle Corporation
java.vm.specification.version   1.8
java.vm.vendor  Oracle Corporation
java.vm.version 25.40-b25
line.separator  
os.arch amd64
os.name Linux
os.version  2.6.32-431.el6.x86_64
path.separator  :
sun.arch.data.model 64
sun.boot.class.path /usr/java/jdk1.8.0_40/jre/lib/resources.jar:/usr/java/jdk1.8.0_40/jre/lib/rt.jar:/usr/java/jdk1.8.0_40/jre/lib/sunrsasign.jar:/usr/java/jdk1.8.0_40/jre/lib/jsse.jar:/usr/java/jdk1.8.0_40/jre/lib/jce.jar:/usr/java/jdk1.8.0_40/jre/lib/charsets.jar:/usr/java/jdk1.8.0_40/jre/lib/jfr.jar:/usr/java/jdk1.8.0_40/jre/classes
sun.boot.library.path   /usr/java/jdk1.8.0_40/jre/lib/amd64
sun.cpu.endian  little
sun.cpu.isalist 
sun.io.unicode.encoding UnicodeLittle
sun.java.command    org.apache.spark.deploy.SparkSubmit sparkr-pre.R
sun.java.launcher   SUN_STANDARD
sun.jnu.encoding    UTF-8
sun.management.compiler HotSpot 64-Bit Tiered Compilers
sun.nio.ch.bugLevel 
sun.os.patch.level  unknown
user.country    US
user.dir    /home/user/R
user.home   /home/user
user.language   en
user.name   inuser
user.timezone   Asia/Colombo
Classpath Entries

Resource    Source
/datas/spark-1.5.2-bin-hadoop2.6/conf/  System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/conf/yarn-conf/    System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-api-jdo-3.2.6.jar  System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-core-3.2.10.jar    System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/datanucleus-rdbms-3.2.9.jar    System Classpath
/datas/spark-1.5.2-bin-hadoop2.6/lib/spark-assembly-1.5.2-hadoop2.6.0.jar   System Classpath

火花默认.conf

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
# spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
# spark.driver.memory              5g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"
#
spark.master            yarn-client
spark.serializer        org.apache.spark.serializer.KryoSerializer
spark.driver.memory             100G
spark.executor.memory           14G 
spark.sql.parquet.binaryAsString true
spark.kryoserializer.buffer.max 2000M
spark.driver.maxResultSize      8G
spark.akka.frameSize            1024
#spark.executor.instances       16

我无法公开分享 SparkR 脚本。对此真的很抱歉。但是当代码需要不到 100 分钟才能完成时,它可以完美地工作。


这是 Spark 1.6.0 中的一个已知错误,请参阅:https://issues.apache.org/jira/browse/SPARK-12609 https://issues.apache.org/jira/browse/SPARK-12609。对 SparkR 代码的快速审查还表明,该错误实际上自 Spark 1.4.0 以来就存在。

在他们发布修复程序之前,快速而肮脏的解决方案是增加超时。正如问题中所指出的,有问题的函数是connectBackend该函数可以在运行时使用进行修补assignInNamespace.

下面的代码检索原始函数,然后将其包装在第二个函数中,我们将超时值增加到 48 小时。然后原始函数被包装器替换。

connectBackend.orig <- getFromNamespace('connectBackend', pos='package:SparkR')
connectBackend.patched <- function(hostname, port, timeout = 3600*48) {
   connectBackend.orig(hostname, port, timeout)
}
assignInNamespace("connectBackend", value=connectBackend.patched, pos='package:SparkR')

加载 SparkR 包后放置此代码。

另一种解决方案是修改 SparkR 代码中的超时并重新编译。有关编译说明,请参阅:https://github.com/apache/spark/blob/branch-1.6/R/install-dev.sh https://github.com/apache/spark/blob/branch-1.6/R/install-dev.sh

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

SparkR 作业 100 分钟超时 的相关文章

  • 如何在 Apache Spark 中通过 DStream 使用特征提取

    我有通过 DStream 从 Kafka 到达的数据 我想进行特征提取以获得一些关键词 我不想等待所有数据的到达 因为它是可能永远不会结束的连续流 所以我希望以块的形式执行提取 如果准确性会受到一点影响 对我来说并不重要 到目前为止 我整理
  • 带有安全 Kafka 抛出的 Spark 结构化流:无权访问组异常

    为了在我的项目中使用结构化流 我正在 hortonworks 2 6 3 环境上测试 Spark 2 2 0 和 Kafka 0 10 1 与 Kerberos 的集成 我正在运行下面的示例代码来检查集成 我能够在 Spark 本地模式下的
  • 如何使用 Spark 2 屏蔽列?

    我有一些表 我需要屏蔽其中的一些列 要屏蔽的列因表而异 我正在读取这些列application conf file 例如 对于员工表如下所示 id name age address 1 abcd 21 India 2 qazx 42 Ger
  • pyspark flatmat 错误:TypeError:“int”对象不可迭代

    这是我书中的示例代码 from pyspark import SparkConf SparkContext conf SparkConf setMaster spark chetan ThinkPad E470 7077 setAppNam
  • Spark 中的 Distinct() 函数如何工作?

    我是 Apache Spark 的新手 正在学习基本功能 有一个小疑问 假设我有一个元组 键 值 的 RDD 并且想从中获取一些唯一的元组 我使用distinct 函数 我想知道该函数基于什么基础认为元组是不同的 是基于键 值还是两者 di
  • 根据 pyspark 中的条件从数据框中删除行

    我有一个包含两列的数据框 col1 col2 22 12 2 1 2 1 5 52 1 2 62 9 77 33 3 我想创建一个新的数据框 它只需要行 col1 的值 gt col2 的值 就像注释一样col1 很长类型和col2 有双
  • Spark 中的 StandardScaler 未按预期工作

    知道为什么 Spark 会这样做吗StandardScaler 根据定义StandardScaler StandardScaler 将一组特征标准化为均值为零 标准差为 1 withStd 标志将数据缩放为 单位标准差 而标志 withMe
  • 从 pyspark.sql 中的列表创建数据框

    我完全陷入了有线的境地 现在我有一个清单li li example data map lambda x get labeled prediction w x collect print li type li 输出就像 0 0 59 0 0
  • 从 PySpark RDD 中的每个组中取出前 N 个元素(不使用 groupByKey)

    我有一个如下所示的 RDD dataSource sc parallelize user1 3 blue user1 4 black user2 5 white user2 3 black user2 6 red user1 1 red 我
  • 将 CSV 转换为序列文件

    我有一个 CSV 文件 我想将其转换为 SequenceFile 我最终将使用它来创建 NamedVectors 以在聚类作业中使用 我一直在使用 seqdirectory 命令尝试创建 SequenceFile 然后使用 nv 选项将该输
  • 在spark-kafka中使用schema将ConsumerRecord值转换为Dataframe

    我正在使用 Spark 2 0 2 和 Kafka 0 11 0 并且 我正在尝试在火花流中使用来自卡夫卡的消息 以下是代码 val topics notes val kafkaParams Map String Object bootst
  • 了解 Spark 中的 DAG

    问题是我有以下 DAG 我认为当需要洗牌时 火花将工作划分为不同的阶段 考虑阶段 0 和阶段 1 有些操作不需要洗牌 那么为什么 Spark 将它们分成不同的阶段呢 我认为跨分区的实际数据移动应该发生在第 2 阶段 因为这里我们需要cogr
  • pyspark加入多个条件

    我如何指定很多条件 当我使用pyspark时 join 例子 与蜂巢 query select a NUMCNT b NUMCNT as RNUMCNT a POLE b POLE as RPOLE a ACTIVITE b ACTIVIT
  • 我可以在没有 Hadoop 的情况下使用 Spark 作为开发环境吗?

    我对大数据和相关领域的概念非常陌生 如果我犯了一些错误或拼写错误 我很抱歉 我想了解阿帕奇火花 http spark apache org 并使用它仅在我的电脑中 在开发 测试环境中 由于Hadoop包含HDFS Hadoop分布式文件系统
  • HDFS:使用 Java / Scala API 移动多个文件

    我需要使用 Java Scala 程序移动 HDFS 中对应于给定正则表达式的多个文件 例如 我必须移动所有名称为 xml从文件夹a到文件夹b 使用 shell 命令我可以使用以下命令 bin hdfs dfs mv a xml b 我可以
  • HDFS 中的文件数量与块数量

    我正在运行单节点 hadoop 环境 当我跑的时候 hadoop fsck user root mydatadir block 我真的对它给出的输出感到困惑 Status HEALTHY Total size 998562090 B Tot
  • 更改 Spark Streaming 中的输出文件名

    我正在运行一个 Spark 作业 就逻辑而言 它的性能非常好 但是 当我使用 saveAsTextFile 将文件保存在 s3 存储桶中时 输出文件的名称格式为 part 00000 part 00001 等 有没有办法更改输出文件名 谢谢
  • Spark:导入UTF-8编码的文本文件

    我正在尝试处理一个包含很多特殊字符的文件 例如德语变音符号 o 等 如下所示 sc hadoopConfiguration set textinputformat record delimiter r n r n sc textFile f
  • Spark-1.6.1 上的 DMLC 的 XGBoost-4j

    我正在尝试在 Spark 1 6 1 上使用 DMLC 的 XGBoost 实现 我能够使用 XGBoost 训练我的数据 但在预测方面面临困难 我实际上想以在 Apache Spark mllib 库中完成的方式进行预测 这有助于计算训练
  • Hive:如何分解嵌入 CSV 文件中的 JSON 列?

    从 CSV 文件 带有标题和管道分隔符 中 我得到了以下两个内容 其中包含一个 JSON 列 内部有一个集合 如下所示 第一种情况 使用没有名称的 JSON 集合 ProductId IngestTime ProductOrders 918

随机推荐