Spark:常用算子总结大全

2023-11-10

目录

park的算子的分类

 从大方向来说,Spark 算子大致可以分为以下两类:

1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。

2)Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。

从小方向来说,Spark 算子大致可以分为以下三类:

1)Value数据类型的Transformation算子  

2)Key-Value数据类型的Transfromation算子

3)Action算子

算子介绍

(1)map

(2)flatMap

(3)mapPartitions

(4)glom

(5)union

(6)cartesian

(7)groupBy

(8)filter

(9)distinct

(10)subtract

(11)sample

(12)takeSample

(13)cache

(14)persist

(15)mapValues

(16)combineByKey

(17)reduceByKey

(18)partitionBy

(19)Cogroup

(20)join

(21)leftOutJoin和rightOutJoin

(22)foreach

(23)saveAsTextFile

(24)saveAsObjectFile

(25)collect

(26)collectAsMap

(27)reduceByKeyLocally

(28)lookup

(29)count

(30)top

(31)reduce

(32)fold

(33)aggregate


park的算子的分类

 从大方向来说,Spark 算子大致可以分为以下两类:

1)Transformation 变换/转换算子:这种变换并不触发提交作业,完成作业中间过程处理。

 Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。

2)Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。

  Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统

从小方向来说,Spark 算子大致可以分为以下三类:

1)Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数据项是Value型的数据。
2)Key-Value数据类型的Transfromation算子,这种变换并不触发提交作业,针对处理的数据项是Key-Value型的数据对。

3)Action算子,这类算子会触发SparkContext提交Job作业。

 

1)Value数据类型的Transformation算子  

一、输入分区与输出分区一对一型

1、map算子

2、flatMap算子

3、mapPartitions算子

4、glom算子

二、输入分区与输出分区多对一型 

5、union算子

6、cartesian算子

三、输入分区与输出分区多对多型

7、grouBy算子

四、输出分区为输入分区子集型

8、filter算子

9、distinct算子

10、subtract算子

11、sample算子

   12、takeSample算子

五、Cache型

13、cache算子  

14、persist算子

 

2)Key-Value数据类型的Transfromation算子

一、输入分区与输出分区一对一

15、mapValues算子

二、对单个RDD或两个RDD聚集

 单个RDD聚集

16、combineByKey算子

17、reduceByKey算子

18、partitionBy算子

两个RDD聚集

19、Cogroup算子

三、连接

20、join算子

21、leftOutJoin和 rightOutJoin算子

 

3)Action算子

一、无输出

22、foreach算子

二、HDFS

23、saveAsTextFile算子

24、saveAsObjectFile算子

三、Scala集合和数据类型

25、collect算子

26、collectAsMap算子

27、reduceByKeyLocally算子

28、lookup算子

29、count算子

30、top算子

31、reduce算子

32、fold算子

33、aggregate算子

 

 

算子介绍

1. Transformations 算子

(1)map

将原来 RDD 的每个数据项通过map 中的用户自定义函数 f映射转变为一个新的元素。源码中 map 算子相当于初始化一个 RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))。

图 1中每个方框表示一个 RDD 分区,左侧的分区经过用户自定义函数 f:T->U映射为右侧的新 RDD 分区。但是,实际只有等到 Action算子触发后,这个 f 函数才会和其他函数在一个stage 中对数据进行运算。在图 1 中的第一个分区,数据记录 V1 输入 f,通过 f 转换输出为转换后的分区中的数据记录 V’1。
\
 

图1 map 算子对 RDD 转换                   

(2)flatMap


将原来 RDD 中的每个元素通过函数 f 转换为新的元素,并将生成的 RDD 的每个集合中的元素合并为一个集合,内部创建 FlatMappedRDD(this,sc.clean(f))。
  图 2 表 示 RDD 的 一 个 分 区 ,进 行 flatMap函 数 操 作, flatMap 中 传 入 的 函 数 为 f:T->U,T和 U 可以是任意的数据类型。将分区中的数据通过用户自定义函数 f 转换为新的数据。外部大方框可以认为是一个 RDD 分区,小方框代表一个集合。 V1、 V2、 V3 在一个集合作为 RDD 的一个数据项,可能存储为数组或其他容器,转换为V’1、 V’2、 V’3 后,将原来的数组或容器结合拆散,拆散的数据形成为 RDD 中的数据项。

\

  图2  flapMap 算子对 RDD 转换


(3)mapPartitions


