DataX同步Hive数据丢失,源码修复

2023-10-27

DataX简介

在这里插入图片描述

DataX 是阿里云 DataWorks数据集成 的开源版本,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 等各种异构数据源之间高效的数据同步功能。

DataX 商业版本

阿里云DataWorks数据集成是DataX团队在阿里云上的商业化产品,致力于提供复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动能力,以及繁杂业务背景下的数据同步解决方案。目前已经支持云上近3000家客户,单日同步数据超过3万亿条。DataWorks数据集成目前支持离线50+种数据源,可以进行整库迁移、批量上云、增量同步、分库分表等各类同步解决方案。2020年更新实时同步能力,2020年更新实时同步能力,支持10+种数据源的读写任意组合。提供MySQL,Oracle等多种数据源到阿里云MaxCompute,Hologres等大数据引擎的一键全增量同步解决方案。

商业版本参见: https://www.aliyun.com/product/bigdata/ide

DataX的特点

DataX本身作为数据同步框架,将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件,理论上DataX框架可以支持任意数据源类型的数据同步工作。同时DataX插件体系作为一套生态系统, 每接入一套新数据源该新加入的数据源即可实现和现有的数据源互通。

DataX同步Hive数据丢失

使用Datax进行两个集群间的数据同步,在读取HDFS大文件数据时,存在出现数据丢失问题。从上文我们知道DataX的数据同步原理,就是将不同数据源的同步抽象为从源头数据源读取数据的Reader插件,以及向目标端写入数据的Writer插件。为了适配各种异构的数据存储介质,DataX源码在设计的时候针对不同的数据源编写了相应的Reader插件和Writer插件。既然问题是在数据源读取就存在数据丢失的问题,我们不妨看看DataX得源码实现。

DataX的Hive数据源HdfsReader插件

HdfsReader实现了从Hadoop分布式文件系统Hdfs中读取文件数据并转为DataX协议的功能。textfile是Hive建表时默认使用的存储格式,数据不做压缩,本质上textfile就是以文本的形式将数据存放在hdfs中,对于DataX而言,HdfsReader实现上类比TxtFileReader,有诸多相似之处。orcfile,它的全名是Optimized Row Columnar file,是对RCFile做了优化。据官方文档介绍,这种文件格式可以提供一种高效的方法来存储Hive数据。HdfsReader利用Hive提供的OrcSerde类,读取解析orcfile文件的数据。目前HdfsReader支持的功能如下:

  1. 支持textfile、orcfile、rcfile、sequence file和csv格式的文件,且要求文件内容存放的是一张逻辑意义上的二维表。
  2. 支持多种类型数据读取(使用String表示),支持列裁剪,支持列常量
  3. 支持递归读取、支持正则表达式("*“和”?")。
  4. 支持orcfile数据压缩,目前支持SNAPPY,ZLIB两种压缩方式。
  5. 多个File可以支持并发读取。
  6. 支持sequence file数据压缩,目前支持lzo压缩方式。
  7. csv类型支持压缩格式有:gzip、bz2、zip、lzo、lzo_deflate、snappy。
  8. 目前插件中Hive版本为1.1.1,Hadoop版本为2.7.1(Apache[为适配JDK1.7],在Hadoop 2.5.0, Hadoop 2.6.0 和Hive 1.2.0测试环境中写入正常;其它版本需后期进一步测试;
  9. 支持kerberos认证(注意:如果用户需要进行kerberos认证,那么用户使用的Hadoop集群版本需要和hdfsreader的Hadoop版本保持一致,如果高于hdfsreader的Hadoop版本,不保证kerberos认证有效)

源码暂时未实现的点:

  1. 单个File支持多线程并发读取,这里涉及到单个File内部切分算法。二期考虑支持。
  2. 目前还不支持hdfs HA;

HdfsReader核心实现DFSUtil源码读取orc格式的文件方法 :

public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
                                 RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
        LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
        List<ColumnEntry> column = UnstructuredStorageReaderUtil
                .getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
        String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
        StringBuilder allColumns = new StringBuilder();
        StringBuilder allColumnTypes = new StringBuilder();
        boolean isReadAllColumns = false;
        int columnIndexMax = -1;
        // 判断是否读取所有列
        if (null == column || column.size() == 0) {
            int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
            columnIndexMax = allColumnsCount - 1;
            isReadAllColumns = true;
        } else {
            columnIndexMax = getMaxIndex(column);
        }
        for (int i = 0; i <= columnIndexMax; i++) {
            allColumns.append("col");
            allColumnTypes.append("string");
            if (i != columnIndexMax) {
                allColumns.append(",");
                allColumnTypes.append(":");
            }
        }
        if (columnIndexMax >= 0) {
            JobConf conf = new JobConf(hadoopConf);
            Path orcFilePath = new Path(sourceOrcFilePath);
            Properties p = new Properties();
            p.setProperty("columns", allColumns.toString());
            p.setProperty("columns.types", allColumnTypes.toString());
            try {
                OrcSerde serde = new OrcSerde();
                serde.initialize(conf, p);
                StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector();
                InputFormat<?, ?> in = new OrcInputFormat();
                FileInputFormat.setInputPaths(conf, orcFilePath.toString());

                //If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
                //Each file as a split
                //TODO multy threads
                InputSplit[] splits = in.getSplits(conf, 1);

                RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
                Object key = reader.createKey();
                Object value = reader.createValue();
                // 获取列信息
                List<? extends StructField> fields = inspector.getAllStructFieldRefs();

                List<Object> recordFields;
                while (reader.next(key, value)) {
                    recordFields = new ArrayList<Object>();

                    for (int i = 0; i <= columnIndexMax; i++) {
                        Object field = inspector.getStructFieldData(value, fields.get(i));
                        recordFields.add(field);
                    }
                    transportOneRecord(column, recordFields, recordSender,
                            taskPluginCollector, isReadAllColumns, nullFormat);
                }
                reader.close();
            } catch (Exception e) {
                String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
                        , sourceOrcFilePath);
                LOG.error(message);
                throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
            }
        } else {
            String message = String.format("请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
            throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
        }
    }

