Spark MLlib:为每个数据组构建分类器

2024-03-26

我已经标记了向量(LabeledPoint-staged 由一些组号组成。对于每个组,我需要创建独立逻辑回归分类器:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}

object Scratch {

  val train = Seq(
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
  )

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setMaster("local[5]")
      .setAppName("Scratch")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)

    val trainRDD = sc.parallelize(train)
    val modelByGroup = trainRDD.groupByKey().map({case (group, iter) => 
                           (group, new LogisticRegressionWithLBFGS().run(iter))})
  }

}

LogisticRegressionWithLBFGS().run(iter)无法编译,因为runRDD而不是迭代器groupBy返回。 请告知如何构建与输入数据中的组(标签)一样多的分类器。

Update- 演示嵌套 RDD 迭代不起作用:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.{Vector, Vectors}

object Scratch {

  val train = Seq(
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))))),
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((1, 1.5), (2, 4.0))))),
    (1, LabeledPoint(0, Vectors.sparse(3, Seq((0, 2.0), (1, 1.0), (2, 3.5))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 3.0), (2, 7.0))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.0), (1, 3.0))))),
    (8, LabeledPoint(0, Vectors.sparse(3, Seq((0, 1.5), (2, 4.0)))))
  )

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setMaster("local[5]")
      .setAppName("Scratch")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)

    val trainRDD = sc.parallelize(train)
    val keys : RDD[Int] = trainRDD.map({case (key,_) => key}).distinct
    for (key <- keys) {
    // key is Int here!
      // Get train data for the current group (key):
      val groupTrain = trainRDD.filter({case (x, _) => x == key }).cache()

      /**
       * Which results in org.apache.spark.SparkException:
       * RDD transformations and actions can only be invoked by the driver,
       * not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid
       * because the values transformation and count action cannot be performed inside of the rdd1.map transformation.
       * For more information, see SPARK-5063. at org.apache.spark.rdd.RDD.sc(RDD.scala:87)
       */
    }
  }
}

看起来无法在其他转换中使用转换,对吗?


如果您在每个组上使用分类器,则不需要 mllib。 Mllib 旨在与分布式集一起使用(您的集不是每个工作人员上都有大量本地集)。您可以使用一些本地机器学习库,例如weka https://weka在地图功能的每个组上。

EDIT:

val keys = wholeRDD.map(_._1).distinct.collect

var models = List()
for (key <- keys) {
  val valuesForKey = wholeRDD.filter(_._1 == key)
  // train model
  ...
  models = model::models
}
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Spark MLlib:为每个数据组构建分类器 的相关文章