mapPartitions 函 数 获 取 到 每 个 分 区 的 迭 代器,在 函 数 中 通 过 这 个 分 区 整 体 的 迭 代 器 对整 个 分 区 的 元 素 进 行 操 作。 内 部 实 现 是 生 成
MapPartitionsRDD。图 3 中的方框代表一个 RDD 分区。图 3 中,用户通过函数 f (iter)=>iter.f ilter(_>=3) 对分区中所有数据进行过滤,大于和等于 3 的数据保留。一个方块代表一个 RDD 分区,含有 1、 2、 3 的分区过滤只剩下元素 3。

\

图3 mapPartitions 算子对 RDD 转换

(4)glom

 

glom函数将每个分区形成一个数组,内部实现是返回的GlommedRDD。 图4中的每个方框代表一个RDD分区。图4中的方框代表一个分区。 该图表示含有V1、 V2、 V3的分区通过函数glom形成一数组Array[(V1),(V2),(V3)]。

\

图 4 glom算子对RDD转换

(5)union


使用 union 函数时需要保证两个 RDD 元素的数据类型相同,返回的 RDD 数据类型和被合并的 RDD 元素数据类型相同,并不进行去重操作,保存所有元素。如果想去重
可以使用 distinct()。同时 Spark 还提供更为简洁的使用 union 的 API,通过 ++ 符号相当于 union 函数操作。
图 5 中左侧大方框代表两个 RDD,大方框内的小方框代表 RDD 的分区。右侧大方框代表合并后的 RDD,大方框内的小方框代表分区。

含有V1、V2、U1、U2、U3、U4的RDD和含有V1、V8、U5、U6、U7、U8的RDD合并所有元素形成一个RDD。V1、V1、V2、V8形成一个分区,U1、U2、U3、U4、U5、U6、U7、U8形成一个分区。
\
  图 5 union 算子对 RDD 转换 



(6)cartesian


对 两 个 RDD 内 的 所 有 元 素进 行 笛 卡 尔 积 操 作。 操 作 后, 内 部 实 现 返 回CartesianRDD。图6中左侧大方框代表两个 RDD,大方框内的小方框代表 RDD 的分区。右侧大方框代表合并后的 RDD,大方框内的小方框代表分区。图6中的大方框代表RDD,大方框中的小方框代表RDD分区。
例 如: V1 和 另 一 个 RDD 中 的 W1、 W2、 Q5 进 行 笛 卡 尔 积 运 算 形 成 (V1,W1)、(V1,W2)、 (V1,Q5)。
\

图 6 cartesian 算子对 RDD 转换

 

(7)groupBy


  groupBy :将元素通过函数生成相应的 Key,数据就转化为 Key-Value 格式,之后将 Key 相同的元素分为一组。
  函数实现如下:
  1)将用户函数预处理:
  val cleanF = sc.clean(f)
  2)对数据 map 进行函数操作,最后再进行 groupByKey 分组操作。

this.map(t => (cleanF(t), t)).groupByKey(p)
  其中, p 确定了分区个数和分区函数,也就决定了并行化的程度。

图7 中方框代表一个 RDD 分区,相同key 的元素合并到一个组。例如 V1 和 V2 合并为 V, Value 为 V1,V2。形成 V,Seq(V1,V2)。
\
  图 7groupBy 算子对 RDD 转换

(8)filter


filter 函数功能是对元素进行过滤,对每个 元 素 应 用 f 函 数, 返 回 值 为 true 的 元 素 在RDD 中保留,返回值为 false 的元素将被过滤掉。 内 部 实 现 相 当 于 生 成 FilteredRDD(this,sc.clean(f))。
下面代码为函数的本质实现:
deffilter(f:T=>Boolean):RDD[T]=newFilteredRDD(this,sc.clean(f))
  图 8 中每个方框代表一个 RDD 分区, T 可以是任意的类型。通过用户自定义的过滤函数 f,对每个数据项操作,将满足条件、返回结果为 true 的数据项保留。例如,过滤掉 V2 和 V3 保留了 V1,为区分命名为 V’1。

\

图 8 filter 算子对 RDD 转换

(9)distinct

distinct将RDD中的元素进行去重操作。图9中的每个方框代表一个RDD分区,通过distinct函数,将数据去重。 例如,重复数据V1、 V1去重后只保留一份V1。
\

图9 distinct算子对RDD转换

(10)subtract

subtract相当于进行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。图10中左侧的大方框代表两个RDD,大方框内的小方框代表RDD的分区。 右侧大方框
代表合并后的RDD,大方框内的小方框代表分区。 V1在两个RDD中均有,根据差集运算规则,新RDD不保留,V2在第一个RDD有,第二个RDD没有,则在新RDD元素中包含V2。
  \

    图10 subtract算子对RDD转换

