Spark scala和java的api使用

2023-05-16

Spark scala和java的api使用

1、利用scala语言开发spark的worcount程序(本地运行)


package com.zy.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

//todo:利用scala语言来实现spark的wordcount程序
object WordCount {
  def main(args: Array[String]): Unit = {
    //1、创建SparkConf对象,设置appName和master  local[2]表示本地采用2个线程去运行任务
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[2]")

    //2、创建SparkContext 该对象是所有spark程序的执行入口,它会创建DAGScheduler和TaskScheduler
    val sc = new SparkContext(sparkConf)

    //设置日志输出级别
    sc.setLogLevel("warn")

    //3、读取数据文件
    val data: RDD[String] = sc.textFile("D:\\words.txt")

    //4、切分每一行获取所有单词
    val words: RDD[String] = data.flatMap(_.split(" "))

    //5、每个单词计为1
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    //6、相同单词出现的所有的1累加
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    //按照单词出现的次数降序排列
    val sortRDD: RDD[(String, Int)] = result.sortBy(x => x._2, false)


    //7、收集数据,打印输出
    val finalResult: Array[(String, Int)] = sortRDD.collect()
    finalResult.foreach(println)

    //8、关闭sc
    sc.stop()
  }
}  

 

2、利用scala语言开发spark的wordcount程序(集群运行)


package com.zy.spark

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

//todo:利用scala语言开发spark的wordcount程序(集群运行)
object WordCount_Online {
  def main(args: Array[String]): Unit = {
    //1、创建SparkConf对象,设置appName
    val sparkConf: SparkConf = new SparkConf().setAppName("WordCount_Online")

    //2、创建SparkContext 该对象是所有spark程序的执行入口,它会创建DAGScheduler和TaskScheduler
    val sc = new SparkContext(sparkConf)

    //设置日志输出级别
    sc.setLogLevel("warn")

    //3、读取数据文件 args(0)为文件地址参数
    val data: RDD[String] = sc.textFile(args(0))

    //4、切分每一行获取所有单词
    val words: RDD[String] = data.flatMap(_.split(" "))

    //5、每个单词计为1
    val wordAndOne: RDD[(String, Int)] = words.map((_, 1))

    //6、相同单词出现的所有的1累加
    val result: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)

    //7、把结果数据保存到hdfs上  args(1)是保存到hdfs的目录参数
    result.saveAsTextFile(args(1))

    //8、关闭sc
    sc.stop()
  }

}  

最后打成jar包 到集群上执行


spark-submit --master spark://node1:7077 --class cn.itcast.spark.WordCount_Online --executor-memory 1g --total-executor-cores 2 original-spark_xxx-1.0-SNAPSHOT.jar /words.txt /out  

 

3、利用java语言开发spark的wordcount程序(本地运行)


package com.zy.spark;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;

//todo:利用java语言开发spark的wordcount程序(本地运行)
public class WordCount_Java {
    public static void main(String[] args) {
        //1、创建SparkConf对象
        SparkConf sparkConf = new SparkConf().setAppName("WordCount_Java").setMaster("local[2]");

        //2、创建JavaSparkContext对象
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);

        //3、读取数据文件
        JavaRDD<String> data = jsc.textFile("D:\\words.txt");

        //4、切分每一行获取所有的单词
        JavaRDD<String> words = data.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String line) throws Exception {
                String[] words = line.split(" ");
                return Arrays.asList(words).iterator();
            }
        });

        //5、每个单词计为1
        JavaPairRDD<String, Integer> wordAndOne = words.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);
            }
        });

        //6、相同单词出现1累加
        JavaPairRDD<String, Integer> result = wordAndOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //按照单词出现的次数降序排列 (单词,次数)------>(次数,单词).sortByKey------->(单词,次数)

        JavaPairRDD<Integer, String> reverseRDD = result.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
                return new Tuple2<Integer, String>(t._2, t._1);
            }
        });

        JavaPairRDD<String, Integer> sortedRDD = reverseRDD.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
                return new Tuple2<String, Integer>(t._2, t._1);
            }
        });


        //7、收集数据打印输出
        List<Tuple2<String, Integer>> finalResult = sortedRDD.collect();
        for (Tuple2<String, Integer> tuple : finalResult) {
            System.out.println("单词:" + tuple._1 + " 次数:" + tuple._2);
        }

        //8、关闭jsc
        jsc.stop();
    }
}  

 

