如何运行 Spark Java 程序

2024-03-14

我为 Spark 编写了一个 Java 程序。但是如何从 Unix 命令行运行和编译它。编译运行时是否必须包含任何 jar


结合官方步骤快速入门指南 https://spark.apache.org/docs/latest/quick-start.html and 在 YARN 上启动 Spark https://spark.apache.org/docs/latest/running-on-yarn.html we get:

我们将创建一个非常简单的 Spark 应用程序 SimpleApp.java:

/*** SimpleApp.java ***/
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;

public class SimpleApp {
  public static void main(String[] args) {
    String logFile = "$YOUR_SPARK_HOME/README.md"; // Should be some file on your system
    JavaSparkContext sc = new JavaSparkContext("local", "Simple App",
      "$YOUR_SPARK_HOME", new String[]{"target/simple-project-1.0.jar"});
    JavaRDD<String> logData = sc.textFile(logFile).cache();

    long numAs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("a"); }
    }).count();

    long numBs = logData.filter(new Function<String, Boolean>() {
      public Boolean call(String s) { return s.contains("b"); }
    }).count();

    System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
  }
}

该程序仅计算文本文件中包含“a”的行数和包含“b”的行数。请注意,您需要将 $YOUR_SPARK_HOME 替换为 Spark 的安装位置。与 Scala 示例一样,我们初始化一个SparkContext,尽管我们使用特殊的 JavaSparkContext 类来获得 Java 友好的类。我们还创建 RDD(以 JavaRDD 为代表)并对其运行转换。最后,我们通过创建扩展spark.api.java.function.Function的类将函数传递给Spark。 Java 编程指南更详细地描述了这些差异。

为了构建程序,我们还编写了一个Mavenpom.xml https://maven.apache.org/guides/introduction/introduction-to-the-pom.html将 Spark 列为依赖项的文件。请注意,Spark 工件标有 Scala 版本。

<project>
  <groupId>edu.berkeley</groupId>
  <artifactId>simple-project</artifactId>
  <modelVersion>4.0.0</modelVersion>
  <name>Simple Project</name>
  <packaging>jar</packaging>
  <version>1.0</version>
  <repositories>
    <repository>
      <id>Akka repository</id>
      <url>http://repo.akka.io/releases</url>
    </repository>
  </repositories>
  <dependencies>
    <dependency> <!-- Spark dependency -->
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <version>0.9.0-incubating</version>
    </dependency>
  </dependencies>
</project>

如果您还希望从 Hadoop 的 HDFS 读取数据,您还需要为您的 HDFS 版本添加对 hadoop-client 的依赖:

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-client</artifactId>
  <version>...</version>
</dependency>

我们根据规范的 Maven 目录结构来布置这些文件:

$ find .
./pom.xml
./src
./src/main
./src/main/java
./src/main/java/SimpleApp.java

现在,我们可以使用 Maven 执行应用程序:

$ mvn package
$ mvn exec:java -Dexec.mainClass="SimpleApp"
...
Lines with a: 46, Lines with b: 23

然后按照以下步骤操作在 YARN 上启动 Spark https://spark.apache.org/docs/latest/running-on-yarn.html:

构建支持 YARN 的组装 JAR

我们需要一个整合的 Spark JAR(它捆绑了所有必需的依赖项)来在 YARN 集群上运行 Spark 作业。这可以通过设置 Hadoop 版本和 SPARK_YARN 环境变量来构建,如下所示:

SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly

组装好的 JAR 将如下所示:./ assembly/target/scala-2.10/spark- assembly_0.9.0-incubating-hadoop2.0.5.jar。

构建过程现在还支持新的 YARN 版本 (2.2.x)。见下文。

准备工作

  • 构建支持 YARN 的程序集(见上文)。
  • 组装好的jar可以安装到HDFS中或者本地使用。
  • 您的应用程序代码必须打包到单独的 JAR 文件中。

如果您想测试 YARN 部署模式,可以使用当前的 Spark 示例。可以通过运行以下命令生成 Spark-examples_2.10-0.9.0-incubating 文件:

sbt/sbt assembly 