对于Hdfs大文件在读取数据的时候会对大文件进行分片/区块的读取,正如上述代码片段:

 								//Each file as a split
                //TODO multy threads
                InputSplit[] splits = in.getSplits(conf, 1);

                RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);

从代码实现可以很容易发现在读取文件的时候只取了分片后的第一个区块的数据,也尚未开启多线程消费多分片的数据,这样就会导致在大文件读取时,存在多分片情况丢失数据的现象。

问题发现后对上述代码进行完善,完善后的代码如下:

public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
                                 RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
        LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
        List<ColumnEntry> column = UnstructuredStorageReaderUtil
                .getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
        String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
        StringBuilder allColumns = new StringBuilder();
        StringBuilder allColumnTypes = new StringBuilder();
        boolean isReadAllColumns = false;
        int columnIndexMax = -1;
        // 判断是否读取所有列
        if (null == column || column.size() == 0) {
            int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
            columnIndexMax = allColumnsCount - 1;
            isReadAllColumns = true;
        } else {
            columnIndexMax = getMaxIndex(column);
        }
        for (int i = 0; i <= columnIndexMax; i++) {
            allColumns.append("col");
            allColumnTypes.append("string");
            if (i != columnIndexMax) {
                allColumns.append(",");
                allColumnTypes.append(":");
            }
        }
        if (columnIndexMax >= 0) {
            JobConf conf = new JobConf(hadoopConf);
            Path orcFilePath = new Path(sourceOrcFilePath);
            Properties p = new Properties();
            p.setProperty("columns", allColumns.toString());
            p.setProperty("columns.types", allColumnTypes.toString());
            try {
                OrcSerde serde = new OrcSerde();
                serde.initialize(conf, p);
                StructObjectInspector inspector = (StructObjectInspector)                 serde.getObjectInspector();
                InputFormat<?, ?> in = new OrcInputFormat();
                FileInputFormat.setInputPaths(conf, orcFilePath.toString());

                //If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
                //Each file as a split
                //TODO multy threads
                InputSplit[] splits = in.getSplits(conf, 1);

                RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
                Object key = reader.createKey();
                Object value = reader.createValue();
                // 获取列信息
                List<? extends StructField> fields = inspector.getAllStructFieldRefs();

                List<Object> recordFields;
                while (reader.next(key, value)) {
                    recordFields = new ArrayList<Object>();

                    for (int i = 0; i <= columnIndexMax; i++) {
                        Object field = inspector.getStructFieldData(value, fields.get(i));
                        recordFields.add(field);
                    }
                    transportOneRecord(column, recordFields, recordSender,
                            taskPluginCollector, isReadAllColumns, nullFormat);
                }
                reader.close();
            } catch (Exception e) {
                String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
                        , sourceOrcFilePath);
                LOG.error(message);
                throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
            }
        } else {
            String message = String.format("请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
            throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
        }
    }

在对原始DataX源码进行调整后,重新对HdfsReader工程模块进行打jar,并覆盖DataX部署的libs目录下的HdfsReader的jar,重启DataX应用后问题得到解决。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

DataX同步Hive数据丢失,源码修复 的相关文章