posted @ 2018-02-12 13:22 青衫仗剑 阅读( ...) 评论( ...) 编辑 收藏
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark scala和java的api使用 的相关文章

  • 如何将 Java 赋值表达式转换为 Kotlin

    java中的一些东西就像 int a 1 b 2 c 1 if a b c System out print true 现在它应该转换为 kotlin 就像 var a Int 1 var b Int 2 var c Int 1 if a
  • 如何在 JFace 的 TableViewer 中创建复选框?

    我创建了一个包含两列的 tableViewer 我想将其中一列设为复选框 为此 我创建了一个 CheckBoxCellEditor 但我不知道为什么它不起作用 名为 tableName 的列显示其值正常 色谱柱规格如下 String COL
  • AES 加密 Java/plsql

    我需要在Java和plsql DBMS CRYPTO for Oracle 10g 上实现相同的加密 解密应用程序 两种实现都工作正常 但这里的问题是我对相同纯文本的加密得到了不同的输出 下面是用于加密 解密过程的代码 Java 和 PLS
  • JNI 不满意链接错误

    我想创建一个简单的 JNI 层 我使用Visual studio 2008创建了一个dll Win 32控制台应用程序项目类型 带有DLL作为选项 当我调用本机方法时 出现此异常 Exception occurred during even
  • 如何查找 Android 设备中的所有文件并将它们放入列表中?

    我正在寻求帮助来列出 Android 外部存储设备中的所有文件 我想查找所有文件夹 包括主文件夹的子文件夹 有办法吗 我已经做了一个基本的工作 但我仍然没有得到想要的结果 这不起作用 这是我的代码 File files array file
  • Java8无符号算术

    据广泛报道 Java 8 具有对无符号整数的库支持 然而 似乎没有文章解释如何使用它以及有多少可能 有些函数 例如 Integer CompareUnsigned 很容易找到 并且似乎可以实现人们所期望的功能 但是 我什至无法编写一个简单的
  • 在数据流模板中调用 waitUntilFinish() 后可以运行代码吗?

    我有一个批处理 Apache Beam 作业 它从 GCS 获取文件作为输入 我的目标是根据执行后管道的状态将文件移动到两个 GCS 存储桶之一 如果管道执行成功 则将文件移动到存储桶 A 否则 如果管道在执行过程中出现任何未处理的异常 则
  • 如何在 Java 中禁用 System.out 以提高速度

    我正在用 Java 编写一个模拟重力的程序 其中有一堆日志语句 到 System out 我的程序运行速度非常慢 我认为日志记录可能是部分原因 有什么方法可以禁用 System out 以便我的程序在打印时不会变慢 或者我是否必须手动检查并
  • HDFS:使用 Java / Scala API 移动多个文件

    我需要使用 Java Scala 程序移动 HDFS 中对应于给定正则表达式的多个文件 例如 我必须移动所有名称为 xml从文件夹a到文件夹b 使用 shell 命令我可以使用以下命令 bin hdfs dfs mv a xml b 我可以
  • hibernate总是自己删除表中的所有数据

    您好 我正在开发一个 spring mvc 应用程序 它使用 hibernate 连接到存储文件的 mysql 数据库 我有两个方法 一个方法添加我选择的特定文件路径中的所有文件 另一种方法调用查询以返回从 mysql 存储的文件列表 问题
  • Microsoft Graph 身份验证 - 委派权限

    我可以使用 Microsoft Graph 访问资源无需用户即可访问 https developer microsoft com en us graph docs concepts auth v2 service 但是 此方法不允许我访问需
  • 使用 apply 方法的泛型类型的 Scala 工厂?

    假设我有以下特征 它定义了一个接口并采用几个类型参数 trait Foo A B implementation details not important 我想使用伴随对象作为该特征的具体实现的工厂 我还想强制用户使用Foo接口而不是子类所
  • 迁移到 java 17 后有关“每个进程的内存映射”和 JVM 崩溃的 GC 警告

    我们正在将 java 8 应用程序迁移到 java 17 并将 GC 从G1GC to ZGC 我们的应用程序作为容器运行 这两个基础映像之间的唯一区别是 java 的版本 例如对于 java 17 版本 FROM ubuntu 20 04
  • Spring Data 与 Spring Data JPA 与 JdbcTemplate

    我有信心Spring Data and Spring Data JPA指的是相同的 但后来我在 youtube 上观看了一个关于他正在使用JdbcTemplate在那篇教程中 所以我在那里感到困惑 我想澄清一下两者之间有什么区别Spring
  • 归并排序中的递归:两次递归调用

    private void mergesort int low int high line 1 if low lt high line 2 int middle low high 2 line 3 mergesort low middle l
  • 检查 protobuf 消息 - 如何按名称获取字段值?

    我似乎无法找到一种方法来验证 protobuf 消息中字段的值 而无需显式调用其 getter 我看到周围的例子使用Descriptors FieldDescriptor实例到达消息映射内部 但它们要么基于迭代器 要么由字段号驱动 一旦我有
  • 尝试使用 Ruby Java Bridge (RJB) gem 时出现错误“无法创建 Java VM”

    我正在尝试实现 Ruby Java Bridge RJB gem 来与 JVM 通信 以便我可以运行 Open NLP gem 我在 Windows 8 上安装并运行了 Java 所有迹象 至少我所知道的 都表明 Java 已安装并可运行
  • android Accessibility-service 突然停止触发事件

    我有一个 AccessibilityService 工作正常 但由于开发过程中的某些原因它停止工作 我似乎找不到这个原因 请看一下我的代码并告诉我为什么它不起作用 public class MyServicee extends Access
  • com.jcraft.jsch.JSchException:身份验证失败

    当我从本地磁盘上传文件到远程服务器时 出现这样的异常 com jcraft jsch JSchException Auth fail at org apache tools ant taskdefs optional ssh Scp exe
  • java8 Collectors.toMap() 限制?

    我正在尝试使用java8Collectors toMap on a Stream of ZipEntry 这可能不是最好的想法 因为在处理过程中可能会发生异常 但我想这应该是可能的 我现在收到一个我不明白的编译错误 我猜是类型推理引擎 这是

