UDF、UDAF和UDTF开发模板

2023-10-26

0.背景

Hive是一种构建在Hadoop上的数据仓库,Hive把SQL查询转换为一系列在Hadoop集群中运行的MapReduce作业,是MapReduce更高层次的抽象,不用编写具体的MapReduce方法。Hive将数据组织为表,这就使得HDFS上的数据有了结构,元数据即表的模式,都存储在名为metastore的数据库中。

1.UDF是什么?

hive的类SQL预发给数据挖掘工作者带来了很多便利,海量数据通过简单的sql就可以完成分析,有时候hive提供的函数功能满足不了业务需要,就需要我们自己来写UDF函数来辅助完成,下面用一个简单的例子说明过程,以及注意事项。

UDF函数其实就是一个简单的函数,执行过程就是在Hive转换成mapreduce程序后,执行java方法,类似于像Mapreduce执行过程中加入一个插件,方便扩展. UDF只能实现一进一出的操作,如果需要实现多进一出,则需要实现UDAF .

Hive可以允许用户编写自己定义的函数UDF,来在查询中使用。

2.UDF类型


Hive中有3种UDF:
UDF:操作单个数据行,产生单个数据行;
UDAF:操作多个数据行,产生一个数据行。
UDTF:操作一个数据行,产生多个数据行一个表作为输出。

3.如何构建UDF

用户构建的UDF使用过程如下:
第一步:继承UDF或者UDAF或者UDTF,实现特定的方法。
第二步:将写好的类打包为jar。如hivefirst.jar.
第三步:进入到Hive外壳环境中,利用add jar /home/hadoop/hivefirst.jar.注册该jar文件
第四步:为该类起一个别名,create temporary function mylength as 'com.whut.StringLength';这里注意UDF只是为这个Hive会话临时定义的。
第五步:在select中使用mylength();

4.UDF_Demo

package whut;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
//UDF是作用于单个数据行,产生一个数据行
//用户必须要继承UDF,且必须至少实现一个evalute方法,该方法并不在UDF中
//但是Hive会检查用户的UDF是否拥有一个evalute方法
public class Strip extends UDF{
    private Text result=new Text();
    //自定义方法
    public Text evaluate(Text str)
    {
      if(str==null)
        return null;
        result.set(StringUtils.strip(str.toString()));
        return result;
    }
    public Text evaluate(Text str,String stripChars)
    {
        if(str==null)
            return null;
        result.set(StringUtils.strip(str.toString(),stripChars));
        return result;
    }
}

 

注意事项:
   1,一个用户UDF必须继承org.apache.hadoop.hive.ql.exec.UDF;
   2,一个UDF必须要包含有evaluate()方法,但是该方法并不存在于UDF中。evaluate的参数个数以及类型都是用户自己定义  的。在使用的时候,Hive会调用UDF的evaluate()方法。

5.UDTF_Demo

public class NameParserGenericUDTF extends GenericUDTF {
 
	  private PrimitiveObjectInspector stringOI = null;
 
	  @Override
	  public StructObjectInspector initialize(ObjectInspector[] args) UDFArgumentException {
 
	    if (args.length != 1) {
	      throw new UDFArgumentException("NameParserGenericUDTF() takes exactly one argument");
	    }
 
	    if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE
	        && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
	      throw new UDFArgumentException("NameParserGenericUDTF() takes a string as a parameter");
	    }
        
	    // 输入格式(inspectors)
	    stringOI = (PrimitiveObjectInspector) args[0];
 
	    // 输出格式(inspectors) -- 有两个属性的对象
	    List<String> fieldNames = new ArrayList<String>(2);
	    List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);
	    fieldNames.add("name");
	    fieldNames.add("surname");
	    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
	    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
	    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
	  }
	  	  
	  public ArrayList<Object[]> processInputRecord(String name){
		    ArrayList<Object[]> result = new ArrayList<Object[]>();
		  
		    // 忽略null值与空值
		    if (name == null || name.isEmpty()) {
		      return result;
		    }
		    
		    String[] tokens = name.split("\\s+");
		    
		    if (tokens.length == 2){
		    	result.add(new Object[] { tokens[0], tokens[1] });
		    }else if (tokens.length == 4 && tokens[1].equals("and")){
		    	result.add(new Object[] { tokens[0], tokens[3] });
		    	result.add(new Object[] { tokens[2], tokens[3] });
		    }
		    
		    return result;
	  }
	  
	  @Override
	  public void process(Object[] record) throws HiveException {
 
	    final String name = stringOI.getPrimitiveJavaObject(record[0]).toString();
 
	    ArrayList<Object[]> results = processInputRecord(name);
 
	    Iterator<Object[]> it = results.iterator();
	    
	    while (it.hasNext()){
	    	Object[] r = it.next();
	    	forward(r);
	    }
	  }
 
	  @Override
	  public void close() throws HiveException {
	    // do nothing
	  }
}

