我刚刚发布了一个solution实际上使用 DataFrame/Dataset。该帖子使用星球大战数据集在 R 中构建模型,然后在 Spark 测试集上对 MOJO 进行评分。我将在这里粘贴唯一相关的部分:
使用 Spark(和 Scala)评分
您可以使用spark-submit 或spark-shell。如果您使用spark-submit,则需要将h2o-genmodel.jar放在spark应用程序根目录的lib文件夹下,以便在编译期间将其添加为依赖项。以下代码假设您正在运行 Spark-shell。为了使用 h2o-genmodel.jar,您需要在启动 Spark-shell 时通过提供 --jar 标志来附加 jar 文件。例如:
/usr/lib/spark/bin/spark-shell \
--conf spark.serializer="org.apache.spark.serializer.KryoSerializer" \
--conf spark.driver.memory="3g" \
--conf spark.executor.memory="10g" \
--conf spark.executor.instances=10 \
--conf spark.executor.cores=4 \
--jars /path/to/h2o-genmodel.jar
现在在 Spark shell 中,导入依赖项
import _root_.hex.genmodel.easy.{EasyPredictModelWrapper, RowData}
import _root_.hex.genmodel.MojoModel
使用数据框
val modelPath = "/path/to/zip/file"
val dataPath = "/path/to/test/data"
// Import data
val dfStarWars = spark.read.option("header", "true").csv(dataPath)
// Import MOJO model
val mojo = MojoModel.load(modelPath)
val easyModel = new EasyPredictModelWrapper(mojo)
// score
val dfScore = dfStarWars.map {
x =>
val r = new RowData
r.put("height", x.getAs[String](1))
r.put("mass", x.getAs[String](2))
val score = easyModel.predictBinomial(r).classProbabilities
(x.getAs[String](0), score(1))
}.toDF("name", "isHumanScore")
变量 Score 是级别 0 和 1 的两个分数的列表。score(1) 是级别 1 的分数,即“人类”。默认情况下,map 函数返回一个带有未指定列名“_1”、“_2”等的 DataFrame。您可以通过调用 toDF 重命名列。
使用数据集
要使用 Dataset API,我们只需要创建两个案例类,一类用于输入数据,一类用于输出。
case class StarWars (
name: String,
height: String,
mass: String,
is_human: String
)
case class Score (
name: String,
isHumanScore: Double
)
// Dataset
val dtStarWars = dfStarWars.as[StarWars]
val dtScore = dtStarWars.map {
x =>
val r = new RowData
r.put("height", x.height)
r.put("mass", x.mass)
val score = easyModel.predictBinomial(r).classProbabilities
Score(x.name, score(1))
}
使用Dataset,您可以通过直接调用x.columnName 来获取列的值。请注意,列值的类型必须是 String,因此如果它们是案例类中定义的其他类型,则可能需要手动转换它们。