余老师带你学习大数据框架全栈第十三章Hudi第一节核心技术

2023-11-11

1.前言

1.1为什么产生数据湖

在这里插入图片描述

数据量比较大,越来越不满足处理结构化的数据,比如说数仓,数仓就是处理结构化数据。什么是结构化数据,就是数据成数据库来的,传统型的数据库有:MySQL数据库、Oracle、SQLserver,从这些库里面过来的数据都是结构化数据。日志、json、xml是属于半结构化数据,结构化数据和半结构化数据就是当前数仓所做的功能。数据湖的产生就是为了解决非结构化数据和二进制数据,主要就是处理非结构化数据,非结构化数据主要是:图片、视频、音频。

1.2数据湖的性能特点

在这里插入图片描述
在这里插入图片描述

1、新增支持特别快的新增和删除的功能

2、要有表的结构信息

3、本身就有小文件管理合并

4、保证语义等

1.3Hudi介绍

Hudi将带来流式处理大数据,提供新数据集,同时比传统批处理效率高一个数据量级。
在这里插入图片描述

1.4Hudi特性

1、快速upsert,可插入索引

2、以原子方式操作数据并具有回滚功能

3、写入器之间的快照隔离

4、savepoint用户数据恢复的保存点

5、管理文件大小,使用统计数据布局

6、数据行的异步压缩和柱状数据

7、时间数据跟踪血统

2.数据湖的定义

1、 数据湖需要提供足够用的数据存储能力,这个存储保存了一个企业/组织中的所有数据。

2、 数据湖可以存储海量的任意类型的数据,包括结构化、半结构化和非结构化数据。

3、 数据湖中的数据是原始数据,是业务数据的完整副本。数据湖中的数据保持了他们在业务系统中原来的样子。

4、 数据湖需要具备完善的数据管理能力(完善的元数据),可以管理各类数据相关的要素,包括数据源、数据格式、连接信息、数据schema、权限管理等。

5、 数据湖需要具备多样化的分析能力,包括但不限于批处理、流式计算、交互式分析以及机器学习;同时,还需要提供一定的任务调度和管理能力。

6、 数据湖需要具备完善的数据生命周期管理能力。不光需要存储原始数据,还需要能够保存各类分析处理的中间结果,并完整的记录数据的分析处理过程,能帮助用户完整详细追溯任意一条数据的产生过程。

7、 数据湖需要具备完善的数据获取和数据发布能力。数据湖需要能支撑各种各样的数据源,并能从相关的数据源中获取全量/增量数据;然后规范存储。数据湖能将数据分析处理的结果推送到合适的存储引擎中,满足不同的应用访问需求。

8、 对于大数据的支持,包括超大规模存储以及可扩展的大规模数据处理能力。

综上,个人认为数据湖应该是一种不断演进中、可扩展的大数据存储、处理、分析的基础设施;以数据为导向,实现任意来源、任意速度、任意规模、任意类型数据的全量获取、全量存储、多模式处理与全生命周期管理;并通过与各类外部异构数据源的交互集成,支持各类企业级应用。

在这里插入图片描述

上图数据湖基本能力示意

这里需要再特别指出两点:

1)可扩展是指规模的可扩展和能力的可扩展,即数据湖不但要能够随着数据量的增大,提供“足够”的存储和计算能力;还需要根据需要不断提供新的数据处理模式,例如可能一开始业务只需要批处理能力,但随着业务的发展,可能需要交互式的即席分析能力;又随着业务的实效性要求不断提升,可能需要支持实时分析和机器学习等丰富的能力。

2)以数据为导向,是指数据湖对于用户来说要足够的简单、易用,帮助用户从复杂的IT基础设施运维工作中解脱出来,关注业务、关注模型、关注算法、关注数据。数据湖面向的是数据科学家、分析师。目前来看,云原生应该是构建数据湖的一种比较理想的构建方式,后面在“数据湖基本架构”一节会详细论述这一观点。

3.数据湖基本架构

数据湖可以认为是新一代的大数据基础设施。为了更好的理解数据湖的基本架构,我们先来看看大数据基础设施架构的演进过程。

1) 第一阶段:以Hadoop为代表的离线数据处理基础设施。如下图所示,Hadoop是以HDFS为核心存储,以MapReduce(简称MR)为基本计算模型的批量数据处理基础设施。围绕HDFS和MR,产生了一系列的组件,不断完善整个大数据平台的数据处理能力,例如面向在线KV操作的HBase、面向SQL的HIVE、面向工作流的PIG等。同时,随着大家对于批处理的性能要求越来越高,新的计算模型不断被提出,产生了Tez、Spark、Presto等计算引擎,MR模型也逐渐进化成DAG模型。DAG模型一方面,增加计算模型的抽象并发能力:对每一个计算过程进行分解,根据计算过程中的聚合操作点对任务进行逻辑切分,任务被切分成一个个的stage,每个stage都可以有一个或者多个Task组成,Task是可以并发执行的,从而提升整个计算过程的并行能力;另一方面,为减少数据处理过程中的中间结果写文件操作,Spark、Presto等计算引擎尽量使用计算节点的内存对数据进行缓存,从而提高整个数据过程的效率和系统吞吐能力。