NOTE:由于您正在阅读的文档适用于 Spark 0.9.0-incubating 版本,因此我们假设您已下载 Spark 0.9.0-incubating 或将其从源代码管理中签出。如果使用不同版本的Spark,sbt package命令生成的jar中的版本号显然会不同。

配置

Spark on YARN 的大多数配置与其他部署相同。有关这些的更多信息,请参阅配置页面。这些是特定于 SPARK on YARN 的配置。

环境变量:

  • SPARK_YARN_USER_ENV,将环境变量添加到在 YARN 上启动的 Spark 进程。这可以是逗号分隔的环境变量列表,例如
SPARK_YARN_USER_ENV="JAVA_HOME=/jdk64,FOO=bar"

系统属性:

  • Spark.yarn.applicationMaster.waitTries,属性设置 ApplicationMaster 等待 Spark Master 的次数,以及等待 Spark Context 初始化的尝试次数。默认值为 10。
  • Spark.yarn.submit.file.replication,应用程序上传到 HDFS 的文件的 HDFS 复制级别。其中包括 Spark jar、应用程序 jar 和任何分布式缓存文件/档案。
  • Spark.yarn.preserve.staging.files,设置为 true 以在作业结束时保留暂存文件(spark jar、app jar、分布式缓存文件),而不是删除它们。
  • Spark.yarn.scheduler.heartbeat.interval-ms,Spark 应用程序主节点向 YARN ResourceManager 发出心跳的时间间隔(以毫秒为单位)。默认值为 5 秒。
  • Spark.yarn.max.worker.failures,应用程序失败之前工作线程失败的最大次数。默认值是请求的工作人员数量乘以 2,最少为 3。

在 YARN 上启动 Spark

确保这件事HADOOP_CONF_DIR or YARN_CONF_DIR指向包含 hadoop 集群(客户端)配置文件的目录。这将用于连接到集群、写入 dfs 并将作业提交到资源管理器。

有两种调度程序模式可用于在 YARN 上启动 Spark 应用程序。

通过 YARN 客户端以纱线独立模式启动 Spark 应用程序。

启动 YARN 客户端的命令如下:

SPARK_JAR=<SPARK_ASSEMBLY_JAR_FILE> ./bin/spark-class org.apache.spark.deploy.yarn.Client \
  --jar <YOUR_APP_JAR_FILE> \
  --class <APP_MAIN_CLASS> \
  --args <APP_MAIN_ARGUMENTS> \
  --num-workers <NUMBER_OF_WORKER_MACHINES> \
  --master-class <ApplicationMaster_CLASS>
  --master-memory <MEMORY_FOR_MASTER> \
  --worker-memory <MEMORY_PER_WORKER> \
  --worker-cores <CORES_PER_WORKER> \
  --name <application_name> \
  --queue <queue_name> \
  --addJars <any_local_files_used_in_SparkContext.addJar> \
  --files <files_for_distributed_cache> \
  --archives <archives_for_distributed_cache>

例如:

# Build the Spark assembly JAR and the Spark examples JAR
$ SPARK_HADOOP_VERSION=2.0.5-alpha SPARK_YARN=true sbt/sbt assembly

# Configure logging
$ cp conf/log4j.properties.template conf/log4j.properties

# Submit Spark's ApplicationMaster to YARN's ResourceManager, and instruct Spark to run the SparkPi example
$ SPARK_JAR=./assembly/target/scala-2.10/spark-assembly-0.9.0-incubating-hadoop2.0.5-alpha.jar \
    ./bin/spark-class org.apache.spark.deploy.yarn.Client \
      --jar examples/target/scala-2.10/spark-examples-assembly-0.9.0-incubating.jar \
      --class org.apache.spark.examples.SparkPi \
      --args yarn-standalone \
      --num-workers 3 \
      --master-memory 4g \
      --worker-memory 2g \
      --worker-cores 1

# Examine the output (replace $YARN_APP_ID in the following with the "application identifier" output by the previous command)
# (Note: YARN_APP_LOGS_DIR is usually /tmp/logs or $HADOOP_HOME/logs/userlogs depending on the Hadoop version.)
$ cat $YARN_APP_LOGS_DIR/$YARN_APP_ID/container*_000001/stdout
Pi is roughly 3.13794