分开的UDTF参考:https://www.jianshu.com/p/ac352ceab9cd 

6.UDAF_Demo

package whut;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
import org.apache.hadoop.io.IntWritable;
//UDAF是输入多个数据行,产生一个数据行
//用户自定义的UDAF必须是继承了UDAF,且内部包含多个实现了exec的静态类
public class MaxiNumber extends UDAF{
    public static class MaxiNumberIntUDAFEvaluator implements UDAFEvaluator{
        //最终结果
        private IntWritable result;
        //负责初始化计算函数并设置它的内部状态,result是存放最终结果的
        @Override
        public void init() {
            result=null;
        }
        //每次对一个新值进行聚集计算都会调用iterate方法
        public boolean iterate(IntWritable value)
        {
            if(value==null)
                return false;
            if(result==null)
              result=new IntWritable(value.get());
            else
              result.set(Math.max(result.get(), value.get()));
            return true;
        }
                                                                                                                                  
        //Hive需要部分聚集结果的时候会调用该方法
        //会返回一个封装了聚集计算当前状态的对象
        public IntWritable terminatePartial()
        {
            return result;
        }
        //合并两个部分聚集值会调用这个方法
        public boolean merge(IntWritable other)
        {
            return iterate(other);
        }
        //Hive需要最终聚集结果时候会调用该方法
        public IntWritable terminate()
        {
            return result;
        }
    }
}

注意事项:
1.用户的UDAF必须继承了org.apache.hadoop.hive.ql.exec.UDAF;
2.用户的UDAF必须包含至少一个实现了org.apache.hadoop.hive.ql.exec的静态类,诸如常见的实现了 UDAFEvaluator。
3.一个计算函数必须实现的5个方法的具体含义如下:

  • init():主要是负责初始化计算函数并且重设其内部状态,一般就是重设其内部字段。一般在静态类中定义一个内部字段来存放最终的结果。

  • iterate():每一次对一个新值进行聚集计算时候都会调用该方法,计算函数会根据聚集计算结果更新内部状态。当输入值合法或者正确计算了,则就返回true。

  • terminatePartial():Hive需要部分聚集结果的时候会调用该方法,必须要返回一个封装了聚集计算当前状态的对象。

  • merge():Hive进行合并一个部分聚集和另一个部分聚集的时候会调用该方法。

  • terminate():Hive最终聚集结果的时候就会调用该方法。计算函数需要把状态作为一个值返回给用户。

4.部分聚集结果的数据类型和最终结果的数据类型可以不同。

参考:

https://blog.csdn.net/WYpersist/article/details/80314352 写的很好 很详细

https://blog.csdn.net/kent7306/article/details/50200061

https://www.jianshu.com/p/ac352ceab9cd

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