在这里插入图片描述

如上图. Hadoop体系结构示意

2) 第二阶段:lambda架构。随着数据处理能力和处理需求的不断变化,越来越多的用户发现,批处理模式无论如何提升性能,也无法满足一些实时性要求高的处理场景,流式计算引擎应运而生,例如Storm、Spark Streaming、Flink等。然而,随着越来越多的应用上线,大家发现,其实批处理和流计算配合使用,才能满足大部分应用需求;而对于用户而言,其实他们并不关心底层的计算模型是什么,用户希望无论是批处理还是流计算,都能基于统一的数据模型来返回处理结果,于是Lambda架构被提出,如下图所示。

在这里插入图片描述

Lambda架构示意

Lambda架构的核心理念是“流批一体”,如上图所示,整个数据流向自左向右流入平台。进入平台后一分为二,一部分走批处理模式,一部分走流式计算模式。无论哪种计算模式,最终的处理结果都通过服务层对应用提供,确保访问的一致性。

3) 第三阶段:Kappa架构。Lambda架构解决了应用读取数据的一致性问题,但是“流批分离”的处理链路增大了研发的复杂性。因此,有人就提出能不能用一套系统来解决所有问题。目前比较流行的做法就是基于流计算来做。流计算天然的分布式特征,注定了他的扩展性更好。通过加大流计算的并发性,加大流式数据的“时间窗口”,来统一批处理与流式处理两种计算模式。

在这里插入图片描述

Kappa架构示意

综上,从传统的hadoop架构往lambda架构,从lambda架构往Kappa架构的演进,大数据平台基础架构的演进逐渐囊括了应用所需的各类数据处理能力,大数据平台逐渐演化成了一个企业/组织的全量数据处理平台。当前的企业实践中,除了关系型数据库依托于各个独立的业务系统;其余的数据,几乎都被考虑纳入大数据平台来进行统一的处理。然而,目前的大数据平台基础架构,都将视角锁定在了存储和计算,而忽略了对于数据的资产化管理,这恰恰是数据湖作为新一代的大数据基础设施所重点关注的方向之一。

大数据基础架构的演进,其实反应了一点:在企业/组织内部,数据是一类重要资产已经成为了共识:

为了更好的利用数据,企业/组织需要对数据资产

1)进行长期的原样存储;

2)进行有效管理与集中治理;

3)提供多模式的计算能力满足处理需求;

4)以及面向业务,提供统一的数据视图、数据模型与数据处理结果。数据湖就是在这个大背景下产生的,除了大数据平台所拥有的各类基础能力之外,数据湖更强调对于数据的管理、治理和资产化能力。

落到具体的实现上,数据湖需要包括一系列的数据管理组件,包括:

1)数据接入;

2)数据搬迁;

3)数据治理;

4)质量管理;

5)资产目录;

6)访问控制;

7)任务管理;

8)任务编排;

9)元数据管理等。如下图所示,给出了一个数据湖系统的参考架构。对于一个典型的数据湖而言,它与大数据平台相同的地方在于它也具备处理超大规模数据所需的存储和计算能力,能提供多模式的数据处理能力;增强点在于数据湖提供了更为完善的数据管理能力,具体体现在:

1) 更强大的数据接入能力。数据接入能力体现在对于各类外部异构数据源的定义管理能力,以及对于外部数据源相关数据的抽取迁移能力,抽取迁移的数据包括外部数据源的元数据与实际存储的数据。

2) 更强大的数据管理能力。管理能力具体又可分为基本管理能力和扩展管理能力。基本管理能力包括对各类元数据的管理、数据访问控制、数据资产管理,是一个数据湖系统所必须的,后面我们会在“各厂商的数据湖解决方案”一节相信讨论各个厂商对于基本管理能力的支持方式。扩展管理能力包括任务管理、流程编排以及与数据质量、数据治理相关的能力。任务管理和流程编排主要用来管理、编排、调度、监测在数据湖系统中处理数据的各类任务,通常情况下,数据湖构建者会通过购买/研制定制的数据集成或数据开发子系统/模块来提供此类能力,定制的系统/模块可以通过读取数据湖的相关元数据,来实现与数据湖系统的融合。而数据质量和数据治理则是更为复杂的问题,一般情况下,数据湖系统不会直接提供相关功能,但是会开放各类接口或者元数据,供有能力的企业/组织与已有的数据治理软件集成或者做定制开发。

