无法从 Spark SQL 使用现有的 Hive 永久 UDF

2024-02-12

我之前已经向 hive 注册了一个 UDF。是永久的不是TEMPORARY。它直线工作。

CREATE FUNCTION normaliseURL AS 'com.example.hive.udfs.NormaliseURL' USING JAR 'hdfs://udfs/hive-udfs.jar';

我已将 Spark 配置为使用配置单元元存储。该配置正在运行,因为我可以查询配置单元表。我可以看到 UDF;

In [9]: spark.sql('describe function normaliseURL').show(truncate=False)
+-------------------------------------------+
|function_desc                              |
+-------------------------------------------+
|Function: default.normaliseURL             |
|Class: com.example.hive.udfs.NormaliseURL  |
|Usage: N/A.                                |
+-------------------------------------------+

但是我无法在 sql 语句中使用 UDF;

spark.sql('SELECT normaliseURL("value")')
AnalysisException: "Undefined function: 'default.normaliseURL'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7"

如果我尝试使用 Spark 注册 UDF(绕过元存储),则无法注册它,这表明它已经存在。

In [12]: spark.sql("create function normaliseURL as 'com.example.hive.udfs.NormaliseURL'")
AnalysisException: "Function 'default.normaliseURL' already exists in database 'default';"

我正在使用 Spark 2.0,hive 元存储 1.1.0。 UDF是scala,我的spark驱动程序代码是python。

我很困惑。

  • 我关于 Spark 可以利用元存储定义的永久 UDF 的假设是否正确?
  • 我是否在配置单元中正确创建了该函数?

问题是 Spark 2.0 无法执行 JAR 位于 HDFS 上的函数。

Spark SQL:Thriftserver 无法运行已注册的 Hive UDTF https://issues.apache.org/jira/browse/SPARK-18832

一种解决方法是将该函数定义为 Spark 作业中的临时函数,并将 jar 路径指向本地边缘节点路径。然后在同一个 Spark 作业中调用该函数。

CREATE TEMPORARY FUNCTION functionName as 'com.test.HiveUDF' USING JAR '/user/home/dir1/functions.jar'
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