随机推荐

  • rabbit-mq 本地环境搭建

    一 xff1a 安装RabbitMQ需要先安装Erlang语言开发包 xff0c 直接下载地址 xff1a http erlang org download otp win64 18 3 exe 尽量安装时不选择C盘 xff0c 避免操作系
  • oracle小数点前面没有0,纠结解惑

    一1 天前台人找到我 xff0c 说我们安装的数据库有问题 xff0c 为什么小数点前面是0就不显示呢 xff0c 我去看了一下 xff0c command窗口要显示 SQL gt create table ml test num numb
  • 网监后台管理系统设计思路

    本次做的是网监系统saas服务平台的后台管理系统 xff0c 不涉及复杂功能逻辑 就是从菜单 模板 系 统 组织架构 角色 用户的设计思路 产品需求 xff1a 在各个省市网监系统的数量不断增长 xff0c 且系统逻辑和功能模块大致相同 x
  • 分类算法 c4.5 详解

    C4 5是一系列用在机器学习和数据挖掘的分类问题中的算法 它的目标是监督学习 xff1a 给定一个数据集 xff0c 其中的每一个元组都能用一组属性值来描述 xff0c 每一个元组属于一个互斥的类别中的某一类 C4 5的目标是通过学习 xf
  • 正则表达式匹配Html代码中图片路劲

    正则表达匹配图片路径 public static string GetHtmlImageUrlList string sHtmlText 定义正则表达式用来匹配 img 标签 Regex regImg 61 new Regex 64 34
  • (GPU版)Pytorch+pycharm+jupyter安装记录(截至23年3月14日)

    由于搞了一台旧主机 xff0c 主机上没有pytorch等软件程序 xff0c 所以重新装一遍 xff0c 顺便记录一下 xff01 一 安装显卡GPU的驱动程序 xff0c 搞定CUDA先 WIN 43 R打开命令行 xff0c 输入命令
  • Alibaba官方最新发布的这份Java学习导图+彩版手册,真不是吹的

    时间飞逝 xff0c 转眼间毕业七年多 xff0c 从事 Java 开发也六年了 我在想 xff0c 也是时候将自己的 Java 整理成一套体系 这一次的知识体系面试题涉及到 Java 知识部分 性能优化 微服务 并发编程 开源框架 分布式
  • linux下使用第三方商店安装应用

    安装 snap store 进行下载 xff0c 相当与第三方应用商店 xff0c 但是往往比某一个官方软件源里面的应用要丰富或更实用 到 snap docs 中选择你的 linux 版本进入安装文档 xff0c 根据指示一步一步安装即可
  • Centos7离线安装sqlserver2017

    Centos7离线安装sqlserver2017 根据操作系统版本选择下载匹配的sqlserver版本 可以在这里找一下https packages microsoft com config 我选择是先在一台有网的机器上下载好rpm安装包之
  • HC-05蓝牙模块配置

    目录 1 连接蓝牙模块a 蓝牙模块通过USB转TTL连接电脑b 打开串口助手 xff0c 波特率设置为38400c 检验是否连接成功 2 配置波特率3 修改密码4 设置主从模式5 设置蓝牙连接模式6 查询自身地址7 添加配对蓝牙地址8 测试
  • Windows沙盒技术调研

    转载自 xff1a 移动云开发者社区 一 Windows沙盒技术介绍 Windows沙盒提供了轻型桌面环境 xff0c 可安全地隔离运行应用程序 沙盒环境中Windows软件保持 34 沙盒 34 状态 xff0c 并独立于主机运行 沙盒是
  • OS + Linux Shell bash / sh / ksh / csh / tcsh / adb shell

    s Android adb shell ADB Android debug bridge Android手机实际是基于Linux系统的 Google提供的ADB工具包带有fastboot exe rar http dl iteye com
  • kali利用CVE_2019_0708(远程桌面代码执行漏洞)攻击win7

    一 漏洞说明 2019年5月15日微软发布安全补丁修复了CVE编号为CVE 2019 0708的Windows远程桌面服务 xff08 RDP xff09 远程代码执行漏洞 该漏洞在不需身份认证的情况下即可远程触发 危害与影响面极大 目前
  • 数据库系统原理1

    第一章 数据库管理技术发展的不同阶段形成不同的特点 数据描述经历了三个阶段对应于三个数据模型 第二章 数据库系统的生命周期 xff0c 书中可能和我们学习软工的时候有些出入 xff0c 其实就是不同时间有不同的理解 xff0c 横看成岭侧成
  • ssh详解

    SSH ssh secure shell protocol 22 tcp 安全的 具体的软件实现 xff1a OpenSSH ssh协议的开源实现 xff0c CentOS dropbear xff1a 另一个开源实现 SSH协议版本 v1
  • spring框架的简单配置步骤——小马同学@Tian

    spring框架配置步骤 1 导入jar包 本教程使用spring5 1 5 xff0c 在pom xml中进行导入依赖 Maven方式 xff1a span class token tag span class token tag spa
  • PSReadLine - Powershell 的强化工具

    PSReadLine Powershell 的强化工具 UPDATE 2022 3 4 根据其 Github README 的说明 xff0c If you are using Windows PowerShell on Windows 1
  • 美化 PowerShell

    美化 PowerShell UPDATE 2022 3 4 本文使用的 oh my posh 基于 V2 版本 xff0c 而更新且功能更强大的新版本已经发布 xff0c 如需使用请参考其官方文档 1 准备工作 Step1 下载并安装 Po
  • nltk下载语料库

    1 首先我们使用命令pip list查看是否安装了nltk模块 xff0c 如果没有 xff0c 则执行命令pip3 install nltk进行安装 2 之后 xff0c 我们在Jupyter Notebook中进行语料库的安装 impo
  • Spark scala和java的api使用

    Spark scala和java的api使用 1 利用scala语言开发spark的worcount程序 xff08 本地运行 xff09 package com zy spark import org apache spark rdd R