如何将位于 HDFS 上的类型安全配置文件添加到 Spark-Submit(集群模式)?

2024-04-04

我有一个 Spark (Spark 1.5.2) 应用程序,它将数据从 Kafka 流式传输到 HDFS。我的应用程序包含两个 Typesafe 配置文件来配置某些内容,例如 Kafka 主题等。

现在我想在集群中使用spark-submit(集群模式)运行我的应用程序。 我的项目的所有依赖项的 jar 文件都存储在 HDFS 上。 只要我的配置文件包含在 jar 文件中,一切就可以正常工作。但这对于测试目的来说是不切实际的,因为我总是必须重建罐子。

因此,我排除了项目的配置文件,并通过“driver-class-path”添加它们。这适用于客户端模式,但如果我现在将配置文件移动到 HDFS 并在集群模式下运行我的应用程序,它将找不到设置。您可以在下面找到我的 Spark-submit 命令:

/usr/local/spark/bin/spark-submit \
    --total-executor-cores 10 \
    --executor-memory 15g \
    --verbose \
    --deploy-mode cluster\
    --class com.hdp.speedlayer.SpeedLayerApp \
    --driver-class-path hdfs://iot-master:8020/user/spark/config \
    --master spark://spark-master:6066 \
    hdfs://iot-master:8020/user/spark/speed-layer-CONFIG.jar

我已经尝试过使用 --file 参数,但这也不起作用。有人知道我该如何解决这个问题吗?

Update:

我做了一些进一步的研究,发现它可能与 HDFS 路径有关。我将 HDFS 路径更改为“hdfs:///iot-master:8020//user//spark//config 但不幸的是这也不起作用。但也许这可以帮助你。

您还可以在下面看到我在集群模式下运行驱动程序时遇到的错误:

Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.ExceptionInInitializerError
    at com.speedlayer.SpeedLayerApp.main(SpeedLayerApp.scala)
    ... 6 more
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'application'
    at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
    at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
...

试图达到相同的结果我发现了以下内容:

  1. --files:仅与运行 Spark-submit 命令的计算机上的本地文件关联并转换为conf.addFile()。所以除非你能够运行,否则 hdfs 文件将无法工作hdfs dfs -get <....>在检索文件之前。就我而言,我想从 oozie 运行它,所以我不知道它将在哪台机器上运行,并且我不想将复制文件操作添加到我的工作流程中。
  2. @Yuval_Itzchakov 引用的引用是指 --jars ,它只处理 jar,因为它转换为conf.addJar()

据我所知,没有直接的方法从 hdfs 加载配置文件。

我的方法是将路径传递给我的应用程序并读取配置文件并将其合并到参考文件中:

private val HDFS_IMPL_KEY = "fs.hdfs.impl"
def loadConf(pathToConf: String): Config = {
   val path = new Path(pathToConf)
   val confFile = File.createTempFile(path.getName, "tmp")
   confFile.deleteOnExit()
   getFileSystemByUri(path.toUri).copyToLocalFile(path, new Path(confFile.getAbsolutePath))

   ConfigFactory.load(ConfigFactory.parseFile(confFile))
}

