Hive深入浅出UDTF

2023-05-16

前面两篇文章我们分析了UDF和UDAF的原理以及实现思路,这一节我们介绍另外一种UDF: UDTF((User-Defined Table-Generating Functions),是用来解决输入一行输出多行的需求的,本节我们来详细分析下UDTF如何实现以及如何与lateral view一起结合使用。

概述

UDTF(User-Defined Table-Generating Functions)是用来解决输入一行输出多行的需求。

执行步骤

要实现UDTF,我们需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,实现initialize, process, close三个方法。

  1. UDTF首先会调用initialize方法,此方法返回UDTF的返回行的信息,返回个数,类型;
  2. 初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数;
  3. 最后close()方法调用,对需要清理的方法进行清理。

explode分析

实现分析

explode是将hive一列中复杂的array或者map结构拆分成多行,只针对array和map两种数据结构有效。

私有变量

首先看下私有变量,inputOI是输入参数的类型,forwardListObj是当输入参数是List时候

@Description(name = "explode",
    value = "_FUNC_(a) - separates the elements of array a into multiple rows,"
      + " or the elements of a map into multiple rows and columns ")
public class GenericUDTFExplode extends GenericUDTF {

  private transient ObjectInspector inputOI = null;
  private transient final Object[] forwardListObj = new Object[1];
  private transient final Object[] forwardMapObj = new Object[2];
}

initialize

initialize初始化函数会对输入参数进行检查,输入参数长度是1,然后初始化输入类型,输出名字以及类型:

  1. 如果category是List,则初始化inputOI类型,然后初始化结果的变量名字为col,并设置结果的类型;
  2. 如果category是Map,则初始化inputOI类型,然后初始化结果的变量名字为key, value,并设置结果的类型key类型和value类型.
@Override
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
  if (args.length != 1) {
    throw new UDFArgumentException("explode() takes only one argument");
  }

  ArrayList<String> fieldNames = new ArrayList<String>();
  ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();

  switch (args[0].getCategory()) {
    case LIST:
      inputOI = args[0];
      fieldNames.add("col");
      fieldOIs.add(((ListObjectInspector)inputOI).getListElementObjectInspector());
      break;
    case MAP:
      inputOI = args[0];
      fieldNames.add("key");
      fieldNames.add("value");
      fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector());
      fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector());
      break;
    default:
      throw new UDFArgumentException("explode() takes an array or a map as a parameter");
  }

  return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
                                                                 fieldOIs);
}

process

我们接下来看下process函数,针对List和Map分别不同处理方法:

  1. List类别,首先判断list空时候直接返回,然后如果不空,则对于每个值,放入到forwardListObj中;
  2. Map类别,需要抽取出来key和value,然后放入forwardMapObj中;

最后调用父类的forward方法,进行collect。

@Override
public void process(Object[] o) throws HiveException {
  switch (inputOI.getCategory()) {
    case LIST:
      ListObjectInspector listOI = (ListObjectInspector)inputOI;
      List<?> list = listOI.getList(o[0]);
      if (list == null) {
        return;
      }
      for (Object r : list) {
        forwardListObj[0] = r;
        forward(forwardListObj);
      }
      break;
    case MAP:
      MapObjectInspector mapOI = (MapObjectInspector)inputOI;
      Map<?,?> map = mapOI.getMap(o[0]);
      if (map == null) {
        return;
      }
      for (Entry<?,?> r : map.entrySet()) {
        forwardMapObj[0] = r.getKey();
        forwardMapObj[1] = r.getValue();
        forward(forwardMapObj);
      }
      break;
    default:
      throw new TaskExecutionException("explode() can only operate on an array or a map");
  }
}

我们来看下父类中forward方法,主要是将输出传递给下游:

Collector collector = null;

protected final void forward(Object o) throws HiveException {
  collector.collect(o);
}

close

close没有进行任何操作。

@Override
public void close() throws HiveException {
}  

使用方法

explode使用

UDTF有两种使用方法,一种直接放到select后面,一种和lateral view一起使用,直接select中使用:

select explode_map(properties) as (col1,col2) from src;
select explode(arraycol) as newcol from tablename;

但是我们在使用这个函数时候,有以下限制:

  1. 不可以添加其他字段使用:select xx, explode_map(properties) as (col1, col2) from src
  2. 不可以嵌套调用:select explode(explode(properties)) from src
  3. 不可以和group by/cluster by/distribute by/sort by一起使用:select explode_map(properties) as (col1,col2) from src group by col1, col

lateral view

使用介绍