(11)sample


sample 将 RDD 这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。内部实现是生成 SampledRDD(withReplacement, fraction, seed)。
  函数参数设置:
‰   withReplacement=true,表示有放回的抽样。
‰   withReplacement=false,表示无放回的抽样。
  图 11中 的 每 个 方 框 是 一 个 RDD 分 区。 通 过 sample 函 数, 采 样 50% 的 数 据。V1、 V2、 U1、 U2、U3、U4 采样出数据 V1 和 U1、 U2 形成新的 RDD。

\

 图11 sample 算子对 RDD 转换

(12)takeSample

 

takeSample()函数和上面的sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行
Collect(),返回结果的集合为单机的数组。
  图12中左侧的方框代表分布式的各个节点上的分区,右侧方框代表单机上返回的结果数组。 通过takeSample对数据采样,设置为采样一份数据,返回结果为V1。

\

图12   takeSample算子对RDD转换

(13)cache


cache将 RDD 元素从磁盘缓存到内存。 相当于 persist(MEMORY_ONLY) 函数的功能。
图13 中每个方框代表一个 RDD 分区,左侧相当于数据分区都存储在磁盘,通过 cache 算子将数据缓存在内存。
\

图 13 Cache 算子对 RDD 转换

(14)persist


persist 函数对RDD 进行缓存操作。数据缓存在哪里依据 StorageLevel 这个枚举类型进行确定。 有以下几种类型的组合(见10), DISK 代表磁盘,MEMORY 代表内存, SER 代表数据是否进行序列化存储。

下面为函数定义, StorageLevel 是枚举类型,代表存储模式,用户可以通过图 14-1 按需进行选择。
  persist(newLevel:StorageLevel)
  图 14-1 中列出persist 函数可以进行缓存的模式。例如,MEMORY_AND_DISK_SER 代表数据可以存储在内存和磁盘,并且以序列化的方式存储,其他同理。
\

      图 14-1 persist 算子对 RDD 转换

图 14-2 中方框代表 RDD 分区。 disk 代表存储在磁盘, mem 代表存储在内存。数据最初全部存储在磁盘,通过 persist(MEMORY_AND_DISK) 将数据缓存到内存,但是有的分区无法容纳在内存,将含有 V1、 V2、 V3 的RDD存储到磁盘,将含有U1,U2的RDD仍旧存储在内存。

\

图 14-2 Persist 算子对 RDD 转换

(15)mapValues


