Datax-HdfsWriter如何实现支持decimal类型数据写入

2023-11-10

一、问题背景
之前在做Datax数据同步时,发现源端binary、decimal等类型的数据无法写入hive字段。看了一下官网文档,DataX HdfsWriter 插件文档,是1-2年前的,当初看过部分源码其实底层hadoop是支持这些类型写入的,后来随着工作变动也忘了记录下来,借着近期datax群里又有人问起,勾起了回忆,索性改一下源码记录一下。

很重要的一点:我们其实要知道,datax只不过是个集成了异构数据源同步的框架,真正的读取和写入都是数据源底层本身支持功能才能用,所以要想知道某个功能支不支持,首先得去看底层的数据源支不支持。

注:binary类型写入之后读取又会有坑,将另外开启一篇单独介绍Hdfs如何实现支持binary类型数据读写,改动部分代码已提交。

欢迎自取:github地址

分支:feature_hdfs_writer_decimal_binary_support

二. 环境准备
Datax版本:3.0

Hadoop版本:2.7.3

Hive版本:2.3.2

三. Datax 源码
       首先从hdfswriter的startwrite方法入手,根据配置job文件的filetype类型区分写入hdfs的存储格式:

HdfsWriter:
 
public void startWrite(RecordReceiver lineReceiver) {
            LOG.info("begin do write...");
            LOG.info(String.format("write to file : [%s]", this.fileName));
            if(fileType.equalsIgnoreCase("TEXT")){
                //写TEXT FILE
                hdfsHelper.textFileStartWrite(lineReceiver,this.writerSliceConfig, this.fileName,
                        this.getTaskPluginCollector());
            }else if(fileType.equalsIgnoreCase("ORC")){
                //写ORC FILE
                hdfsHelper.orcFileStartWrite(lineReceiver,this.writerSliceConfig, this.fileName,
                        this.getTaskPluginCollector());
            }
 
            LOG.info("end do write");
}

进入hdfsHelper查看具体的写入逻辑:

HdfsHelper:
 
// TEXT
public void textFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
                                   TaskPluginCollector taskPluginCollector){
...
            RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, outputPath.toString(), Reporter.NULL);
            Record record = null;
            while ((record = lineReceiver.getFromReader()) != null) {
                MutablePair<Text, Boolean> transportResult = transportOneRecord(record, fieldDelimiter, columns, taskPluginCollector);
                if (!transportResult.getRight()) {
                    writer.write(NullWritable.get(),transportResult.getLeft());
                }
            }
            writer.close(Reporter.NULL);
...
}
 
// ORC
public void orcFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
                                  TaskPluginCollector taskPluginCollector){
...
        List<String> columnNames = getColumnNames(columns);
        // 获取字段类型序列化器,这个方法很关键,后续对于decimal类型字段的改造需要用到
        List<ObjectInspector> columnTypeInspectors = getColumnTypeInspectors(columns);
        StructObjectInspector inspector = (StructObjectInspector)ObjectInspectorFactory
                .getStandardStructObjectInspector(columnNames, columnTypeInspectors);
...
            RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL);
            Record record = null;
            while ((record = lineReceiver.getFromReader()) != null) {
                MutablePair<List<Object>, Boolean> transportResult =  transportOneRecord(record,columns,taskPluginCollector);
                if (!transportResult.getRight()) {
// orc 格式的需要对应类型序列化器才能写入到hdfs
                    writer.write(NullWritable.get(), orcSerde.serialize(transportResult.getLeft(), inspector));
                }
            }
            writer.close(Reporter.NULL);
...
}
 