上面启动了一个 YARN 客户端程序,该程序启动默认的 Application Master。然后SparkPi将作为Application Master的子线程运行,YARN Client将定期轮询Application Master以获取状态更新并将其显示在控制台中。一旦您的应用程序完成运行,客户端就会退出。

在这种模式下,您的应用程序实际上在运行应用程序主机的远程计算机上运行。因此,涉及本地交互的应用程序将无法正常工作,例如火花壳。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何运行 Spark Java 程序 的相关文章

  • Java Sqlite Gradle

    我对 gradle 和 java 还很陌生 我有一个使用 sqlite 的项目 它通过 intellij idea 运行良好 但我无法从终端运行它 它会抛出异常 java lang ClassNotFoundException org sq
  • Java - 从配置文件加密/解密用户名和密码

    我们正忙于为客户开发 Java Web 服务 有两种可能的选择 将加密的用户名 密码存储在Web服务客户端上 从配置中读取 文件在客户端 解密并发送 将加密的用户名 密码存储在 Web 服务器上 从配置中读取 Web 服务器上的文件 解密并
  • H264 字节流到图像文件

    第一次来这里所以要温柔 我已经在给定的 H 264 字节流上工作了几个星期 一般注意事项 字节流不是来自文件 它是从外部源实时提供给我的 字节流使用 Android 的媒体编解码器进行编码 当将流写入扩展名为 H264的文件时 VLC能够正
  • Java中定义类型后同时初始化多个变量?

    这里需要一些语法方面的帮助 我正在尝试在定义类型后重新初始化多个变量 例如 int bonus sales x y 50 这工作正常 但是我想稍后在程序中将不同的值放入其中一些变量中 但我收到语法错误 bonus 25 x 38 sales
  • 过滤字符串上的 Spark DataFrame 包含

    我在用火花1 3 0 http spark apache org releases spark release 1 3 0 html and 火花阿夫罗1 0 0 https github com databricks spark avro
  • 为什么一个线程会中断另一个线程[重复]

    这个问题在这里已经有答案了 在Java多线程应用程序中 我们处理InterruptedThreadException 如果另一个线程中断当前线程 则会抛出此异常 现在 当另一个线程知道它将导致异常时 它可能想要中断当前线程的原因是什么 很多
  • Codility 钉板

    尝试了解 Codility NailingPlanks 的解决方案 问题链接 https app codility com programmers lessons 14 binary search algorithm nailing pla
  • Hystrix是否可以订阅CircuitBreaker开启事件?

    对于单元测试 我希望能够订阅 Hystrix 事件 特别是在断路器打开或关闭时发生事件 我四处寻找示例 似乎解决方法是利用指标流并监视断路器标志 由于 Hystrix 是基于 RxJava 构建的 我认为应该在某个地方有一个事件订阅接口 在
  • 如何在Gradle中支持多种语言(Java和Scala)的多个项目?

    我正在尝试将过时的 Ant 构建转换为 Gradle 该项目包含约50个Java子项目和10个Scala子项目 Java 项目仅包含 Java Scala 项目仅包含 Scala 每个项目都是由 Java 和 Scala 构建的 这大大减慢
  • 用于防止滥用的 Servlet 过滤器? (DoS、垃圾邮件等)

    我正在寻找一个 Servlet 过滤器库 它可以帮助我保护我们的 Web 服务免受未经授权的使用和 DDoS 攻击 我们的网络服务有 授权客户 因此理想情况下 过滤器将帮助检测未经授权或行为不当的客户 或检测使用同一帐户的多个人 此外 我们
  • 如何获取 Android 中临时文件的文件大小?

    如果我使用 openFileOutput 创建并写入临时文件 写入完成后如何获取文件大小 我希望这可以帮助你 File file new File selectedPath int file size Integer parseInt St
  • Java 类:匿名类、嵌套类、私有类

    有人能解释一下Java中匿名类 嵌套类和私有类之间的区别吗 我想知道与每个相关的运行时成本以及每个编译器的方法 这样我就可以掌握哪个最适合用于例如性能 编译器优化的潜力 内存使用以及其他 Java 编码人员的普遍可接受性 我所说的匿名类是指
  • 在多模块项目中访问绑定适配器

    我有一个多模块项目 其中应用程序模块包含我的绑定适配器 而我的功能模块取决于我的应用程序模块 因为它是动态功能模块 应用程序 包含绑定适配器 gt 动态功能模块 存在布局的地方 我在所有模块中启用了数据绑定和 kapt 我无法成功构建应用程
  • Netty中连接关闭后重新连接的最佳方法是什么

    简单场景 扩展 SimpleChannelUpstreamHandler 的较低级别的类 A 此类是发送消息和接收响应的主力 系统其他部分可以使用顶级类 B 来发送和接收消息 可以模拟同步和异步 此类创建 ClientBootstrap 设
  • 如何将模型结果保存到文本文件?

    我正在尝试将从模型生成的频繁项集保存到文本文件中 该代码是 Spark ML 库中 FPGrowth 示例的示例 Using saveAsTextFile直接在模型上写入 RDD 位置而不是实际值 import org apache spa
  • setKeyListener 将覆盖 setInputType 并更改键盘

    大家好 我在两个设备之间遇到问题 在实践中使用InputType和KeyListener我正在操纵一个EditText让它从数字键盘接收逗号和数字 有关更多背景信息 请检查我之前的question https stackoverflow c
  • 在方法内声明类 - Final 关键字 [重复]

    这个问题在这里已经有答案了 给定方法中的以下内部类 IsSomething public class InnerMethod private int x public class Something private int y public
  • Jenkins 管道和 java.nio.file.* 方法的问题

    我正在尝试使用 java nio file 中的方法在 Jenkins 管道中执行一些基本文件操作 无论代码存在于哪个节点块中 代码都在主节点上执行 在管道中 我已经验证了各个节点块都是正确的 它们唯一地标识了特定的节点 但是 pathEx
  • Java中的媒体播放器库[关闭]

    Closed 这个问题正在寻求书籍 工具 软件库等的推荐 不满足堆栈溢出指南 help closed questions 目前不接受答案 我正在评估用于在 Java 中播放音频 视频的库 它不需要 100 Java Java 与本机库的绑定
  • 条件查询:按计数排序

    我正在尝试执行一个标准查询 该查询返回 stackoverflow 中回答最多的问题 例如常见问题解答 一个问题包含多个答案 我正在尝试使用标准查询返回按每个问题的答案数排序的回答最多的问题 任何人都知道我应该在 hibernate cri