3) 可共享的元数据。数据湖中的各类计算引擎会与数据湖中的数据深度融合,而融合的基础就是数据湖的元数据。好的数据湖系统,计算引擎在处理数据时,能从元数据中直接获取数据存储位置、数据格式、数据模式、数据分布等信息,然后直接进行数据处理,而无需进行人工/编程干预。更进一步,好的数据湖系统还可以对数据湖中的数据进行访问控制,控制的力度可以做到“库表列行”等不同级别。

在这里插入图片描述

数据湖组件参考架构

还有一点应该指出的是,上图的“集中式存储”更多的是业务概念上的集中,本质上是希望一个企业/组织内部的数据能在一个明确统一的地方进行沉淀。事实上,数据湖的存储应该是一类可按需扩展的分布式文件系统,大多数数据湖实践中也是推荐采用S3/OSS/OBS/HDFS等分布式系统作为数据湖的统一存储。

我们可以再切换到数据维度,从数据生命周期的视角来看待数据湖对于数据的处理方式,数据在数据湖中的整个生命周期如图6所示。理论上,一个管理完善的数据湖中的数据会永久的保留原始数据,同时过程数据会不断的完善、演化,以满足业务的需要。

在这里插入图片描述

数据湖中的数据生命周期示意

3.Upsert场景执行流程介绍

对于Hudi Upsert 操作整理了比较核心的几个操作如下图所示

在这里插入图片描述

1.开始提交:判断上次任务是否失败,如果失败会触发回滚操作。然后会根据当前时间生成一个事务开始的请求标识元数据。
2.构造HoodieRecord Rdd对象:Hudi 会根据元数据信息构造HoodieRecord Rdd 对象,方便后续数据去重和数据合并。
3.数据去重:一批增量数据中可能会有重复的数据,Hudi会根据主键对数据进行去重避免重复数据写入Hudi 表。
4.数据fileId位置信息获取:在修改记录中可以根据索引获取当前记录所属文件的fileid,在数据合并时需要知道数据update操作向那个fileId文件写入新的快照文件。
5.数据合并:Hudi 有两种模式cow和mor。在cow模式中会重写索引命中的fileId快照文件;在mor 模式中根据fileId 追加到分区中的log 文件。
6.完成提交:在元数据中生成xxxx.commit文件,只有生成commit 元数据文件,查询引擎才能根据元数据查询到刚刚upsert 后的数据。
7.compaction压缩:主要是mor 模式中才会有,他会将mor模式中的xxx.log 数据合并到xxx.parquet 快照文件中去。
8.hive元数据同步:hive 的元素数据同步这个步骤需要配置非必需操作,主要是对于hive 和presto 等查询引擎,需要依赖hive 元数据才能进行查询,所以hive元数据同步就是构造外表提供查询。

在这里插入图片描述

介绍完Hudi的upsert运行流程,再来看下Hudi如何进行存储并且保证事务,在每次upsert完成后都会产生commit 文件记录每次重新的快照文件。

例如上图

时间1初始化写入三个分区文件分别是xxx-1.parquet;

时间3场景如果会修改分区1和分区2xxx-1.parquet的数据,那么写入完成后会生成新的快照文件分别是分区1和分区2xxx-3.parquet文件。(上述是COW模式的过程,而对于MOR模式的更新会生成log文件,如果log文件存在并且可追加则继续追加数据)。

时间5修改分区1的数据那么同理会生成分区1下的新快照文件。

可以看出对于Hudi 每次修改都是会在文件级别重新写入数据快照。查询的时候就会根据最后一次快照元数据加载每个分区小于等于当前的元数据的parquet文件。Hudi事务的原理就是通过元数据mvcc多版本控制写入新的快照文件,在每个时间阶段根据最近的元数据查找快照文件。因为是重写数据所以同一时间只能保证一个事务去重写parquet 文件。不过当前Hudi版本加入了并发写机制,原理是Zookeeper分布锁控制或者HMS提供锁的方式, 会保证同一个文件的修改只有一个事务会写入成功。

下面将根据Spark 调用write方法深入剖析upsert操作每个步骤的执行流程。

3.1开始提交&数据回滚