随机推荐

  • Grails 全局约束

    在 1 2 版本中 Grails 引入了全局约束 我尝试将以下内容添加到 Config groovy grails gorm default constraints notBlank nullable false blank false 然
  • NHibernate ThenFetchMany 正在检索重复的子项

    我有一个父对象 其子集合包含一个元素 子集合包含一个包含 3 个元素的 孙子 集合 我使用 NHibernate 从数据库加载父对象 如下所示 Parent parentObject session Query
  • Start-Process 与 Start-Sleep 不能很好地配合

    我的目标是运行多个进程并保存它们ProcessName and Id供以后使用 这是我的代码 System Collections ArrayList startedProcesses processStatus Start Process
  • 简单/基本的隐写算法和方法

    最基本和最简单的是什么隐写算法和方法 我的意思是应用于图像的隐写术 将数据隐藏到图像的简单程序如何工作 主要使用了哪些技术 程序如何在没有源图像的情况下识别图像中的加密消息 基本且最简单的隐写算法和方法是什么 我的意思是应用于图像的隐写术
  • Three.js ShaderMaterial 灯光问题

    你好 这是我的代码的一部分 地球仪 function createGlobe var normalMap THREE ImageUtils loadTexture images earth normal 2048 jpg var surfa
  • 在 django admin 过滤器 list_filter 中选择多个选项?

    目前我通过 django 管理界面中的某些选项进行过滤 例如 假设我按 按状态 过滤 是否可以选择多个状态来过滤结果 这是过滤器的屏幕截图 我可以从此列表中选择多个项目吗 不在管理 UI 中 但如果修改 URL 则可以使过滤条件更加复杂 例
  • EF 4.3(代码优先)- 确定何时将项目添加到虚拟 ICollection 属性

    当从查询加载 ICollection 虚拟成员时 有什么方法可以确定实际项目何时添加到 ICollection 虚拟成员中 希望下面的代码能够证明我的观点 public class DbAppointment public DbAppoin
  • F# 类型和循环

    我正在编写一个创建一副纸牌的 F 教程 列出了类型 但我无法理解如何循环遍历类型来创建完整套牌的地图 我期望做类似的事情 Foreach rank in ranks Foreach suit in suits somehow combine
  • 通过 Azure KeyVault 禁止控制台日志记录

    我正在 Net Core 2 控制台应用程序中使用 C 访问 Azure 密钥保管库 每当应用程序运行时 当我从保管库检索机密时 控制台窗口都会收到如下消息 2017 12 26T18 03 49 8610049Z 29c98a86 9e1
  • 如何为 iOS 和 macOS 编译 libffi?

    我正在尝试使用libffi https github com atgreen libffi在我的一个项目中 但我似乎无法为 iOS 或 macOS 就此而言 进行编译 以下是我在构建 iOS 模拟器时遇到的各种错误之一 bash src a
  • Python 3 async for 循环中的类型错误

    我正在学习 Python 相对较新的异步功能 我发现这个在PEP 492 https www python org dev peps pep 0492 example 2 以下是一个实用程序类 它将常规迭代转换为 异步的 虽然这不是一件非常
  • 尝试运行不和谐机器人时出错(python)

    File dban py line 1 in
  • 更改消息名称

    这是我的 WSDL 的一部分 我正在使用代码优先的方法
  • 检查产品变体是否在 Woocommerce 的购物车中

    我正在尝试显示产品的变体是否已在购物车中 在单个产品页面中 产品 ID 与购物车对象中的产品的简单比较不适用于可变产品 因为变体 ID 是使用 ajax 加载的 这是我的代码 适用于产品类型不是变量的情况
  • Python“decimal”包给出错误的结果

    我尝试通过设置来计算以下内容getcontext prec 800 gt gt gt from decimal import gt gt gt getcontext prec 800 gt gt gt Decimal 22 0 Decima
  • 使用jquery和参数调用WCF服务

    好吧 这是这些基本问题之一 但我现在已经用谷歌搜索和调试了两个小时 但错误没有消失 简单场景 WCF 服务 其方法带有我想通过 jquery 调用的参数 我可以调用不带参数的方法 但是使用参数时 调用永远不会到达 NET 中的断点 服务器代
  • 执行 .bat 文件时启用按钮

    我有一个带有打开按钮和后退按钮的表单 我通过打开按钮打开批处理文件 在执行批处理文件时 其他按钮被禁用 我想启用这些按钮 请帮我 运行批处理文件代码 private void openActionPerformed java awt eve
  • NotificationCompat.Builder 缺少 build() 方法

    我想做的正是他所做的 Android 时间通知 https stackoverflow com questions 17053996 android notification at time但在我的 AlarmReceiver 类中 出现错
  • HttpInterceptor 根据其他可观察值的值更改响应主体

    有些我似乎无法根据另一个可观察值的值来更改响应主体 而我只能在检索响应后才能获得该值 更改请求非常简单 我不知道如何处理响应 Injectable export class MyHttpInterceptor implements Http
  • Spark MLlib:为每个数据组构建分类器

    我已经标记了向量 LabeledPoint staged 由一些组号组成 对于每个组 我需要创建独立逻辑回归分类器 import org apache log4j Level Logger import org apache spark m