HIVE自定义函数的扩展

2023-05-16

作者简介

淳敏,物流架构师同时也是一位team leader,工作认真负责,曾在休假期间“面向大海编程”,不明觉厉

在Hive中,用户可以自定义一些函数,用于扩展HiveQL的功能。Hive 自定义函数主要包含以下三种:

  • UDF(user-defined function) 单独处理一行,输出也是以行输出。许多Hive内置字符串,数学函数,时间函数都是这种类型。大多数情况下编写对应功能的处理函数就能满足需求。如:concat, split, length ,rand等。这种UDF主要有两种写法:继承实现UDF类和继承GenericUDF类(通用UDF)。
  • UDAF(user-defined aggregate function) 用于处理多行数据并形成累加结果。一般配合group by使用。主要用于累加操作,常见的函数有max, min, count, sum,collect_set等。这种UDF主要有两种写法:继承实现 UDAF类和继承实现AbstractGenericUDAFResolver类。
  • UDTF(user-defined table function) 处理一行数据产生多行数据或者将一列打成多列。 如explode, 通常配合Lateral View使用,实现列转行的功能。parse_url_tuple将一列转为多列。

Hive的UDF机制是需要用户实现: ResolverEvaluator,其中Resolver就用来处理输入,调用EvaluatorEvaluator就是具体功能的实现。

自定义UDF实现和调用机制

Hadoop提供了一个基础类org.apache.hadoop.hive.ql.exec.UDF,在这个类中含有了一个UDFMethodResolver的接口实现类DefaultUDFMethodResolver的对象。

public class UDF {
  private UDFMethodResolver rslv;

  public UDF() {
    this.rslv = new DefaultUDFMethodResolver(this.getClass());
  }
	......
}
复制代码

DefaultUDFMethodResolver中,提供了一个getEvalMethod的方法,从切面调用UDFevaluate方法

public class DefaultUDFMethodResolver implements UDFMethodResolver {
  private final Class<? extends UDF> udfClass;

  public DefaultUDFMethodResolver(Class<? extends UDF> udfClass) {
    this.udfClass = udfClass;
  }

  public Method getEvalMethod(List<TypeInfo> argClasses) throws UDFArgumentException {
    return FunctionRegistry.getMethodInternal(this.udfClass, "evaluate", false, argClasses);
  }
}
复制代码

自定义UDF的实现上以继承org.apache.hadoop.hive.ql.exec.UDF为基础,然后实现一个evaluate方法,该方法会被DefaultUDFMethodResolver对象执行。

Case Study: 判断坐标点是不是在图形中

public class DAIsContainPoint extends UDF {

  public Boolean evaluate(Double longitude, Double latitude, String geojson) {

    Boolean isContained = false;
    try {
      Polygon polygon = JTSHelper.parse(geojson);
      Coordinate center = new Coordinate(longitude, latitude);
      GeometryFactory factory = new GeometryFactory();
      Point point = factory.createPoint(center);
      isContained = polygon.contains(point);
    }catch (Throwable e){
      isContained = false;
    }finally {
      return isContained;
    }
  }
}
复制代码

完成了代码定义之后需要对其进行打包,编译成一个jar,注意: 最终的jar中需要包含所有依赖的jarmaven编译上推荐使用maven-shade-plugin