// 将从channel中收到的record字符串按照对应的字段类型进行转换
public static MutablePair<List<Object>, Boolean> transportOneRecord(
            Record record,List<Configuration> columnsConfiguration,
            TaskPluginCollector taskPluginCollector){
...
 for (int i = 0; i < recordLength; i++) {
      column = record.getColumn(i);
      //todo as method
      if (null != column.getRawData()) {
      String rowData = column.getRawData().toString();
      // datax定义的hive支持类型枚举类
      SupportHiveDataType columnType = SupportHiveDataType.valueOf(columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase());
      //根据writer端类型配置做类型转换
      switch (columnType) {
          case TINYINT:
               recordList.add(Byte.valueOf(rowData));
               break;
...
}

从上述代码中可以得知,text类型文件写入,不需要做特殊的序列化处理,因此对于text类型的文本写入,只要在transportOneRecord中添加缺少的类型转换就能实现对应类型字段的写入,而对于ORC类型的文件写入则需要对应的类型序列化器才能做到。至此我们重点应该放在验证hadoop底层是否真的没有binary以及decimal等类型的序列化器。

      上述代码中,我也标记出了ORC中获取字段序列化器的入口位置[HdfsHelper.getColumnTypeInspectors]方法内部。
 

HdfsHelper:
 
// 根据writer配置的字段类型,构建序列化器
public List<ObjectInspector>  getColumnTypeInspectors(List<Configuration> columns){
        List<ObjectInspector>  columnTypeInspectors = Lists.newArrayList();
        for (Configuration eachColumnConf : columns) {
            SupportHiveDataType columnType = SupportHiveDataType.valueOf(eachColumnConf.getString(Key.TYPE).toUpperCase());
            ObjectInspector objectInspector = null;
            switch (columnType) {
                case TINYINT:
                    objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Byte.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
                    break;
...
}

  看到这里就知道下一步就是需要到ObjectInspectorFactory中去看对应类型的ObjectInspector类是什么,接下来就是到hive的底层源码了。

ObjectInspectorFactory:
 
public static ObjectInspector getReflectionObjectInspector(Type t, ObjectInspectorFactory.ObjectInspectorOptions options) {
        // 优先从缓存中获取
        ObjectInspector oi = (ObjectInspector)objectInspectorCache.get(t);
        if (oi == null) {
        // 缓存中不存在,获取实际类,并添加到缓存中
            oi = getReflectionObjectInspectorNoCache(t, options);
            objectInspectorCache.put(t, oi);
        }
...
        return oi;
    }
 
 
 
private static ObjectInspector getReflectionObjectInspectorNoCache(Type t, ObjectInspectorFactory.ObjectInspectorOptions options) {
// 开头就验证Map,Array类型的复合字段类型,这就说明了其实hive提供的sdk本身也是支持这些字段类型写入的
        if (t instanceof GenericArrayType) {
            GenericArrayType at = (GenericArrayType)t;
            return getStandardListObjectInspector(getReflectionObjectInspector(at.getGenericComponentType(), options));
        } else {
            if (t instanceof ParameterizedType) {
                ParameterizedType pt = (ParameterizedType)t;
                if (List.class.isAssignableFrom((Class)pt.getRawType()) || Set.class.isAssignableFrom((Class)pt.getRawType())) {
                    return getStandardListObjectInspector(getReflectionObjectInspector(pt.getActualTypeArguments()[0], options));
                }
 
                if (Map.class.isAssignableFrom((Class)pt.getRawType())) {
                    return getStandardMapObjectInspector(getReflectionObjectInspector(pt.getActualTypeArguments()[0], options), getReflectionObjectInspector(pt.getActualTypeArguments()[1], options));
                }
 
                t = pt.getRawType();
            }
        if (!(t instanceof Class)) {
                throw new RuntimeException(ObjectInspectorFactory.class.getName() + " internal error:" + t);
            } else {
                Class<?> c = (Class)t;
// 根据传入的不同类去不同的缓存中获取class对象
                if (PrimitiveObjectInspectorUtils.isPrimitiveJavaType(c)) {
                    return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspectorUtils.getTypeEntryFromPrimitiveJavaType(c).primitiveCategory);
                } else if (PrimitiveObjectInspectorUtils.isPrimitiveJavaClass(c)) {
                    return PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveObjectInspectorUtils.getTypeEntryFromPrimitiveJavaClass(c).primitiveCategory);
                } else if (PrimitiveObjectInspectorUtils.isPrimitiveWritableClass(c)) {
                    return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(PrimitiveObjectInspectorUtils.getTypeEntryFromPrimitiveWritableClass(c).primitiveCategory);
                }
...
}
}

代码很清晰,直接看对应的缓存class是怎么初始化进去的就可以知道,我们一会需要用什么类型去做代码改造

PrimitiveObjectInspectorUtils:
 
// 缓存中注册类型
static void registerType(PrimitiveObjectInspectorUtils.PrimitiveTypeEntry t) {
...
        if (t.primitiveJavaType != null) {
            primitiveJavaTypeToTypeEntry.put(t.primitiveJavaType, t);
        }
 
        if (t.primitiveJavaClass != null) {
            primitiveJavaClassToTypeEntry.put(t.primitiveJavaClass, t);
        }
 
        if (t.primitiveWritableClass != null) {
            primitiveWritableClassToTypeEntry.put(t.primitiveWritableClass, t);
        }
...
}
 
// 静态代码块初始化
static {
        binaryTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.BINARY, "binary", byte[].class, byte[].class, BytesWritable.class);
        stringTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.STRING, "string", (Class)null, String.class, Text.class);
        booleanTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.BOOLEAN, "boolean", Boolean.TYPE, Boolean.class, BooleanWritable.class);
        intTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.INT, "int", Integer.TYPE, Integer.class, IntWritable.class);
        longTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.LONG, "bigint", Long.TYPE, Long.class, LongWritable.class);
        floatTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.FLOAT, "float", Float.TYPE, Float.class, FloatWritable.class);
        voidTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.VOID, "void", Void.TYPE, Void.class, NullWritable.class);
        doubleTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.DOUBLE, "double", Double.TYPE, Double.class, DoubleWritable.class);
        byteTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.BYTE, "tinyint", Byte.TYPE, Byte.class, ByteWritable.class);
        shortTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.SHORT, "smallint", Short.TYPE, Short.class, ShortWritable.class);
        dateTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.DATE, "date", (Class)null, Date.class, DateWritable.class);
        timestampTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.TIMESTAMP, "timestamp", (Class)null, Timestamp.class, TimestampWritable.class);
        decimalTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.DECIMAL, "decimal", (Class)null, HiveDecimal.class, HiveDecimalWritable.class);
        varcharTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.VARCHAR, "varchar", (Class)null, HiveVarchar.class, HiveVarcharWritable.class);
        charTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.CHAR, "char", (Class)null, HiveChar.class, HiveCharWritable.class);
        unknownTypeEntry = new PrimitiveObjectInspectorUtils.PrimitiveTypeEntry(PrimitiveCategory.UNKNOWN, "unknown", (Class)null, Object.class, (Class)null);
        registerType(binaryTypeEntry);
        registerType(stringTypeEntry);
        registerType(charTypeEntry);
        registerType(varcharTypeEntry);
        registerType(booleanTypeEntry);
        registerType(intTypeEntry);
        registerType(longTypeEntry);
        registerType(floatTypeEntry);
        registerType(voidTypeEntry);
        registerType(doubleTypeEntry);
        registerType(byteTypeEntry);
        registerType(shortTypeEntry);
        registerType(dateTypeEntry);
        registerType(timestampTypeEntry);
        registerType(decimalTypeEntry);
        registerType(unknownTypeEntry);
    }
 

  看到这里,就很明白了,hive底层是支持binary,decimal这些类型的字段写入的,所以我们只需要拿到入参的class类。这里用decimal拿来举例子,选择有2个,一个是HiveDecimal.class, HiveDecimalWritable.class,因此回到HdfsHelper中,添加decimal类型,并在枚举类中新增DECIMAL即可
 

