128-152-spark-核心编程-源码

2023-05-16

128-spark-核心编程-源码(主要以了解基本原理和流程为主):

总体相关

​ 1.环境准备(Yarn 集群)
​ (1) Driver, Executor
​ 2.组件通信
​ (1) Driver => Executor
​ (2) Executor => Driver
​ (3) Executor => Executor
​ 3.应用程序的执行
​ (1) RDD 依赖
​ (2)阶段的划分
​ (3) 任务的切分
​ (4)任务的调度
​ (5)任务的执行

​ 4.Shuffle
​ (1) Shuffle 的原理和执行过程
​ (2) Shuffle 写磁盘
​ (3) Shuffle 读取磁盘
​ 5.内存的管理
​ (1)内存的分类
​ (2)内存的配置

起点:org/apache/spark/deploy/SparkSubmit.scala main

java org.apache.spark.deploy.SparkSubmit
java HelloWorld
JVM => Process ( SparkSubmit)
SparkSubmit.main
jps

结合尚硅谷视频讲解图片理解。

#提交命令
bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-examples_2.12-3.0.0.jar \
10

main->doSubmit->parseArguments->parse(args.asJava)->SparkSubmitArguments.handle(--master --class)
给action赋值action = Option(action).getOrElse(SUBMIT) org.apache.spark.deploy.SparkSubmit#submit ->doRunMain()-org.apache.spark.deploy.SparkSubmit#runMain->prepareSubmitEnvironment(准备提交的环境)

#准备提交的环境
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

#根据环境找到childMainClass
if (isYarnCluster) {childMainClass = YARN_CLUSTER_SUBMIT_CLASS。。。} (YARN_CLUSTER_SUBMIT_CLASS:	org.apache.spark.deploy.yarn.YarnClusterApplication)

#yarnclient创建了资源调度器rmclient
YarnClient.createYarnClient->ApplicationClientProtocol rmClient;->org.apache.spark.deploy.yarn.Client#run
#提交应用程序,返回appid
org.apache.spark.deploy.yarn.Client#submitApplication
#客户端启动      yarnClient.init(hadoopConf)->yarnClient.start()->val newApp = yarnClient.createApplication()创建应用	->	createContainerLaunchContext(创建容器环境)->createApplicationSubmissionContext(创建提交环境)
#连接和提交,yarnClient连接,submitApplication提交
yarnClient.submitApplication(appContext)->Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++ amArgs ++  ->amArgs->amClass="org.apache.spark.deploy.yarn.ApplicationMaster"(启动ApplicationMaster)
      


 #启动ApplicationMaster
 org.apache.spark.deploy.yarn.ApplicationMaster#main-》org.apache.spark.deploy.yarn.YarnRMClient#amClient属性。applicationmaster和resourcemaster的链接-》org.apache.spark.deploy.yarn.ApplicationMaster#runDriver-》
 org.apache.spark.deploy.yarn.ApplicationMaster#startUserApplication启动用户应用程序-》startUserApplication.start(driver驱动线程初始化sparkcontext以及run mian方法)-》org.apache.spark.deploy.yarn.ApplicationMaster#registerAM(注册到rm,申请资源)-》org.apache.spark.deploy.yarn.YarnRMClient#createAllocator(创建分配器)-》org.apache.spark.deploy.yarn.YarnAllocator#allocateResources(返回可用资源列表)-》
org.apache.spark.deploy.yarn.YarnAllocator#handleAllocatedContainers(处理可用的容器)-》
org.apache.spark.deploy.yarn.YarnAllocator#runAllocatedContainers(运行已分配的容器)-》
org.apache.spark.deploy.yarn.ExecutorRunnable#prepareCommand(准备指令)-》
nmClient.startContainer(container.get, ctx)(向指定的NM启动容器)-》
/bin/java org.apache.spark.executor.YarnCoarseGrainedExecutorBackend(启动Executor)