无法从 Spark SQL 使用现有的 Hive 永久 UDF 的相关文章

  • 如何在不从 DataFrame 转换并访问它的情况下向数据集添加列?

    我知道使用以下方法将新列添加到 Spark 数据集的方法 withColumn and a UDF 它返回一个 DataFrame 我还知道 我们可以将生成的 DataFrame 转换为 DataSet 我的问题是 如果我们仍然遵循传统的
  • 了解 Spark 中的 DAG

    问题是我有以下 DAG 我认为当需要洗牌时 火花将工作划分为不同的阶段 考虑阶段 0 和阶段 1 有些操作不需要洗牌 那么为什么 Spark 将它们分成不同的阶段呢 我认为跨分区的实际数据移动应该发生在第 2 阶段 因为这里我们需要cogr
  • 如何使用 Scala 从 Spark 更新 ORC Hive 表

    我想更新 orc 格式的 hive 表 我可以从 ambari hive 视图进行更新 但无法从 sacla spark shell 运行相同的更新语句 objHiveContext sql select from table name 能
  • pyspark加入多个条件

    我如何指定很多条件 当我使用pyspark时 join 例子 与蜂巢 query select a NUMCNT b NUMCNT as RNUMCNT a POLE b POLE as RPOLE a ACTIVITE b ACTIVIT
  • 如何创建 HIVE 表来读取分号分隔值

    我想创建一个 HIVE 表 该表将以分号分隔的值读取 但我的代码不断给出错误 有没有人有什么建议 CREATE TABLE test details Time STRING Vital STRING sID STRING PARTITION
  • InvalidRequestException(为什么:empid 如果包含 Equal,则不能被多个关系限制)

    这是关于我从 Apache Spark 查询 Cassandra 时遇到的问题 Spark 的正常查询工作正常 没有任何问题 但是当我使用关键条件进行查询时 出现以下错误 最初 我尝试查询复合键列族 它也给出了与下面相同的问题 由以下原因引
  • Spark SQL sql("").first().getDouble(0) 给我不一致的结果

    我有下面的查询 它应该找到列值的平均值并返回一个数字的结果 val avgVal hiveContext sql select round avg amount 4 from users payment where dt between 2
  • 更改 Spark Streaming 中的输出文件名

    我正在运行一个 Spark 作业 就逻辑而言 它的性能非常好 但是 当我使用 saveAsTextFile 将文件保存在 s3 存储桶中时 输出文件的名称格式为 part 00000 part 00001 等 有没有办法更改输出文件名 谢谢
  • Spark-1.6.1 上的 DMLC 的 XGBoost-4j

    我正在尝试在 Spark 1 6 1 上使用 DMLC 的 XGBoost 实现 我能够使用 XGBoost 训练我的数据 但在预测方面面临困难 我实际上想以在 Apache Spark mllib 库中完成的方式进行预测 这有助于计算训练
  • 使用已知模式保存空 DataFrame (Spark 2.2.1)

    是否可以使用已知模式保存一个空的 DataFrame 以便将该模式写入文件 即使它有 0 条记录 def example spark SparkSession path String schema StructType val datafr
  • 如何在不使用 .toPandas() hack 的情况下提取 PySpark 中对长度敏感的特征?

    我是 PySpark 的新手 我想翻译特征提取 FE 将 pythonic 部分脚本放入 PySpark 中 首先 我有所谓的 Spark 数据框sdf包括 2 列 A 和 B 下面是示例 data A B https example1 o
  • “为 Apache Hadoop 2.7 及更高版本预构建”是什么意思?

    Apache Spark 下载页面上的 pre built for Apache Hadoop 2 7 and later 是什么意思 这是否意味着spark中HDFS必须有库 如果是这样 其他存储系统 例如 Cassandra s3 HB
  • 缩放数据框的每一列

    我正在尝试缩放数据框的每一列 首先 我将每一列转换为向量 然后使用 ml MinMax Scaler 除了简单地重复它之外 是否有更好 更优雅的方法将相同的函数应用于每一列 import org apache spark ml linalg
  • 如何删除spark输出中的compactbuffer

    下面是我在spark shell中运行的程序 但是当我将输出保存在HDFS中时 我得到带有compactbuffer的输出 如何删除spark输出中的compactbuffer Program val a sc textFile datag
  • Spark 对 RDD 中按值排序

    我有一个火花对 RDD 键 计数 如下 Array String Int Array a 1 b 2 c 1 d 3 使用spark scala API如何获取按值排序的新RDD对 所需结果 Array d 3 b 2 a 1 c 1 这应
  • 在 Jupyter 笔记本中使用 PySpark 读取 XML

    我正在尝试读取 XML 文件 df spark read format com databricks spark xml load path to my xml 并收到以下错误 java lang ClassNotFoundExceptio
  • collect_list() 是否保持行的相对顺序?

    想象一下我有以下 DataFrame df id featureName featureValue id1 a 3 id1 b 4 id2 a 2 id2 c 5 id3 d 9 想象一下我运行 df groupBy id agg coll
  • Spark Shuffle 写入超慢

    为什么对于 1 6MB shuffle 写入和 2 4MB 输入 spark shuffle 阶段如此缓慢 为什么 shuffle 写入仅发生在一个执行器上 我正在运行一个 3 节点集群 每个集群有 8 个核心 火花用户界面 Code Ja
  • hive 添加分区语句忽略前导零

    我在 hdfs 上有文件夹 user test year 2016 month 04 dt 25 000000 0 需要将上面的分区路径添加到test table 命令 ALTER TABLE test ADD IF NOT EXISTS
  • Spark shuffle 溢出指标

    在 Spark 2 3 集群上运行作业时 我在 Spark WebUI 中注意到某些任务发生了溢出 据我所知 在reduce端 reducer获取所需的分区 随机读取 然后使用执行器的执行内存执行reduce计算 由于没有足够的执行内存 一

随机推荐