在构造好spark 的rdd 后会调用 df.write.format(“hudi”) 方法执行数据的写入,实际会调用Hudi源码中的HoodieSparkSqlWriter#write方法实现。在执行任务前Hudi 会创建HoodieWriteClient 对象,并构造HoodieTableMetaClient调用startCommitWithTime方法开始一次事务。在开始提交前会获取hoodie 目录下的元数据信息,判断上一次写入操作是否成功,判断的标准是上次任务的快照元数据有xxx.commit后缀的元数据文件。如果不存在那么Hudi 会触发回滚机制,回滚是将不完整的事务元数据文件删除,并新建xxx.rollback元数据文件。如果有数据写入到快照parquet 文件中也会一起删除。

在这里插入图片描述

3.2 构造HoodieRecord Rdd 对象

HoodieRecord Rdd 对象的构造先是通过map 算子提取spark dataframe中的schema和数据,构造avro的GenericRecords Rdd,然后Hudi会在使用map算子封装为HoodierRecord Rdd。对于HoodileRecord Rdd 主要由currentLocation,newLocation,hoodieKey,data 组成。HoodileRecord数据结构是为后续数据去重和数据合并时提供基础。

在这里插入图片描述

•currentLocation 当前数据位置信息:只有数据在当前Hudi表中存在才会有,主要存放parquet文件的fileId,构造时默认为空,在查找索引位置信息时被赋予数据。

•newLocation 数据新位置信息:与currentLocation不同不管是否存在都会被赋值,newLocation是存放当前数据需要被写入到那个fileID文件中的位置信息,构造时默认为空,在merge阶段会被赋予位置信息。
•HoodieKey 主键信息:主要包含recordKey 和patitionPath 。recordkey 是由hoodie.datasource.write.recordkey.field 配置项根据列名从记录中获取的主键值。patitionPath 是分区路径。Hudi 会根据hoodie.datasource.write.partitionpath.field 配置项的列名从记录中获取的值作为分区路径。
•data 数据:data是一个泛型对象,泛型对象需要实现HoodieRecordPayload类,主要是实现合并方法和比较方法。默认实现OverwriteWithLatestAvroPayload类,需要配置hoodie.datasource.write.precombine.field配置项获取记录中列的值用于比较数据大小,去重和合并都是需要保留值最大的数据。

3.3 数据去重

在upsert 场景中数据去重是默认要做的操作,如果不进行去重会导致数据重复写入parquet文件中。当然upsert 数据中如果没有重复数据是可以关闭去重操作。配置是否去重参数为hoodie.combine.before.upsert,默认为true开启。

在这里插入图片描述

在Spark client调用upsert 操作是Hudi会创建HoodieTable对象,并且调用upsert 方法。对于HooideTable 的实现分别有COW和MOR两种模式的实现。但是在数据去重阶段和索引查找阶段的操作都是一样的。调用HoodieTable#upsert方法底层的实现都是SparkWriteHelper。

在去重操作中,会先使用map 算子提取HoodieRecord中的HoodieatestAvroPayload的实现是保留时间戳最大的记录。这里要注意如果我们配置的是全局类型的索引,map 中的key 值是 HoodieKey 对象中的recordKey。因为全局索引是需要保证所有分区中的主键都是唯一的,避免不同分区数据重复。当然如果是非分区表,没有必要使用全局索引。

3.4 数据位置信息索引查找

对于Hudi 索引主要分为两大类:

•非全局索引:索引在查找数据位置信息时,只会检索当前分区的索引,索引只保证当前分区内数据做upsert。如果记录的分区值发生变更就会导致数据重复。
•全局索引:顾名思义在查找索引时会加载所有分区的索引,用于定位数据位置信息,即使发生分区值变更也能定位数据位置信息。这种方式因为要加载所有分区文件的索引,对查找性能会有影响(HBase索引除外)。

在这里插入图片描述

Spark 索引实现主要有如下几种

•布隆索引(BloomIndex)
•全局布隆索引(GlobalBloomIndex)
•简易索引(SimpleIndex)
•简易全局索引(GlobalSimpleIndex)
•全局HBase 索引(HbaseIndex)
•内存索引(InMemoryHashIndex)。

3.4.1 索引的选择

普通索引:主要用于非分区表和分区不会发生分区列值变更的表。当然如果你不关心多分区主键重复的情况也是可以使用。他的优势是只会加载upsert数据中的分区下的每个文件中的索引,相对于全局索引需要扫描的文件少。并且索引只会命中当前分区的fileid 文件,需要重写的快照也少相对全局索引高效。但是某些情况下我们的设置的分区列的值就是会变那么必须要使用全局索引保证数据不重复,这样upsert 写入速度就会慢一些。其实对于非分区表他就是个分区列值不会变且只有一个分区的表,很适合普通索引,如果非分区表硬要用全局索引其实和普通索引性能和效果是一样的。

全局索引:分区表场景要考虑分区值变更,需要加载所有分区文件的索引比普通索引慢。