UDF、UDAF和UDTF开发模板 的相关文章

  • hive自定义函数UDF

    Hive自定义函数 UDF xff0c 可以帮助用户轻松实现在hql语句中展现自定义查询结果 这里以一个简单的连接函数来实现用户自定义函数 xff0c 假设表结构如下 xff1a 表中只有两个简单的字段 xff0c id和name 这里实现
  • Hive深入浅出UDTF

    前面两篇文章我们分析了UDF和UDAF的原理以及实现思路 xff0c 这一节我们介绍另外一种UDF UDTF User Defined Table Generating Functions xff0c 是用来解决输入一行输出多行的需求的 x
  • Apache IoTDB’s UDF源码分析(1)

    目录 前言 命令行注册UDF函数 Create Function xxx as 34 全限定类名 34 语法分析 生成物理计划 执行物理计划进行函数注册 Select带有UDF函数的查询 前言 继上个月开始了Apache IoTDB的源码贡
  • UDF、UDAF和UDTF开发模板

    0 背景 Hive是一种构建在Hadoop上的数据仓库 Hive把SQL查询转换为一系列在Hadoop集群中运行的MapReduce作业 是MapReduce更高层次的抽象 不用编写具体的MapReduce方法 Hive将数据组织为表 这就
  • hive 分区表和数据产生关联三种方式

    写在前面 想要从hive数据库里面查询到数据就要求hive的元数据必须存在且元数据指向的的HDFS路径中也必须要存在实际的数据 1 方式一 上传数据后修复 使用的场景是历史数据积累了很多分区数据 推荐使用该方式 该方法将HDFS上的数据方向
  • JAVA maven 编写UDF适用于hive和impala

    hive 内置函数很少 我们可以通过自定义的方式添加新的UDF上去 来增强hive的处理能力 比如hive没有字符串包含的UDF 我们通过Java maven的方式来编写一个字符串包含的UDF 1 新建maven工程 2 修改pom xml
  • hive 表中常用的 增加/修改/替换列操作

    1 语法 更新列 ALTER TABLE table name CHANGE COLUMN col old name col new name column type COMMENT col comment FIRST AFTER colu
  • 如何处理原始可空类型的 Spark UDF 输入/输出

    问题 1 如果输入是包含以下内容的原始类型列 Spark 不会调用 UDFnull inputDF show x null 1 0 inputDF withColumn y udf x Double gt 2 0 apply x will
  • 在 BigQuery 中将 API 调用作为 UDF 的一部分 - 可能吗?

    我想知道是否可以在 BigQuery 中的 UDF 内对 google 地图地理编码 api 进行 api 调用 我有 Google Analytics 地理字段 例如 geoNetwork continent Europe geoNetw
  • 不支持 Any 类型的架构

    我正在尝试创建一个 Spark UDF 以从用户定义的案例类中提取 键 值 对的映射 scala 函数似乎工作正常 但是当我尝试将其转换为 Spark2 0 中的 UDF 时 我遇到了 不支持 Any 类型的架构 错误 case class
  • BigQuery UDF 内部错误

    我们在 BigQuery 中有一个简单的 UDF 它会以某种方式抛出一个不断返回的错误 Query Failed Error An internal error occurred and the request could not be c
  • 如何使用 mysql udf json_extract 0.4.0 从 json 数组中提取行?

    我有一些 sql 想要传递到 mysql 存储过程中 我正在使用 mysql json udfs 0 4 0 labs json udfs linux glibc2 5 x86 64 中的 json 函数 我们正在运行 mysql 5 5
  • 将元组列表作为参数传递给 scala 中的 Spark udf

    我正在尝试将元组列表传递给 scala 中的 udf 我不确定如何为此准确定义数据类型 我试图将其作为整行传递 但它无法真正解决它 我需要根据元组的第一个元素对列表进行排序 然后发回 n 个元素 我已经尝试过以下 udf 定义 def ud
  • 如何从 Scala 方法创建 UDF(计算 md5)?

    我想从两个已经工作的函数构建一个 UDF 我正在尝试计算 md5 哈希作为现有 Spark Dataframe 的新列 def md5 s String String toHex MessageDigest getInstance MD5
  • 如何使用 Spark UDF 返回复杂类型

    您好 提前谢谢您 我的程序是用java编写的 我无法转移到scala 我目前正在使用以下行处理从 json 文件中提取的 Spark DataFrame DataFrame dff sqlContext read json filePath
  • Hive UDF 用于选择除某些列之外的所有列

    HiveQL 以及一般的 SQL 中常见的查询构建模式是选择所有列 SELECT 或明确指定的一组列 SELECT A B C SQL 没有内置机制来选择除指定的一组列之外的所有列 有多种机制可以排除某些列 如中所述这个问题 https s
  • 无法从 Spark SQL 使用现有的 Hive 永久 UDF

    我之前已经向 hive 注册了一个 UDF 是永久的不是TEMPORARY 它直线工作 CREATE FUNCTION normaliseURL AS com example hive udfs NormaliseURL USING JAR
  • scala Spark udf 中没有可用的 typeTag 错误

    在编译以下代码时 我没有找到 Seq String 的类型标签 val post event list evar lookup String gt Seq String pel String gt pel split filterNot c
  • Spark - Java UDF 返回多列

    我正在使用 SparkSql 1 6 2 Java API 我必须处理以下 DataFrame 该 DataFrame 在 2 列中具有值列表 ID AttributeName AttributeValue 0 an1 an2 an3 av
  • 当用户输入的参数多于预期时,强制 VBA 中的 UDF 显示 MsgBox?

    当用户输入太多参数时COUNTBLANK函数 该函数显示此错误消息 并返回到编辑模式 您为此函数输入了太多参数 如何使任何 UDF 都像这样工作 例如 Function COUNT2 c As Range COUNT2 c Count En

