Datax插件二次开发之HdfsWriter支持parquet

2023-11-06

Datax插件二次开发之HdfsWriter支持parquet

Date: December 24, 2021

1. 背景

目前,公司的OLAP和AD-HOC组件主要使用impala,而当前我们的impala版本支持parquet\textfile格式,却不支持ORC格式,因此会有同步数据时,进行parquet格式写入的需求。

在网上查了下资料,只找到一个支持parquet的hdfswriter插件,但是有网友乐(tian)于(bu)分(zhi)享(chi)的贴了一个版本,没有源码不说,下载下来还各种坑(只支持单表,不支持分区表),实则是明里埋坑暗里收钱,让人无语。

因此,决定自己开发个插件,开源出来,供大家一起使用。

2. 操作步骤

2.1 代码开发

通过拉取datax源码,对hdfswriter模块进行修改,主要修改HdfsHelper.java和HdfsWriter.java 两个类文件,模仿模块中orc的代码块,结合parquet API 进行开发,主要是在HdfsHelper.java中添加parquetFileStartWrite 方法,具体代码如下:

/**
     * 写parquetfile类型文件
     * @param lineReceiver
     * @param config
     * @param fileName
     * @param taskPluginCollector
     */
    public void parquetFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
                                      TaskPluginCollector taskPluginCollector){
        List<Configuration>  columns = config.getListConfiguration(Key.COLUMN);
        String compress = config.getString(Key.COMPRESS, null);
        List<String> columnNames = getColumnNames(columns);
        List<ObjectInspector> columnTypeInspectors = getColumnTypeInspectors(columns);
        StructObjectInspector inspector = (StructObjectInspector)ObjectInspectorFactory
                .getStandardStructObjectInspector(columnNames, columnTypeInspectors);

        ParquetHiveSerDe parquetHiveSerDe = new ParquetHiveSerDe ();

        MapredParquetOutputFormat outFormat = new MapredParquetOutputFormat();
        if(!"NONE".equalsIgnoreCase(compress) && null != compress ) {
            Class<? extends CompressionCodec> codecClass = getCompressCodec(compress);
            if (null != codecClass) {
                outFormat.setOutputCompressorClass(conf, codecClass);
            }
        }
        try {
            Properties colProp= new Properties();
            colProp.setProperty("columns",String.join(",",columnNames));
            List<String> colTypes = new ArrayList<>();
            columns.forEach(col ->colTypes.add(col.getString(Key.TYPE)));
            colProp.setProperty("columns.types",String.join(",",colTypes));
            RecordWriter writer = (RecordWriter) outFormat.getHiveRecordWriter(conf,new Path(fileName), ObjectWritable.class,true,colProp,Reporter.NULL);
            Record record = null;
            while ((record = lineReceiver.getFromReader()) != null) {
                MutablePair<List<Object>, Boolean> transportResult =  transportOneRecord(record,columns,taskPluginCollector);
                if (!transportResult.getRight()) {
                    writer.write(null, parquetHiveSerDe.serialize(transportResult.getLeft(), inspector));
                }
            }
            writer.close(Reporter.NULL);
        } catch (Exception e) {
            String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName);
            LOG.error(message);
            Path path = new Path(fileName);
            deleteDir(path.getParent());
            throw DataXException.asDataXException(HdfsWriterErrorCode.Write_FILE_IO_ERROR, e);
        }
    }

2.2 相关说明

虽然在hdfswriter模块中加入 parquet 代码块中后,能够在hdfs中进行parquet格式写入。但是测试的时候,却发现原来支持的ORC格式写入却发生异常了,几经debug,也没发现问题出在哪里,因此怀疑问题出现在包的版本上。

由于DATAX的hdfswriter中hive和hadoop版本为:

<properties>
    <hive.version>1.1.1</hive.version>
    <hadoop.version>2.7.1</hadoop.version>
</properties>

需要适配我们的CDH集群,在引入parquet时,我们更改了DATAX的hdfswriter中hive和hadoop版本:

<properties>
    <hive.version>1.1.0-cdh5.12.1</hive.version>
    <hadoop.version>2.6.0-cdh5.12.1</hadoop.version>
    <!--<hive.version>1.1.1</hive.version>-->
    <!--<hadoop.version>2.7.1</hadoop.version>-->
</properties>

所以,目测当时阿里开发人员在做文件类型支持时,肯定也是考虑过parquet了的,但是确实存在版本兼容问题,因此舍弃了parquet文件格式,而选择了新型的ORC。大家在使用时,如果出现版本问题,请调整hive和hadoop版本进行重新编译。

