结合官方步骤快速入门指南 https://spark.apache.org/docs/latest/quick-start.html and 在 YARN 上启动 Spark https://spark.apache.org/docs/latest/running-on-yarn.html we get:
我们将创建一个非常简单的 Spark 应用程序 SimpleApp.java:
/*** SimpleApp.java ***/
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
public class SimpleApp {
public static void main(String[] args) {
String logFile = "$YOUR_SPARK_HOME/README.md"; // Should be some file on your system
JavaSparkContext sc = new JavaSparkContext("local", "Simple App",
"$YOUR_SPARK_HOME", new String[]{"target/simple-project-1.0.jar"});
JavaRDD<String> logData = sc.textFile(logFile).cache();
long numAs = logData.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("a"); }
}).count();
long numBs = logData.filter(new Function<String, Boolean>() {
public Boolean call(String s) { return s.contains("b"); }
}).count();
System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
}
}
该程序仅计算文本文件中包含“a”的行数和包含“b”的行数。请注意,您需要将 $YOUR_SPARK_HOME 替换为 Spark 的安装位置。与 Scala 示例一样,我们初始化一个SparkContext,尽管我们使用特殊的 JavaSparkContext 类来获得 Java 友好的类。我们还创建 RDD(以 JavaRDD 为代表)并对其运行转换。最后,我们通过创建扩展spark.api.java.function.Function的类将函数传递给Spark。 Java 编程指南更详细地描述了这些差异。
为了构建程序,我们还编写了一个Mavenpom.xml https://maven.apache.org/guides/introduction/introduction-to-the-pom.html将 Spark 列为依赖项的文件。请注意,Spark 工件标有 Scala 版本。
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<repositories>
<repository>
<id>Akka repository</id>
<url>http://repo.akka.io/releases</url>
</repository>
</repositories>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>0.9.0-incubating</version>
</dependency>
</dependencies>
</project>
如果您还希望从 Hadoop 的 HDFS 读取数据,您还需要为您的 HDFS 版本添加对 hadoop-client 的依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>...</version>
</dependency>
我们根据规范的 Maven 目录结构来布置这些文件:
$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java
现在,我们可以使用 Maven 执行应用程序:
$ mvn package
$ mvn exec:java -Dexec.mainClass="SimpleApp"
...
Lines with a: 46, Lines with b: 23
然后按照以下步骤操作在 YARN 上启动 Spark https://spark.apache.org/docs/latest/running-on-yarn.html:
构建支持 YARN 的组装 JAR
我们需要一个整合的 Spark JAR(它捆绑了所有必需的依赖项)来在 YARN 集群上运行 Spark 作业。这可以通过设置 Hadoop 版本和 SPARK_YARN 环境变量来构建,如下所示:
SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly
组装好的 JAR 将如下所示:./ assembly/target/scala-2.10/spark- assembly_0.9.0-incubating-hadoop2.0.5.jar。
构建过程现在还支持新的 YARN 版本 (2.2.x)。见下文。
准备工作
- 构建支持 YARN 的程序集(见上文)。
- 组装好的jar可以安装到HDFS中或者本地使用。
- 您的应用程序代码必须打包到单独的 JAR 文件中。
如果您想测试 YARN 部署模式,可以使用当前的 Spark 示例。可以通过运行以下命令生成 Spark-examples_2.10-0.9.0-incubating 文件:
sbt/sbt assembly
NOTE:由于您正在阅读的文档适用于 Spark 0.9.0-incubating 版本,因此我们假设您已下载 Spark 0.9.0-incubating 或将其从源代码管理中签出。如果使用不同版本的Spark,sbt package命令生成的jar中的版本号显然会不同。
配置
Spark on YARN 的大多数配置与其他部署相同。有关这些的更多信息,请参阅配置页面。这些是特定于 SPARK on YARN 的配置。
环境变量:
-
SPARK_YARN_USER_ENV,将环境变量添加到在 YARN 上启动的 Spark 进程。这可以是逗号分隔的环境变量列表,例如
SPARK_YARN_USER_ENV="JAVA_HOME=/jdk64,FOO=bar"
系统属性:
-
Spark.yarn.applicationMaster.waitTries,属性设置 ApplicationMaster 等待 Spark Master 的次数,以及等待 Spark Context 初始化的尝试次数。默认值为 10。
-
Spark.yarn.submit.file.replication,应用程序上传到 HDFS 的文件的 HDFS 复制级别。其中包括 Spark jar、应用程序 jar 和任何分布式缓存文件/档案。
-
Spark.yarn.preserve.staging.files,设置为 true 以在作业结束时保留暂存文件(spark jar、app jar、分布式缓存文件),而不是删除它们。
-
Spark.yarn.scheduler.heartbeat.interval-ms,Spark 应用程序主节点向 YARN ResourceManager 发出心跳的时间间隔(以毫秒为单位)。默认值为 5 秒。
-
Spark.yarn.max.worker.failures,应用程序失败之前工作线程失败的最大次数。默认值是请求的工作人员数量乘以 2,最少为 3。
在 YARN 上启动 Spark
确保这件事HADOOP_CONF_DIR or YARN_CONF_DIR指向包含 hadoop 集群(客户端)配置文件的目录。这将用于连接到集群、写入 dfs 并将作业提交到资源管理器。
有两种调度程序模式可用于在 YARN 上启动 Spark 应用程序。
通过 YARN 客户端以纱线独立模式启动 Spark 应用程序。
启动 YARN 客户端的命令如下:
SPARK_JAR=<SPARK_ASSEMBLY_JAR_FILE> ./bin/spark-class org.apache.spark.deploy.yarn.Client \
--jar <YOUR_APP_JAR_FILE> \
--class <APP_MAIN_CLASS> \
--args <APP_MAIN_ARGUMENTS> \
--num-workers <NUMBER_OF_WORKER_MACHINES> \
--master-class <ApplicationMaster_CLASS>
--master-memory <MEMORY_FOR_MASTER> \
--worker-memory <MEMORY_PER_WORKER> \
--worker-cores <CORES_PER_WORKER> \
--name <application_name> \
--queue <queue_name> \
--addJars <any_local_files_used_in_SparkContext.addJar> \
--files <files_for_distributed_cache> \
--archives <archives_for_distributed_cache>
例如:
# Build the Spark assembly JAR and the Spark examples JAR
$ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly
# Configure logging
$ cp conf/log4j.properties.template conf/log4j.properties
# Submit Spark's ApplicationMaster to YARN's ResourceManager, and instruct Spark to run the SparkPi example
$ SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.0.5-alpha.jar \
./bin/spark-class org.apache.spark.deploy.yarn.Client \
--jar examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating.jar \
--class org.apache.spark.examples.SparkPi \
--args yarn-standalone \
--num-workers 3 \
--master-memory 4g \
--worker-memory 2g \
--worker-cores 1
# Examine the output (replace $YARN_APP_ID in the following with the "application identifier" output by the previous command)
# (Note: YARN_APP_LOGS_DIR is usually /tmp/logs or $HADOOP_HOME/logs/userlogs depending on the Hadoop version.)
$ cat $YARN_APP_LOGS_DIR/$YARN_APP_ID/container*_000001/stdout
Pi is roughly 3.13794
上面启动了一个 YARN 客户端程序,该程序启动默认的 Application Master。然后SparkPi将作为Application Master的子线程运行,YARN Client将定期轮询Application Master以获取状态更新并将其显示在控制台中。一旦您的应用程序完成运行,客户端就会退出。
在这种模式下,您的应用程序实际上在运行应用程序主机的远程计算机上运行。因此,涉及本地交互的应用程序将无法正常工作,例如火花壳。