随机推荐

  • Flutter组件 - Expanded

    Row Column Flex会被Expanded撑开 充满主轴可用空间 使用方式 Row children
  • C# 获得配置文件存储目录

    在C 中 不同工程为了读取自己的配置文件 由于系统当前目录的问题 往往在不同情况下 使用不同的方法 下面对在什么时候 使用什么方法 做一个整理 一下方法很多是引用别人信息 情况1 如果是一个标准的Win独立应用 或者一个标准的WEB独立应用
  • [Ubuntu]深度学习环境安装NVIDIA-1080+CUDA9.0+cuDnn+Tensorflow-gpu-1.6.0+conda

    1 安装Miniconda wget https mirrors tuna tsinghua edu cn anaconda miniconda Miniconda 1 6 0 Linux x86 64 sh bash Miniconda
  • AngularJS2.0 开发指南

    经过前面的学习 基本了解了Angular2 0的使用 所有的Module都是一个Component 甚至一个事件响应也是一个Component 或者表单验证也可以是一个Component Angular的运作机制图 Angular2 0 A
  • 面向对象高级3-内部类&枚举&泛型

    1 内部类 回顾 之前学了类的四个成员 分别是成员变量 成员方法 代码块 构造器 现在这是第五个成员 内部类 前三个作了解 第四个重点学习 内部类的应用场景 场景 当一个类的内部 包含了一个完整的事物 且这个事物没有必要单独设计时 就可以把
  • 路由中的mata

    一 定义 meta简单来说就是路由元信息 也就是每个路由身上携带的信息 二 使用 1 面包屑 path index name index meta keepAlive true 需要缓存 title 首页 components gt imp
  • linux 一个用户进入另外一个用户的家目录

    a userb use b 用户 cd home a 只有查看权限chmod 755 home a 转载于 https blog 51cto com wsxxsl 2096507
  • Python之区块链简单记账本实现

    在上一篇 Python之区块链入门 中讲述了区块链的基础知识 并用Python实现了区块和区块链的结构 在本篇中 将基于上面的内容实现一个简单的记账本功能 记账本的功能如下 实现基本的收支记录 计算当前余额 对收支情况做简单统计分析 账单记
  • android studio 导入module作为lib使用

    android studio 导入module作为lib使用 1 将 android module导入 android project 中 2 在要作为lib导入的module 的build gradle文件中添加一行 apply plug
  • 若依前后端分离版3、用户角色权限和动态菜单

    文章目录 一 用户角色和权限 1 前端 2 后端 一 用户角色和权限 1 前端 我们通过登陆 F12进行查看发现还有getinfo和getRouters方法 我们发现若依在页面跳转的时候都会出现这两个方法 这其实就是我们在路由里边配置的东西
  • 汽车智能座椅系统

    概述 自动驾驶领域日渐成熟 将催生一些新应用场景 如休闲 娱乐 社交和健康等 传统的座椅控制系统无法满足人们新的需求 更安全 更舒适 智能化及健康化体验将成为未来智能座椅的方向 恒润凭借汽车电子技术的积累 能够提供智能汽车座椅的解决方案 为
  • 笔记

    零散个人笔记 书籍已出版 完整版 淘宝 京东 当当有售 1 tensorflow源码完整下载方法 git clone recurse submodules https github com tensorflow tensorflow git
  • 作业 从外到内:一次完整的渗透测试!作业

    9th 一 环境准备 Windows10 1709地址 WindowsServer2016 x64 修改了密码 原密码 lonelyor org UbuntuServer2004 x64 UbuntuServer1604 x64 pfsen
  • Qt实现coturn穿透客户端,coturn服务器搭建

    目录 coturn简介 coturn服务器搭建 coturn服务验证 qt实现coturn穿透 NAT类型是否可以穿透 coturn简介 Coturn集成了stun turn协议 实现NAT检测 穿透就需要通过stun协议 NAT检测无法进
  • 渗透测试核心思路-边界突破

    概述 渗透测试的目标可以是单个主机 也可以是整个内网 在实战中 比如最近如火如荼的HW行动 更多的是对一个目标的内网进行渗透 争取获得所有有价值的资产 完整的内网渗透涉及的步骤如下图所示 我们总是先通过对外提供服务的 防守最薄弱的主机打进去
  • c++:继承(超详解)

    目录 一 什么是继承 二 继承的格式 继承的总结 二 子类和父类 基类和派生类 1 子类和父类的相互赋值 2 同名的成员变量 3 同名成员函数 三 子类中默认的成员函数 1 构造函数 2 析构函数 3 拷贝构造 4 赋值运算符重载 四 单继
  • 数组中和为0的三个数

    给你一个整数数组 nums 判断是否存在三元组 nums i nums j nums k 满足 i j i k 且 j k 同时还满足 nums i nums j nums k 0 请你返回所有和为 0 且不重复的三元组 注意 答案中不可以
  • 正六边形旋转实现

    1 行内样式 div style background none div
  • Jenkins :添加node权限获取凭据、执行命令

    拥有Jenkins agent权限的账号可以对node节点进行操作 通过添加不同的node可以让流水线项目在不同的节点上运行 安装Jenkins的主机默认作为master节点 1 Jenkins 添加node获取明文凭据 通过添加node节
  • UDF、UDAF和UDTF开发模板

    0 背景 Hive是一种构建在Hadoop上的数据仓库 Hive把SQL查询转换为一系列在Hadoop集群中运行的MapReduce作业 是MapReduce更高层次的抽象 不用编写具体的MapReduce方法 Hive将数据组织为表 这就