为了解决UDTF在select中无法跟其他字段一起使用这个问题,我们就需要引入 LATERAL VIEWLATERAL VIEW 对遇到的每一行,首先会按UDTF表达式进行处理,展开成若干行(可能是零行),然后将这些输出行与输入行INNER JOIN。如果要保留输出为零的行,则需使用 LATERAL VIEW OUTER 执行 OUTER JOIN

A lateral view first applies the UDTF to each row of base table and then joins resulting output rows to the input rows to form a virtual table having the supplied table alias.

在正常解析一个有值的Array时,用lateral view explode是完全ok的,但是,当遇到该Array为空时,如果在使用该函数,就会导致该条记录消失,这样的结果就会导致我们漏掉一部分信息。这时,就要用到lateral view outer explode

select src.id, mytable.col1, mytable.col2 from src lateral view explode_map(properties) mytable as col1, col2;

实现分析

Lateral view与UDTF函数一起使用,UDTF对每个输入行产生0或者多个输出行。Lateral view首先在基表的每个输入行应用UDTF,然后连接结果输出行与输入行组成拥有指定表别名的虚拟表。那么这个过程会产生shuffle吗?为什么会,或者不会?我们接下来来分析下,首先看下下面这个sql的执行计划,然后分析这个过程:

hive> explain select a, b from t lateral view explode(b_array) b_array as b;
OK
STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-0 depends on stages: Stage-1

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: t
            Statistics: Num rows: 1000 Data size: 7534 Basic stats: COMPLETE Column stats: NONE
            Lateral View Forward
              Statistics: Num rows: 1000 Data size: 7534 Basic stats: COMPLETE Column stats: NONE
              Select Operator
                expressions: a (type: string)
                outputColumnNames: a
                Statistics: Num rows: 1000 Data size: 7534 Basic stats: COMPLETE Column stats: NONE
                Lateral View Join Operator
                  outputColumnNames: _col48, _col57
                  Statistics: Num rows: 746 Data size: 1506 Basic stats: COMPLETE Column stats: NONE
                  Select Operator
                    expressions: _col48 (type: string), _col57 (type: string)
                    outputColumnNames: _col0, _col1
                    Statistics: Num rows: 746 Data size: 1506 Basic stats: COMPLETE Column stats: NONE
                    File Output Operator
                      compressed: false
                      Statistics: Num rows: 746 Data size: 1506 Basic stats: COMPLETE Column stats: NONE
                      table:
                          input format: org.apache.hadoop.mapred.TextInputFormat
                          output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                          serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
              Select Operator
                expressions: b_array (type: array<string>)
                outputColumnNames: _col0
                Statistics: Num rows: 373 Data size: 753 Basic stats: COMPLETE Column stats: NONE
                UDTF Operator
                  Statistics: Num rows: 373 Data size: 753 Basic stats: COMPLETE Column stats: NONE
                  function name: explode
                  Lateral View Join Operator
                    outputColumnNames: _col48, _col57
                    Statistics: Num rows: 746 Data size: 1506 Basic stats: COMPLETE Column stats: NONE
                    Select Operator
                      expressions: _col48 (type: string), _col57 (type: string)
                      outputColumnNames: _col0, _col1
                      Statistics: Num rows: 746 Data size: 1506 Basic stats: COMPLETE Column stats: NONE
                      File Output Operator
                        compressed: false
                        Statistics: Num rows: 746 Data size: 1506 Basic stats: COMPLETE Column stats: NONE
                        table:
                            input format: org.apache.hadoop.mapred.TextInputFormat
                            output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                            serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

我们来将这个执行计划图形化:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-d6AdjEcv-1607418438935)(/Users/lidongmeng/Library/Application Support/typora-user-images/image-20201208162319137.png)]

可以看出来,这个sql 经历了两条线,一条线只进行了选择,一条线进行了udtf,然后进行Lateral view joinTableScanSelect operator都比较简单了,我们来重点看下Lateral view的两个类以及UDTFOperator

LateralViewForwardOperator

这个Operator几乎没做什么,只是将数据输送出去。

// org.apache.hadoop.hive.ql.exec.LateralViewForwardOperator
@Override
public void processOp(Object row, int tag) throws HiveException {
  forward(row, inputObjInspectors[tag]);
}

UDTFOperator

UDTFOperator主要是进行UDTF函数处理,当UDTF不产生任何行时,比如explode()函数的输入列为空,LATERAL VIEW就不会生成任何输出行。在这种情况下原有行永远不会出现在结果中。OUTRE可被用于阻止这种情况,输出行中来自UDTF的列将被设置为NULL。

