前面两篇文章我们分析了UDF和UDAF的原理以及实现思路,这一节我们介绍另外一种UDF: UDTF((User-Defined Table-Generating Functions),是用来解决输入一行输出多行的需求的,本节我们来详细分析下UDTF如何实现以及如何与lateral view一起结合使用。
概述
UDTF(User-Defined Table-Generating Functions)是用来解决输入一行输出多行的需求。
执行步骤
要实现UDTF,我们需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
,实现initialize
, process
, close
三个方法。
- UDTF首先会调用
initialize
方法,此方法返回UDTF的返回行的信息,返回个数,类型; - 初始化完成后,会调用process方法,真正的处理过程在process函数中,在process中,每一次forward()调用产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到forward()函数;
- 最后close()方法调用,对需要清理的方法进行清理。
explode分析
实现分析
explode
是将hive一列中复杂的array或者map结构拆分成多行,只针对array和map两种数据结构有效。
私有变量
首先看下私有变量,inputOI
是输入参数的类型,forwardListObj
是当输入参数是List时候
@Description(name = "explode",
value = "_FUNC_(a) - separates the elements of array a into multiple rows,"
+ " or the elements of a map into multiple rows and columns ")
public class GenericUDTFExplode extends GenericUDTF {
private transient ObjectInspector inputOI = null;
private transient final Object[] forwardListObj = new Object[1];
private transient final Object[] forwardMapObj = new Object[2];
}
initialize
initialize
初始化函数会对输入参数进行检查,输入参数长度是1,然后初始化输入类型,输出名字以及类型:
- 如果category是List,则初始化
inputOI
类型,然后初始化结果的变量名字为col,并设置结果的类型; - 如果category是Map,则初始化
inputOI
类型,然后初始化结果的变量名字为key, value,并设置结果的类型key类型和value类型.
@Override
public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
if (args.length != 1) {
throw new UDFArgumentException("explode() takes only one argument");
}
ArrayList<String> fieldNames = new ArrayList<String>();
ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
switch (args[0].getCategory()) {
case LIST:
inputOI = args[0];
fieldNames.add("col");
fieldOIs.add(((ListObjectInspector)inputOI).getListElementObjectInspector());
break;
case MAP:
inputOI = args[0];
fieldNames.add("key");
fieldNames.add("value");
fieldOIs.add(((MapObjectInspector)inputOI).getMapKeyObjectInspector());
fieldOIs.add(((MapObjectInspector)inputOI).getMapValueObjectInspector());
break;
default:
throw new UDFArgumentException("explode() takes an array or a map as a parameter");
}
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,
fieldOIs);
}
process
我们接下来看下process
函数,针对List和Map分别不同处理方法:
- List类别,首先判断list空时候直接返回,然后如果不空,则对于每个值,放入到
forwardListObj
中; - Map类别,需要抽取出来key和value,然后放入
forwardMapObj
中;
最后调用父类的forward方法,进行collect。
@Override
public void process(Object[] o) throws HiveException {
switch (inputOI.getCategory()) {
case LIST:
ListObjectInspector listOI = (ListObjectInspector)inputOI;
List<?> list = listOI.getList(o[0]);
if (list == null) {
return;
}
for (Object r : list) {
forwardListObj[0] = r;
forward(forwardListObj);
}
break;
case MAP:
MapObjectInspector mapOI = (MapObjectInspector)inputOI;
Map<?,?> map = mapOI.getMap(o[0]);
if (map == null) {
return;
}
for (Entry<?,?> r : map.entrySet()) {
forwardMapObj[0] = r.getKey();
forwardMapObj[1] = r.getValue();
forward(forwardMapObj);
}
break;
default:
throw new TaskExecutionException("explode() can only operate on an array or a map");
}
}
我们来看下父类中forward
方法,主要是将输出传递给下游:
Collector collector = null;
protected final void forward(Object o) throws HiveException {
collector.collect(o);
}
close
close没有进行任何操作。
@Override
public void close() throws HiveException {
}
使用方法
explode使用
UDTF有两种使用方法,一种直接放到select后面,一种和lateral view
一起使用,直接select中使用:
select explode_map(properties) as (col1,col2) from src;
select explode(arraycol) as newcol from tablename;
但是我们在使用这个函数时候,有以下限制:
- 不可以添加其他字段使用:
select xx, explode_map(properties) as (col1, col2) from src
- 不可以嵌套调用:
select explode(explode(properties)) from src
- 不可以和
group by/cluster by/distribute by/sort by
一起使用:select explode_map(properties) as (col1,col2) from src group by col1, col
lateral view
使用介绍
为了解决UDTF在select中无法跟其他字段一起使用这个问题,我们就需要引入 LATERAL VIEW
。LATERAL VIEW
对遇到的每一行,首先会按UDTF表达式进行处理,展开成若干行(可能是零行),然后将这些输出行与输入行INNER JOIN
。如果要保留输出为零的行,则需使用 LATERAL VIEW OUTER
执行 OUTER JOIN
A lateral view first applies the UDTF to each row of base table and then joins resulting output rows to the input rows to form a virtual table having the supplied table alias.
在正常解析一个有值的Array时,用lateral view explode
是完全ok的,但是,当遇到该Array为空时,如果在使用该函数,就会导致该条记录消失,这样的结果就会导致我们漏掉一部分信息。这时,就要用到lateral view outer explode
。
select src.id, mytable.col1, mytable.col2 from src lateral view explode_map(properties) mytable as col1, col2;
实现分析
Lateral view
与UDTF函数一起使用,UDTF对每个输入行产生0或者多个输出行。Lateral view
首先在基表的每个输入行应用UDTF,然后连接结果输出行与输入行组成拥有指定表别名的虚拟表。那么这个过程会产生shuffle吗?为什么会,或者不会?我们接下来来分析下,首先看下下面这个sql的执行计划,然后分析这个过程:
hive> explain select a, b from t lateral view explode(b_array) b_array as b;
OK
STAGE DEPENDENCIES:
Stage-1 is a root stage
Stage-0 depends on stages: Stage-1
STAGE PLANS:
Stage: Stage-1
Map Reduce
Map Operator Tree:
TableScan
alias: t
Statistics: Num rows: 1000 Data size: 7534 Basic stats: COMPLETE Column stats: NONE
Lateral View Forward
Statistics: Num rows: 1000 Data size: 7534 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: a (type: string)
outputColumnNames: a
Statistics: Num rows: 1000 Data size: 7534 Basic stats: COMPLETE Column stats: NONE
Lateral View Join Operator
outputColumnNames: _col48, _col57
Statistics: Num rows: 746 Data size: 1506 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col48 (type: string), _col57 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 746 Data size: 1506 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 746 Data size: 1506 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Select Operator
expressions: b_array (type: array<string>)
outputColumnNames: _col0
Statistics: Num rows: 373 Data size: 753 Basic stats: COMPLETE Column stats: NONE
UDTF Operator
Statistics: Num rows: 373 Data size: 753 Basic stats: COMPLETE Column stats: NONE
function name: explode
Lateral View Join Operator
outputColumnNames: _col48, _col57
Statistics: Num rows: 746 Data size: 1506 Basic stats: COMPLETE Column stats: NONE
Select Operator
expressions: _col48 (type: string), _col57 (type: string)
outputColumnNames: _col0, _col1
Statistics: Num rows: 746 Data size: 1506 Basic stats: COMPLETE Column stats: NONE
File Output Operator
compressed: false
Statistics: Num rows: 746 Data size: 1506 Basic stats: COMPLETE Column stats: NONE
table:
input format: org.apache.hadoop.mapred.TextInputFormat
output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
Stage: Stage-0
Fetch Operator
limit: -1
Processor Tree:
ListSink
我们来将这个执行计划图形化:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-d6AdjEcv-1607418438935)(/Users/lidongmeng/Library/Application Support/typora-user-images/image-20201208162319137.png)]
可以看出来,这个sql 经历了两条线,一条线只进行了选择,一条线进行了udtf,然后进行Lateral view join
,TableScan
和Select operator
都比较简单了,我们来重点看下Lateral view
的两个类以及UDTFOperator
。
LateralViewForwardOperator
这个Operator几乎没做什么,只是将数据输送出去。
@Override
public void processOp(Object row, int tag) throws HiveException {
forward(row, inputObjInspectors[tag]);
}
UDTFOperator
UDTFOperator
主要是进行UDTF函数处理,当UDTF不产生任何行时,比如explode()
函数的输入列为空,LATERAL VIEW就不会生成任何输出行。在这种情况下原有行永远不会出现在结果中。OUTRE可被用于阻止这种情况,输出行中来自UDTF的列将被设置为NULL。
if (conf.isOuterLV()) {
outerObj = Arrays.asList(new Object[udtfOutputOI.getAllStructFieldRefs().size()]);
}
@Override
public void processOp(Object row, int tag) throws HiveException {
StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
List<? extends StructField> fields = soi.getAllStructFieldRefs();
for (int i = 0; i < fields.size(); i++) {
objToSendToUDTF[i] = soi.getStructFieldData(row, fields.get(i));
}
genericUDTF.process(objToSendToUDTF);
if (conf.isOuterLV() && collector.getCounter() == 0) {
collector.collect(outerObj);
}
collector.reset();
}
LateralViewJoinOperator
我们来重点看下LateralViewJoinOperator
这个实现,步骤如下:
- 首先判断标识是我们上面画的图的左半部分还是有半部分;
- 左半部分,只是将数据保存起来,左半部分只有一条数据;
- 右半部分,则需要跟左半部分进行合并,右半部分有多行。
所以可以看出来这里的join并不需要Shuffle操作,只是连接一起即可。
ArrayList<Object> acc = new ArrayList<Object>();
ArrayList<Object> selectObjs = new ArrayList<Object>();
@Override
public void processOp(Object row, int tag) throws HiveException {
StructObjectInspector soi = (StructObjectInspector) inputObjInspectors[tag];
if (tag == SELECT_TAG) {
selectObjs.clear();
selectObjs.addAll(soi.getStructFieldsDataAsList(row));
} else if (tag == UDTF_TAG) {
acc.clear();
acc.addAll(selectObjs);
acc.addAll(soi.getStructFieldsDataAsList(row));
forward(acc, outputObjInspector);
} else {
throw new HiveException("Invalid tag");
}
}
总结
所以经过上述分析,我们知道了Lateral view
是不会发生Shuffle的,从执行计划中也可以看出来没有reduce任务,这里的LateralViewJoinOperator
代表的是两份数据联接到一起的意思,并不是真正的意义上的join;另外如果我们对与数组或者map是NULL的行也要统计处理的话,一定要记住使用out关键词。
Hive内部UDTF
最后我们看一下Hive内部自带的UDTF。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-spkA5QDN-1607418438939)(/Users/lidongmeng/Library/Application Support/typora-user-images/image-20201208120405862.png)]
参考
- https://www.cnblogs.com/ggjucheng/archive/2013/02/01/2888819.html
- https://cwiki.apache.org/confluence/display/Hive/LanguageManual+UDF#LanguageManualUDF-Built-inAggregateFunctions(UDAF)
- https://www.jianshu.com/p/97164a56a19c
- https://zhuanlan.zhihu.com/p/137482744
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)