一种Hudi on Flink动态同步元数据变化的方法

2023-05-16

文章目录

  • 一、背景
  • 二、官方Schema Evolution例子
  • 三、Flink + Hudi实现Schema Evolution
  • 四、`HoodieFlinkStreamer`流程浅析及扩展方法
    • 4.1 FlinkKafkaConsumer
    • 4.2 RowDataToHoodieFunction
    • 4.3 StreamWriteFunction
    • 4.4 StreamWriteOperatorCoordinator
    • 4.5 Compaction及Clean类
  • 五、MOR rt表查询bug解决
    • 5.1 分析
    • 5.2 修改
  • 六、总结
  • 参考资料

一、背景

一个需求,需要同步MySQL数据到Hive,包括DDL与DML,所以需要动态同步元数据变化。

二、官方Schema Evolution例子

从Hudi官方文档Schema Evolution(https://hudi.apache.org/docs/next/schema_evolution)可知通过Hudi可实现源端添加列、int到long列类型转换等DDL操作同步到目标端,且该文档提供了一个Spark+Hudi写数据的例子,先定义一个Schema,写入了3条数据;然后定义newSchema,比schema多一个newField字段,且intToLong字段类型由Integer变为了Long,又upsert了3条数据到同一个Hudi表中。最终查询出该Hudi表的结构与newSchema一致,即使用新的schema写数据,实现了元数据的更新。

三、Flink + Hudi实现Schema Evolution

由于多种原因(省略一万字)选择了Flink+Hudi,且为了实现一些逻辑更自由,选择了DataStream API而不是Flink SQL,在群里从@玉兆大佬处了解到Hudi中使用DataStream API操作的类org.apache.hudi.streamer.HoodieFlinkStreamer。通过两次启动任务传入的修改后的Avro Schema,也能实现官方文档Schema Evolution中例子类似的功能。

但是按这种方式,只能通过重新定义schema并重启Flink任务,才能将源表新增的列同步到目标Hive表中,无法在启动任务后自动同步schema中未定义的源表中新增的列。所以需要对HoodieFlinkStreamer的功能进行改进。先说方案,经过对HoodieFlinkStreamer分析,其中用到的一些主要Function(Source、Map、Sink等)都是在最初定义时传入参数配置类,要么在构造方法中、要么在open方法中,根据配置(包含schema)生成deserialer、converter、writeClient等对数据进行反序列化、转换、保存的实例,由于是根据最初schema生成的实例,即使数据中有新增的字段,转换后新增的字段也没有保留。所以采用的方式是,在各个Function处理数据的方法中,判断如果数据中的schema与当前Function中的schema不一致(一种简单的优化),就使用数据中的schema重新生成这些deserialer、converter、writeClient,这样数据经过处理后,就有新增的字段。方法很简单,主要是需要了解一下HoodieFlinkStreamer的处理流程。

四、HoodieFlinkStreamer流程浅析及扩展方法

调试环境:可先在Idea中建个Flink Demo/QuickStart项目,导入Hudi的hudi-flink-bundle模块、及对应的Hadoop、Hive相关依赖。先在Idea中操作方便调试、以及分析依赖冲突等问题。当前使用分支release-0.10.0,Hive版本2.1.1-cdh6.3.0,hadoop版本3.0.0-cdh6.3.0。

扩展可将流程中关键的类(及必须依赖的类)从Hudi源码拷贝出来(或集成,视各类的依赖关系而定),修改相应逻辑,再使用修改后的类作为处理函数

经过亿些分析及调试后,梳理出HoodieFlinkStreamer的大致流程如下图。
HoodieFlinkStreamer流程

4.1 FlinkKafkaConsumer

  • Function功能说明

数据来源,从Kafka中读取数据,HoodieFlinkStreamer类中,指定的反序列化类为org.apache.flink.formats.json.JsonRowDataDeserializationSchema,将Json数据转换为org.apache.flink.table.data.RowData

  • 处理逻辑扩展

对source函数扩展,主要就是修改反序列化类,数据可选择Debezium发送到Kafka中的带Schema格式的Json数据,反序列化类使用org.apache.flink.formats.json.debezium.DebeziumJsonDeserializationSchema并修改部分逻辑,在DebeziumJsonDeserializationSchemadeserialize(byte[] message, Collector<RowData> out)方法中,先通过message获取到数据中的schema,再参考构造方法重新生成成员变量this.jsonDeserializer、this.metadataConverters。

  • 输出扩展

DebeziumJsonDeserializationSchema本身输出的是实现了RowData接口的org.apache.flink.table.data.GenericRowData(逻辑在emitRow方法中),改为通过复制或继承GenericRowData定义的SchemaWithRowData,添加一个字符串成员变量保存当前数据的schema,以便下游的函数能根据schema重新生成数据处理实例。

4.2 RowDataToHoodieFunction

  • Function功能说明

将DataStream转换为后面HudiAPI操作需要的DataStream。

  • 处理逻辑扩展

O map(I i)方法中,首先根据上游发来的SchemaWithRowData中的schema,参考open方法重新生成this.converter等成员变量。

RowDataToHoodieFunction类中有一个org.apache.flink.configuration.Configuration类型的成员变量config,保存了任务配置的参数,后面流程中函数基本都有这个成员变量,且很多函数也从该配置中读取schema信息,所以在更新schema时,可以首先设置this.config.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, schema);(任务启动通过--source-avro-schema传入参数,所以schema存在config的FlinkOptions.SOURCE_AVRO_SCHEMA这个key中)。后续不再赘述。

  • 输出扩展