// org.apache.hadoop.hive.ql.exec.UDTFOperator
// 如果设置了out,则如果UDTF为空时候设置默认值。
if (conf.isOuterLV()) {
  outerObj = Arrays.asList(new Object[udtfOutputOI.getAllStructFieldRefs().size()]);
}

@Override
public void processOp(Object row, int tag) throws HiveException {
  // The UDTF expects arguments in an object[]
  StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
  List<? extends StructField> fields = soi.getAllStructFieldRefs();

  for (int i = 0; i < fields.size(); i++) {
    objToSendToUDTF[i] = soi.getStructFieldData(row, fields.get(i));
  }

  //真正处理数据的是 genericUDTF的某个实现类,比如,explode,那就是GenericUDTFExplode.java 的process
  genericUDTF.process(objToSendToUDTF);
  //这里判断一下有没有outer关键字,有out关键字如果UDTF输出为空,也需要添加,避免漏行
  if (conf.isOuterLV() && collector.getCounter() == 0) {
    collector.collect(outerObj);
  }
  collector.reset();
}

LateralViewJoinOperator

我们来重点看下LateralViewJoinOperator这个实现,步骤如下:

  1. 首先判断标识是我们上面画的图的左半部分还是有半部分;
  2. 左半部分,只是将数据保存起来,左半部分只有一条数据;
  3. 右半部分,则需要跟左半部分进行合并,右半部分有多行。

所以可以看出来这里的join并不需要Shuffle操作,只是连接一起即可。

// org.apache.hadoop.hive.ql.exec.LateralViewJoinOperator
// acc is short for accumulator. It's used to build the row before forwarding
ArrayList<Object> acc = new ArrayList<Object>();
// selectObjs hold the row from the select op, until receiving a row from
// the udtf op
ArrayList<Object> selectObjs = new ArrayList<Object>();

@Override
public void processOp(Object row, int tag) throws HiveException {
  StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
  if (tag == SELECT_TAG) { // select部分
    selectObjs.clear();
    selectObjs.addAll(soi.getStructFieldsDataAsList(row));
  } else if (tag == UDTF_TAG) { // udtf产生部分
    acc.clear();
    acc.addAll(selectObjs);
    acc.addAll(soi.getStructFieldsDataAsList(row)); // 添加到里面而已
    forward(acc, outputObjInspector);
  } else {
    throw new HiveException("Invalid tag");
  }
}

总结

所以经过上述分析,我们知道了Lateral view 是不会发生Shuffle的,从执行计划中也可以看出来没有reduce任务,这里的LateralViewJoinOperator代表的是两份数据联接到一起的意思,并不是真正的意义上的join;另外如果我们对与数组或者map是NULL的行也要统计处理的话,一定要记住使用out关键词。

Hive内部UDTF

最后我们看一下Hive内部自带的UDTF。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-spkA5QDN-1607418438939)(/Users/lidongmeng/Library/Application Support/typora-user-images/image-20201208120405862.png)]

参考

  1. https://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888819.html
  2. https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF)
  3. https://www.jianshu.com/p/97164a56a19c
  4. https://zhuanlan.zhihu.com/p/137482744
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Hive深入浅出UDTF 的相关文章