#启动Executor
org.apache.spark.executor.YarnCoarseGrainedExecutorBackend#main-》  run  ->
org.apache.spark.SparkEnv#createExecutorEnv(创建executorenv环境)-》
org.apache.spark.rpc.netty.Dispatcher#registerRpcEndpoint(注册rpc通讯终端)-》
org.apache.spark.rpc.netty.Inbox#Inbox(收件箱,自己给自己发消息constructor -> onStart -> receive* -> onStop)-》
org.apache.spark.executor.CoarseGrainedExecutorBackend#onStart-》
org.apache.spark.rpc.RpcEndpointRef#ask(向driver注册executor)-》
org.apache.spark.scheduler.SchedulerBackend-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receiveAndReply(接收和应答)-》
org.apache.spark.rpc.RpcCallContext#reply( context.reply(true)注册成功)-》
org.apache.spark.executor.CoarseGrainedExecutorBackend#receive(executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false,resources = _resources)创建Executor计算对象)-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receive(makeOffers(executorId)注册成功)-》

通讯环境:

Netty:通讯框架,AIO异步非阻塞IO,BIO阻塞式IO,NIO非阻塞式IO

Linux对AIO支持不够好,Windows支持,Linux采用Epoll方式模仿AIO操作

org.apache.spark.SparkContext#createSparkEnv-》
org.apache.spark.rpc.RpcEnv#create(rpcenv创建)-》
org.apache.spark.rpc.netty.NettyRpcEnv#NettyRpcEnv-》
org.apache.spark.util.Utils$#startServiceOnPort-》
org.apache.spark.rpc.netty.NettyRpcEnv#startServer(创建服务)-》
org.apache.spark.network.server.TransportServer#TransportServer(创建服务)-》
org.apache.spark.network.server.TransportServer#init(初始化)-》
org.apache.spark.network.util.NettyUtils#getServerChannelClass(nio,EPOLL方式模仿异步)-》
org.apache.spark.rpc.netty.Dispatcher#registerRpcEndpoint(注册通讯终端,receive接受数据,收件箱inbox)-》
org.apache.spark.rpc.RpcEndpointRef#ask(发送数据,终端引用,)-》
org.apache.spark.rpc.netty.NettyRpcEnv#outboxes属性,发件箱

应用程序的执行:

应用SparkContext对象重要相关字段

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EjehU7jt-1670772034910)(png/image-20211023100428603.png)]

(1) RDD 依赖

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wCRroNC7-1670772034911)(png/image-20211023101856923.png)]

(2)阶段的划分

org.apache.spark.rdd.RDD#collect(行动算子的触发)-》
org.apache.spark.SparkContext#runJob(运行任务)-》
org.apache.spark.scheduler.DAGScheduler#runJob(有向无环图)-》
org.apache.spark.util.EventLoop#post(将JobSubmitted事件放入事件队列中)-》
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop#doOnReceive(事件队列中取出作业提交)-》
org.apache.spark.scheduler.DAGScheduler#handleJobSubmitted(进行阶段的划分)-》
org.apache.spark.scheduler.DAGScheduler#createResultStage(进行阶段的划分)-》
org.apache.spark.scheduler.DAGScheduler#getOrCreateParentStages(获取上级阶段)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RePkKYud-1670772034912)(png/image-20211023101920351.png)]

(3) 任务的切分

每个阶段最后分区的数量即n*2个任务,例如最后一个阶段分为3个分区,最后shufflerdd也会有三个分区

org.apache.spark.scheduler.DAGScheduler#submitStage-》
org.apache.spark.scheduler.DAGScheduler#submitMissingTasks(没上一节阶段提交任务,有上一级阶段提交上一级阶段)-》

(4)任务的调度

org.apache.spark.scheduler.TaskScheduler#submitTasks(任务调度器提交任务)-》
org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks(实现)-》
org.apache.spark.scheduler.TaskSetManager(任务tasksset的管理者)-》
org.apache.spark.scheduler.SchedulableBuilder(调度器)-》
org.apache.spark.scheduler.TaskSchedulerImpl#initialize(初始化调度器,默认FIFO)-》
org.apache.spark.scheduler.Pool#addSchedulable(任务池添加调度)-》
org.apache.spark.scheduler.SchedulerBackend#reviveOffers(取任务)-》org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers(集群模式取)-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#makeOffers()-》
org.apache.spark.scheduler.TaskSchedulerImpl#resourceOffers(获取资源调度信息)-》
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#launchTasks(调度任务)-》
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))(任务池取出发送终端执行)

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HJ6oNHxS-1670772034912)(png/image-20211023104940587.png)]