def getFileSystemByUri(uri: URI) : FileSystem  = {
   val hdfsConf = new Configuration()
   hdfsConf.set(HDFS_IMPL_KEY, classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
FileSystem.get(uri, hdfsConf)
}

P.S 该错误仅意味着 ConfigFactory 没有找到任何配置文件,因此他找不到您要查找的属性。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将位于 HDFS 上的类型安全配置文件添加到 Spark-Submit(集群模式)? 的相关文章

  • 非时间戳列上的 Spark 结构化流窗口

    我收到以下形式的数据流 id timestamp val xxx 1 12 15 25 50 1 2 12 15 25 30 1 3 12 15 26 30 2 4 12 15 27 50 2 5 12 15 27 30 3 6 12 15
  • 获取 Spark 中组的最后一个值

    我有一个 SparkR DataFrame 如下所示 Create R data frame custId lt c rep 1001 5 rep 1002 3 1003 date lt c 2013 08 01 2014 01 01 20
  • 在Spark的客户端模式下,驱动程序需要网络访问远程执行程序?

    使用火花时在客户端模式 例如yarn client 运行驱动程序的本地计算机是否直接与运行远程执行程序的集群工作节点通信 如果是 是否意味着机器 运行驱动程序 需要具有对工作节点的网络访问权限 那么master节点向集群请求资源 并将wor
  • AWS EMR PySpark 连接到 mysql

    我正在尝试使用 jdbc 通过 pyspark 连接到 mysql 我可以在 EMR 之外完成此操作 但是当我尝试使用 EMR 时 pyspark 无法正确启动 我在我的机器上使用的命令 pyspark conf spark executo
  • 将数据作为 RDD 保存回 Cassandra

    我试图从 Kafka 读取消息 处理数据 然后将数据添加到 cassandra 中 就像它是 RDD 一样 我的麻烦是将数据保存回 cassandra 中 from future import print function from pys
  • 在 Hadoop 中处理带标头的文件

    我想在 Hadoop 中处理很多文件 每个文件都有一些头信息 后面跟着很多记录 每个记录都存储在固定数量的字节中 对此有何建议 我认为最好的解决方案是编写一个自定义的InputFormat http hadoop apache org co
  • 如何查找组中第一个非空值? (使用dataset api进行二次排序)

    我正在研究一个代表事件流的数据集 例如从网站跟踪事件时触发 所有事件都有一个时间戳 我们经常遇到的一个用例是尝试查找给定字段的第一个非空值 例如 类似的东西最能让我们到达那里 val eventsDf spark read json jso
  • Hadoop安装问题:

    我跟着this http www bogotobogo com Hadoop BigData hadoop Install on ubuntu single node cluster phpHadoop 安装教程 不幸的是 当我运行全部启动
  • Spark:有没有办法打印出spark-shell和spark的类路径?

    我可以在 Spark shell 中成功运行 Spark 作业 但是当它打包并通过 Spark submit 运行时 我收到 NoSuchMethodError 这向我表明类路径存在某种不匹配 有没有办法可以比较两个类路径 某种日志记录语句
  • hive查询无法通过jdbc生成结果集

    我是 Hive 和 Hadoop 的新手 在我的教程中 我想将表创建为 import java sql SQLException import java sql Connection import java sql ResultSet im
  • Spark 中的广播 Annoy 对象(对于最近邻居)?

    由于 Spark 的 mllib 没有最近邻居功能 我正在尝试使用Annoy https github com spotify annoy为近似最近邻 我尝试广播 Annoy 对象并将其传递给工人 然而 它并没有按预期运行 下面是可重复性的
  • Python Spark DataFrame:用 SparseVector 替换 null

    在 Spark 中 我有以下名为 df 的数据框 其中包含一些空条目 id features1 features2 185 5 0 1 4 0 1 0 null 220 5 0 2 3 0 1 0 10 1 2 6 0 1 225 null
  • 司机下令停车后 Spark 工作人员停下来

    基本上 主节点也充当从节点之一 一旦主服务器上的从服务器完成 它就会调用 SparkContext 来停止 因此该命令传播到所有从服务器 从而在处理过程中停止执行 其中一名工作人员登录时出错 信息 SparkHadoopMapRedUtil
  • 获取 emr-ddb-hadoop.jar 将 DynamoDB 与 EMR Spark 连接

    我有一个 DynamoDB 表 需要将其连接到 EMR Spark SQL 才能对该表运行查询 我获得了带有发行标签 emr 4 6 0 和 Spark 1 6 1 的 EMR Spark Cluster 我指的是文档 使用 Spark 分
  • Spark 执行器 STDOUT 到 Kubernetes STDOUT

    我在 Spark Worker 中运行的 Spark 应用程序将执行程序日志输出到特定文件路径 worker home directory app xxxxxxxx 0 stdout I used log4j properties将日志从
  • 为什么 PySpark 中的 agg() 一次只能汇总 DataFrame 的一列? [复制]

    这个问题在这里已经有答案了 对于下面的数据框 df spark createDataFrame data Alice 4 300 Bob 7 677 schema name High 当我尝试找到最小值和最大值时 我只得到输出中的最小值 d
  • 将日期字符串转换为“MM/DD/YY”格式

    我刚刚看到这个例子 我该如何解决这个问题 Hive 元存储包含一个名为 Problem1 的数据库 其中包含一个名为 customer 的表 customer 表包含 9000 万条客户记录 90 000 000 每条记录都有一个生日字段
  • 这个 Java 语法是什么意思? [复制]

    这个问题在这里已经有答案了 可能的重复 java中的是什么意思 https stackoverflow com questions 12649572 what does the type in java mean 在下面的代码中 Itera
  • 如何在 Mac 上使用 homebrew 安装 apache-spark 2.3.3

    brew install apache spark只安装最新版本的 Spark 2 4 和 brew search apache spark没有给出任何其他选项 有没有办法用自制程序安装旧版本的 Spark Type brew tap ed
  • Spark:并行转换多个数据帧

    了解如何在并行转换多个数据帧时实现最佳并行性 我有一系列路径 val paths Array path1 path2 我从每个路径加载数据帧 然后转换并写入目标路径 paths foreach path gt val df spark re

随机推荐