创建一个映射来调用 Spark Dataframe 的每一行的 POJO

2023-12-03

我在 R 中构建了一个 H2O 模型并保存了 POJO 代码。我想使用 POJO 在 hdfs 中对 parquet 文件进行评分,但我不知道如何去做。我计划将 parquet 文件读入 Spark (scala/SparkR/PySpark) 并在那里对其进行评分。下面是我在上面找到的摘录H2O 的文档页面。

“如何在 Spark 集群上运行 POJO?

POJO 仅提供进行预测的数学逻辑,因此您不会在那里找到任何 Spark(甚至 H2O)特定的代码。如果您想使用 POJO 对 Spark 中的数据集进行预测,请创建一个映射来为每一行调用 POJO,并将结果逐行保存到新列中”

有人有一些示例代码说明我如何做到这一点吗?我将非常感谢任何帮助。我主要在 R 和 SparkR 中编写代码,我不确定如何将 POJO“映射”到每一行。

提前致谢。


我刚刚发布了一个solution实际上使用 DataFrame/Dataset。该帖子使用星球大战数据集在 R 中构建模型,然后在 Spark 测试集上对 MOJO 进行评分。我将在这里粘贴唯一相关的部分:

使用 Spark(和 Scala)评分

您可以使用spark-submit 或spark-shell。如果您使用spark-submit,则需要将h2o-genmodel.jar放在spark应用程序根目录的lib文件夹下,以便在编译期间将其添加为依赖项。以下代码假设您正在运行 Spark-shell。为了使用 h2o-genmodel.jar,您需要在启动 Spark-shell 时通过提供 --jar 标志来附加 jar 文件。例如:

/usr/lib/spark/bin/spark-shell \
--conf spark.serializer="org.apache.spark.serializer.KryoSerializer" \
--conf spark.driver.memory="3g" \
--conf spark.executor.memory="10g" \
--conf spark.executor.instances=10 \
--conf spark.executor.cores=4 \
--jars /path/to/h2o-genmodel.jar

现在在 Spark shell 中,导入依赖项

import _root_.hex.genmodel.easy.{EasyPredictModelWrapper, RowData}
import _root_.hex.genmodel.MojoModel

使用数据框

val modelPath = "/path/to/zip/file"
val dataPath = "/path/to/test/data"

// Import data
val dfStarWars = spark.read.option("header", "true").csv(dataPath)
// Import MOJO model
val mojo = MojoModel.load(modelPath)
val easyModel = new EasyPredictModelWrapper(mojo)

// score
val dfScore = dfStarWars.map {
  x =>
    val r = new RowData
    r.put("height", x.getAs[String](1))
    r.put("mass", x.getAs[String](2))
    val score = easyModel.predictBinomial(r).classProbabilities
    (x.getAs[String](0), score(1))
}.toDF("name", "isHumanScore")

变量 Score 是级别 0 和 1 的两个分数的列表。score(1) 是级别 1 的分数,即“人类”。默认情况下,map 函数返回一个带有未指定列名“_1”、“_2”等的 DataFrame。您可以通过调用 toDF 重命名列。

使用数据集

要使用 Dataset API,我们只需要创建两个案例类,一类用于输入数据,一类用于输出。

case class StarWars (
  name: String,
  height: String,
  mass: String,
  is_human: String
)

case class Score (
  name: String,
  isHumanScore: Double
)


// Dataset
val dtStarWars = dfStarWars.as[StarWars]
val dtScore = dtStarWars.map {
  x =>
    val r = new RowData
    r.put("height", x.height)
    r.put("mass", x.mass)
    val score = easyModel.predictBinomial(r).classProbabilities
    Score(x.name, score(1))
}

使用Dataset,您可以通过直接调用x.columnName 来获取列的值。请注意,列值的类型必须是 String,因此如果它们是案例类中定义的其他类型,则可能需要手动转换它们。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

创建一个映射来调用 Spark Dataframe 的每一行的 POJO 的相关文章

随机推荐