<build>
  <plugins>
    <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      <version>2.2</version>
      <executions>
        <execution>
          <phase>package</phase>
          <goals>
            <goal>shade</goal>
          </goals>
          <configuration>
            <filters>
              <filter>
                <artifact>*:*</artifact>
                <excludes>
                  <exclude>META-INF/*.SF</exclude>
                  <exclude>META-INF/*.DSA</exclude>
                  <exclude>META-INF/*.RSA</exclude>
                </excludes>
              </filter>
            </filters>
          </configuration>
        </execution>
      </executions>
    </plugin>
  </plugins>
</build>
复制代码

最后产生的jar文件需要在HIVE SQL中被引用

add jar hdfs://xxx/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/udf.jar;
create temporary function is_in_polygon as 'me.ele.breat.hive.udf.DAIsContainPoint';

select lat, lng, geojson, is_in_polygon(lat, lng, geojson) as is_in from example;
复制代码

自定义UDAF和MapReduce

在Hive的聚合计算中,采用MapReduce的方式来加快聚合的速度,而UDAF就是用来撰写聚合类自定义方法的扩展方式。关于MapReduce需要补充知识的请看这里,为了更好的说明白UDAF我们需要知道一下MapReduce的流程

回到Hive中来,在UDAF的实现中,首先需要继承org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver,并实现org.apache.hadoop.hive.ql.udf.generic.GenericUDAFResolver2接口。然后构造GenericUDAFEvaluator类,实现MapReduce的计算过程,其中有3个关键的方法

  • 方法iterate:获取mapper,输送去做merge
  • 方法merge:combiner合并mapper
  • 方法terminate:合并所有combiner返回结果

然后再实现一个继承AbstractGenericUDAFResolver的类,重载其getEvaluator的方法,返回一个GenericUDAFEvaluator的实例

Case Study:合并地理围栏

public class DAJoinV2 extends AbstractGenericUDAFResolver implements GenericUDAFResolver2 {

  @Override
  public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo genericUDAFParameterInfo)
      throws SemanticException {
    return new DAJoinStringEvaluator();
  }

  public GenericUDAFEvaluator getEvaluator(TypeInfo[] typeInfos) throws SemanticException {
    if (typeInfos.length != 1) {
      throw new UDFArgumentTypeException(typeInfos.length - 1,
          "Exactly one argument is expected.");
    }

    if (typeInfos[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(0,
          "Only primitive type arguments are accepted but "
              + typeInfos[0].getTypeName() + " is passed.");
    }

    switch (((PrimitiveTypeInfo) typeInfos[0]).getPrimitiveCategory()) {
      case STRING:
        return new DAJoinStringEvaluator();
      default:
        throw new UDFArgumentTypeException(0,
            "Only numeric or string type arguments are accepted but "
                + typeInfos[0].getTypeName() + " is passed.");
    }
  }

  public static class DAJoinStringEvaluator extends GenericUDAFEvaluator {

    private PrimitiveObjectInspector mInput;
    private Text mResult;

    // 存储Geometry join的值的类
    static class PolygonAgg implements AggregationBuffer {
      Geometry geometry;
    }

    //定义:UDAF的返回类型,确定了DAJoin自定义UDF的返回类型是Text类型
    @Override
    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
      assert (parameters.length == 1);
      super.init(m, parameters);
      mResult = new Text();
      mInput = (PrimitiveObjectInspector) parameters[0];
      return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
    }

    //内存创建,用来存储mapper,combiner,reducer运算过程中的相加总和。
    public AggregationBuffer getNewAggregationBuffer() throws HiveException {
      PolygonAgg polygonAgg = new PolygonAgg();
      reset(polygonAgg);
      return polygonAgg;
    }

    public void reset(AggregationBuffer aggregationBuffer) throws HiveException {
      PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
      GeometryFactory factory = new GeometryFactory();
      polygonAgg.geometry = factory.createPolygon(new Coordinate[]{});
    }

    //map阶段:获取每个mapper,去进行merge
    public void iterate(AggregationBuffer aggregationBuffer, Object[] objects)
        throws HiveException {
      assert (objects.length == 1);
      merge(aggregationBuffer, objects[0]);
    }

    //在一个子的partial中combiner合并map返回结果
    public Object terminatePartial(AggregationBuffer aggregationBuffer) throws HiveException {
      return terminate(aggregationBuffer);
    }

    //combiner合并map返回结果
    public void merge(AggregationBuffer aggregationBuffer, Object partial) throws HiveException {
      if (partial != null) {
        try {
          PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
          String geoJson = PrimitiveObjectInspectorUtils.getString(partial, mInput);
          Polygon polygon = JTSHelper.parse(geoJson);
          polygonAgg.geometry = polygonAgg.geometry.union(polygon);
        } catch (Exception e){

        }
      }
    }

    //reducer合并所有combiner返回结果
    public Object terminate(AggregationBuffer aggregationBuffer) throws HiveException {
      try {
        PolygonAgg polygonAgg = (PolygonAgg) aggregationBuffer;
        Geometry buffer = polygonAgg.geometry.buffer(0);
        mResult.set(JTSHelper.convert2String(buffer.convexHull()));
        return mResult;
      }catch (Exception e) {
        return "";
      }
    }

  }
}
复制代码

打包之后将其用在HIVE SQL中执行

add jar hdfs://xxx/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/udf.jar;
create temporary function da_join as 'me.ele.breat.hive.udf.DAJoinV2';

create table udaf_example as
select id, da_join(da_range) as da_union_polygon
  from example
group by id
复制代码

自定义UDTF

在UDTF的实现中,首先需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,实现process,initializeclose方法

  • initialize返回StructObjectInspector对象,决定最后输出的column的名称和类型
  • process是对每一个输入record进行处理,产生出一个新数组,传递到forward方法中进行处理
  • close关闭整个调用的回调处,清理内存

Case Study: 输入Polygon转成一组S2Cell

public class S2SimpleRegionCoverV2 extends GenericUDTF {

  private final static  int LEVEL = 16;

  @Override
  public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
    List<String> structFieldNames = Lists.newArrayList("s2cellid");
    List<ObjectInspector> structFieldObjectInspectors = Lists.<ObjectInspector>newArrayList(
        PrimitiveObjectInspectorFactory.javaLongObjectInspector);

    return ObjectInspectorFactory
        .getStandardStructObjectInspector(structFieldNames, structFieldObjectInspectors);
  }

  @Override
  public void process(Object[] objects) throws HiveException {
    String json = String.valueOf(objects[0]);

    List<Long> s2cellids = toS2CellIds(json);

    for (Long s2cellid: s2cellids){
      forward(new Long[]{s2cellid});
    }
  }

  public static List<Long> toS2CellIds(String json) {
    GeometryFactory factory = new GeometryFactory();
    GeoJsonReader reader = new GeoJsonReader();

    Geometry geometry = null;
    try {
      geometry = reader.read(json);
    } catch (ParseException e) {
      geometry = factory.createPolygon(new Coordinate[]{});
    }

    List<S2Point> polygonS2Point = new ArrayList<S2Point>();
    for (Coordinate coordinate : geometry.getCoordinates()) {
      S2LatLng s2LatLng = S2LatLng.fromDegrees(coordinate.y, coordinate.x);
      polygonS2Point.add(s2LatLng.toPoint());
    }

    List<S2Point> points = polygonS2Point;

    if (points.size() == 0) {
      return Lists.newArrayList();
    }

    ArrayList<S2CellId> result = new ArrayList<S2CellId>();
    S2RegionCoverer
        .getSimpleCovering(new S2Polygon(new S2Loop(points)), points.get(0), LEVEL, result);

    List<Long> output = new ArrayList<Long>();
    for (S2CellId s2CellId : result) {
      output.add(s2CellId.id());
    }

    return output;
  }

  @Override
  public void close() throws HiveException {

  }
}
复制代码

在使用的时候和lateral view连在一起用

add jar hdfs://bipcluster/data/upload/udf/ff8bd59f-d0a5-4b13-888b-5af239270869/google_s2_udf.jar;
create temporary function da_cover as 'me.ele.breat.hive.udf.S2SimpleRegionCoverV2';

drop table if exists temp.cm_s2_id_cover_list;

create table temp.cm_s2_id_cover_list as
select tb_s2cellid.s2cellid, source.shop_id
from (
select
  geometry,
  shop_id
from
  example) source
lateral view da_cover(geometry) tb_s2cellid as s2cellid;
复制代码

参考

  • [HivePlugins](cwiki.apache.org/confluence/…)
  • Hive系列之HSQL转换成MapReduce过程
  • Hive-UDAF
  • hive udaf开发入门和运行过程详解
  • DeveloperGuide UDTF




阅读博客还不过瘾?

欢迎大家扫二维码通过添加群助手,加入交流群,讨论和博客有关的技术问题,还可以和博主有更多互动

博客转载、线下活动及合作等问题请邮件至 shadowfly_zyl@hotmail.com 进行沟通

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

HIVE自定义函数的扩展 的相关文章

随机推荐

  • 能ping通,但是不能wget或curl

    2019独角兽企业重金招聘Python工程师标准 gt gt gt 当出现http接口请求超时时 xff0c 可以从以下几个方面排查问题 xff1a 检查接口服务本身是否正常 xff1b 检查接口所在服务器的防火墙是否开启 xff0c 尝试
  • R语言选择特定的行,对某一列排序

    R语言的数据框跟MySQL 中的表很像 根据某一列的特定值选择相应的行 d是个数据框 xff0c 有一列的名字是name d d name 61 61 34 95 34 这样就选中了 name为 95 的所有行 m 是个数据框 xff0c
  • excel表格公式无效、不生效的解决方案及常见问题、常用函数

    1 表格公式无效 不生效 使用公式时碰到了一个问题 xff0c 那就是公式明明已经编辑好了 xff0c 但是在单元格里不生效 xff0c 直接把公式显示出来了 xff0c 网上资料说有4种原因 xff0c 但是我4种都不是 xff0c 是第
  • JVM_栈详解一

    1 Java虚拟机栈 2 栈的存储单位 栈中存储什么 xff1f 每个线程都有自己的栈 xff0c 栈中的数据都是以栈帧 xff08 Stack Frame xff09 的格式存在 在这个线程上正在执行的每个方法都各自对应一个栈帧 xff0
  • EntLib 3.1学习笔记(6) : Security Application Block

    http www microsoft com china MSDN library enterprisedevelopment softwaredev dnpag2entlib mspx mfr 61 true http msdn2 mic
  • Delphi文件操作所涉及的一些函数 附例子

    判断文件是否存在 FileExists 判断文件夹是否存在 DirectoryExists 删除文件 DeleteFile Windows DeleteFile 删除文件夹 RemoveDir RemoveDirectory 获取当前文件夹
  • 排序算法

    include lt iostream gt include lt cstdlib gt include lt cstdio gt include lt time h gt using namespace std 插入排序 void Ins
  • C++应用中调用YOLOv3(darknet)进行目标检测

    YOLOv3论文 xff1a https pjreddie com media files papers YOLOv3 pdf 官网和代码 xff1a https pjreddie com darknet yolo属于one stage x
  • DJI开发之航线重叠率的计算

    介绍 无人机在规划一块区域的时候 xff0c 我们需要手动的给予一些参数来影响无人机飞行 xff0c 对于一块地表 xff0c 无人机每隔N秒在空中间隔的拍照地表的一块区域 xff0c 在整个任务执行结束后 xff0c 拍到的所有区域照片能
  • MODBUS MASTER RTU在STM32上的实现

    MODBUS MASTER RTU在STM32上的实现 1 概述 最近需要将几个信号采集模块通过总线串联起来 xff0c 这样便于系统模块化 故将目光关注到了工业上经常使用的modbus协议 modbus协议是一种一主多从的拓扑结构 xff
  • 基于HttpClient的HttpUtils(后台访问URL)

    最近做在线支付时遇到需要以后台方式访问URL并获取其返回的数据的问题 xff0c 在网络上g了一把 xff0c 发现在常用的还是Apache的HttpClient 因为以经常要用到的原故 xff0c 因此我对其进行了一些简单的封装 xff0
  • micropython安装ros_ROS2与STM32入门教程-microROS的freertos版本

    ROS2与STM32入门教程 micro ros的freertos版本 说明 xff1a 介绍如何安装使用micro ros 测试开发板 xff1a olimex stm32 e407 步骤 xff1a 安装ros2版本foxy xff0c
  • C#中通过com组件操作excel不能关闭的问题

    问题 xff1a 当用如下代码操作完Excel xff0c 虽然调用了Application的Quit 方法 xff0c 但发现Excel进程并没退出 object missing 61 System Reflection Missing
  • 交叉编译的概念及交叉编译工具的安装

    目录 一 什么是交叉编译 二 为什么要交叉编译 xff1f 三 交叉编译链的安装 四 相关使用方法 五 软连接 一 什么是交叉编译 交叉编译是指将一种编程语言编写的程序编译成另一种编程语言的程序 xff0c 通常是在不同的操作系统或硬件环境
  • .cn根服务器被攻击之后

    如果是互联网行业的人员应该知道 xff0c 8月25日凌晨 xff0c 大批的 cn 域名的网站都无法访问 xff0c 当然包括weibo cn等大型网站 个人比较奇怪的一件事情是 xff0c 微博PC网页版是 www weibo com
  • [UML]UML系列——包图Package

    系列文章 UML UML系列 用例图Use Case UML UML系列 用例图中的各种关系 xff08 include extend xff09 UML UML系列 类图Class UML UML系列 类图class的关联关系 xff08
  • VBA编程中的 sheet1 与 sheets(1)的区别

    自己理解 sheet1是一个专有名词 xff0c 不是任何对象的属性 xff0c 只能单独使用 xff0c 特指代码所在工作簿的那个sheet1 和顺序无关 xff0c 是固定的一个表 xff0c sheets 1 则和顺序有关 参考资料
  • python练习笔记——计算1/1-1/3+1/5-1/7……的和

    1 1 1 3 43 1 5 1 7 43 求100000个这样的分式计算之为是多少 xff1f 将此值乘以4后打印出来 xff0c 看看是什么 xff1f num list 61 count 61 1 i 61 1 while True
  • Django Model获取指定列的数据

    model一般都是有多个属性的 xff0c 但是很多时候我们又只需要查询特定的某一个 xff0c 这个时候可以用到values和values list 利用values查询 from attendence models import Emp
  • HIVE自定义函数的扩展

    作者简介 淳敏 xff0c 物流架构师同时也是一位team leader xff0c 工作认真负责 xff0c 曾在休假期间 面向大海编程 xff0c 不明觉厉 在Hive中 xff0c 用户可以自定义一些函数 xff0c 用于扩展Hive