布隆索引:加载fileid 文件页脚布隆过滤器,加载少量数据数据就能判断数据是否在文件存在。缺点是有一定的误判,但是merge机制可以避免重复数据写入。parquet文件多会影响索引加载速度。适合没有分区变更和非分区表。主键如果是类似自增的主键布隆索引可以提供更高的性能,因为布隆索引记录的有最大key和最小key加速索引查找。

全局布隆索引:解决分区变更场景,原理和布隆索引一样,在分区表中比普通布隆索引慢。

简易索引:直接加载文件里的数据不会像布隆索引一样误判,但是加载的数据要比布隆索引要多,left join 关联的条数也要比布隆索引多。大多数场景没布隆索引高效,但是极端情况命中所有的parquet文件,那么此时还不如使用简易索引加载所有文件里的数据进行判断。

全局简易索引:解决分区变更场景,原理和简易索引一样,在分区表中比普通简易索引慢。建议优先使用全局布隆索引。

HBase索引:不受分区变跟场景的影响,操作算子要比布隆索引少,在大量的分区和文件的场景中比布隆全局索引高效。因为每条数据都要查询hbase ,upsert数据量很大会对hbase有负载的压力需要考虑hbase集群承受压力,适合微批分区表的写入场景 。在非分区表中数量不大文件也少,速度和布隆索引差不多,这种情况建议用布隆索引。

内存索引:用于测试不适合生产环境

3.5数据合并

COW模式和MOR模式在前面的操作都是一样的,不过在合并的时候Hudi构造的执行器不同。对于COW会根据位置信息中fileId 重写parquet文件,在重写中如果数据是更新会比较parquet文件的数据和当前的数据的大小进行更新,完成更新数据和插入数据。而MOR模式会根据fileId 生成一个log 文件,将数据直接写入到log文件中,如果fileID的log文件已经存在,追加数据写入到log 文件中。与COW 模式相比少了数据比较的工作所以性能要好,但是在log 文件中可能保存多次写有重复数据在读log数据时候就不如cow模式了。还有在mor模式中log文件和parquet 文件都是存在的,log 文件的数据会达到一定条件和parqeut 文件合并。所以mor有两个视图,ro后缀的视图是读优化视图(read-optimized)只查询parquet 文件的数据。rt后缀的视图是实时视图(real-time)查询parquet 和log 日志中的内容。

3.5.1 Copy on Write模式

COW模式数据合并实现逻辑调用BaseSparkCommitActionExecutor#excute方法,实现步骤如下:

在这里插入图片描述

1.通过countByKey 算子提取分区路径和文件位置信息并统计条数,用于后续根据分区文件写入的数据量大小评估如何分桶。
2.统计完成后会将结果写入到workLoadProfile 对象的map 中,这个时候已经完成合并数据的前置条件。Hudi会调用saveWorkloadProfileMetadataToInfilght 方法写入infight标识文件到.hoodie元数据目录中。在workLoadProfile的统计信息中套用的是类似双层map数据结构, 统计是到fileid 文件级别。
3.根据workLoadProfile统计信息生成自定义分区 ,这个步骤就是分桶的过程。首先会对更新的数据做分桶,因为是更新数据在合并时要么覆盖老数据要么丢弃,所以不存在parquet文件过于膨胀,这个过程会给将要发生修改的fileId都会添加一个桶。然后会对新增数据分配桶,新增数据分桶先获取分区路径下所有的fileid 文件, 判断数据是否小于100兆。小于100兆会被认为是小文件后续新增数据会被分配到这个文件桶中,大于100兆的文件将不会被分配桶。获取到小文件后会计算每个fileid 文件还能存少数据然后分配一个桶。如果小文件fileId 的桶都分配完了还不够会根据数据量大小分配n个新增桶。最后分好桶后会将桶信息存入一个map 集合中,当调用自定义实现getpartition方法时直接向map 中获取。所以spark在运行的时候一个桶会对应一个分区的合并计算。

4.分桶结束后调用handleUpsertPartition合并数据。首先会获取map 集合中的桶信息,桶类型有两种新增和修改两种。如果桶fileid文件只有新增数据操作,直接追加文件或新建parquet文件写入就好,这里会调用handleInsert方法。如果桶fileid文件既有新增又有修改或只有修改一定会走handUpdate方法。这里设计的非常的巧妙对于新增多修改改少的场景大部分的数据直接可以走新增的逻辑可以很好的提升性能。对于handUpdate方法的处理会先构造HoodieMergeHandle对象初始化一个map集合,这个map集合很特殊拥有存在内存的map集合和存在磁盘的map 集合,这个map集合是用来存放所有需要update数据的集合用来遍历fileid旧文件时查询文件是否存在要不要覆盖旧数据。这里使用内存加磁盘为了避免update桶中数据特别大情况可以将一部分存磁盘避免jvm oom。update 数据会优先存放到内存map如果内存map不够才会存在磁盘map,而内存Map默认大小是1g 。DiskBasedMap 存储是key信息存放的还是record key ,value 信息存放value 的值存放到那个文件上,偏移量是多少、存放大小和时间。这样如果命中了磁盘上map就可以根据value存放的信息去获取hoodieRecord了。