mapValues :针对(Key, Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。

图 15 中的方框代表 RDD 分区。 a=>a+2 代表对 (V1,1) 这样的 Key Value 数据对,数据只对 Value 中的 1 进行加 2 操作,返回结果为 3。

\

图 15 mapValues 算子 RDD 对转换

(16)combineByKey


  下面代码为 combineByKey 函数的定义:
  combineByKey[C](createCombiner:(V) C,
  mergeValue:(C, V) C,
  mergeCombiners:(C, C) C,
  partitioner:Partitioner,
  mapSideCombine:Boolean=true,
  serializer:Serializer=null):RDD[(K,C)]

说明:
‰   createCombiner: V => C, C 不存在的情况下,比如通过 V 创建 seq C。
‰   mergeValue: (C, V) => C,当 C 已经存在的情况下,需要 merge,比如把 item V
加到 seq C 中,或者叠加。
  mergeCombiners: (C, C) => C,合并两个 C。
‰   partitioner: Partitioner, Shuff le 时需要的 Partitioner。
‰   mapSideCombine : Boolean = true,为了减小传输量,很多 combine 可以在 map
端先做,比如叠加,可以先在一个 partition 中把所有相同的 key 的 value 叠加,
再 shuff le。
‰   serializerClass: String = null,传输需要序列化,用户可以自定义序列化类:

例如,相当于将元素为 (Int, Int) 的 RDD 转变为了 (Int, Seq[Int]) 类型元素的 RDD。图 16中的方框代表 RDD 分区。如图,通过 combineByKey, 将 (V1,2), (V1,1)数据合并为( V1,Seq(2,1))。
  \

图 16 comBineByKey 算子对 RDD 转换

(17)reduceByKey


reduceByKey 是比 combineByKey 更简单的一种情况,只是两个值合并成一个值,( Int, Int V)to (Int, Int C),比如叠加。所以 createCombiner reduceBykey 很简单,就是直接返回 v,而 mergeValue和 mergeCombiners 逻辑是相同的,没有区别。
函数实现:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
= {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
  图17中的方框代表 RDD 分区。通过用户自定义函数 (A,B) => (A + B) 函数,将相同 key 的数据 (V1,2) 和 (V1,1) 的 value 相加运算,结果为( V1,3)。
\

  图 17reduceByKey 算子对 RDD 转换

(18)partitionBy

partitionBy函数对RDD进行分区操作。
  函数定义如下。
  partitionBy(partitioner:Partitioner)
  如果原有RDD的分区器和现有分区器(partitioner)一致,则不重分区,如果不一致,则相当于根据分区器生成一个新的ShuffledRDD。
  图18中的方框代表RDD分区。 通过新的分区策略将原来在不同分区的V1、 V2数据都合并到了一个分区。
\

图18  partitionBy算子对RDD转换

(19)Cogroup

cogroup函数将两个RDD进行协同划分,cogroup函数的定义如下。
  cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
  对在两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集合,并且返回两个RDD中对应Key的元素集合的迭代器。
  (K, (Iterable[V], Iterable[W]))
  其中,Key和Value,Value是两个RDD下相同Key的两个数据集合的迭代器所构成的元组。
  图19中的大方框代表RDD,大方框内的小方框代表RDD中的分区。 将RDD1中的数据(U1,1)、 (U1,2)和RDD2中的数据(U1,2)合并为(U1,((1,2),(2)))。

\

  图19 Cogroup算子对RDD转换

(20)join


join 对两个需要连接的 RDD 进行 cogroup函数操作,将相同 key 的数据能够放到一个分区,在 cogroup 操作之后形成的新 RDD 对每个key 下的元素进行笛卡尔积的操作,返回的结果再展平,对应 key 下的所有元组形成一个集合。最后返回 RDD[(K, (V, W))]。
  下 面 代 码 为 join 的 函 数 实 现, 本 质 是通 过 cogroup 算 子 先 进 行 协 同 划 分, 再 通 过flatMapValues 将合并的数据打散。
this.cogroup(other,partitioner).f latMapValues{case(vs,ws) =>for(v<-vs;w<-ws)yield(v,w) }
图 20是对两个 RDD 的 join 操作示意图。大方框代表 RDD,小方框代表 RDD 中的分区。函数对相同 key 的元素,如 V1 为 key 做连接后结果为 (V1,(1,1)) 和 (V1,(1,2))。
\

              图 20 join 算子对 RDD 转换

(21)leftOutJoin和rightOutJoin

LeftOutJoin(左外连接)和RightOutJoin(右外连接)相当于在join的基础上先判断一侧的RDD元素是否为空,如果为空,则填充为空。 如果不为空,则将数据进行连接运算,并
返回结果。
下面代码是leftOutJoin的实现。
if (ws.isEmpty) {
vs.map(v => (v, None))
} else {
for (v <- vs; w <- ws) yield (v, Some(w))
}

2. Actions 算子
  本质上在 Action 算子中通过 SparkContext 进行了提交作业的 runJob 操作,触发了RDD DAG 的执行。
例如, Action 算子 collect 函数的代码如下,感兴趣的读者可以顺着这个入口进行源码剖析:

/**
* Return an array that contains all of the elements in this RDD.
*/
def collect(): Array[T] = {
/* 提交 Job*/
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}


(22)foreach


  foreach 对 RDD 中的每个元素都应用 f 函数操作,不返回 RDD 和 Array, 而是返回Uint。图22表示 foreach 算子通过用户自定义函数对每个数据项进行操作。本例中自定义函数为 println(),控制台打印所有数据项。
  \

图 22 foreach 算子对 RDD 转换

 

(23)saveAsTextFile


  函数将数据输出,存储到 HDFS 的指定目录。

下面为 saveAsTextFile 函数的内部实现,其内部
  通过调用 saveAsHadoopFile 进行实现:
this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
将 RDD 中的每个元素映射转变为 (null, x.toString),然后再将其写入 HDFS。
  图 23中左侧方框代表 RDD 分区,右侧方框代表 HDFS 的 Block。通过函数将RDD 的每个分区存储为 HDFS 中的一个 Block。

\

      图 23 saveAsHadoopFile 算子对 RDD 转换

(24)saveAsObjectFile

saveAsObjectFile将分区中的每10个元素组成一个Array,然后将这个Array序列化,映射为(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile的格式。
  下面代码为函数内部实现。
  map(x=>(NullWritable.get(),new BytesWritable(Utils.serialize(x))))
  图24中的左侧方框代表RDD分区,右侧方框代表HDFS的Block。 通过函数将RDD的每个分区存储为HDFS上的一个Block。

\

      图24 saveAsObjectFile算子对RDD转换

(25)collect


  collect 相当于 toArray, toArray 已经过时不推荐使用, collect 将分布式的 RDD 返回为一个单机的 scala Array 数组。在这个数组上运用 scala 的函数式操作。
  图 25中左侧方框代表 RDD 分区,右侧方框代表单机内存中的数组。通过函数操作,将结果返回到 Driver 程序所在的节点,以数组形式存储。

\

图 25 Collect 算子对 RDD 转换 

(26)collectAsMap

collectAsMap对(K,V)型的RDD数据返回一个单机HashMap。 对于重复K的RDD元素,后面的元素覆盖前面的元素。
  图26中的左侧方框代表RDD分区,右侧方框代表单机数组。 数据通过collectAsMap函数返回给Driver程序计算结果,结果以HashMap形式存储。

\

    图26 CollectAsMap算子对RDD转换

(27)reduceByKeyLocally

实现的是先reduce再collectAsMap的功能,先对RDD的整体进行reduce操作,然后再收集所有结果返回为一个HashMap。

(28)lookup

下面代码为lookup的声明。
lookup(key:K):Seq[V]
Lookup函数对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq。 这个函数处理优化的部分在于,如果这个RDD包含分区器,则只会对应处理K所在的分区,然后返回由(K,V)形成的Seq。 如果RDD不包含分区器,则需要对全RDD元素进行暴力扫描处理,搜索指定K对应的元素。
  图28中的左侧方框代表RDD分区,右侧方框代表Seq,最后结果返回到Driver所在节点的应用中。

\

图28 lookup对RDD转换

(29)count


  count 返回整个 RDD 的元素个数。
  内部函数实现为:
  defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum
  图 29中,返回数据的个数为 5。一个方块代表一个 RDD 分区。
\

图29 count 对 RDD 算子转换

(30)top

top可返回最大的k个元素。 函数定义如下。
top(num:Int)(implicit ord:Ordering[T]):Array[T]

相近函数说明如下。
·top返回最大的k个元素。
·take返回最小的k个元素。
·takeOrdered返回最小的k个元素,并且在返回的数组中保持元素的顺序。
·first相当于top(1)返回整个RDD中的前k个元素,可以定义排序的方式Ordering[T]。
返回的是一个含前k个元素的数组。

(31)reduce

reduce函数相当于对RDD中的元素进行reduceLeft函数的操作。 函数实现如下。
  Some(iter.reduceLeft(cleanF))
  reduceLeft先对两个元素进行reduce函数操作,然后将结果和迭代器取出的下一个元素进行reduce函数操作,直到迭代器遍历完所有元素,得到最后结果。在RDD中,先对每个分区中的所有元素的集合分别进行reduceLeft。 每个分区形成的结果相当于一个元素,再对这个结果集合进行reduceleft操作。
  例如:用户自定义函数如下。
  f:(A,B)=>(A._1+”@”+B._1,A._2+B._2)
  图31中的方框代表一个RDD分区,通过用户自定函数f将数据进行reduce运算。 示例
最后的返回结果为V1@[1]V2U!@U2@U3@U4,12。

\

图31 reduce算子对RDD转换

(32)fold

fold和reduce的原理相同,但是与reduce不同,相当于每个reduce时,迭代器取的第一个元素是zeroValue。
  图32中通过下面的用户自定义函数进行fold运算,图中的一个方框代表一个RDD分区。 读者可以参照reduce函数理解。
  fold((”V0@”,2))( (A,B)=>(A._1+”@”+B._1,A._2+B._2))
 

\

    图32 fold算子对RDD转换

(33)aggregate

aggregate先对每个分区的所有元素进行aggregate操作,再对分区的结果进行fold操作。
  aggreagate与fold和reduce的不同之处在于,aggregate相当于采用归并的方式进行数据聚集,这种聚集是并行化的。 而在fold和reduce函数的运算过程中,每个分区中需要进行串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚集,并返回最终聚集结果。
  函数的定义如下。
aggregate[B](z: B)(seqop: (B,A) => B,combop: (B,B) => B): B
  图33通过用户自定义函数对RDD 进行aggregate的聚集操作,图中的每个方框代表一个RDD分区。
  rdd.aggregate(”V0@”,2)((A,B)=>(A._1+”@”+B._1,A._2+B._2)),(A,B)=>(A._1+”@”+B_1,A._@+B_.2))
  最后,介绍两个计算模型中的两个特殊变量。
  广播(broadcast)变量:其广泛用于广播Map Side Join中的小表,以及广播大变量等场景。 这些数据集合在单节点内存能够容纳,不需要像RDD那样在节点之间打散存储。
Spark运行时把广播变量数据发到各个节点,并保存下来,后续计算可以复用。 相比Hadoo的distributed cache,广播的内容可以跨作业共享。 Broadcast的底层实现采用了BT机制。
\

  图33 aggregate算子对RDD转换

②代表V。
  ③代表U。
  accumulator变量:允许做全局累加操作,如accumulator变量广泛使用在应用中记录当前的运行指标的情景。

 

原文参考:https://www.2cto.com/net/201805/742582.html 

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark:常用算子总结大全 的相关文章

  • Spark集群安装部署

    目录 一 环境准备 二 安装步骤 三 使用Standalone模式 四 使用Yarn模式 一 环境准备 由于Spark仅仅是一种计算机框架 不负责数据的存储和管理 因此 通常都会将Spark和Hadoop进行统一部署 由Hadoop中的HD
  • Spark数据处理实战之列空值&新增列处理

    在spark dataset dataframe操作过程中 我们经常会遇到对于一个列值的一些判断情况 是否为NULL 创建一个新列等 本文讲解了常用的增加列的方法 并且对于列空值判断 填充处理以及查询的api做了详细的描述和实例讲解 阅读完
  • Spark on Kubernetes 与 Spark on Yarn 不完全对比分析

    前言 Apache Spark 是目前应用最广泛的大数据分析计算工具之一 它擅长于批处理和实时流处理 并支持机器学习 人工智能 自然语言处理和数据分析应用 随着 Spark 越来越受欢迎 使用量越来越大 狭义上的 Hadoop MR 技术栈
  • Compressed Sparse Column format(CSC)

    CSR Compressed Sparse Row format 和CSC Compressed Spare Column format 都是一种稀疏矩阵的存储格式 这里分别给出实例 假设有如下矩阵 1360
  • 学习大数据spark——心得体会

    总结与体会 1 项目总结 本次项目实现了Spark 单机模式Python版的安装 介绍了与Spark编程有关的一些基本概念 特别对RDD的创建 转换和行动操作做了比较详细的说明 对从RDD 到DataFrame的实现进 行了案例训练 包括
  • 【Spark系列2】reduceByKey和groupByKey区别与用法

    在spark中 我们知道一切的操作都是基于RDD的 在使用中 RDD有一种非常特殊也是非常实用的format pair RDD 即RDD的每一行是 key value 的格式 这种格式很像Python的字典类型 便于针对key进行一些处理
  • Kafka/Spark消费topic到写出到topic

    1 Kafka的工具类 1 1 从kafka消费数据的方法 消费者代码 def getKafkaDStream ssc StreamingContext topic String groupId String consumerConfigs
  • dolphinschedule使用shell任务结束状态研究

    背景 配置的dolphin任务 使用的是shell shell里包含了spark submit 如下截图 dolphin shell 介绍完毕 开始说明现象 有天有人调整了集群的cdp配置 executor cores max 1 我之前这
  • 基于Spark的电商用户行为实时分析可视化系统(Flask-SocketIO)

    基于Spark的电商用户行为实时分析可视化系统 Flask SocketIO 项目简介 该项目已上线蓝桥课程 有需要的可凭邀请码 UB5mdLbl 学习哦 有优惠 课程地址 https www lanqiao cn courses 2629
  • Spark Job写文件个数的控制以及小文件合并的一个优化

    文章目录 背景说明 通过引入额外Shuffle对写入数据进行合并 EnsureRepartitionForWriting Rule CoalesceShufflePartitions Rule OptimizeShuffleWithLoca
  • 【Spark NLP】第 7 章:分类和回归

    大家好 我是Sonhhxg 柒 希望你看完之后 能对你有所帮助 不足请指正 共同学习交流 个人主页 Sonhhxg 柒的博客 CSDN博客 欢迎各位 点赞 收藏 留言 系列专栏 机器学习 ML 自然语言处理 NLP 深度学习 DL fore
  • Spark大数据分析与实战笔记(第一章 Scala语言基础-3)

    文章目录 1 3 Scala的数据结构 1 3 1 数组 数组的遍历 数组转换 1 3 2 元组 创建元组 获取元组中的值 拉链操作 1 3 3 集合 List Set Map 1 3 Scala的数据结构 对于每一门编程语言来说 数组 A
  • 大数据—— Flink 的优化

    目录 一 Flink内存优化 1 1 Flink 内存配置 二 配置进程参数 2 1 场景 2 2 操作步骤 三 解决数据倾斜 3 1 场景描述 3 2 解决方式 3 2 1 数据源的消费不均匀 调整并发度 3 2 2 数据分布不均匀 四
  • 数据中台-让数据用起来-6

    文章目录 第六章 数据开发 数据价值提炼工厂 6 1 数据计算能力的4种类型 6 1 1 批计算 6 1 2 流计算 6 1 3 在线查询 6 1 4 即席分析 6 2 离线开发 1 作业调度 2 基线控制 3 异构存储 4 代码校验 5
  • 【硬刚大数据之学习路线篇】2021年从零到大数据专家的学习指南(全面升级版)

    欢迎关注博客主页 https blog csdn net u013411339 本文由 王知无 原创 首发于 CSDN博客 本文首发CSDN论坛 未经过官方和本人允许 严禁转载 欢迎点赞 收藏 留言 欢迎留言交流 声明 本篇博客在我之前发表
  • Spark常用参数解释

    Spark的默认配置文件位于堡垒机上的这个位置 SPARK CONF DIR spark defaults conf 用户可以自行查看和理解 需要注意的是 默认值优先级最低 用户如果提交任务时或者代码里明确指定配置 则以用户配置为先 用户再
  • spark SQL基础教程

    1 sparkSQL入门 sparksql专门用于处理结构化的数据 而RDD还可以处理非结构化的数据 sparksql的优点之一是sparkfsql使用统一的api读取不同的数据 第二个优点是可以在语言中使用其他语言 例如python 另外
  • JAVA 安装与简单使用

    JAVA简易安装 下载安装 环境变量 进入变量界面 设置变量 验证JAVA环境 运行Java程序 个人站 ghzzz cn 还在备案 很快就能访问了 下载安装 第一步当然是从官网下载安装java了 网上有很多的教程 这里简单的写一下 在这里
  • Spark SQL 项目:实现各区域热门商品前N统计

    一 需求1 1 需求简介这里的热门商品是从点击量的维度来看的 计算各个区域前三大热门商品 并备注上每个商品在主要城市中的分布比例 超过两个城市用其他显示 1 2 思路分析使用 sql 来完成 碰到复杂的需求 可以使用 udf 或 udaf查
  • 2023_Spark_实验二十九:Flume配置KafkaSink

    实验目的 掌握Flume采集数据发送到Kafka的方法 实验方法 通过配置Flume的KafkaSink采集数据到Kafka中 实验步骤 一 明确日志采集方式 一般Flume采集日志source有两种方式 1 Exec类型的Source 可

随机推荐

  • 从N个整数中判断是否有三个整数能组成三角形

    解决这个问题 可以用斐波那契数列 Fibonacci sequence 原因 斐波那契数列中的数是不可能组成三角形的 而我们只要在这些数列里面加一个数就可以有一个三角形可以组成 有了这个原因我们就可以写一个非常快速就可以判断出结果的函数 如
  • C#:递归汉诺塔

    一 运行效果截图 二 实验要求 如果n 1 直接将金片从A针移到C针上 函数输出 Form A to C 如果n gt 1时 移动过程分解为以下几个步骤 1 将A上的n 1片金片借助C针移到B针上 2 把A针上剩下的一片金片由A针移到C针上
  • stomp.min.js(stomp协议的客户端脚本)、sockjs.min.js(SockJS的客户端脚本)以及jQuery

    stomp min js stomp协议的客户端脚本 sockjs min js SockJS的客户端脚本 以及jQuery 点击打开链接
  • [ C++ ] — 智能指针

    一 三种智能指针 auto ptr unique ptr shared ptr auto ptr是C 98提供的方案 C 11已经将其摒弃 以下只是示例 实际不要使用auto ptr 使用new和delete管理动态内存常出现的问题 1 忘
  • STM32MP1开发环境搭建

    STM32MP1 wiki教程 stm32mpu 按照教程的介绍 开发MPU需要在linux环境下 一般选择在VMware虚拟机环境下安装Ubuntu 安装步骤 1 安装VMware 我安装的是VMware 10 0 0 链接 https
  • jQuery使用手册

    官方网站 http jquery com jQuery是一款优秀js开发库类 特别是对css和XPath的支持 使我们写js变得更加方便 如果你不是个js高手又想写出优 秀的js效果 jQuery可以帮你达到目的 下载地址 Starterk
  • C语言数据结构问题:停车场问题(栈和队列)

    试题描述 设停车场只有一个可停放几辆汽车的狭长通道 只有一个大门可供汽车进出 汽车在停车场内按车辆到达的先后顺序依次排列 若车场内已停满几辆汽车 则后来的汽车只能在门外的便道上等候 一旦停车场内有车辆开走 则排在便道上的第一辆汽车即可进入
  • ARP(地址解析协议)协议和RARP协议(逆地址解析协议)

    ARP协议 地址解析协议 及ARP 是根据IP地址获取物理地址的一个TCP IP协议 主机发送信息是将包含将包含目标IP地址的APR请求广播到局域网络上的所有主机 并接收返回消息 以此确定目标的物理地址 受到返回消息的时候将IP地址和物理地
  • 接口自动化之测试数据动态生成并替换

    一 测试数据 1 随机库random 查看内置random方法 该方法自行学习 不再介绍 show 2 Faker库 pip install faker showHttps github com joke2k faker 3 应用到项目中
  • Java 反射机制 与 工厂设计模式

    什么是反射 Java反射机制是在运行状态中 对于任意类 都能知道这个类的全部属性和方法 对于任意对象 都能够调用它的任何一个方法或属性 这种动态获取的信息以及动态调用对象的方法的功能 称为Java语言的反射机制 Class类 Class 是
  • MPI与main()程序中的其他函数执行次数

    我原先以为只有在MPI代码区域 即MPI Init argc argv 到MPI Finalize 中的代码才会涉及到进程通信的问题 但实际上在MPI区域外的代码依然受到影响 执行的次数与开启的进程数有关 为此可以使用MPI 秩 rank
  • AttributeError: 'Function' object has no attribute 'fn' [in caffe]

    n global pool prob3 L Sigmoid n global pool up3 name global pool prob3 ntop 0 top global pool up3 n att repmat3 L Tile n
  • 智能语音技术栈

    识别原理 硬件数据采集 软件数据处理 目前主流的开源平台包括CMU Sphinx HTK Kaldi Julius iATROS CNTK TensorFlow等 CMU Sphinx是离线的语音识别工具 支持DSP等低功耗的离线应用场景
  • 推荐系统 用户画像 标签聚类 个性化搜索

    最近在做短视频推荐 和别的部门配合着做 我们部门做用户画像这一部分 回头看看 我们部门以前做的用户画像只能称之为 所谓的用户画像 如果一个人不懂用户画像还好指挥来指挥去真的让人无言 不知道其他公司的有没有这样的人儿那 哈哈 扯远了 言归正传
  • linux系统编程(五)针对linux系统中文件的IO操作

    文章目录 1 系统调用 2 C标准库文件IO函数 3 open close函数 3 1 函数原型 3 2 常用参数 3 3 open常见错误 4 文件描述符 4 1 PCB进程控制块 4 2 文件描述图表 4 3 最大打开文件数 4 4 F
  • LVS负载均衡之--Keepalived模式(具详细)

    前言 前面和拐友们一起掌握了NAT和DR模式 这章来看一下负载均衡里的最后一种Keepalived模式 在生产中这个模式用的是还是比较广泛的 目录 一 Keepalived概述 1 2Keepalived的工作原理 1 3Keepalive
  • 登录时发起的请求是Get还是Post?Get和Post的区别

    为了保证信息的安全性 注册 登录等操作通常都会使用POST请求 GET请求一般用来获取信息 1 根据HTTP规范 GET用于信息获取 GET请求的数据会附在URL之后 就是把数据放置在HTTP协议头中 以 分割URL和传输数据 参数之间以
  • CSS总结第七天

    day09 前端基础CSS第七天 学习目标 能够使用精灵图 能够使用字体图标 能够写出 CSS 三角 能够写出常见的 CSS 用户界面样式 能够说出常见的布局技巧 1 精灵图 重点 1 1 为什么需要精灵图 一个网页中往往会应用很多小的背景
  • IDEA 类名及方法名为红色,但是能正常启动-处理办法

    今天在切换分支过后 idea里面很多类名 方法名报红 提示类等找不到 但是不影响功能 解决办法 点击 idae 的 File gt Invalideate Caches Restart 清除缓存并重启即可 UG7O9VKKH6 eyJsaW
  • Spark:常用算子总结大全

    目录 park的算子的分类 从大方向来说 Spark 算子大致可以分为以下两类 1 Transformation 变换 转换算子 这种变换并不触发提交作业 完成作业中间过程处理 2 Action 行动算子 这类算子会触发 SparkCont