将 IndexToString 应用于 Spark 中的特征向量

2024-05-15

Context:我有一个数据框,其中所有分类值都已使用 StringIndexer 进行索引。

val categoricalColumns = df.schema.collect { case StructField(name, StringType, nullable, meta) => name }    

val categoryIndexers = categoricalColumns.map {
  col => new StringIndexer().setInputCol(col).setOutputCol(s"${col}Indexed") 
}

然后我使用 VectorAssembler 对所有特征列(包括索引的分类列)进行矢量化。

val assembler = new VectorAssembler()
    .setInputCols(dfIndexed.columns.diff(List("label") ++ categoricalColumns))
    .setOutputCol("features")

应用分类器和一些额外的步骤后,我最终得到一个包含标签、特征和预测的数据框。我想将我的特征向量扩展为单独的列,以便将索引值转换回原始字符串形式。

val categoryConverters = categoricalColumns.zip(categoryIndexers).map {
colAndIndexer => new IndexToString().setInputCol(s"${colAndIndexer._1}Indexed").setOutputCol(colAndIndexer._1).setLabels(colAndIndexer._2.fit(df).labels)
}

问题:有没有simple这样做的方法,或者是以某种方式将预测列附加到测试数据帧的最佳方法?

我尝试过的:

val featureSlicers = categoricalColumns.map {
  col => new VectorSlicer().setInputCol("features").setOutputCol(s"${col}Indexed").setNames(Array(s"${col}Indexed"))
}

应用这个给了我我想要的列,但它们是矢量形式(正如它的意思)而不是类型 Double。

Edit:所需的输出是原始数据框(即分类特征作为字符串而不是索引),并带有指示预测标签的附加列(在我的例子中为 0 或 1)。

例如,假设我的分类器的输出如下所示:

+-----+---------+----------+
|label| features|prediction|
+-----+---------+----------+
|  1.0|[0.0,3.0]|       1.0|
+-----+---------+----------+

通过在每个功能上应用 VectorSlicer 我会得到:

+-----+---------+----------+-------------+-------------+
|label| features|prediction|statusIndexed|artistIndexed|
+-----+---------+----------+-------------+-------------+
|  1.0|[0.0,3.0]|       1.0|        [0.0]|        [3.0]|
+-----+---------+----------+-------------+-------------+

这很棒,但我需要:

+-----+---------+----------+-------------+-------------+
|label| features|prediction|statusIndexed|artistIndexed|
+-----+---------+----------+-------------+-------------+
|  1.0|[0.0,3.0]|       1.0|         0.0 |         3.0 |
+-----+---------+----------+-------------+-------------+

然后能够使用 IndexToString 并将其转换为:

+-----+---------+----------+-------------+-------------+
|label| features|prediction|    status   |    artist   |
+-----+---------+----------+-------------+-------------+
|  1.0|[0.0,3.0]|       1.0|        good |  Pink Floyd |
+-----+---------+----------+-------------+-------------+

or even:

+-----+----------+-------------+-------------+
|label|prediction|    status   |    artist   |
+-----+----------+-------------+-------------+
|  1.0|       1.0|        good |  Pink Floyd |
+-----+----------+-------------+-------------+

嗯,这不是一个非常有用的操作,但应该可以使用列元数据和简单的 UDF 来提取所需的信息。我假设您的数据已经创建了类似于此的管道:

import org.apache.spark.ml.feature.{VectorSlicer, VectorAssembler, StringIndexer}
import org.apache.spark.ml.Pipeline

val df = sc.parallelize(Seq(
  (1L, "a", "foo", 1.0), (2L, "b", "bar", 2.0), (3L, "a", "bar", 3.0)
)).toDF("id", "x1", "x2", "x3")

val featureCols = Array("x1", "x2", "x3")
val featureColsIdx = featureCols.map(c => s"${c}_i")

val indexers = featureCols.map(
  c => new StringIndexer().setInputCol(c).setOutputCol(s"${c}_i")
)

val assembler = new VectorAssembler()
  .setInputCols(featureColsIdx)
  .setOutputCol("features")

val slicer = new VectorSlicer()
  .setInputCol("features")
  .setOutputCol("string_features")
  .setNames(featureColsIdx.init)


val transformed = new Pipeline()
  .setStages(indexers :+ assembler :+ slicer)
  .fit(df)
  .transform(df)

首先我们可以从特征中提取所需的元数据:

val meta = transformed.select($"string_features")
  .schema.fields.head.metadata
  .getMetadata("ml_attr") 
  .getMetadata("attrs")
  .getMetadataArray("nominal")

并将其转换为更易于使用的东西

case class NominalMetadataWrapper(idx: Long, name: String, vals: Array[String])

// In general it could a good idea to make it a broadcast variable
val lookup = meta.map(m => NominalMetadataWrapper(
  m.getLong("idx"), m.getString("name"), m.getStringArray("vals")
))

最后是一个小的UDF:

import scala.util.Try

val transFeatures = udf((v: Vector) => lookup.map{
  m => Try(m.vals(v(m.idx.toInt).toInt)).toOption
})

transformed.select(transFeatures($"string_features")).
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