与前面类似,为了让下游函数获取到schema,在toHoodieRecord方法中,修改返回值为继承了HoodieRecord类的带有schema信息的自定义类SchemaWithHoodieRecord

4.3 StreamWriteFunction

(其实前面还有个BucketAssignerFunction,看起来没有直接修改或转换当前从流中接收到的数据的各字段值,只是设置了location。也添加了更新schema逻辑,重新生成了bucketAssigner成员变量。)

  • Function功能说明

将流中的数据写入HDFS。数据缓存在this.buckets中,由bufferRecord方法的注释可知,缓存的记录数大于FlinkOptions.WRITE_BATCH_SIZE配置的值、或缓冲区大小大于FlinkOptions.WRITE_TASK_MAX_SIZE时,调用flushBucket将缓存的数据写入文件。

在每次checkpoint时,snapshotState方法也会调用flushRemaining方法将缓存的记录写入文件。

  • 处理逻辑扩展

仍然在processElement方法中,首先通过接收到的SchemaWithHoodieRecord中的schema信息,更新this.writeClient,先关闭再重新生成。

  • 输出扩展

Hudi官方代码中,只在processElement中调用了bufferRecord(所以图中画的虚线)。为了让下游的compact和clean函数接收到新的schema,可直接转发:out.collect(value);(可优化,比如只传schema)

改到当前Function为止,数据已经能写入文件(MOR表的log文件、COW表的parquet文件),但是在Hive中查询不出来

结合StreamWriteFunction类的注释,及一些日志,及一些调试分析。了解到数据写入文件后,会通知StreamWriteOperatorCoordinator保存hudi表相关参数,如提交instant更新Timeline相关记录,相关元数据等,应该就是操作hudi表目录下的.hoodie目录。StreamWriteFunctionflushBucketflushRemaining方法最后调用this.eventGateway.sendEventToCoordinator(event);org.apache.hudi.sink.event.WriteMetadataEvent发到StreamWriteOperatorCoordinatorStreamWriteOperatorCoordinatororg.apache.hudi.sink.common.WriteOperatorFactorygetCoordinatorProvider方法中实例化,也是传入的初始配置,为了能保存最新的元数据,所以也要将schema发过去,在StreamWriteOperatorCoordinator中主要使用WriteMetadataEventwriteStatuses成员变量,所以将schema存在writeStatuses中。

4.4 StreamWriteOperatorCoordinator

  • Function功能说明

主要逻辑在notifyCheckpointComplete方法中,即每次checkpoint完成后执行,总体分为2部分,commitInstant和hive同步。

commitInstant方法中,从eventBuffer中读取WriteMetadataEventwriteStatus,若前面的步骤中真的有数据处理,这里获取到的writeResults不为空,则调用doCommit方法提交相关信息。

如果commitInstant真的提交了数据,返回true,则会调用syncHiveIfEnabled方法执行hive同步操作。最终其实调用到HiveSyncToolsyncHoodieTable方法,从这个方法可以看到Hive同步支持的一些功能,自动建数据库、自动建数据表、自动建分区;元数据同步功能通过将hudi表最新的提交中的元数据从Hive metastore查出的表的元数据对比,如果不同则将元数据变化同步到Hive metastore中。

  • 处理逻辑扩展

上面提到,commitInstant方法中,如果writeResults不为空,则会调用doCommit方法,所以在调用doCommit之前添加更新schema的逻辑,从自定义的SchemaWithWriteStatus中读取schema,参考start方法的逻辑重新生成writeClient。

