DataX读取Hive Orc格式表丢失数据处理记录

2023-11-12

问题

问题概述

DataX读取Hive Orc存储格式表数据丢失

问题详细描述

同步Hive表将数据发送到Kafka,Hive表A数据总量如下

SQL:select count(1) from A;
数量:19397281

使用DataX将表A数据发送到Kafka,最终打印读取数据量为12649450

任务总计耗时                    :               1273s
任务平均流量                    :            2.51MB/s
记录写入速度                    :           9960rec/s
读出记录总数                    :            12649450
读写失败总数                    :                   0

在kafka中查询发送的数据为12649449(有一条垃圾数据被我写自定义KafkaWriter过滤掉了,这里忽略即可)

在这里插入图片描述

原因

DataX读取HDFS Orce文件,代码有bug,当读取Hive文件大于BlockSize时会丢失数据,问题代码如下:

InputSplit[] splits = in.getSplits(conf, 1);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);

代码位置:

hdfsreader模块,com.alibaba.datax.plugin.reader.hdfsreader.DFSUtil类334行代码(源码为v202210版本)

这里当文件大于BlockSize大小会将文件分为多个,但是下面只取了第一个文件splits[0],其他数据就会丢失

我们发现问题后,去验证一下,hive表存储目录查询文件存储大小如下

$ hdfs dfs -du -h  /usr/hive/warehouse/dwd.db/A/dt=2022-05-05
....
518.4 K   1.5 M    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000171_0
669.7 M   2.0 G    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000172_0
205.6 K   616.9 K  /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000173_0
264.6 K   793.9 K  /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000174_0
1.4 M     4.3 M    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000175_0
1.5 M     4.6 M    /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000176_0
....

发现果然有文件669.7 M 2.0 G /usr/hive/warehouse/dwd.db/A/dt=2022-05-05/000172_0大于BlockSize大小

解决方法

修改源码

修改后方法源码如下,直接替换DFSUtil.javaorcFileStartRead方法即可

