火花 >= 2.2
有一个DataFrame
base ml
提供的APIAssociationRules
:
from pyspark.ml.fpm import FPGrowth
data = ...
fpm = FPGrowth(minSupport=0.3, minConfidence=0.9).fit(data)
associationRules = fpm.associationRules.
火花
目前PySpark不支持提取关联规则(DataFrame
based FPGrowth
支持 Python 的 API 正在进行中SPARK-1450)但我们可以轻松解决这个问题。
首先你必须安装 SBT(只需下载页面)并按照适用于您的操作系统的说明进行操作。
接下来,您必须创建一个仅包含两个文件的简单 Scala 项目:
.
├── AssociationRulesExtractor.scala
└── build.sbt
您可以稍后调整以遵循建立的目录结构.
接下来将以下内容添加到build.sbt
(调整 Scala 版本和 Spark 版本以匹配您使用的版本):
name := "fpm"
version := "1.0"
scalaVersion := "2.10.6"
val sparkVersion = "1.6.2"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion
)
并遵循AssociationRulesExtractor.scala
:
package com.example.fpm
import org.apache.spark.mllib.fpm.AssociationRules.Rule
import org.apache.spark.rdd.RDD
object AssociationRulesExtractor {
def apply(rdd: RDD[Rule[String]]) = {
rdd.map(rule => Array(
rule.confidence, rule.javaAntecedent, rule.javaConsequent
))
}
}
打开您选择的终端模拟器,转到项目的根目录并调用:
sbt package
它会在目标目录中生成一个jar文件。例如在 Scala 2.10 中它将是:
target/scala-2.10/fpm_2.10-1.0.jar
启动 PySpark shell 或使用spark-submit
并将路径传递给生成的 jar 文件--driver-class-path
:
bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar
非本地模式下:
bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar --jars /path/to/fpm_2.10-1.0.jar
在集群模式下,jar 应该存在于所有节点上。
添加一些方便的包装:
from pyspark import SparkContext
from pyspark.mllib.fpm import FPGrowthModel
from pyspark.mllib.common import _java2py
from collections import namedtuple
rule = namedtuple("Rule", ["confidence", "antecedent", "consequent"])
def generateAssociationRules(model, minConfidence):
# Get active context
sc = SparkContext.getOrCreate()
# Retrieve extractor object
extractor = sc._gateway.jvm.com.example.fpm.AssociationRulesExtractor
# Compute rules
java_rules = model._java_model.generateAssociationRules(minConfidence)
# Convert rules to Python RDD
return _java2py(sc, extractor.apply(java_rules)).map(lambda x:rule(*x))
最后,您可以将这些助手用作函数:
generateAssociationRules(model, 0.9)
或作为一种方法:
FPGrowthModel.generateAssociationRules = generateAssociationRules
model.generateAssociationRules(0.9)
此解决方案依赖于内部 PySpark 方法,因此不能保证它在版本之间可移植。