随机推荐

  • 原理图改动后更新PCB报错的解决方法

    今天我在用Altium Designer 20绘制PCB时遇到了一个情况 在更改了原理图以后再进行Update PCB Document的时候软件会提示错误 截图如下 遇到这种情况时大家往往束手无策而选择重新建立一个空白的PCB板再去生成一
  • Spring MVC传递List类型参数报错:No primary or default constructor found for interface java.util.List]使用两种注解解决

    在测试GenericConverter传递List数组的时候 出现No primary or default constructor found for interface java util List with r这个错误 解决 因为报错
  • 单图像超分辨率重建总结

    单图像超分辨率重建总结 定义 单图像超分辨率重建 Single Image Super resolution Reconstruction SISR 旨在从给定的低分辨率 LR 图像中 重建含有清晰细节特征的高分辨率 HR 图像 是计算机视
  • vue和elementui实现多表格选择和查看已选的功能

    目录 功能描述 数据 HTML js CSS 功能描述 经常碰到需要写多个表格选择 并且可以查看已选项目的功能 实现功能大致如此 左侧是可选择的表 点击表展示每个表里可选字段 然后点击查看已选 可以展示以及选择的表格数据 如下图 涉及敏感字
  • BeanUtils应用,tojavabean

    package com bochy tojavabean import java util HashMap import java util Map import org apache commons beanutils locale co
  • 高清壁纸wallhaven.cc下载图片

    建议不要用多线程 设置延迟 不然会被封IP import requests import os import re import time from concurrent futures import ThreadPoolExecutor
  • GCN代码详解(SEMI-SUPERVISED CLASSIFICATION WITH GRAPH CONVOLUTIONAL NETWORKS(2017ICLR))

    不足之处请大家多多指点 文章目录 链接 代码详解 链接 论文题目 用图卷积网络进行自监督分类 GCN 2017ICLR 论文链接 1609 02907v3 pdf arxiv org 代码链接 tkipf pygcn Graph Convo
  • 总链接地址

    链接 达内博客 链接地址 程序媛泡泡 https blog csdn net weixin 43884234 cgblpx 皮皮虾 https blog csdn net u012932876 闪耀太阳 https blog csdn ne
  • python环境离线打包环境、离线环境迁移、离线环境复制、离线环境克隆

    我们经常使用conda来管理自己环境 时间久了 环境中的依赖错综复杂 有的通过pip或者conda下载 有的是离线安装 有的是通过魔法下载 这时候如果要将本机环境复制到另一台计算机中就会非常麻烦 甚至另一台计算机无法联网 这时候可以使用co
  • uni-app简单介绍

    uni app简单介绍 https uniapp dcloud io 一 什么是uni app uni app 是一个使用 Vue js 开发所有前端应用的框架 开发者编写一套代码 可发布到iOS Android Web 响应式 以及各种小
  • 转:Vue项目中的token验证登录(前端部分)

    声明 此文章是转载 2 问题 做一个登录界面 我选择的是用token进行验证登录 我用的前端框架是Vue js 和 element ui 如何在vue 中使用token进行验证登录 3 思考 1 利用token进行验证登录 用户进行登录操作
  • TF卡目录显示文件夹变0字节的方法

    关于电脑上的目录打不开是什么原因 电脑目录打不开是什么原因这个许多人还不清楚 今天小编来为大家解答目录打不开状况 此刻让好多人一起来瞧瞧吧 TF卡目录显示文件夹变0字节的方法 工具 软件 sayRecy 步骤1 先百度搜索并下载程序打开后
  • Python:赋值,copy和deepcopy区别

    参考 Python赋值 copy deepcopy区别 结论 copy 与deepcopy 之间的主要区别是python对数据的存储方式 python2中 需要import copy模块 python3中 直接可以使用copy 方法 但de
  • Postman —— postman实现参数化

    什么时候会用到参数化 比如 一个模块要用多组不同数据进行测试 验证业务的正确性 Login模块 正确的用户名 密码 成功 错误的用户名 正确的密码 失败 postman实现参数化 在实际的接口测试中 部分参数每次发送请求时都要唯一 比如注册
  • jacoco检测功能或自动化测试覆盖率

    参考文档 http t csdn cn QqCSh http t csdn cn HonVL 目录 下载jacoco 启动jacocoagent监控被测项目 执行手工测试 生成exec文件 生成report报告 jacoco代码覆盖率报告分
  • 【Hexo博客搭建】将其部署到GitHub Pages(二):如何初始化并部署?

    系列目录 Hexo博客搭建 将其部署到 GitHub Pages 一 前期要做哪些准备 Hexo博客搭建 将其部署到 GitHub Pages 二 如何初始化并部署 Hexo博客搭建 将其部署到 GitHub Pages 三 怎么写作以及更
  • awr 自动 mail 发送设置

    1 各节点 AWR 自动生成 script 设置 awrrun autoawr sql cat data run awrrun bin sh cd data awrrpt ORACLE HOME u01 product oracle exp
  • Ubuntu 配置 C/C++ 开发环境

    文章目录 0 更新和升级系统软件 1 安装 build essential 软件包 2 安装 gdb 3 安装 cmake 0 更新和升级系统软件 在终端中输入以下命令对软件进行刷新 保证后续安装的软件都是最新的 sudo apt upda
  • 解决微信小程序安卓手机访问不到图片,无法显示图片

    关于微信小程序不显示图片 通病可能有以下几个可能性 非本地图片 确定图片资源存在 copy 图片url再浏览器打开 确定图片资源存在且能正常访问 本地图片 确定相对路径或者绝对路径正确 微信小程序图片路径 不可以存在中文 使用英文做路径和文
  • DataX同步Hive数据丢失,源码修复

    文章目录 DataX简介 DataX 商业版本 DataX的特点 DataX同步Hive数据丢失 DataX的Hive数据源HdfsReader插件 DataX简介 DataX 是阿里云 DataWorks数据集成 的开源版本 在阿里巴巴集