作者简介
淳敏,物流架构师同时也是一位team leader,工作认真负责,曾在休假期间“面向大海编程”,不明觉厉
在Hive中,用户可以自定义一些函数,用于扩展HiveQL的功能。Hive 自定义函数主要包含以下三种:
UDF(user-defined function) 单独处理一行,输出也是以行输出。许多Hive内置字符串,数学函数,时间函数都是这种类型。大多数情况下编写对应功能的处理函数就能满足需求。如:concat, split, length ,rand等。这种UDF主要有两种写法:继承实现UDF类和继承GenericUDF类(通用UDF)。 UDAF(user-defined aggregate function) 用于处理多行数据并形成累加结果。一般配合group by使用。主要用于累加操作,常见的函数有max, min, count, sum,collect_set等。这种UDF主要有两种写法:继承实现 UDAF类和继承实现AbstractGenericUDAFResolver类。 UDTF(user-defined table function) 处理一行数据产生多行数据或者将一列打成多列。 如explode, 通常配合Lateral View使用,实现列转行的功能。parse_url_tuple将一列转为多列。
Hive的UDF机制是需要用户实现: Resolver
和Evaluator
,其中Resolver
就用来处理输入,调用Evaluator
,Evaluator
就是具体功能的实现。
自定义UDF实现和调用机制
Hadoop提供了一个基础类org.apache.hadoop.hive.ql.exec.UDF
,在这个类中含有了一个UDFMethodResolver
的接口实现类DefaultUDFMethodResolver
的对象。
public class UDF {
private UDFMethodResolver rslv;
public UDF () {
this .rslv = new DefaultUDFMethodResolver(this .getClass());
}
......
}
复制代码
在DefaultUDFMethodResolver
中,提供了一个getEvalMethod
的方法,从切面调用UDF
的evaluate
方法
public class DefaultUDFMethodResolver implements UDFMethodResolver {
private final Class<? extends UDF> udfClass;
public DefaultUDFMethodResolver (Class<? extends UDF> udfClass) {
this .udfClass = udfClass;
}
public Method getEvalMethod (List<TypeInfo> argClasses) throws UDFArgumentException {
return FunctionRegistry.getMethodInternal(this .udfClass, "evaluate" , false , argClasses);
}
}
复制代码
自定义UDF的实现上以继承org.apache.hadoop.hive.ql.exec.UDF
为基础,然后实现一个evaluate
方法,该方法会被DefaultUDFMethodResolver
对象执行。
Case Study: 判断坐标点是不是在图形中
public class DAIsContainPoint extends UDF {
public Boolean evaluate (Double longitude, Double latitude, String geojson) {
Boolean isContained = false ;
try {
Polygon polygon = JTSHelper.parse(geojson);
Coordinate center = new Coordinate(longitude, latitude);
GeometryFactory factory = new GeometryFactory();
Point point = factory.createPoint(center);
isContained = polygon.contains(point);
}catch (Throwable e){
isContained = false ;
}finally {
return isContained;
}
}
}
复制代码
完成了代码定义之后需要对其进行打包,编译成一个jar
,注意: 最终的jar
中需要包含所有依赖的jar
,maven
编译上推荐使用maven-shade-plugin
<build >
<plugins >
<plugin >
<groupId > org.apache.maven.plugins</groupId >
<artifactId > maven-shade-plugin</artifactId >
<version > 2.2</version >
<executions >
<execution >
<phase > package</phase >
<goals >
<goal > shade</goal >
</goals >
<configuration >
<filters >
<filter >
<artifact > *:*</artifact >
<excludes >
<exclude > META-INF/*.SF</exclude >
<exclude > META-INF/*.DSA</exclude >
<exclude > META-INF/*.RSA</exclude >
</excludes >
</filter >
</filters >
</configuration >
</execution >
</executions >
</plugin >
</plugins >
</build >
复制代码
最后产生的jar
文件需要在HIVE SQL中被引用
add jar hdfs://xxx/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/udf.jar;
create temporary function is_in_polygon as 'me.ele.breat.hive.udf.DAIsContainPoint' ;
select lat, lng, geojson, is_in_polygon(lat, lng, geojson) as is_in from example;
复制代码
自定义UDAF和MapReduce
在Hive的聚合计算中,采用MapReduce的方式来加快聚合的速度,而UDAF就是用来撰写聚合类自定义方法的扩展方式。关于MapReduce需要补充知识的请看这里,为了更好的说明白UDAF我们需要知道一下MapReduce
的流程
回到Hive中来,在UDAF的实现中,首先需要继承org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
,并实现org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2
接口。然后构造GenericUDAFEvaluator
类,实现MapReduce的计算过程,其中有3个关键的方法
方法iterate
:获取mapper,输送去做merge 方法merge
:combiner合并mapper 方法terminate
:合并所有combiner返回结果
然后再实现一个继承AbstractGenericUDAFResolver
的类,重载其getEvaluator
的方法,返回一个GenericUDAFEvaluator
的实例
Case Study:合并地理围栏
public class DAJoinV2 extends AbstractGenericUDAFResolver implements GenericUDAFResolver2 {
@Override
public GenericUDAFEvaluator getEvaluator (GenericUDAFParameterInfo genericUDAFParameterInfo)
throws SemanticException {
return new DAJoinStringEvaluator();
}
public GenericUDAFEvaluator getEvaluator (TypeInfo[] typeInfos) throws SemanticException {
if (typeInfos.length != 1 ) {
throw new UDFArgumentTypeException(typeInfos.length - 1 ,
"Exactly one argument is expected." );
}
if (typeInfos[0 ].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0 ,
"Only primitive type arguments are accepted but "
+ typeInfos[0 ].getTypeName() + " is passed." );
}
switch (((PrimitiveTypeInfo) typeInfos[0 ]).getPrimitiveCategory()) {
case STRING:
return new DAJoinStringEvaluator();
default :
throw new UDFArgumentTypeException(0 ,
"Only numeric or string type arguments are accepted but "
+ typeInfos[0 ].getTypeName() + " is passed." );
}
}
public static class DAJoinStringEvaluator extends GenericUDAFEvaluator {
private PrimitiveObjectInspector mInput;
private Text mResult;
static class PolygonAgg implements AggregationBuffer {
Geometry geometry;
}
@Override
public ObjectInspector init (Mode m, ObjectInspector[] parameters) throws HiveException {
assert (parameters.length == 1 );
super .init(m, parameters);
mResult = new Text();
mInput = (PrimitiveObjectInspector) parameters[0 ];
return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
public AggregationBuffer getNewAggregationBuffer () throws HiveException {
PolygonAgg polygonAgg = new PolygonAgg();
reset(polygonAgg);
return polygonAgg;
}
public void reset (AggregationBuffer aggregationBuffer) throws HiveException {
PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
GeometryFactory factory = new GeometryFactory();
polygonAgg.geometry = factory.createPolygon(new Coordinate[]{});
}
public void iterate (AggregationBuffer aggregationBuffer, Object[] objects)
throws HiveException {
assert (objects.length == 1 );
merge(aggregationBuffer, objects[0 ]);
}
public Object terminatePartial (AggregationBuffer aggregationBuffer) throws HiveException {
return terminate(aggregationBuffer);
}
public void merge (AggregationBuffer aggregationBuffer, Object partial) throws HiveException {
if (partial != null ) {
try {
PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
String geoJson = PrimitiveObjectInspectorUtils.getString(partial, mInput);
Polygon polygon = JTSHelper.parse(geoJson);
polygonAgg.geometry = polygonAgg.geometry.union(polygon);
} catch (Exception e){
}
}
}
public Object terminate (AggregationBuffer aggregationBuffer) throws HiveException {
try {
PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
Geometry buffer = polygonAgg.geometry.buffer(0 );
mResult.set(JTSHelper.convert2String(buffer.convexHull()));
return mResult;
}catch (Exception e) {
return "" ;
}
}
}
}
复制代码
打包之后将其用在HIVE SQL中执行
add jar hdfs://xxx/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/udf.jar;
create temporary function da_join as 'me.ele.breat.hive.udf.DAJoinV2' ;
create table udaf_example as
select id , da_join(da_range) as da_union_polygon
from example
group by id
复制代码
自定义UDTF
在UDTF的实现中,首先需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
,实现process
,initialize
和close
方法
initialize
返回StructObjectInspector对象,决定最后输出的column的名称和类型process
是对每一个输入record进行处理,产生出一个新数组,传递到forward
方法中进行处理close
关闭整个调用的回调处,清理内存
Case Study: 输入Polygon转成一组S2Cell
public class S2SimpleRegionCoverV2 extends GenericUDTF {
private final static int LEVEL = 16 ;
@Override
public StructObjectInspector initialize (ObjectInspector[] argOIs) throws UDFArgumentException {
List<String> structFieldNames = Lists.newArrayList("s2cellid" );
List<ObjectInspector> structFieldObjectInspectors = Lists.<ObjectInspector>newArrayList(
PrimitiveObjectInspectorFactory.javaLongObjectInspector);
return ObjectInspectorFactory
.getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
}
@Override
public void process (Object[] objects) throws HiveException {
String json = String.valueOf(objects[0 ]);
List<Long> s2cellids = toS2CellIds(json);
for (Long s2cellid: s2cellids){
forward(new Long[]{s2cellid});
}
}
public static List<Long> toS2CellIds (String json) {
GeometryFactory factory = new GeometryFactory();
GeoJsonReader reader = new GeoJsonReader();
Geometry geometry = null ;
try {
geometry = reader.read(json);
} catch (ParseException e) {
geometry = factory.createPolygon(new Coordinate[]{});
}
List<S2Point> polygonS2Point = new ArrayList<S2Point>();
for (Coordinate coordinate : geometry.getCoordinates()) {
S2LatLng s2LatLng = S2LatLng.fromDegrees(coordinate.y, coordinate.x);
polygonS2Point.add(s2LatLng.toPoint());
}
List<S2Point> points = polygonS2Point;
if (points.size() == 0 ) {
return Lists.newArrayList();
}
ArrayList<S2CellId> result = new ArrayList<S2CellId>();
S2RegionCoverer
.getSimpleCovering(new S2Polygon(new S2Loop(points)), points.get(0 ), LEVEL, result);
List<Long> output = new ArrayList<Long>();
for (S2CellId s2CellId : result) {
output.add(s2CellId.id());
}
return output;
}
@Override
public void close () throws HiveException {
}
}
复制代码
在使用的时候和lateral view
连在一起用
add jar hdfs://bipcluster/data/upload/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/google_s2_udf.jar;
create temporary function da_cover as 'me.ele.breat.hive.udf.S2SimpleRegionCoverV2' ;
drop table if exists temp.cm_s2_id_cover_list;
create table temp.cm_s2_id_cover_list as
select tb_s2cellid.s2cellid, source.shop_id
from (
select
geometry,
shop_id
from
example) source
lateral view da_cover(geometry) tb_s2cellid as s2cellid;
复制代码
参考
[HivePlugins](cwiki.apache.org/confluence/…) Hive系列之HSQL转换成MapReduce过程 Hive-UDAF hive udaf开发入门和运行过程详解 DeveloperGuide UDTF
阅读博客还不过瘾?
欢迎大家扫二维码通过添加群助手,加入交流群,讨论和博客有关的技术问题,还可以和博主有更多互动
博客转载、线下活动及合作等问题请邮件至 shadowfly_zyl@hotmail.com 进行沟通