5.构造sparkMergHelper 开始合并数据写入到新的快照文件。在SparkMergHelper 内部会构造一个BoundedInMemoryExecutor 的队列,在这个队列中会构造多个生产者和一个消费者(file 文件一般情况只有一个文件所以生产者也会是一个)。producers 会加载老数据的fileId文件里的数据构造一个迭代器,执行的时候先调用producers 完成初始化后调用consumer。而consumer被调用后会比较数据是否存在ExternalSpillableMap 中如果不存在重新写入数据到新的快照文件,如果存在调用当前的HoodileRecordPayload 实现类combineAndGetUpdateValue 方法进行比较来确定是写入老数据还是新数据,默认比较那个数据时间大。这里有个特别的场景就是硬删除,对于硬删除里面的数据是空的,比较后会直接忽略写入达到数据删除的目的。

3.5.2 Merge on Read模式

在MOR模式中的实现和前面COW模式分桶阶段逻辑相同,这里主要说下最后的合并和COW模式不一样的操作。在MOR合并是调用AbstarctSparkDeltaCommitActionExecutor#execute方法,会构造HoodieAppaendHandle对象。在写入时调用append向log日志文件追加数据,如果日志文件不存在或不可追加将新建log文件。

分桶相关参数与COW模式通用

在这里插入图片描述

3.6索引更新

在这里插入图片描述

数据写入到log文件或者是parquet 文件,这个时候需要更新索引。简易索引和布隆索引对于他们来说索引在parquet文件中是不需要去更新索引的。这里索引更新只有HBase索引和内存索引需要更新。内存索引是更新通过map 算子写入到内存map上,HBase索引通过map算子put到HBase上。

3.7完成提交

3.7.1 提交&元数据信息归档

上述操作如果都成功且写入时writeStatus中没有任何错误记录,Hudi 会进行完成事务的提交和元数据归档操作,步骤如下

1.sparkRddWriteClient 调用commit 方法,首先会向Hdfs 上提交一个.commit 后缀的文件,里面记录的是writeStatus的信息包括写入多少条数据、fileID快照的信息、Schema结构等等。当commit 文件写入成功就意味着一次upsert 已经成功,Hudi 内的数据就可以查询到。
2.为了不让元数据一直增长下去需要对元数据做归档操作。元数据归档会先创建HoodieTimelineArchiveLog对象,通过HoodieTableMetaClient 获取.hoodie目录下所有的元数据信息,根据这些元数据信息来判断是否达到归档条件。如果达到条件构造HooieLogFormatWrite对象对archived文件进行追加。每个元数据文件会封装成 HoodieLogBlock 对象批量写入。

在这里插入图片描述

3.7.2 数据清理

元数据清理后parquet 文件也是要去清理,在Hudi 有专有spark任务去清理文件。因为是通过spark 任务去清理文件也有对应XXX.clean.request、xxx.clean.infight、xxx.clean元数据来标识任务的每个任务阶段。数据清理步骤如下:

1.构造baseCleanPanActionExecutor 执行器,并调用requestClean方法获取元数据生成清理计划对象HoodieCleanPlan。判断HoodieCleanPlan对象满足触发条件会向元数据写入xxx.request 标识,表示可以开始清理计划。
2.生成执行计划后调用baseCleanPanActionExecutor 的继承类clean方法完成执行spark任务前的准备操作,然后向hdfs 写入xxx.clean.inflight对象准备执行spark任务。
3.spark 任务获取HoodieCleanPlan中所有分区序列化成为Rdd并调用flatMap迭代每个分区的文件。然后在mapPartitions算子中调用deleteFilesFunc方法删除每一个分区的过期的文件。最后reduceBykey汇总删除文件的结果构造成HoodieCleanStat对象,将结果元数据写入xxx.clean中完成数据清理。

在这里插入图片描述

3.7.3 数据压缩

数据压缩是mor 模式才会有的操作,目的是让log文件合并到新的fileId快照文件中。因为数据压缩也是spark 任务完成的,所以在运行时也对应的xxx.compaction.requet、xxx.compaction.clean、xxx.compaction元数据生成记录每个阶段。数据压缩实现步骤如下:

