我有一个 Spark (Spark 1.5.2) 应用程序,它将数据从 Kafka 流式传输到 HDFS。我的应用程序包含两个 Typesafe 配置文件来配置某些内容,例如 Kafka 主题等。
现在我想在集群中使用spark-submit(集群模式)运行我的应用程序。
我的项目的所有依赖项的 jar 文件都存储在 HDFS 上。
只要我的配置文件包含在 jar 文件中,一切就可以正常工作。但这对于测试目的来说是不切实际的,因为我总是必须重建罐子。
因此,我排除了项目的配置文件,并通过“driver-class-path”添加它们。这适用于客户端模式,但如果我现在将配置文件移动到 HDFS 并在集群模式下运行我的应用程序,它将找不到设置。您可以在下面找到我的 Spark-submit 命令:
/usr/local/spark/bin/spark-submit \
--total-executor-cores 10 \
--executor-memory 15g \
--verbose \
--deploy-mode cluster\
--class com.hdp.speedlayer.SpeedLayerApp \
--driver-class-path hdfs://iot-master:8020/user/spark/config \
--master spark://spark-master:6066 \
hdfs://iot-master:8020/user/spark/speed-layer-CONFIG.jar
我已经尝试过使用 --file 参数,但这也不起作用。有人知道我该如何解决这个问题吗?
Update:
我做了一些进一步的研究,发现它可能与 HDFS 路径有关。我将 HDFS 路径更改为“hdfs:///iot-master:8020//user//spark//config 但不幸的是这也不起作用。但也许这可以帮助你。
您还可以在下面看到我在集群模式下运行驱动程序时遇到的错误:
Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.ExceptionInInitializerError
at com.speedlayer.SpeedLayerApp.main(SpeedLayerApp.scala)
... 6 more
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'application'
at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:124)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:145)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:159)
at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:164)
...
试图达到相同的结果我发现了以下内容:
- --files:仅与运行 Spark-submit 命令的计算机上的本地文件关联并转换为
conf.addFile()
。所以除非你能够运行,否则 hdfs 文件将无法工作hdfs dfs -get <....>
在检索文件之前。就我而言,我想从 oozie 运行它,所以我不知道它将在哪台机器上运行,并且我不想将复制文件操作添加到我的工作流程中。
- @Yuval_Itzchakov 引用的引用是指 --jars ,它只处理 jar,因为它转换为
conf.addJar()
据我所知,没有直接的方法从 hdfs 加载配置文件。
我的方法是将路径传递给我的应用程序并读取配置文件并将其合并到参考文件中:
private val HDFS_IMPL_KEY = "fs.hdfs.impl"
def loadConf(pathToConf: String): Config = {
val path = new Path(pathToConf)
val confFile = File.createTempFile(path.getName, "tmp")
confFile.deleteOnExit()
getFileSystemByUri(path.toUri).copyToLocalFile(path, new Path(confFile.getAbsolutePath))
ConfigFactory.load(ConfigFactory.parseFile(confFile))
}
def getFileSystemByUri(uri: URI) : FileSystem = {
val hdfsConf = new Configuration()
hdfsConf.set(HDFS_IMPL_KEY, classOf[org.apache.hadoop.hdfs.DistributedFileSystem].getName)
FileSystem.get(uri, hdfsConf)
}
P.S 该错误仅意味着 ConfigFactory 没有找到任何配置文件,因此他找不到您要查找的属性。
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)