那么,我选新建插件模块,来绕开这个问题,于是新建了一个模块: hdfsparquetwriter 用来在支持parquet写入,具体代码仓库为:https://gitee.com/jackielee4cn/DataX.git

欢迎进行指正和Star 。

3.使用样例

3.1 编译安装

下载源码,编译打包,找到模块文件 的 target/datax/plugin/writer/hdfswriter.zip 文件。将文件解压到datax安装目录的${DATAX_HOME}/plugin/writer/ 下 。

3.2 配置datax job

配置方式与官网的orc根式hdfswriter方式一致,只是这里的fileType,只支持text、csv、rcfile、parquet ,不支持ORC (orc相关方法被注释掉了 ) ,并且wtier.name为【hdfsparquetwriter】,相当于是hdfswriter插件的一个补丁插件。

具体样例内容如下:

test_hdfswriter_parquet.job

{
  "core": {
    "transport": {
      "channel": {
        "speed": {
          "record": "100000"
        }
      }
    }
  },
  "job": {
    "setting": {
      "speed": {
        "channel": 2
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "write_user111",
            "password": "aaa@123",
            "column": [
				"`f_id`",
				"`f_order_id`",
				"`f_is_refund`",
				"`f_amount`",
				"`f_time`"
            ],
            "splitPk": "f_id",
			"where": "f_time<='2021-12-01 01:00:00' ", 
            "connection": [
              {
                "table": [
                  "test_datax_parquet"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://test02:3306/test?useUnicode=true&characterEncoding=utf-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&dontTrackOpenResources=true"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "hdfsparquetwriter",
          "parameter": {
            "defaultFS": "hdfs://test01:8020",
            "fileType": "parquet",
            "path": "/user/hive/warehouse/test.db/test_datax_parquet/date_id=20211201",
            "fileName": "test_datax_text",
            "writeMode": "append",
            "fieldDelimiter": "\u0001",
            "column": [
              {
                "name": "f_id",
                "type": "bigint"
              },
              {
                "name": "f_order_id",
                "type": "string"
              },
              {
                "name": "f_is_refund",
                "type": "int"
              },
              {
                "name": "f_amount",
                "type": "double"
              },
              {
                "name": "f_time",
                "type": "string"
              }
            ]
          }
        }
      }
    ]
  }
}

3.3 执行job

python ${DATAX_HOME}/bin/datax.py test_hdfswriter_parquet.job

查看控制台日志,执行正常。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Datax插件二次开发之HdfsWriter支持parquet 的相关文章

随机推荐

  • 性能测试 —— 什么是全链路压测?

    随着互联网技术的发展和普及 越来越多的互联网公司开始重视性能压测 并将其纳入软件开发和测试的流程中 阿里巴巴在2014 年双11 大促活动保障背景下提出了全链路压测技术 能更好的保障系统可用性和稳定性 什么是全链路压测 全链路压测是一种全面
  • 4.7 期货每日早盘操作建议

    期货期权日评 静待反抽 PMI数据显示国内疫情基本控制后复工已较明显 经济数据将在二季度逐步改善 同时近期高层在贷款 汽车消费方面政策频出 有望支持实体经济复苏 当前A股已处于低位 期指继续做空的风险收益比在下降 因此建议可在股指期权上轻仓
  • Failed to execute ‘addColorStop‘ on ‘CanvasGradient‘: The value provided (‘undefined‘) could not be

    在echarts使用属性visualMap对折线图进行区间的变色设置 结果写完直接报错 Uncaught DOMException Failed to execute addColorStop on CanvasGradient The v
  • springboottest注解

    SpringBoot test 好习惯要坚持下去 CSDN博客 springboot test springboot使用 SpringBootTest注解进行单元测试 灰太狼 CSDN博客 springboot test
  • 如何为模型不同层设置不同的学习率?

    在模型调参中常用的一种方法是针对不同层设置不同的学习率 以此避免因难易程度不一致引起的过拟合等问题 一 模型举例 class Model nn Module def init self input size hidden size outp
  • 【cfengDB】自己实现数据库第0节 ---整体介绍及事务管理层实现

    LearnProj 内容管理 MySQL系统结构 一条SQL执行流程 cfengDB整体结构 事务管理TM模块 TID文件规则定义 文件读写 NIO RandomAccessFile FileChannel ByteBuffer 接口实现
  • 【单调栈】找到左右两边的最近小于元素

    基本概念 从一个问题引出单调栈的这个概念 给定一个数组 对于数组中的每一个元素 分别找到它左边和右边最近的小于它的元素 无重复数组 默认该数组中的元素是无重复的 我们可以维护一个栈 从栈的下方到上方 元素的大小从小到大 对于数组中的每一个元
  • OSI参考模型与TCP/IP参考模型(计算机网络)

    一 1 OSI参考模型有7层 从上到下为 应用层 表示层 会话层 传输层 网络层 数据链路层 物理层 如下图1 2 TCP IP参考模型有4层 自上到下分别为 应用层 传输层 网际层 网络接口层 如下图2 3 常考的5层参考模型是这样的 自
  • 微信小程序生成分享带参数二维码图片 并添加文字功能

    笔者最近接到一个新的任务 不是很难的功能 就是之前没有接触过 后端生成带参数的小程序二维码图片 并在图片下面添加一些文字 想在将代码分享给大家 期望可以给大家提供帮助 一 首先生成小程序的分享二维码有三种方式 接口 A 适用于需要的码数量较
  • 编程报错和问题解决办法【总结篇】

    目录 1 VMware开启虚拟机失败 模块 Disk 启动失败 2 vim 输入时光标键会变成a b c d 3 vim中delete backspace 键不能向左删除 4 conda command not found解决办法 5 进入
  • Leaflet的Vue组件 — Vue2Leaflet

    原文地址 Leaflet的Vue组件 Vue2Leaflet 这两天折腾Vue 在GitHub上发现了一个开源项目Vue2Leaflet Vue2Leaflet是一个Vue框架的JavaScript库 封装了Leaflet 它使构建地图变得
  • element Tree树形控件使用记录

    需求为使用弹窗选择区域 弹窗左侧为待选区 右侧会展示当前已选中项 也是树形控件展示 如果打开弹窗时上次有选中数据 需要展示出来并勾选相应树形节点 1 html及配置项 数据源部分 由于需求中有需要主动设置选中项 所以需要设置node key
  • 【Linux】线程池

    目录 前言 线程池概念 线程池的实现 前言 这篇文章来实现一个线程池 线程池概念 线程池 一种线程使用模式 线程过多会带来调度开销 进而影响缓存局部性和整体性能 而线程池维护着多个线程 等待着监督管理者分配可并发执行的任务 这避免了在处理短
  • Linux_基础知识笔记4

    基础知识 一 Linux目录结构 二 cat 查看文件内容 三 more 查看文件内容 内容多 四 less 查看文件内容 五 head tail 查看文件内容 六 wc 统计文件内容 七 grep 检索和过滤文件内容 八 gzip bzi
  • 利用闭包,在不设置全局变量的情况下,完成再次点击退出功能

    做APP经常会用到的功能就是 第一次点击弹出退出提示 再次点击退出app 以前常规做法 就是立flag 代码如下 var ableToOut null api addEventListener name keyback function r
  • 关于Undefined symbols for architecture x86_64这个错的总结

    最近在Mac上做一个程序 需要调用动态链接库 出现两次Undefined symbols for architecture x86 64的错误 所以总结下 第一个是 Undefined symbols for architecture x8
  • C++ 复制(拷贝)构造函数

    复制构造函数的定义 复制构造函数是一种特殊的构造函数 其形参为本类的对象引用 作用是用一个已经存在的对象去初始化同类型的新对象 复制构造函数被调用的三种情况 1 定义了一个对象 以本类另一个对象作为初始值 发生复制构造 2 如果函数的形参是
  • matlab中input输入多个数_基于MATLAB的PID控制算法仿真

    一 初学者学习目的 1 利用Matlab Simulink实现PID控制算法 2 观察不同PID参数对控制性能的影响 3 掌握PID参数整定的方法 二 实践内容 1 以二阶系统 为被控对象 K 135 在阶跃输入信号的作用下 用simuli
  • keil添加了头文件仍然报找不到头文件的原因

    1 如果工程中有中文路径 keil是无法识别中文路径的 需要修改为英文路径 2 如果工程中的路径存在数字开头 则keil无法识别该路径 需要修改为以英文字符开头 3 如果修改了工程中的文件夹名 则需要重新将文件夹包含到工程中 4 点击魔术棒
  • Datax插件二次开发之HdfsWriter支持parquet

    Datax插件二次开发之HdfsWriter支持parquet Date December 24 2021 1 背景 目前 公司的OLAP和AD HOC组件主要使用impala 而当前我们的impala版本支持parquet textfil