随机推荐

  • 【可解释AI】图神经网络的可解释性方法及GNNexplainer代码示例

    图神经网络的可解释性方法及GNNexplainer代码示例 GNNExplainerIntroductionModelSingle instance explanations xff08 Explanation via Structural
  • 文本编辑器VI命令详解

    目录 一 xff1a 文本编辑器概述 1 文本编辑器含义 2 文本编辑器的作用 3 Linux中最常见的文本编辑器 二 vi编辑器的工作模式 1 vi编辑器的工作模式 2 各模式之间的切换 三 xff1a 命令模式概述 1 命令模式常用操作
  • Linux中与“内核安全”相关的数据结构

    五 内核安全相关数据结构 5 1 security operations结构体 这是一个钩子函数的指针数组 xff0c 其中每一个数组元素都是一个SELINUX安全钩子函数 xff0c 在2 6以上的内核中 xff0c 大部分涉及安全控制的
  • 洛谷 P3366 【模板】最小生成树

    题目描述 如题 xff0c 给出一个无向图 xff0c 求出最小生成树 xff0c 如果该图不连通 xff0c 则输出orz 输入输出格式 输入格式 xff1a 第一行包含两个整数N M xff0c 表示该图共有N个结点和M条无向边 xff
  • 关于网站最近出现504错误的总结,too open many files in system

    如果你有耐心看完这篇文章 xff0c 也许会给你带来真正的益处 网站出现504错误 xff0c 如果你用阿里云CDN的话还会报 504 Gateway Time out The gateway did not receive a timel
  • Manjaro21安装VNC,Win10远程连接manjaro桌面

    manjaro安装tigervnc xff0c win10使用VNC viewer TigerVNC 简体中文 ArchWiki archlinux org https wiki archlinux org title TigerVNC E
  • Proxmox虚拟环境搭建

    一 Proxmox VE简介 ProxmoxVE 是一个完整的 开源的企业虚拟化服务器管理平台 它在单个平台上紧密集成了 KVM 管理程序和 Linux 容器 LXC 软件定义的存储和网络功能 通过集成的基于 web 的用户界面 xff0c
  • HEX2DEC存储过程实现

    数据库当前有十进制转换为十六进制的函数hex 函数 xff0c 却没有十六进制转换为十进制的函数 xff0c 只能自己定义一个hex2dec xff0c 存储过程如下 xff1a span class token keyword drop
  • SQLite数据类型引起的问题——全数字字符串使用varchar出现错误

    问题 xff1a 项目中需要把某些数据保存到Android的数据库中 xff0c 因为保存的字符串全部为数字形式 xff0c SQLite把部分字符串自动转化为了科学技术法导致数据显示异常 xff0c 同时还把一些开头为0的字符串自动去掉了
  • IOS 自定义UIAlertController

    自定义UIAlertController xff1a 首先展示效果图 1 创建一个新的类来管理弹出的视图 继承于UIView 2 传建一个xib文件来自定义弹出视图 xff08 注意创建过后一定要将xib的class关联 xff09 3 在
  • python把txt文件里重复数据去重代码

    有时候会发现txt文件里有很多重复数据 xff0c 这里自写了一个去重的python程序 xff0c 供学习使用 xff01 def quchong print 39 39 50 print 39 导入txt文件中 39 num 61 0
  • ERROR 1064 (42000): You have an error in your SQL

    对于新手来说 xff0c MySQL数据库 xff0c 在命令行使用sql语句进行建库 xff0c 查库 xff0c 建表 xff0c 查表 时 xff0c MySQL 报错 xff1a ERROR 1064 42000 You have
  • 【图神经网络】GNNExplainer代码解读及其PyG实现

    GNNExplainer代码解读及其PyG实现 使用GNNExplainerGNNExplainer源码速读前向传播损失函数 基于GNNExplainer图分类解释的PyG代码示例参考资料 接上一篇博客图神经网络的可解释性方法及GNNexp
  • python之自动发送微信消息

    这篇文章主要是总结最近写自动发送微信消息的python代码时所接触的两个库 pyautogui和pyperclip的用法 在网上找了很多能实现发送微信消息的方法 xff0c 其中有使用itchat和wxpy库来实现的 xff0c 尝试过后发
  • CSU1646: HearthStone(DP)

    Description Henry十分钟爱炉石传说 Heart Stone 这款有趣的桌面卡牌游戏 我们简化的游戏规则如下 xff1a 游戏由两人对战 xff0c 出牌并尽量对对方造成最大的伤害 xff0c 一共进行 r轮 前10轮 xff
  • System.DllNotFoundException:“无法加载 DLL“XXX.dll”: 找不到指定的模块。 (异常来自 HRESULT:0x8007007E)。”

    System DllNotFoundException 无法加载 DLL XXX dll 找不到指定的模块 异常来自 HRESULT 0x8007007E 一般这种情况需要按是否安装完整了vc的运行时 xff0c 可以尝试安装 VC运行库合
  • Arch-004ArchLinux搜狗输入法安装

    搜狗输入法 1 sudo pacman Rsn fcitx im fcitx configtool 2 sudo pacman S fcitx lilydjwg git fcitx sogoupinyin 3 sudo pacman S f
  • AArch64中va_list/va_start/va_arg/...的实现

    版权声明 xff1a 本文为笔者本人 ashimida 64 的原创文章 xff0c 遵循CC 4 0 BY SA版权协议 xff0c 转载请附上原文出处链接及本声明 原文链接 xff1a https blog csdn net lidan
  • 51单片机外部中断示例

    void Usart INT0 init TMOD 61 0X21 TH1 61 0XFD TL1 61 0XFD SM0 61 0 SM1 61 1 REN 61 1 TR1 61 1 ES 61 1 串口中断影响外部中断0 这句话会让程
  • Hive深入浅出UDTF

    前面两篇文章我们分析了UDF和UDAF的原理以及实现思路 xff0c 这一节我们介绍另外一种UDF UDTF User Defined Table Generating Functions xff0c 是用来解决输入一行输出多行的需求的 x