public void orcFileStartRead(
            String sourceOrcFilePath,
            Configuration readerSliceConfig,
            RecordSender recordSender,
            TaskPluginCollector taskPluginCollector) {
        LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
        List<ColumnEntry> column =
                UnstructuredStorageReaderUtil.getListColumnEntry(
                        readerSliceConfig,
                        com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
        String nullFormat =
                readerSliceConfig.getString(
                        com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
        StringBuilder allColumns = new StringBuilder();
        StringBuilder allColumnTypes = new StringBuilder();
        boolean isReadAllColumns = false;
        int columnIndexMax = -1;
        // 判断是否读取所有列
        if (null == column || column.size() == 0) {
            int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
            columnIndexMax = allColumnsCount - 1;
            isReadAllColumns = true;
        } else {
            columnIndexMax = getMaxIndex(column);
        }
        for (int i = 0; i <= columnIndexMax; i++) {
            allColumns.append("col");
            allColumnTypes.append("string");
            if (i != columnIndexMax) {
                allColumns.append(",");
                allColumnTypes.append(":");
            }
        }
        if (columnIndexMax >= 0) {
            JobConf conf = new JobConf(hadoopConf);
            Path orcFilePath = new Path(sourceOrcFilePath);
            Properties p = new Properties();
            p.setProperty("columns", allColumns.toString());
            p.setProperty("columns.types", allColumnTypes.toString());
            try {
                OrcSerde serde = new OrcSerde();
                serde.initialize(conf, p);
                StructObjectInspector inspector =
                        (StructObjectInspector) serde.getObjectInspector();
                InputFormat<?, ?> in = new OrcInputFormat();
                FileInputFormat.setInputPaths(conf, orcFilePath.toString());

                // If the network disconnected, will retry 45 times, each time the retry interval
                // for 20 seconds
                // Each file as a split
                InputSplit[] splits = in.getSplits(conf, -1);
                for (InputSplit split : splits) {
                    {
                        RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
                        Object key = reader.createKey();
                        Object value = reader.createValue();
                        // 获取列信息
                        List<? extends StructField> fields = inspector.getAllStructFieldRefs();

                        List<Object> recordFields;
                        while (reader.next(key, value)) {
                            recordFields = new ArrayList<Object>();

                            for (int i = 0; i <= columnIndexMax; i++) {
                                Object field = inspector.getStructFieldData(value, fields.get(i));
                                recordFields.add(field);
                            }
                            transportOneRecord(
                                    column,
                                    recordFields,
                                    recordSender,
                                    taskPluginCollector,
                                    isReadAllColumns,
                                    nullFormat);
                        }
                        reader.close();
                    }
                    // transportOneRecord(column, recordFields, recordSender,
                    //        taskPluginCollector, isReadAllColumns, nullFormat);
                }
                // reader.close();
            } catch (Exception e) {
                String message =
                        String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。", sourceOrcFilePath);
                LOG.error(message);
                throw DataXException.asDataXException(HdfsReaderErrorCode.READ_FILE_ERROR, message);
            }
        } else {
            String message =
                    String.format(
                            "请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s",
                            JSON.toJSONString(column));
            throw DataXException.asDataXException(HdfsReaderErrorCode.BAD_CONFIG_VALUE, message);
        }
    }

修改完成后将源码打包,打包后在hdfsreader模块下target目录有target/hdfsreader-0.0.1-SNAPSHOT.jar文件,将文件上传到部署服务器上的目录datax/plugin/reader/hdfsreader下,替换之前的包.


[hadoop@10 /datax/plugin/reader/hdfsreader]$ pwd
/datax/plugin/reader/hdfsreader
[hadoop@10 /datax/plugin/reader/hdfsreader]$ ll
total 52
-rw-rw-r-- 1 hadoop hadoop 28828 May 11 14:08 hdfsreader-0.0.1-SNAPSHOT.jar
drwxrwxr-x 2 hadoop hadoop  8192 Dec  9 15:06 libs
-rw-rw-r-- 1 hadoop hadoop   217 Oct 26  2022 plugin_job_template.json
-rw-rw-r-- 1 hadoop hadoop   302 Oct 26  2022 plugin.json

验证

重新启动DataX同步脚本,发现同步数据与Hive表保存数据一致。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

DataX读取Hive Orc格式表丢失数据处理记录 的相关文章

  • Hive(查找连续 n 列中的最小值)

    我在 Hive 中有一个表 有 5 列 即电子邮件 a first date b first date c first date d first date a b c d 是用户可以执行的 4 个不同操作 上表中的 4 列表示用户执行第一个
  • Hive 中字符串数据类型是否有最大大小?

    谷歌了很多 但没有在任何地方找到它 或者这是否意味着只要允许集群 Hive 就可以支持任意大字符串数据类型 如果是这样 我在哪里可以找到我的集群可以支持的最大字符串数据类型大小 提前致谢 Hive 列表的当前文档STRING作为有效的数据类
  • 使用 UDF 添加文件读取添加到 Hive 资源的文件

    我想知道如何读取使用添加的 Hive 资源ADD FILE来自乌德夫 例如 Hive gt add file users temp key jks Java中的UDF可以读取这个文件吗 在 Udf 中获取此文件的路径是什么 谢谢 大卫 一旦
  • Hive查询快速查找表大小(行数)

    是否有 Hive 查询可以快速查找表大小 即行数 而无需启动耗时的 MapReduce 作业 这就是为什么我想避免COUNT I tried DESCRIBE EXTENDED 但这产生了numRows 0这显然是不正确的 对新手问题表示歉
  • 在 Hadoop 中按文件中的值排序

    我有一个文件 其中每行包含一个字符串 然后是一个空格 然后是一个数字 例子 Line1 Word 2 Line2 Word1 8 Line3 Word2 1 我需要按降序对数字进行排序 然后将结果放入文件中 为数字分配排名 所以我的输出应该
  • AWS Athena 扁平化来自嵌套 JSON 源的数据

    我想从 Athena 中的嵌套 JSON 创建一个表 描述的解决方案here http docs aws amazon com athena latest ug json html使用 hive Openx JsonSerDe 等工具尝试在
  • 使用 Hadoop 映射两个数据集

    假设我有两个键值数据集 数据集A和B 我们称它们为数据集A和B 我想用 B 组的数据更新 A 组中的所有数据 其中两者在键上匹配 因为我要处理如此大量的数据 所以我使用 Hadoop 进行 MapReduce 我担心的是 为了在 A 和 B
  • 在 Hadoop 中处理带标头的文件

    我想在 Hadoop 中处理很多文件 每个文件都有一些头信息 后面跟着很多记录 每个记录都存储在固定数量的字节中 对此有何建议 我认为最好的解决方案是编写一个自定义的InputFormat http hadoop apache org co
  • 使用 python 从 hive 读取数据时的性能问题

    我在 hive 中有一个表 其中包含 351 837 110 MB 大小 记录 我正在使用 python 读取该表并写入 sql server 在此过程中 从 hive 读取数据到 pandas dataframe 需要很长时间 当我加载整
  • 获取 emr-ddb-hadoop.jar 将 DynamoDB 与 EMR Spark 连接

    我有一个 DynamoDB 表 需要将其连接到 EMR Spark SQL 才能对该表运行查询 我获得了带有发行标签 emr 4 6 0 和 Spark 1 6 1 的 EMR Spark Cluster 我指的是文档 使用 Spark 分
  • 这个 Java 语法是什么意思? [复制]

    这个问题在这里已经有答案了 可能的重复 java中的是什么意思 https stackoverflow com questions 12649572 what does the type in java mean 在下面的代码中 Itera
  • 当气流 initdb 时,导入错误:无法导入名称 HiveOperator

    我最近安装了airflow对于我的工作流程 在创建项目时 我执行了以下命令 airflow initdb 返回以下错误 2016 08 15 11 17 00 314 init py 36 INFO Using executor Seque
  • Namenode高可用客户端请求

    谁能告诉我 如果我使用java应用程序请求一些文件上传 下载操作到带有Namenode HA设置的HDFS 这个请求首先去哪里 我的意思是客户端如何知道哪个名称节点处于活动状态 如果您提供一些工作流程类型图或详细解释请求步骤 从开始到结束
  • 获取行 HBase 的特定列族中的列

    我正在编写一个应用程序 通过 JSP 显示 HBase 中特定表中的数据 我想获取一行的特定列族中的所有列 有什么办法可以做到这一点吗 public String getColumnsInColumnFamily Result r Stri
  • 在 Amazon EMR 上使用 java 中的 hbase 时遇到问题

    因此 我尝试使用作为 MapReduce 步骤启动的自定义 jar 来查询 Amazon ec2 上的 hbase 集群 我的 jar 在地图函数内 我这样调用 Hbase public void map Text key BytesWri
  • 如何通过Python访问Hive?

    https cwiki apache org confluence display Hive HiveClient HiveClient Python https cwiki apache org confluence display Hi
  • 在 Hive 中获取数据的交集

    我在配置单元中有以下数据 userid cityid 1 15 2 15 1 7 3 15 2 8 3 9 3 7 我只想保留具有 cityid 15 和 cityid 7 的用户 ID 在我的示例中 它将是用户 ID 1 和 3 我试过
  • 为 Presto 和 AWS S3 设置独立 Hive Metastore 服务

    我工作的环境中使用 S3 服务作为数据湖 但没有 AWS Athena 我正在尝试设置 Presto 以便能够查询 S3 中的数据 并且我知道我需要通过 Hive Metastore 服务将数据结构定义为 Hive 表 我正在 Docker
  • 如何在蜂巢中的每个组中按计数 desc 进行排序?

    这是 HQL select A B count as cnt from test table group by A B order by cnt desc 示例输出如下 a1 b1 5 a2 b1 3 a1 b2 2 a2 b2 1 但我想
  • hive sql查找最新记录

    该表是 create table test id string name string age string modified string 像这样的数据 id name age modifed 1 a 10 2011 11 11 11 1

随机推荐

  • 解决MyBatis-Plus分页查询

    在使用Spring Boot或者Spring Cloud开发业务时 经常会需要查数据库 本文以MySQL数据库为例 这时候通常会用到MyBatis 数据量比较多页面展示就会要求分页 接下来正式开始 1 Spring工程创建和添加Maven依
  • HDU - 1272 小希的迷宫之独木桥(并查集的简单应用)

    小希的迷宫 Time Limit 2000 1000 MS Java Others Memory Limit 65536 32768 K Java Others Total Submission s 51951 Accepted Submi
  • 作为一个面试官,我是怎么来面试测试人员的?

    其实之前关于面试也说了好多 知乎上我也开过一个面试的Live 也有幸被选进了知乎2016精选 不过今天我想说的是在实际过程中如果我去面试了 我会怎么进行面试 会问什么问题 会遵照哪些原则 我本身的行事风格就是比较特殊的 希望对广大应聘者和面
  • pragma once

    在C C 中 pragma once是一个非标准但是被广泛支持的方式 pragma once方式产生于 ifndef之后 ifndef方式受C C 语言标准的支持 不受编译器的任何限制 而 pragma once方式有些编译器不支持 较老编
  • 计算机显卡和cpu的关系,cpu和显卡的关系

    大家好 我是时间财富网智能客服时间君 上述问题将由我为大家进行解答 cpu和显卡的关系是都是计算机重要的硬件 CPU就是中央处理器 电脑中的所有命令几乎都要通过处理器来处理 可以将他简单理解为对数据初加工 而显卡主要是对图形进行处理 它能根
  • 机器学习---算法基础(八)SVM

    参考文献 机器学习数学 拉格朗日对偶问题 拉格朗日对偶问题 为什么支持向量机要用拉格朗日对偶算法来解最大化间隔问题 零基础学SVM Support Vector Machine 一 1 SVM概念 支持向量机 英语 support vect
  • SpringBoot实验合集(持续更新中...)

    实验一 使用Spring Boot构建应用程序 一 实验目的 1 掌握使用IntelliJ IDEA创建Spring Boot应用程序的方法 2 了解spring boot starter parent的配置内容 3 掌握如何利用Start
  • 如何用递归解决逆波兰表达式问题?

    描述 逆波兰表达式是一种把运算符前置的算术表达式 例如普通的表达式2 3的逆波兰表示法为 2 3 逆波兰表达式的优点是运算符之间不必有优先级关系 也不必用括号改变运算次序 例如 2 3 4的逆波兰表示法为 2 3 4 本题求解逆波兰表达式的
  • 蓝桥杯2023年真题 python B组

    第十四届蓝桥杯大赛软件赛省赛 Python 大学 B 组 Python 大学 B 组 试题 A 2023 本题总分 5 分 问题描述 请求出在 12345678 至 98765432 中 有多少个数中完全不包含 2023 完全不包含 202
  • 微服务搭建后端项目

    1 搭建分析 2 开始搭建父项目 父项目选SpringBoot项目 如果使用的idea社区版的话 那就创建maven项目导入如下依赖
  • 索引,元素下标,Java ListIterator 中的 nextIndex() 和 next();

    索引 元素下标 Java ListIterator 中的 nextIndex 和 next 问题 previousIndex 输出前一个元素的下标 索引 nextIndex 输出下一个元素的下标 索引 public static void
  • 使用css动画实现网易云音乐播放界面波浪动画效果

    通过实现CSS实现仿网易云音乐播放界面动画效果 最终的效果如下 界面布局 图片也是实现滚动效果的 使用四个div 来标识每一帧波动的效果 div class container wrap div class container div cl
  • 基于开路电压测量(OCV)的电量计获取锂离子(Li+)电池参数

    获取li 电池参数的步骤 确定满电量和空电量点 提取Li 电池参数的最佳方法是创建一个尽可能与实际应用接近的环境 其中包括保护电路 放电曲线 包括实际应用中有效电流和待机电流的典型值 以及充电曲线 因此需要模拟电池充电和放电过程 并监控和记
  • 地图切片的概念与原理

    为什么80 的码农都做不了架构师 gt gt gt 定义 地图切片 采用预生成的方法存放在服务器端 然后根据用户提交的不同请求 把相应的地图瓦片发送给客户端的过程 它是一种多分辨率层次模型 从瓦片金字塔底层到顶层 分辨率越来越低 但表示的地
  • Redis实现限流的三种方式

    一 固定窗口 所谓固定窗口限流即时间窗口的起始和结束时间是固定的 在固定时间段内允许要求的请求数量访问 超过则拒绝 当固定时间段结束后 再重新开始下一个时间段进行计数 我们可以根据当前的时间 以分钟为时间段 每分钟都生成一个key 用来in
  • Elasticlunr.js 支持其他语言 V0.9.5

    之前一直没有处理其他的语言的需求 所以没有测试elasticlunr js对于其他的语言的支持 多亏了Github的网友 帮忙发现了elasticlunr js对于其他语言支持的问题 昨天 elasticlunr js发布了V0 9 5版本
  • 苹果手机各种尺寸详细表以及iPhoneX、iPhoneXS、iPhoneXR、iPhoneXSMax、iPhone 11、iPhone 12、屏幕适配

    iPhone设备 物理分辨率是硬件所支持的 逻辑分辨率是软件可以达到的 代数 设备 操作系统 逻辑分辨率 point 物理分辨率 pixel 屏幕尺寸 对角线长度 缩放因子 iPhone 第一代 iPhone 2G iOS 1 320 x
  • Centos7 安装Hadoop3 单机版本(伪分布式版本)

    环境版本 CentOS 7 JDK 8 Hadoop 3 CentOS 7 服务器设置 设置静态IP 查看IP配置在 etc sysconfig network scripts 目录下的ifcfg ens33文件中 root Hadoop3
  • requests设置代理ip------验证代理ip是否可用

    文章目录 1 代理ip设置 1 0 透明 普匿 高匿ip区别 1 1 代理设置格式 1 2 详细测试 1 3 报错407 2 验证ip是否可用demo 遇到不可用ip程序会停止 2 1 验证网站 2 2 代码及结果 2 2 1 http h
  • DataX读取Hive Orc格式表丢失数据处理记录

    文章目录 问题 问题概述 问题详细描述 原因 解决方法 修改源码 验证 问题 问题概述 DataX读取Hive Orc存储格式表数据丢失 问题详细描述 同步Hive表将数据发送到Kafka Hive表A数据总量如下 SQL select c