本地化级别
#补充,本地化级别,首选位置。task任务发送到哪里,数据和节点位置等。效率考虑,移动数据不如移动计算
for(currentMaxLocality<-taskSet.myLocalityLevels) 
移动数据不如移动计算
计算和数据的位置存在不同的级别,这个级别称之为本地化级别
进程本地化:数据和计算在同一个进程中
节点本地化:数据和计算在同一个节点中
机架本地化:数据和计算在同一个机架中
任意

(5)任务的执行

org.apache.spark.executor.CoarseGrainedExecutorBackend#receive(executor接收到消息)-》
org.apache.spark.executor.Executor#launchTask(启动Task)-》
java.util.concurrent.ThreadPoolExecutor#execute(每个线程执行每个task)-》
org.apache.spark.executor.Executor.TaskRunner#run-》
org.apache.spark.scheduler.Task#run

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YeoNG4rn-1670772034914)(png/image-20211023111151468.png)]

Shuffle原理

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pnRL92cj-1670772034914)(png/image-20211023180336159.png)]

详解转变的过程

前提1:一核,一个task,落盘一个文件,三个任务读取数据,但是没法分辨需要的数据是文件中的那些数据

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ULeyvl0C-1670772034915)(png/image-20211023180544869.png)]

前提2:多核,多task,每个任务针对落盘三个文件,导致小文件过多

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NTmqEXci-1670772034915)(png/image-20211023180730597.png)]

前提3:对前提2的优化,将同核任务的落盘,写相同的文件,但是真实环境task可能会很多,下游任务也可能人多,或者100核数。文件将还是很多。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-DG26XExg-1670772034916)(png/image-20211023180910097.png)]

前提4:对前提3的优化,写到同一个文件,使用index索引文件记录下游任务读取数据的偏移量。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-V85FxisI-1670772034916)(png/image-20211023182401664.png)]

Shuffle实现过程:

org.apache.spark.scheduler.ShuffleMapTask-》
org.apache.spark.scheduler.ShuffleMapTask#runTask-》
org.apache.spark.shuffle.ShuffleWriteProcessor#write(写)-》
org.apache.spark.shuffle.ShuffleManager#getWriter(获取写入的对象)-》
org.apache.spark.shuffle.sort.SortShuffleWriter#write()-》
org.apache.spark.util.collection.ExternalSorter#writePartitionedMapOutput-》
org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter#commitAllPartitions(提交)-》
org.apache.spark.shuffle.IndexShuffleBlockResolver#writeIndexFileAndCommit(索引文件和数据文件的提交)-》

#读
org.apache.spark.scheduler.ResultTask#runTask(结果任务)-》
org.apache.spark.rdd.RDD#getOrCompute(获取或计算)-》
org.apache.spark.rdd.ShuffledRDD#compute(shufflerdd的计算)-》
org.apache.spark.shuffle.BlockStoreShuffleReader#read(读取数据)

shuffle写:

org.apache.spark.shuffle.ShuffleWriteProcessor(shuffle写的处理器)-》write-》
org.apache.spark.shuffle.ShuffleManager(shuffle管理器,hash早期有,sort现版本)-》
org.apache.spark.shuffle.sort.SortShuffleManager#getWriter(获取到了下面的写对象的SortShuffleWriter)-》
org.apache.spark.shuffle.sort.SortShuffleWriter#write(写)-》
org.apache.spark.util.collection.ExternalSorter#writePartitionedMapOutput(写出)-》
org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter#commitAllPartitions(提交)

补充writer写的类型,判断条件org.apache.spark.shuffle.sort.SortShuffleManager#registerShuffle