1.sparkRDDwirteClient 调用compaction方法构造BaseScheduleCompationActionExecutor对象并调用scheduleCompaction方法,计算是否满足数据压缩条件生成HoodieCompactionPlan执行计划元数据。如果满足条件会向hdfs 写入xxx.compation.request元数据标识请求提交spark任务。
2.BaseScheduleCompationActionExecutor会调用继承类SparkRunCompactionExecutor类并调用compact方法构造HoodieSparkMergeOnReadTableCompactor 对象来实现压缩逻辑,完成一切准备操作后向hdfs写入xxx.compation.inflight标识。
3.spark任务执行parallelize加载HooideCompactionPlan 的执行计划,然后调用compact迭代执行每个分区中log的合并逻辑。在 compact会构造HoodieMergelogRecordScanner 扫描文件对象,加载分区中的log构造迭代器遍历log中的数据写入ExtemalSpillableMap。这个ExtemalSpillableMap和cow 模式中内存加载磁盘的map 是一样的。至于合并逻辑是和cow模式的合并逻辑是一样的,这里不重复阐述都是调用cow模式的handleUpdate方法。
4.完成合并操作会构造writeStatus结果信息,并写入xxx.compaction标识到hdfs中完成合并操作。

在这里插入图片描述

3.8Hive元数据同步

