让我们首先修复导入以消除歧义
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.feature.{StringIndexer, VectorIndexer}
import org.apache.spark.ml.{Pipeline, PipelineStage}
import org.apache.spark.ml.linalg.Vectors
我将使用您使用的相同数据:
val training = sqlContext.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
然后创建管道阶段:
val stages = new scala.collection.mutable.ArrayBuffer[PipelineStage]()
- 对于分类,重新索引类别:
val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(training)
- 使用 VectorIndexer 识别分类特征
val featuresIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(10).fit(training)
stages += featuresIndexer
val tmp = featuresIndexer.transform(labelIndexer.transform(training))
- 学习随机森林
val rf = new RandomForestClassifier().setFeaturesCol(featuresIndexer.getOutputCol).setLabelCol(labelIndexer.getOutputCol)
stages += rf
val pipeline = new Pipeline().setStages(stages.toArray)
// Fit the Pipeline
val pipelineModel = pipeline.fit(tmp)
val results = pipelineModel.transform(training)
results.show
//+-----+--------------+---------------+-------------+-----------+----------+
//|label| features|indexedFeatures|rawPrediction|probability|prediction|
//+-----+--------------+---------------+-------------+-----------+----------+
//| 1.0| [0.0,1.1,0.1]| [0.0,1.0,2.0]| [1.0,19.0]|[0.05,0.95]| 1.0|
//| 0.0|[2.0,1.0,-1.0]| [1.0,0.0,0.0]| [17.0,3.0]|[0.85,0.15]| 0.0|
//| 0.0| [2.0,1.3,1.0]| [1.0,3.0,3.0]| [14.0,6.0]| [0.7,0.3]| 0.0|
//| 1.0|[0.0,1.2,-0.5]| [0.0,2.0,1.0]| [1.0,19.0]|[0.05,0.95]| 1.0|
//+-----+--------------+---------------+-------------+-----------+----------+
参考:关于步骤 1. 和 2.,对于那些想要了解更多详细信息的人特征转换器,建议你阅读官方文档here https://spark.apache.org/docs/1.5.1/ml-features.html#feature-transformers.