随机推荐

  • iOS 7.1:当应用程序在后台时获取核心运动数据(加速计、陀螺仪)

    我想知道当应用程序处于后台模式时如何继续接收运动传感器值 我意识到那里已经有几个帖子了 例如 我尝试过iPhone 上的 Nike GPS 如何在后台接收加速度计更新 https stackoverflow com questions 87
  • Bouncy Castle 在 CBC 模式下使用 AES 进行基于密码的加密

    我最近遇到了一段在 CBC 模式下使用 BouncyCastle 的 PBE 和 AES 的代码 PBEWithSHA1And256BitAES CBC BC public static final String ALGORITHM PBE
  • 给出很多间隔 [ai, bi],找到与最多间隔数相交的间隔

    给定很多间隔 ai bi 找到与间隔数量最多的间隔 我们能在 O nlogn 或更好的时间内做到这一点吗 我只能想到 n 2 方法 假设间隔给出为 a1 b1 an bn 制作一个长度已排序的数组2n关系被打破的地方 if ai aj 然后
  • PostgreSQL - jsonb_each

    我刚刚开始在 postgres 上使用 jsonb 并在网上很难找到示例 因为它是一个相对较新的概念 我尝试使用 jsonb each text 打印出键和值表 但在单列中获取 csv 我将下面的 json 保存为 jsonb 并用它来测试
  • ggplot 中仅一个图例的标签为斜体

    我正在尝试格式化一个侧面有两个独立图例的绘图 我有一个用于所有不同分类单元的形状图例 以及一个用于它们所属类别的颜色图例 我只想将形状图例中的分类单元名称斜体化 而不将颜色图例中的类别名称斜体化 到目前为止 我可以使用这一行将所有图例条目设
  • 如何以自定义方式从主题恢复全局存储?

    假设我在从主题获取数据后将数据存储在 Globalstore 中时正在进行一些自定义处理 即我正在根据 message 的值创建自定义键 在本地删除状态后 它会以相同的方式再次恢复 Globalstore 吗 override def pr
  • 无法在 Android Studio 中使用 compose 检查器 (Flamingo 2022.2.1)

    设备的网络正常工作 但如屏幕截图所示 我无法连接到以下 URL maven google com 导致撰写检查器无法正常工作 我尝试修改旧版本 Compose 的版本 但没有解决问题 我可以以某种方式设置 URL 或其他解决方案 以便我可以
  • 如何使用express启用cors nodejs?

    总之 我正在使用一个像 api 这样的 dicom 文件查看器 称为 Cornstone 为此 我连接到 dc4chee 的 WADO 服务来获取 dicom dcm4chee 运行端口 8080 而我在节点上的应用程序使用端口 3000
  • 加载.csv文件时如何将当前系统时间戳插入db2数据库基列

    下面的类将把 csv 导入到数据库表中 它工作正常 现在我需要更新同一个表中的另一列 其中当前系统时间戳需要获取 当该程序在数据库表的相应列中执行时得到更新 示例 在 Db2 表中 主题列为 英语社会数学时间戳 在 CSV 文件中只有 3
  • 实体框架代码优先迁移保留现有数据

    我正在使用 EF 6 1 并且对包含生产环境中的数据的现有数据库使用代码优先方法 是否有可能迁移模型更改并保留现有客户的数据 是的 但是 根据具体情况 更改的复杂性 它可能会很复杂 概述 https learn microsoft com
  • dojo 中的状态服务器端过滤

    我正在 dojo 1 10 版本的增强网格中进行服务器端过滤 Here https dojotoolkit org reference guide 1 10 dojox grid EnhancedGrid plugins Filter ht
  • 如何自动删除项目的所有system.out.println语句,包括所有多行Sop语句[重复]

    这个问题在这里已经有答案了 实际上我想自动化删除所有的过程System out println 在将项目交付给客户之前 对整个项目进行陈述 怎么做 按 ctrl H 转到文件搜索 将包含文本填充为System out println 文件名
  • 将 std::bind 与成员函数一起使用,该参数是否使用对象指针?

    使用时std bind要绑定成员函数 第一个参数是对象this指针 然而 它可以将对象作为指针传递 也可以不传递 例如 请参见以下程序 include
  • 使用转换后的边界进行布局

    我已经缩放了窗格中的一个节点 但窗格的布局考虑了边界 没有任何转换 我希望它考虑到转换后的边界 例如 和代码 import javafx application Application import javafx geometry Pos
  • 如何在 Postgres.app 中降级/使用以前版本的 Postgres DB

    我已经从这里安装了 Postgres app http postgresapp com http postgresapp com 几天以前 它附带 Postgres 9 4 4 今天我发现我使用的软件官方只支持Postgres 9 3 9
  • Javascript导入包无法解析模块说明符

    我正在尝试导入使用 npm 下载的模块 我的 json 文件是 name nodejs web app1 version 0 0 0 description NodejsWebApp1 main server js author name
  • 致命异常:iOS 上的 NSInternalInconsistencyException 崩溃

    几天来我一直在尝试重新创建和研究这个问题 但我不会去任何地方 这是堆栈跟踪 任何人都可以阐明正在发生的事情吗 我认为这与远程通知有关 不久前 当我安装 firebase sdk 时 问题就开始了 当时事故发生的次数并不多 但现在 事故发生的
  • 带有 pandas 数据框的子图

    我想使用 pandas 数据框 称为 df 在图上创建多个子图 我原来的情节在这里 df plot x month y number title open by month color blue 我在本网站的 使用图形和子图 部分尝试了多次
  • 我如何定义“其余宽度”?

    这是我的代码 function tags input on focusout function var txt this value replace a z0 9 ig allowed characters if txt span span
  • 如何运行 Spark Java 程序

    我为 Spark 编写了一个 Java 程序 但是如何从 Unix 命令行运行和编译它 编译运行时是否必须包含任何 jar 结合官方步骤快速入门指南 https spark apache org docs latest quick star