StreamWriteOperatorCoordinator修改后,在之前数据已经写入文件的基础上,新增的字段、修改的类型已经能同步到hudi表(指.hoodie目录)及hive metastore中,在Hive中COW表也能正常查询。但是对于MOR表,新增的字段只是写到了增量日志文件中,读优化表(_ro)查不到新增字段的数据,所以还要修改Compaction处理类。

4.5 Compaction及Clean类

如上面流程图,compaction分三步:生成压缩计划、执行压缩、提交压缩执行结果。都是类似的操作,先是CompactionPlanOperator接收到DataStream<SchemaWithHoodieRecord>,更新元数据,后续的CompactionPlanEventCompactionCommiEvent也可带上schema,更新各自的table、writeClient等成员。CleanFunction也类似。

到目前为止,MOR表的读优化表在Hive也能查询到新增列的数据,历史parquet文件中没有新增字段,查询结果中新增字段为null。但是实时表(_rt)查询还有点问题。如果查询rt表涉及历史的parquet文件(没有新增字段,至于为什么肯定是Parquet文件,后面会说到通过调试发现,如果以后发现有其他情况再补充),则会报类似这样的错误:

"Field new_col2 not found in log schema. Query cannot proceed! Derived Schema Fields: ..."

五、MOR rt表查询bug解决

5.1 分析

在Hudi源码中搜索该报错信息,找到两个位置,实时表对应的位置是org.apache.hudi.hadoop.utils.HoodieRealtimeRecordReaderUtils#generateProjectionSchema

  public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Schema.Field> schemaFieldsMap,
                                                List<String> fieldNames) {
    /**
     * ......
     */
    List<Schema.Field> projectedFields = new ArrayList<>();
    for (String fn : fieldNames) {
      Schema.Field field = schemaFieldsMap.get(fn.toLowerCase());
      if (field == null) {
        throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
            + "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet()));
      } else {
        projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
      }
    }

    Schema projectedSchema = Schema.createRecord(writeSchema.getName(), writeSchema.getDoc(),
        writeSchema.getNamespace(), writeSchema.isError());
    projectedSchema.setFields(projectedFields);
    return projectedSchema;
  }

遍历了fieldNames,如果schemaFieldsMap中找不到这个字段则报错,所以fieldNames包含新增的字段,schemaFieldsMap为历史parquet文件中读取出来的字段信息,

该方法在org.apache.hudi.hadoop.realtime.AbstractRealtimeRecordReader#init 中被调用。源码中也写了一个TODO还没有DO:

    // TODO(vc): In the future, the reader schema should be updated based on log files & be able
    // to null out fields not present before

即未来基于日志文件更新reader schema,并且会支持将新增的字段置为空值。

AbstractRealtimeRecordReader#init方法中看到,HoodieRealtimeRecordReaderUtils#generateProjectionSchema的fieldNames参数从jobConf中读取,包含新增的字段。通过调试HiveServer2,发现jobConf的properties中以下几个key的值包含字段信息(new_col2为新增字段):

hive.io.file.readcolumn.names -> _hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,id,first_name,last_name,alias,new_col,new_col2

schema.evolution.columns -> _hoodie_commit_time,_hoodie_commit_seqno,_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,id,first_name,last_name,alias,new_col,new_col2

serialization.ddl -> struct customers1_rt { string _hoodie_commit_time, string _hoodie_commit_seqno, string _hoodie_record_key, string _hoodie_partition_path, string _hoodie_file_name, i32 id, string first_name, string last_name, string alias, double new_col, double new_col2}

schema.evolution.columns.types -> string,string,string,string,string,int,string,string,string,double,double

fieldNames参数就使用到了hive.io.file.readcolumn.names的值。schema.evolution.columns中包含了新增字段,且与之对应的schema.evolution.columns.types中包含了字段的类型(看起来是Hive中的类型)。所以尝试将HoodieRealtimeRecordReaderUtils#generateProjectionSchema中抛异常的位置改为在projectedFields中仍然添加一个字段,默认值为null,字段schema通过schema.evolution.columns.types中的类型转换而来。

经过测试,即使不解决这个bug,更新一下历史的数据也可以,但是实际情况中肯定不会用这种方法

5.2 修改

AbstractRealtimeRecordReader#init方法中调用HoodieRealtimeRecordReaderUtils#generateProjectionSchema的位置改成:

readerSchema = HoodieRealtimeRecordReaderUtils.generateProjectionSchema(writerSchema, schemaFieldsMap, projectionFields,
          jobConf.get("schema.evolution.columns"), jobConf.get("schema.evolution.columns.types"));

HoodieRealtimeRecordReaderUtils#generateProjectionSchema改为:

public static Schema generateProjectionSchema(Schema writeSchema, Map<String, Schema.Field> schemaFieldsMap,
                                                List<String> fieldNames, String csColumns, String csColumnTypes) {
    /**
     * ...
     */
    List<Schema.Field> projectedFields = new ArrayList<>();
    Map<String, Schema.Field> fieldMap = getFieldMap(csColumns, csColumnTypes);
    for (String fn : fieldNames) {
      Schema.Field field = schemaFieldsMap.get(fn.toLowerCase());
      if (field == null) {
//        throw new HoodieException("Field " + fn + " not found in log schema. Query cannot proceed! "
//            + "Derived Schema Fields: " + new ArrayList<>(schemaFieldsMap.keySet()));
        projectedFields.add(fieldMap.get(fn));
      } else {
        projectedFields.add(new Schema.Field(field.name(), field.schema(), field.doc(), field.defaultVal()));
      }
    }

    Schema projectedSchema = Schema.createRecord(writeSchema.getName(), writeSchema.getDoc(),
        writeSchema.getNamespace(), writeSchema.isError());
    projectedSchema.setFields(projectedFields);
    return projectedSchema;
  }

其中getFieldMap方法为:

  private static Map<String, Schema.Field> getFieldMap(String csColumns, String csColumnTypes) {
    LOG.info(String.format("columns:%s\ntypes:%s", csColumns, csColumnTypes));
    Map<String, Schema.Field> result = new HashMap<>();
    String[] columns = csColumns.split(",");
    String[] types = csColumnTypes.split(",");
    for (int i = 0; i < columns.length; i++) {
      String columnName = columns[i];
      result.put(columnName, new Schema.Field(columnName,toSchema(types[i]), null, null));
    }
    return result;
  }

  private static Schema toSchema(String hiveSqlType) {
    switch (hiveSqlType.toLowerCase()) {
      case "boolean":
        return Schema.create(Schema.Type.BOOLEAN);
      case "byte":
      case "short":
      case "integer":
        return Schema.create(Schema.Type.INT);
      case "long":
        return Schema.create(Schema.Type.LONG);
      case "float":
        return Schema.create(Schema.Type.FLOAT);
      case "double":
      case "decimal":
        return Schema.create(Schema.Type.DOUBLE);
      case "binary":
        return Schema.create(Schema.Type.BYTES);
      case "string":
      case "char":
      case "varchar":
      case "date":
      case "timestamp":
      default:
        return Schema.create(Schema.Type.STRING);

    }
  }

修改后,重新将依赖包部署到Hive中,rt表也能正常查询。从parquet文件中读取的数据,新增的字段则显示的空值

六、总结

通过这种方法,实现了元数据动态同步到Hive。

对有些地方的源码细节了解得还不透彻,以后可以再多看看,以便能发现和解决更多的问题。

期待Hudi官方以后新增加这种功能,看是否有更好的方案。

参考资料

  • Schema Evolution
  • Apache Hudi + Flink作业运行指南
  • HUDI FLINK 答疑解惑
  • 超详细步骤!整合Apache Hudi + Flink + CDH
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

一种Hudi on Flink动态同步元数据变化的方法 的相关文章