case DECIMAL:
     objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(HiveDecimal.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;

但是实际还有个坑,没注意,因为我先测试的text类型的文件写入,在transportOneRecord中用java的decimal去做类型转换操作了

transportOneRecord方法:
 
case DECIMAL:
recordList.add(new BigDecimal(rowData));
break;

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Datax-HdfsWriter如何实现支持decimal类型数据写入 的相关文章

随机推荐

  • Sqlserver 监控使用磁盘空间情况

    最近遇到一个小问题 为了保存以往的一些数据 间了大量临时表 导致SQLserver 数据增长过快 不得不想个办法监控磁盘空间使用情况 网上一般有几种办法 一是使用 dm os volume stats函数 缺点是 无法获取非数据库所在的磁盘
  • Service Bus Namespace 和 Access Control

    Service Bus Namespace 和 Access Control Service Bus Namespace简述 https yourapp servicebus windows net foo bar baz 就是一个name
  • 【必看】时序逻辑仿真成组合逻辑?你知道原因吗?

    对于初学者 一般会遇到这种情况 明明写的时序逻辑 结果仿真结果却是组合逻辑 然后看遍设计代码 始终找不到原因 交流群 知乎这种问题随处可见 但不要怀疑软件问题 modelsim这些专用软件基本不会遇见软件自身问题 原因其实很简单 因为多数人
  • 常用内存数据库介绍(四)

    4 5 H2 Database h2是Thomas Mueller提供的一个开源的 纯java实现的关系数据库 官方网站 http www h2database com html main html 它的主要特性是 非常速的数据库引擎 开源
  • 《算法图解》总结第 7 章:狄克斯特拉算法

    仅用于记录学习 欢迎批评指正 大神勿喷 系列文章目录 算法图解 总结第 1 章 二分查找 大O表示法 算法图解 总结第 2 章 数组和链表 选择排序 算法图解 总结第 3 章 while循环 递归 栈 算法图解 总结第 4 章 分而治之 快
  • mac safari无法打开网页_Safari浏览器无法打开网页,因为您的iphone尚未接入互联网...

    原因如下 1 移动数据没打开 如果苹果手机出现游览器无法打开网页 我们专首先要查看手机上面网络属数据是否开启 如果忘记开启网络数据的话 那么没有网络也就无法打开访问网页 这个时候 打开系统设置将蜂窝移动数据按钮打开 即可解决这个问题 2 检
  • FastCGI sent in stderr: “Primary script unknown“ while reading response header from upstream问题解决

    error 1439 1439 5 FastCGI sent in stderr Primary script unknown while reading response header from upstream php对接nginx的配
  • wifi 概念

    wifi 的一些概念 转载 http blog csdn net eager7 article details 8117600 python view plain copy 1 什么是WIFI Wi Fi 原先是无线保真的缩写 Wi Fi
  • html中哪些是行内元素,html行内元素有哪些

    html行内元素有 a b u span img input strong select sub sup label em button textarea tt var samp br cite code font strike等等 本教程
  • layui文件上传后台(带自定参数)

    记录layui文件上传方法 前端页面直接看layui文件上传相关文档就行 主要是记录后端Java接收上传流并保存的方法 layui文档 https www layui com doc modules upload html 因为该方法使用M
  • [BSidesSF2019]goodluks

    BSidesSF2019 goodluks 考点 题解过程 flag 考点 1 EFF 骰子密码 2 Linux删除的文件恢复 3 LUKS加密 题解过程 开局给了一张图片和一个img的文件 首先使用查看镜像的文件内容 是一个MBR的启动项
  • matlab非线性规划

    1 非线性规划matlab函数 非线性规划函数的约束函数和目标函数至少有一个是非线性函数 而对比于线性规划的区别也就一眼识别了 MATLAB中用于求解非线性规划的函数为fmincon 其调用格式如下 x fmincon f x0 A b x
  • java调用webservice接口 几种方法

    webservice的 发布一般都是使用WSDL web service descriptive language 文件的样式来发布的 在WSDL文件里面 包含这个webservice暴露在外面可供使用的接口 今天搜索到了非常好的 webs
  • python数据预处理之缺失值的各种填补方式

    如果你觉得文字看着枯燥 可以看配套讲解视频 讲解视频 对于数据挖掘的缺失值的处理 应该是在数据预处理阶段应该首先完成的事 缺失值的处理一般情况下有三种方式 1 删掉缺失值数据 2 不对其进行处理 3 利用插补法对数据进行补充 第一种方式是极
  • 修改 bootargs 方式增加分区(mtd分区和blkdevparts分区)

    1 Linux内核设置分区的两种方式 1 1 内核代码中写死 在内核的平台代码中写死 然后在初始化NandFlash的时候设置 1 2 uboot通过bootargs传递分区表 1 u boot将分区信息 形如 mtdparts xxx b
  • 机器学习之逻辑回归,代码实现(附带sklearn代码,小白版)

    文章目录 前言 一 逻辑回归能够解决什么 二 公式 三 激活函数 四 如何求得w 六 逻辑回归代码实现 五 sklearn demo 总结 前言 虽然名字带有回归 但实际上是一个常用的二分类算法 并且在预测的时候能够提供预测类别的概率 一
  • antd中Form.useForm()使用方式

    这里写自定义目录标题 onRow 表单Form useForm onRow table table record 点击后获取的数据对象 onRow record gt return event获取当前列元素节点 可用 event targe
  • 聚类算法4——DBSCAN密度聚类(算法步骤及matlab代码)

    看了西关书的聚类算法 算法原理很容易明白 接下来就是整理成自己的理解思路 然后一步一步来实现算法 那么就来做吧 DensityClustering算法 概念 从样本密度的角度考察样本之间的可连接性 样本分布的紧密程度刻画聚类结构 术语 核心
  • el-table实现指定列合并

    table传入span method方法可以实现合并行或列 方法的参数是一个对象 里面包含当前行row 当前列column 当前行号rowIndex 当前列号columnIndex四个属性 该函数可以返回一个包含两个元素的数组 第一个元素代
  • Datax-HdfsWriter如何实现支持decimal类型数据写入

    一 问题背景 之前在做Datax数据同步时 发现源端binary decimal等类型的数据无法写入hive字段 看了一下官网文档 DataX HdfsWriter 插件文档 是1 2年前的 当初看过部分源码其实底层hadoop是支持这些类