将 IndexToString 应用于 Spark 中的特征向量 的相关文章

  • Spark 输出:日志式与进度式

    spark submit两个不同集群 都运行 Spark 1 2 上的输出看起来不同 一个是 日志式 即大量消息流 例如 15 04 06 14 53 13 INFO TaskSetManager Starting task 262 0 i
  • 帮助我理解这段 Scala 代码:scalaz IO Monad 和隐式

    这是后续this https stackoverflow com questions 7404495 help me understand this scala code scalaz io monad问题 这是我试图理解的代码 它来自ht
  • Spark 在执行 jdbc 保存时给出空指针异常

    您好 当我执行以下代码行时 我得到以下堆栈跟踪 transactionDF write format jdbc option url SqlServerUri option driver driver option dbtable full
  • Pyspark dataframe:如何按组应用 scipy.optimize 函数

    我有一段运行良好的代码 但使用 pandas 数据帧 groupby 处理 但是 由于文件很大 gt 7000 万组 我需要转换代码以使用 PYSPARK 数据框架 这是使用 pandas dataframe 和小示例数据的原始代码 imp
  • 有没有好的 Clojure 基准测试?

    Edit Clojure 基准测试已达到基准游戏 http benchmarksgame alioth debian org u64q clojure html 我已经制作了这个问题社区维基并邀请其他人保持更新 有人知道 Clojure 性
  • scala 中的模拟案例类:Mockito

    在我的游戏应用程序中 我打算模拟一个案例类 我可以这样做 但它创建了一个所有成员变量都为空的对象 有没有办法创建案例类的模拟对象 以便该对象可以初始化一些成员 case class User name String address Stri
  • 如何使用 `ProjectRef` 来引用 sbt 1.x 中的本地项目?

    其他答案中有很多含糊不清的内容 或者涉及到更旧版本的 sbt 即 0 12 x 但似乎没有人真正回答这个问题 鉴于我有一个文件夹 并且我已经运行 sbt new scala scala seed g8 name Scala Seed Pro
  • Spark如何选择节点来运行执行器?(spark on YARN)

    Spark如何选择节点来运行执行器 spark on YARN 我们使用 Spark on Yarn 模式 集群有 120 个节点 昨天 一个 Spark 作业创建了 200 个执行程序 而节点 1 上有 11 个执行程序 Node2上有1
  • 将额外的参数传递给多态函数?

    我有一个多态函数 可以将列表转换为集合 import shapeless PolyDefns gt import shapeless val lists List 1 2 List A B List 1 1 2 2 HNil object
  • 内存泄漏在哪里?

    我使用 InetAddress 来解析 IP 地址 但现在如果 IP 不可用 则需要存储主机名 所以我介绍了一个班级Host case class Host name String ip InetAddress import Host ad
  • Scala 'null' 是否算作另一种类型的实例?

    我有这个代码 class MyLinkedList T h T tail MyLinkedList T def prepend v T MyLinkedList T new MyLinkedList v this 我想知道我如何可以将第二个
  • 如何使用 with open 在 pySpark 中打开存储在 HDFS 中的文件

    如何打开存储在 HDFS 中的文件 这里输入文件来自 HDFS 如果我按如下方式提供文件 我将无法打开 它将显示为找不到文件 from pyspark import SparkConf SparkContext conf SparkConf
  • 在 Spark 中将流式 XML 转换为 JSON

    我是 Spark 新手 正在开发一个简单的应用程序 将从 Kafka 接收的 XML 流转换为 JSON 格式 Using 火花2 4 5 斯卡拉 2 11 12 在我的用例中 kafka 流采用 xml 格式 以下是我尝试过的代码 val
  • 如何列出所有 sbt 依赖项?

    我需要列出所有 sbt 依赖项 以便检查是否已存在 debian 软件包 我还注意到有一个 DEB 包 http www scala sbt org 0 13 tutorial Installing sbt on Linux html但似乎
  • Scala 中的 Apply 和 lambda

    我有下面的代码 scala gt val builder new StringBuilder foo bar baz builder StringBuilder foo bar baz scala gt 0 until 5 foreach
  • [json4s]:提取不同对象的数组

    我正在使用 facebook graph API 响应看起来与此类似 data id 311620272349920 311718615673419 from id 1456046457993048 name Richard Ettinso
  • Spark Streaming 中是否需要检查点

    我注意到 Spark 流示例也有检查点代码 我的问题是检查点有多重要 如果是为了容错 那么在此类流应用程序中发生故障的频率是多少 这一切都取决于您的用例 假设您正在运行一个流作业 它仅从 Kafka 读取数据并计算记录数 如果您的应用程序在
  • Spark、pyspark中从TF-IDF到LDA聚类

    我正在尝试对存储在格式键 listofwords 中的推文进行聚类 我的第一步是使用 dataframe 提取单词列表的 TF IDF 值 dbURL hdfs pathtodir file sc textFile dbURL Define
  • Scala 警告、IntelliJ 和编译器标志

    我目前正在试用 IntelliJ Scala 插件 有件事让我有点烦恼 编译时我收到 3 个警告 Warning scala Recompiling 4 files Warning scala Warning scala there wer
  • 在 Spark 中将多行汇总为单行和单列

    我有一个如下的火花 DF 我需要汇总具有与单行相同 ID 的多行 但值应该不同 id values 1 hello 1 hello Sam 1 hello Tom 2 hello 2 hello Tom 预期输出 id values 1 h

随机推荐