实现原理比较简单就是根据Hive外表和Hudi表当前表结构和分区做比较,是否有新增字段和新增分区如果有会添加字段和分区到Hive外表。如果不同步Hudi新写入的分区无法查询。在COW模式中只会有ro表(读优化视图,而在mor模式中有ro表(读优化视图)和rt表(实时视图)。

3.9提交成功通知回调

当事务提交成功后向外部系统发送通知消息,通知的方式有两种,一种是发送http服务消息,一种是发送kafka消息。这个通知过程我们可以把清理操作、压缩操作、hive元数据操作,都交给外部系统异步处理或者做其他扩展。也可以自己实现HoodieWriteCommitCallback的接口自定义实现。

4.大数据技术和工具归类

在这里插入图片描述

部分术语翻译:

Administration: 管理平台(此处应指大数据管理平台)

Data Security: 数据安全

Data Governance: 数据管控

Data Computing: 数据计算

Data Collection: 数据采集

Data Storage: 数据存储

BI/DATA Visualization: 商务智能可视化/数据可视化

5.数据湖的功能

Data Ingestion(获取数据)

Data Storage(数据存储)

Data Auditing(数据审计)

Data Exploration(数据探索)

Data Lineage(数据继承)

Data Discovery(数据挖掘)

Data Governance(数据管理与处理)

Data Security(数据安全)

Data Quality(数据质量评估)

在这里插入图片描述

6.最后

本节篇幅过长,感谢阅读希望对您的学习有所帮助。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

余老师带你学习大数据框架全栈第十三章Hudi第一节核心技术 的相关文章

随机推荐

  • DC-DC电源模块输出先放大电容还是小电容

    最好的资料是电容厂家的设计指南 1 电容简单的等效模型是C ESL ESR 2 通常电解电容容量越大 ESR越小 ESL越大 承受纹波电流越大 3 电流流经阻抗最小路径 4 大电流 PCB走线电阻不能忽略 高频纹波电流PCB走线电感不能忽略
  • C语言之结构体内存的计算

    结构体的内存 一 提出疑问 结构体占用的是一片连续的内存空间 大小是由成员变量的类型决定的 但并不是计算所有成员变量的类型大小之和那么简单 先举一个实例 struct student int age 4个字节 int telephone 4
  • win系统使用frp端口映射实现内网穿透,配置“任务计划程序”提高稳定性

    Github下载最新版frp https github com fatedier frp releases download v0 48 0 frp 0 48 0 windows amd64 zip 解压把frpc exe和frpc ini
  • 【2】Python爬虫:分析AJAX传递的JSON获取数据-初步分析动态网页(1)

    前言 这是本人写的第二篇文章 希望能够帮助到一些和我一样的python爬虫初学者 在第一篇文章中 我总结了最近学到的利用requests和bs4第三方库共同作用 基本可以应对python获取静态网页数据的相关问题 但是如果现实中的网页往往比
  • JVM 四. 对象布局

    目录 一 对象实例化相关 创建对象的步骤 二 对象的内存布局 三 对象的访问定位 一 对象实例化相关 有哪些方式可以创建一个对象 new 方式创建一个对象 由new方式创建对象又延伸出 Builder建造者方式 Factory工厂方 等静态
  • 链表排序——选择排序法(纯C语言版)

    链表选择排序 链表的排序 功能 选择排序 由小到大 返回 指向链表表头的指针 选择排序的基本思想就是反复从还未排好序的那些节点中 选出键值 就是用它排序的字段 我们取学号num为键值 最小的节点 依次重新组合成一个链表 我认为写链表这类程序
  • 表弟高中毕业,半路学Python爬虫,现在月薪20—30k,嫉妒使人面目全非

    python近几年越来越火爆 爬虫工程师也很火 市场需求挺大 工资还算可观 很多不是计算机专业出身的人被薪资吸引 也开始转战python爬虫 那么半路学Python爬虫 学到什么程度可以面试爬虫工程师呢 首先要明确一点 python只不过是
  • Git第十四讲 Git标签管理

    Git标签是用于标记项目中的特定版本的重要工具 它们通常用于标识发布版本或里程碑 本文将介绍如何在Git中创建 查看和管理标签 创建标签 要在Git中创建一个标签 可以使用git tag命令 有两种类型的标签 轻量标签和附注标签 轻量标签
  • ML/DL-复习笔记【十】- 分组卷积和深度可分离卷积的区别

    本节为ML DL 复习笔记 十 分组卷积和深度可分离卷积的区别 主要内容包括 分组卷积与深度可分离卷积的参数量分析 最早出现分组卷积是AlexNet 由于单块GPU显存的限制 需要将网络部署在两张显卡上 分别进行训练最后再融合 Alex认为
  • 鸿蒙出来后H5足以取代原生app

    本地模式的H5渲染效果 其实已经在性能上无限接近于原生 也就是说当你使用file 而非http 访问H5页面的时候 打开速度也是非常快的 而且现在安卓和ios里面 提供了js访问原生代码的能力 目前有很多H5开发框架 集成了访问原生的能力
  • 在线标签云词云

    易词云 https www yciyun com 优点 可显示中文词条 可下载 缺点 字体大小控制不精准 下载高清图像要收费 wordart https wordart com 优点 免费 功能强大 缺点 不可以显示中文 可能需要添加中文字
  • 凌晨睡不着,想起了童年,写首诗吧,就叫《童年》

    雪夜里轻快又谨慎的踩雪声 印出稚嫩的脚印 一个胆大又怯懦的孩童 把手电筒插在帽兜里当做矿灯照明 微声哼着只有自己能听到的歌 沧桑的锁有节奏地拍打着大门 发出心里人的归家之念 炕上坐着的妇女 闻声而来 夏日晌午的小孩 在庭院里逗着毛虫 趁着无
  • SOTA模型训练笔记(完善中)

    文章目录 记录感知SOTA模型训练的过程 1 语义分割 1 PolarNet 2 Cylinder3D 2 视觉人体姿态识别 1 ViTPose 3 点云目标检测 1 centerpoint 记录感知SOTA模型训练的过程 1 语义分割 1
  • windows 使用docker安装elasticsearch报错

  • 使用ensp搭建简单校园网拓扑

    使用ensp搭建简单校园网拓扑 一 校园网拓扑 1 每台电脑代表一个vlan 2 二层交换机向下的每个端口需要做access 向上的每个端口需要做trunk 3 三层交换机向下的每个端口需要做trunk 而且需要为每个vlan接口配置ip地
  • Intellij IDEA2017.3.5安装

    1 下载安装包及 链接 https pan baidu com s 16az6tmQub bOn2CFOXLa2g 提取码 7689 复制这段内容后打开百度网盘手机App 操作更方便哦 2 将下载的JetbrainsCrack 2 7 re
  • js中(...)用法

    1 深拷贝一个对象 如上图所示 obj和tmp是完全两个独立的对象 互不影响 2 数组复制 3 函数形参中的使用 这里的 args 是对test函数中多余的参数进行收集 并转换成数组的形式进入函数体中 4 一种特殊情况 当数组里面套对象的时
  • sql 时间函数(全)

    Cite http www jb51 net article 20832 htm 1 当前系统日期 时间 select getdate 2 dateadd 在向指定日期加上一段时间的基础上 返回新的 datetime 值 例如 向日期加上2
  • C++结构体对齐问题

    规则1 结构体成员的内部偏移量 内部地址 要被这个成员的数据类型大小整除 规则2 整个结构体的大小 必须是最大成员的size整数倍 否则就需要在末尾补充空白字节 规则3 对于结构体中的结构体 按照结构体展开之后的内存对齐来处理 规则4 人为
  • 余老师带你学习大数据框架全栈第十三章Hudi第一节核心技术

    1 前言 1 1为什么产生数据湖 数据量比较大 越来越不满足处理结构化的数据 比如说数仓 数仓就是处理结构化数据 什么是结构化数据 就是数据成数据库来的 传统型的数据库有 MySQL数据库 Oracle SQLserver 从这些库里面过来