处理器写对象判断条件
SerializedShuffleHandleUnsafeShuffleWriter1.序列化规则支持重定位操作(java序列化不支持,kryo序列化支持) 2.不能使用预聚合功能 3.如果下游的分区数量小区大(16777215+1=16777216)PackedRecordPointer.MAXIMUM_PARTITION_ID + 1
BypassMergeSortShuffleHandleBypassMergeSortShuffleWriter1.不能使用预聚合 2、如果下游的分区数量小区等于200(可配)
BaseShuffleHandleSortShuffleWriter其他情况

Shuffle归并排序和读

org.apache.spark.util.collection.ExternalSorter#insertAll(插入)-》
org.apache.spark.util.collection.PartitionedAppendOnlyMap(支持预聚合的map结构)
org.apache.spark.util.collection.PartitionedPairBuffer(不支持预聚合的结构)
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap#changeValue(预聚合)-》位置-》
org.apache.spark.util.collection.ExternalSorter#maybeSpillCollection(是否溢写磁盘)-》
org.apache.spark.util.collection.Spillable#maybeSpill-》
org.apache.spark.util.collection.ExternalSorter#spill(溢写)-》
org.apache.spark.util.collection.ExternalSorter#spillMemoryIteratorToDisk(写到磁盘)-》
org.apache.spark.util.collection.ExternalSorter#writePartitionedMapOutput-》
org.apache.spark.util.collection.ExternalSorter#merge(Merge spilled and in-memory data合并溢写和磁盘)-》
org.apache.spark.util.collection.ExternalSorter#mergeSort(归并排序)-》
org.apache.spark.shuffle.sort.io.LocalDiskShuffleMapOutputWriter#commitAllPartitions()-》
org.apache.spark.shuffle.IndexShuffleBlockResolver#writeIndexFileAndCommit(写索引文件和数据文件)->

5.内存的管理
(1)内存的分类

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Ir6j3dQC-1670772034916)(png/image-20211024010705514.png)]

相关位置:

org.apache.spark.memory.UnifiedMemoryManager#apply	->
org.apache.spark.memory.UnifiedMemoryManager#getMaxMemory

#相关参数:org.apache.spark.memory.UnifiedMemoryManager#RESERVED_SYSTEM_MEMORY_BYTES预留内存300M

​ (2)内存的配置

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

