如何从 Scala 方法创建 UDF(计算 md5)?

2024-01-07

我想从两个已经工作的函数构建一个 UDF。我正在尝试计算 md5 哈希作为现有 Spark Dataframe 的新列。

def md5(s: String): String = { toHex(MessageDigest.getInstance("MD5").digest(s.getBytes("UTF-8")))}
def toHex(bytes: Array[Byte]): String = bytes.map("%02x".format(_)).mkString("")

结构(到目前为止我所拥有的)

val md5_hash: // UDF Implementation
val sqlfunc = udf(md5_hash)
val new_df = load_df.withColumn("New_MD5_Column", sqlfunc(col("Duration")))

不幸的是我不知道如何正确地实现该函数作为 UDF。


为什么不使用内置的md5 http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions%24@md5(e:org.apache.spark.sql.Column):org.apache.spark.sql.Column功能?

md5(e: 列): 列计算二进制列的 MD5 摘要并以 32 个字符的十六进制字符串形式返回值。

然后您可以按如下方式使用它:

val new_df = load_df.withColumn("New_MD5_Column", md5($"Duration"))

您必须确保该列是二进制类型,因此如果它是 int,您可能会看到以下错误:

org.apache.spark.sql.AnalysisException:无法解析'md5(Duration)' 由于数据类型不匹配:参数 1 需要二进制类型,但是,'Duration' 是 int 类型。;;

然后您应该将类​​型更改为md5-兼容,即二进制类型,使用bin http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions%24@bin(e:org.apache.spark.sql.Column):org.apache.spark.sql.Column功能。

bin(e: 列): 列返回给定长列的二进制值的字符串表示形式的表达式。例如,bin("12")回报"1100".

那么解决方案可能如下:

val solution = load_df.
  withColumn("bin_duration", bin($"duration")).
  withColumn("md5", md5($"bin_duration"))
scala> solution.show(false)
+--------+------------+--------------------------------+
|Duration|bin_duration|md5                             |
+--------+------------+--------------------------------+
|1       |1           |c4ca4238a0b923820dcc509a6f75849b|
+--------+------------+--------------------------------+

您还可以将函数“链接”在一起,并在一个函数中进行转换和计算 MD5withColumn,但我更喜欢将步骤分开,以防出现需要解决的问题,并且中间步骤通常会有所帮助。

表现

您会考虑使用内置函数的原因bin and md5自定义用户定义函数 (UDF) 的优点是could由于 Spark SQL 处于完全控制状态,因此可以获得更好的性能would不添加额外的步骤来序列化和反序列化内部行表示。

这里的情况并非如此,但仍然需要较少的导入和使用。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何从 Scala 方法创建 UDF(计算 md5)? 的相关文章

随机推荐