随机推荐

  • Android RecyclerView Item点击事件

    RecyclerView控件出来后可以代替ListView xff0c 功能更强大 xff0c 但是RecyclerView却没有ListView的setOnItemClickListener和setOnLongClickListener方
  • GitHub上README.md编写教程(基本语法)

    今天在编写github中的readme md文档时 xff0c 发现自己写的和别人写的样式完全不一样 xff0c 自己的好难看 xff0c 于是百度了一番 xff0c 很多都是转载的别人13年的博客 xff0c 其中还发现一片17年写的原创
  • android 平板适配

    首次进行平板开发 xff0c 从一开始就在琢磨适配是怎么弄的 xff0c 百度 google 群一大圈 xff0c 还是没人告诉具体是怎么做的 xff0c 都是基本的概念性的讲述怎么适配 xff0c 写了一个界面发现存在很大适配问题 xff
  • Linux下常用的优秀软件

    Linux下优秀软件介绍 常用软件工具软件美化图像视频相关音频播放器下载工具科研利器终端Windows下常用软件运行 常用软件 下面的软件均是本人使用过的 xff0c 如果大家有自己觉得很好的软件 xff0c 欢迎留言 xff0c 好的软件
  • Charles抓包遇到的坑,看这一篇就够了

    Android 7 0 xff08 API 24 xff09 以下 xff0c 你可以直接使用 Charles 安装相关证书配置好代理后实现抓包功能 xff0c 本文主要讲android7 0以后如何实现抓包功能 xff1a 首先下载安装C
  • Bad notification posted from: Couldn't expand RemoteViews for: StatusBarNotification

    自定义通知栏有些机型报下面的错 xff1a android app RemoteServiceException Bad notification posted from package xxx Couldn 39 t expand Rem
  • SCP不用密码传输文件

    SCP不用密码传输文件 方法一 xff1a 使用sshpass yum install sshpass sshpass p password scp data scripts data log root 64 X X X X data da
  • 新版Unity里面怎么使用Post Process

  • 在Unity中使用.NET 4.x和在Unity项目添加外部程序集

    2019版本已经没有4 x等效运行选项了 xff1a 应该是只能选择4 x等效Api 然后API等级选择有两种选择 xff1a NET Standard 2 0 此配置文件与 NET Foundation发布的 NET Standard 2
  • Unity使用.NET4.x新的语法和语言功能

  • UE4设置 只修改蓝图节点的语言为英文

    原因 因为在查找需要的节点时 xff0c 输入英文更符合命名法 xff0c 为了更好的查找节点 xff0c 把节点的名字改成英文当然更好 xff01 操作步骤 取消下面的勾选即可 结果
  • Unity的InputSystem使用实践

    如何使用 首先得有一个PlayerInput在场景中 xff0c 每一个PlayerInput表示一个玩家 在Actions里面选择自己的Actions xff0c 可以新建 xff1a 比如在PlayerMaps中的MoveActions
  • 虚幻4C++编程入门(搬运1)

    首先我们将使用虚幻编辑器中的类向导生成基础 C 43 43 类 xff0c 以便蓝图稍后进行延展 下图展示了向导的第一步 新建一个 Actor 这里根据对character pawn和actor的描述 xff0c 知道了pawn是actor
  • 虚幻4C++编程入门深入了解

    这部分我们将讨论基础构建块以及它们之间相互关联的方式 在此我们将了解虚幻引擎如何使用 继承和合成构建自定义游戏性功能 游戏性类 xff1a 对象 Actor 和组件 多数游戏性类派生自 4 个主要类型 它们是 UObject AActor
  • 窗口焦点丢失问题分析

    从slog中的systemlog可以看出如下信息 xff1a 01 01 08 29 03 732 633 936 I WindowManager Relayout invis Window 42244420 u0 Keyguard mEx
  • ubuntu不能挂载U盘问题

    插上U盘一个弹窗显示不能挂载系统 就不附图了 自己也是在到处找了问题 xff0c 试了很多教程 xff0c 要么看不懂要么没用要么瞎写的 最后实测一个有用的 因为ubuntu默认不能识别U盘 解决方法 xff1a 安装exfat磁盘格式工具
  • unity3d个人版怎么改变主题=>黑色

    是有很多文章都有介绍如何使用操作 xff0c 我就不再多说 xff0c 主要是那个软件的问题 xff08 在下面我会附上winhex的下载地址和详细教程 xff09 一般会出问题的地方是试用版会出很多问题 xff0c 比如什么200k以上不
  • 【正则表达式】基础应用(匹配matches(regex))(转)

    正则表达式基础应用 匹配 字符 x 字符 x 反斜线字符 字符类 abc a b 或 c xff08 简单类 xff09 abc 任何字符 xff0c 除了 a b 或 c xff08 否定 xff09 a zA Z a 到 z 或 A 到
  • 一种动态更新flink任务配置的方法

    文章目录 1 原理2 例 xff0c 整数过滤2 1 并行度为12 2 并行度大于12 3 完整代码 参考链接 1 原理 参考Flink Spark 如何实现动态更新作业配置 xff0c 讲得比较详细 xff0c 这篇的文章的参考参考文献也
  • 一种Hudi on Flink动态同步元数据变化的方法

    文章目录 一 背景二 官方Schema Evolution例子三 Flink 43 Hudi实现Schema Evolution四 96 HoodieFlinkStreamer 96 流程浅析及扩展方法4 1 FlinkKafkaConsu