Spark Job写文件个数的控制以及小文件合并的一个优化

2023-11-18

背景说明

在大数据领域,平台小文件治理一直是一个非常重要的问题。我司大佬在Spark平台里,在向目标表中增加一个Shuffle,然后在Reduce端合并数据,以实现将小文件合并成大文件,来减少平台中的小文件。
我司还对单个任务写HDFS文件个数做了限制,同时限制了单个Task 和 单次Job 可写的HDFS个数限制。

通过引入额外Shuffle对写入数据进行合并

最终实现效果如下

== Optimized Logical Plan ==
CreateDataSourceTableAsSelectCommand `P_WAKUN_T`.`user_part_2`, ErrorIfExists, [id, name, dt]
+- Relation p_wakun_t.user_part[id#18274940,name#18274941,dt#18274942] parquet

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- Execute CreateDataSourceTableAsSelectCommand `P_WAKUN_T`.`user_part_2`, ErrorIfExists, [id, name, dt]
   +- CustomShuffleReader coalesced
      +- ShuffleQueryStage 0
         +- Exchange RoundRobinPartitioning(10000), REPARTITION_BY_NONE, [id=#30214771]
            +- *(1) ColumnarToRow
               +- FileScan parquet p_wakun_t.user_part[id#18274940,name#18274941,dt#18274942] Batched: true, DataFilters: [], Format: Parquet, Location: xx

EnsureRepartitionForWriting Rule

EnsureRepartitionForWriting 中对 DataWritingCommandExec 进行数据写入之前,增加一个Shuffle。
当然 Rule 中还要考虑Partition table, Bucket table 的Shuffle 方式,不能把数据给搞混了。

CoalesceShufflePartitions Rule

CoalesceShufflePartitions Rule 会根据Shuffle结果,coalesce 数据到合适的 Partition 个数。

OptimizeShuffleWithLocalRead Rule

Local Shuffle Read 是Spark新增的对Spark Shuffle 过程进行优化的Rule,当Shuffle required distribution 不需要按照Hash分布的约束,以及满足其他的一些条件时,Reduce 端修改为连续读某一个Map 的Shuffle Output,这样会有更好的数据本地性,Shuffle 性能也会有提升。
这个Rule 之前叫 OptimizeLocalShuffleReader Rule。
其他应用条件:

  • 如果是 DataWritingCommandExec, 只能优化它的Child 节点
  • 如果是 Shuffle Query Stage, Shuffle 类型只能是 ENSURE_REQUIREMENTSREPARTITION_BY_NONE

分布式数据写控制

在 Hadoop 的 MapReduce 中,通过 FileOutputCommitter 控制分布式数据写Job setup,Task commit, 以及Job commit.
FileOutputCommitter 的 v1 算法对task 输出做两次rename 控制, v2算法对task输出做一次rename控制。

在Spark中有一套新的 FileCommitProtocol, 组合使用了 Hadoop 的 FileOutputCommitter 来控制Job 的写过程。上面要实现的控制单 Task 和 Job 输出文件个数的实现也就是在这里实现的。
通过下面的时序图可以看到,Task端可以通过创建新的文件 newTaskTempFile() 时check task file number; SparkContext.runJob() 方法有一个参数 resultHandler 用于处理Task 执行完成后 result 的回调。写数据的Task 最终返回的结果就是 WriteTaskResult (内部包含写的文件个数),在 resultHandler 中对所有Tasks 的写文件个数进行累加。当超过 maxCreatedFilesInDynamicPartition 报错。

在这里插入图片描述

FileFormatWriter SparkContext Task SQLHadoopMapReduceCommitProtocol OutputCommitter DynamicPartitionDataWriter createCommitter() setupJob(jobContext) setupCommitter() setupJob() runJob() executeTask() setupTask() setupTask() Execute Task write() newTaskTempFile() create new file && fileCounter += 1 loop [call DynamicPartitionDataWriter to write data] commitTask() commitTask() abortTask() abortTask() alt [task success] [task fail] WriteTaskResult onTaskCommit() check file numbers commitJob() commitJob() move all stage files to final directory abort() abortJob() alt [job success] [job fail] FileFormatWriter SparkContext Task SQLHadoopMapReduceCommitProtocol OutputCommitter DynamicPartitionDataWriter
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark Job写文件个数的控制以及小文件合并的一个优化 的相关文章

  • 解决Scrapy使用pipline保存到数据库后返回None

    这也不算报错哈 解决方法 在process item处理完成后返回item即可 return item 案例分析 比如下面的pipeline class MyPipeline object def init self host 127 0
  • 【Spark NLP】第 7 章:分类和回归

    大家好 我是Sonhhxg 柒 希望你看完之后 能对你有所帮助 不足请指正 共同学习交流 个人主页 Sonhhxg 柒的博客 CSDN博客 欢迎各位 点赞 收藏 留言 系列专栏 机器学习 ML 自然语言处理 NLP 深度学习 DL fore
  • 城市污水管网监测系统解决方案

    一 方案概述 在经济快速发展和政府政策的推动下 以产业聚焦为核心的城市园区经济发展迅速 由于在城市园区企业 工厂在生产制造过程产生了大量的废水等其他污染物都是由污水管进行排放 一旦发生井下污水管网堵塞 会造成废水中的气体等其他有害物质的传播
  • 全球及中国冷链物流产业需求前景与投资竞争力研究报告2022版

    全球及中国冷链物流产业需求前景与投资竞争力研究报告2022版 HS HS HS HS HS HS HS HS HS HS HS HS 修订日期 2021年11月 搜索鸿晟信合研究院查看官网更多内容 第一章 冷链物流相关概述 1 1 冷链物流
  • Hudi和Kudu的比较

    与Kudu相比 Kudu是一个支持OLTP workload的数据存储系统 而Hudi的设计目标是基于Hadoop兼容的文件系统 如HDFS S3等 重度依赖Spark的数据处理能力来实现增量处理和丰富的查询能力 Hudi支持Increme
  • spark_hadoop集群搭建自动化脚本

    bin bash 脚本使用说明 1 使用脚本前需要弄好服务器的基础环境 2 在hadoop的每个节点需要手动创建如下目录 data hdfs tmp 3 修改下面的配置参数 4 脚本执行完备后需要收到格式化namenode
  • 大数据—— Flink 的优化

    目录 一 Flink内存优化 1 1 Flink 内存配置 二 配置进程参数 2 1 场景 2 2 操作步骤 三 解决数据倾斜 3 1 场景描述 3 2 解决方式 3 2 1 数据源的消费不均匀 调整并发度 3 2 2 数据分布不均匀 四
  • Hbase Sehll基本命令

    进入hbase shell命令 hbase shell 1 status 查看hbase运行状态 2 version 查看hbase版本 3 list 列出hbase所有的 表 4 创建表 create info member member
  • 2020-10-24 大数据面试问题

    上周面试数据开发职位主要从公司的视角讲一下记录下面试流水 1 三面技术一轮hr 面到了cto 整体来看是这一周技术含量最高信息量最大的一个 1到4轮过了4个小时 技术上的问题主要问的对数据分层的理解 1 一面自我介绍 目前团队的规模多大 2
  • 中国智慧能源行业行情监测及未来动向规划预测报告2022-2028年

    中国智慧能源行业行情监测及未来动向规划预测报告2022 2028年 报告目录 第一章 智慧能源的基本概述 第二章 2021 2021年全球智慧能源产业发展分析 2 1 2021 2021年全球智慧能源产业发展综况 2 1 1 全球智慧能源网
  • Spark 任务调度机制

    1 Spark任务提交流程 Spark YARN Cluster模式下的任务提交流程 如下图所示 图YARN Cluster任务提交流程 下面的时序图清晰地说明了一个Spark应用程序从提交到运行的完整流程 图Spark任务提交时序图 提交
  • 通过yarn提交作业到spark,运行一段时间后报错。

    加粗样式
  • Flink_06_ProcessAPI(个人总结)

    声明 1 本文为我的个人复习总结 并非那种从零基础开始普及知识 内容详细全面 言辞官方的文章 2 由于是个人总结 所以用最精简的话语来写文章 3 若有错误不当之处 请指出 侧输出流 SideOutput 即分支流 可以用来接收迟到数据 也可
  • 大数据简介

    预备篇 目录 知识 大数据简介 计算机单位 大数据的五个 v Hadoop Hadoop概述 Hadoop的历史 Hadoop三大发行版本 1 Apache Hadoop 2 Cloudera Hadoop 3 Hortonworks Ha
  • docker搭建hadoop hdfs完全分布式集群

    1 制作hadoop镜像 参见 https www cnblogs com rmxd p 12051866 html 该博客中只参考制作镜像部分 固定IP及启动集群的部分应该跳过 这里注意 在做好的镜像里 要安装 which 工具 否则在执
  • 华为云,站在数字化背后

    一场新的中国数字化战斗 正在被缓缓拉开帷幕 作者 裴一多 出品 产业家 如果说最近的讨论热点是什么 那无疑是互联网云 在数字化进入纵深的当下 一种市面上的观点是互联网的云业务由于盈利等问题 正在成为 被抛弃 的一方 互联网公司开始重新回归T
  • 大数据之hive(数据仓库工具)的分组和分区操作

    注 在对hive的概念 优缺点 安装部署和参数配置在之后再进行总结 本小节主要对hive中的分组和分区进行总结 一 分组 1 group by语句 group by通常和聚合函数一起使用 按照一个或者多个列进行分组 然后对每个组进行聚合操作
  • 利用人工智能技术普及教学应用、拓展教师研训应用、增强教育系统监测能力

    2019年 中国教育现代化2035 指出 以人才培养为核心 通过提升校园智能化水平 探索新型教学形式 创新教育服务业态 推进教育治理方式变革 智能驱动教育创新发展 2021年教育部等六部门发布 关于推进教育新型基础设施建设构建高质量教育支撑
  • JAVA 安装与简单使用

    JAVA简易安装 下载安装 环境变量 进入变量界面 设置变量 验证JAVA环境 运行Java程序 个人站 ghzzz cn 还在备案 很快就能访问了 下载安装 第一步当然是从官网下载安装java了 网上有很多的教程 这里简单的写一下 在这里
  • ETL.NET 助力海量数据轻松处理

    ETL NET 助力海量数据轻松处理 什么是 ETL EtlT About ETL About EtlT 谈谈 ETL 作用 ETL 对企业的作用 ETL 对个人职业发展的作用 ETL NET 介绍

随机推荐

  • 视频无损放大修复工具:Topaz Video AI对Mac和Windows的系统要求

    Topaz Video AI是一款基于人工智能技术的视频增强软件 旨在提供高质量的视频修复 增强和转换功能 它可以通过智能算法和图像处理技术 改善视频的清晰度 稳定性 降噪效果 还能进行视频转码和格式转换 Mac Topaz Video A
  • 奇迹服务器维护,奇迹MU 3月31日服务器维护更新公告

    尊敬的用户 为了奇迹 mu 服务器能够始终保持高效和稳定的运行 使玩家能够在更好的游戏网络环境中享受游戏的乐趣 我们将于2021年3月30日 周二 9 00开始进行游戏服务器的维护工作 维护时间为大约为6个小时 维护期间官方主页的充值 登陆
  • Acwing2554. 排列数

    在一个排列中 一个折点是指排列中的一个元素 它同时小于两边的元素 或者同时大于两边的元素 对于一个 1 n 的排列 如果可以将这个排列中包含 t 个折点 则它称为一个 t 1 单调序列 例如 排列 1 4 2 3 是一个 3 单调序列 其中
  • SDN/NFV标准组织&SDN架构

    标准组织 1 ONF 开放网络基金会 2 ODL OpenDayLight 3 ETSI 欧洲电信标准协会 作为标准制定的依据 2012年成立 由运营商主导 通信设备 信息设备等厂家共同参与 推动NFV标准研究和产业进程的临时性组织 4 I
  • ERP的灵魂

    ERP应该是有灵魂的 这个灵魂就是规划 开发和完善时的理念 用土话说 上ERP到底是为了啥 有了灵魂 ERP的开发和实施就不会摇摆不定 灵魂源于初心 要回归本源 注意这个本源不是数字化 也不是上档升级 这些只是手段 结果或目的 而不是本源
  • 内容多,鼠标略过显示内容

    格式化单元格提示信息 function formatCellTooltip value return span title value span th 异常类型 th
  • knime工具介绍(1)

    本文旨在介绍knime在数据分析中可具体扮演的角色 安利给大家这个超好用数据分析工具 截图部分转自亚洲数析协会公开课截图 如有侵权请及时私信处理 因为内容比较多 先慢慢更新 未完待续9 14 一 数据分析的全流程均可以用到这个工具 台湾数析
  • 解决:修改JAVA_HOME后,Java版本无法正常切换

    经验总结 步骤1 检查路径是否正确 步骤2 将JAVA HOME配置到path最前面 步骤3 删除 C ProgramData Oracle Java javapath 目录下三个 exe 文件 步骤4 重新测试是否 可正常切换Java 版
  • 软件测试从自学到工作,软件测试学习到底要怎样进行?

    前言 首先 请不要奢望有多么简单的办法 学习没有捷径 这里只是让你明白这一点 顺便根据个人经验帮你理一下学习的过程 其实有文章是说怎么学习以及学习什么的 但是可能还是有些抽象 或者内容有点多 有点杂 以至于不少朋友仍然觉得不知道如何下手 大
  • R语言描述性统计

    使用Hmisc这个包 只需要调用 my data read csv test csv Hmisc describe my data 可以打印出各个变量的均值方差等信息
  • mysql远程连接权限grant all privileges on *.* to ‘root‘@‘%‘ identified by ‘123456‘ with grant option语句报错

    mysql远程连接权限grant all privileges on to root identified by 123456 with grant option语句报错 记录一下自己安装mysql遇到的小坑 grant all privi
  • Integer中缓存池讲解

    文章目录 一 简介 二 实现原理 三 修改缓存范围 一 简介 Integer缓存池是一种优化技术 用于提高整数对象的重用和性能 在Java中 对于整数值在 128 到 127 之间的整数对象 会被放入缓存池中 以便重复使用 这是因为在这个范
  • Centos7操作系统服务器优化流程(关闭防火墙、关闭selinux、更换yum源、安装Docker和docker-compose)

    Centos7 测试环境服务器优化流程 本文讲解内容 将Centos7操作系统作为公司开发环境或者自学者搭建DevOps流程而优化的几项内容 生产环境慎用 防止被网络攻击 纯干货教程 已在本地操作多次 请放心使用 推荐一个笔者长期使用的ss
  • 卡西欧casio手表质量怎么样

    Casio的仿货 淘宝在300以上的质量都还可以 500以上手感就挺好了 我买了一个4折的 没问题 绝对真货 有真货单的 带激光防伪标 好像是广东出的 就是没发票 不过店家保一年 但我觉得casio的质量还是可以的 一年内不会有问题 1年后
  • Jupyter 配置默认工作目录(起始位置)

    没有配置文件 1 安装了 Anaconda 在Anaconda prompt中输入以下命令 也可以用来查找已有配置文件路径 jupyter lab jupyter lab generate config jupyter notebook j
  • OVP保护芯片首选ETA7008,耐压36V,过压保护点可调

    产品描述主要特点 低成本 过压保护点可调 高耐压 低内阻 快速响应ETA7008是一款低侧过压保护 OVP IC 仅具有34mohm开关电阻 确保非常低的导通电阻和高保护电压 负端保护 耐压36V 过压保护点可设 导通内阻小 可蕞大过4A电
  • clang-format configurator - 交互式创建 clang-format 格式配置文件

    clang format configurator 交互式创建 clang format 格式配置文件 clang format configurator https zed0 co uk clang format configurator
  • Apache APISIX 默认密钥漏洞(CVE-2020-13945)

    Vulhub Apache APISIX 默认密钥漏洞 CVE 2020 13945 文章目录 Vulhub Apache APISIX 默认密钥漏洞 CVE 2020 13945 APISIX简介 漏洞复现 payload分析 APISI
  • PCB板框文件丢失的问题

    问题 PCB 板框文件丢失的问题 在制作好PCB并导出Gerber文件后 送厂制板的时候审查被提醒说没有边框文件 缺少 GM1 层 解决办法 经过反复检查 确定添加了边框文件 BOARD GEOMETRY CUT Design outlin
  • Spark Job写文件个数的控制以及小文件合并的一个优化

    文章目录 背景说明 通过引入额外Shuffle对写入数据进行合并 EnsureRepartitionForWriting Rule CoalesceShufflePartitions Rule OptimizeShuffleWithLoca