128-152-spark-核心编程-源码 的相关文章

  • spark性能优化调优指导性文件

    1 让我们看一下前面的核心参数设置 num executors 10 20 executor cores 1 2 executor memory 10 20 driver memory 20 spark default parallelis
  • SparkStreaming知识总结

    一 流式计算的概述 1 1 什么是流式计算 1 数据流与静态数据的区别 数据流指的就是不断产生的数据 是源源不断 不会停止 静态数据指的就是存储在磁盘中的固定的数据 2 流式计算的概念 就是对数据流进行计算 由于数据是炼苗不断的产生的 所以
  • cdh下spark2-yarn运行sparkstreaming获取kafka数据使用spark-streaming-kafka-0-10_2.11报错解决

    报错问题 20 07 15 17 20 51 INFO utils AppInfoParser Kafka version 0 9 0 kafka 2 0 0 20 07 15 17 20 51 INFO utils AppInfoPars
  • 重新定义分析 - EventBridge 实时事件分析平台发布

    对于日志分析大家可能并不陌生 在分布式计算 大数据处理和 Spark 等开源分析框架的支持下 每天可以对潜在的数百万日志进行分析 事件分析则和日志分析是两个完全不同的领域 事件分析对实时性的要求更高 需要磨平事件领域中从半结构化到结构化的消
  • scala和spark的下载与安装

    简易安装scala和spark 一 安装scala 1 安装scala scala下载注意和jdk的版本号 下载地址 https www scala lang org download 2 上传到linux虚拟机里 可通过rz方式上传 上传
  • 浅谈Hadoop体系和MPP体系

    浅谈Hadoop体系和MPP体系 引言 如题 在大数据发展至今 为了应对日益繁多的数据分析处理 和解决客户各种奇思妙 怪 想需求 形形色色的大数据处理的框架和对应的数据存储手段层出不穷 有老当益壮的Hadoop体系 依靠Hadoop巨大的社
  • dolphinschedule使用shell任务结束状态研究

    背景 配置的dolphin任务 使用的是shell shell里包含了spark submit 如下截图 dolphin shell 介绍完毕 开始说明现象 有天有人调整了集群的cdp配置 executor cores max 1 我之前这
  • spark报Got an error when resolving hostNames. Falling back to /default-rack for all

    一 报错代码如下 21 06 01 20 13 36 INFO yarn SparkRackResolver Got an error when resolving hostNames Falling back to default rac
  • Spark 源码阅读一-启动脚本

    Spark Complile Help Links Because spark 1 5 need maven version 3 3 3 so i track the branch 1 4 git branch a git checkout
  • 【Spark NLP】第 7 章:分类和回归

    大家好 我是Sonhhxg 柒 希望你看完之后 能对你有所帮助 不足请指正 共同学习交流 个人主页 Sonhhxg 柒的博客 CSDN博客 欢迎各位 点赞 收藏 留言 系列专栏 机器学习 ML 自然语言处理 NLP 深度学习 DL fore
  • 记一次Spark打包错误:object java.lang.Object in compiler mirror

    使用maven compile和package 一直报错scala reflect internal MissingRequirementError object scala runtime in compiler mirror not f
  • Spark 从入门到放弃(一)Spark基础概念

    一 Spark基础概念 1 Application Spark应用程序 application 应用 其实就是用spark submit提交的程序 一个application通常包含三部分 从数据源 比方说HDFS 取数据形成RDD 通过R
  • 大数据—— Flink 的优化

    目录 一 Flink内存优化 1 1 Flink 内存配置 二 配置进程参数 2 1 场景 2 2 操作步骤 三 解决数据倾斜 3 1 场景描述 3 2 解决方式 3 2 1 数据源的消费不均匀 调整并发度 3 2 2 数据分布不均匀 四
  • 使用Flink1.16.0的SQLGateway迁移Hive SQL任务

    使用Flink的SQL Gateway迁移Hive SQL任务 前言 我们有数万个离线任务 主要还是默认的DataPhin调度CDP集群的Hive On Tez这种低成本任务 当然也有PySpark 打Jar包的Spark和打Jar包的Fl
  • spark groupByKey和groupBy,groupByKey和reduceByKey的区别

    1 groupByKey Vs groupBy 用于对pairRDD按照key进行排序 author starxhong object Test def main args Array String Unit val sparkConf n
  • 11.Linux下Spark的安装配置以及spark-shell的启动和 Spark集群环境搭建

    本案例软件包 链接 https pan baidu com s 1zABhjj2umontXe2CYBW DQ 提取码 1123 若链接失效在下面评论 我会及时更新 目录 1 安装Spark 1 先用xftp将安装包传到home hadoo
  • spark-3.1.2兼容多版本hive

    2 3 9版本Hive的支持 直接在实例化SparkSession时 启用hive支持即可 例如 val spark SparkSession builder appName Spark Hive Example config spark
  • 2023_Spark_实验二十九:Flume配置KafkaSink

    实验目的 掌握Flume采集数据发送到Kafka的方法 实验方法 通过配置Flume的KafkaSink采集数据到Kafka中 实验步骤 一 明确日志采集方式 一般Flume采集日志source有两种方式 1 Exec类型的Source 可
  • 阿里技术官亲笔力作:Kafka限量笔记,一本书助你掌握Kafka的精髓

    前言 分布式 堪称程序员江湖中的一把利器 无论面试还是职场 皆是不可或缺的技能 而Kafka 这款分布式发布订阅消息队列的璀璨明珠 其魅力之强大 无与伦比 对于Kafka的奥秘 我们仍需继续探索 要论对Kafka的熟悉程度 恐怕阿里的大佬们
  • spark相关

    提示 文章写完后 目录可以自动生成 如何生成可参考右边的帮助文档 文章目录 前言 一 pandas是什么 二 使用步骤 1 引入库 2 读入数据 总结 前言 提示 这里可以添加本文要记录的大概内容 例如 随着人工智能的不断发展 机器学习这门

随机推荐