Apache Spark 中的递归方法调用

2024-01-10

我正在从 Apache Spark 上的数据库构建一个家谱,使用递归搜索来查找数据库中每个人的最终父级(即家谱顶部的人)。

假设搜索 id 时返回的第一个人是正确的父母

val peopleById = peopleRDD.keyBy(f => f.id)
def findUltimateParentId(personId: String) : String = {

    if((personId == null) || (personId.length() == 0))
        return "-1"

    val personSeq = peopleById.lookup(personId)
    val person = personSeq(0)
    if(person.personId == "0 "|| person.id == person.parentId) {

        return person.id

    }
    else {

        return findUltimateParentId(person.parentId)

    }

}

val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))

它给出以下错误

“由 org.apache.spark.SparkException 引起:RDD 转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(x => rdd2.values.count() * x)无效,因为值转换和计数操作无法在rdd1.map转型。有关更多信息,请参阅 SPARK-5063。”

我通过阅读其他类似的问题了解到问题是我正在打电话findUltimateParentId从 foreach 循环中,如果我使用一个人的 id 从 shell 调用该方法,它会返回正确的最终结果parent id

但是,其他建议的解决方案都不适合我,或者至少我看不到如何在我的程序中实现它们,有人可以帮忙吗?


如果我理解正确的话 - 这是一个适用于任何大小输入的解决方案(尽管性能可能不是很好) - 它在 RDD 上执行 N 次迭代,其中 N 是“最深的家族”(从祖先到孩子的最大距离)输入:

// representation of input: each person has an ID and an optional parent ID
case class Person(id: Int, parentId: Option[Int])

// representation of result: each person is optionally attached its "ultimate" ancestor,
// or none if it had no parent id in the first place
case class WithAncestor(person: Person, ancestor: Option[Person]) {
  def hasGrandparent: Boolean = ancestor.exists(_.parentId.isDefined)
}

object RecursiveParentLookup {
  // requested method
  def findUltimateParent(rdd: RDD[Person]): RDD[WithAncestor] = {

    // all persons keyed by id
    def byId = rdd.keyBy(_.id).cache()

    // recursive function that "climbs" one generation at each iteration
    def climbOneGeneration(persons: RDD[WithAncestor]): RDD[WithAncestor] = {
      val cached = persons.cache()
      // find which persons can climb further up family tree
      val haveGrandparents = cached.filter(_.hasGrandparent)

      if (haveGrandparents.isEmpty()) {
        cached // we're done, return result
      } else {
        val done = cached.filter(!_.hasGrandparent) // these are done, we'll return them as-is
        // for those who can - join with persons to find the grandparent and attach it instead of parent
        val withGrandparents = haveGrandparents
          .keyBy(_.ancestor.get.parentId.get) // grandparent id
          .join(byId)
          .values
          .map({ case (withAncestor, grandparent) => WithAncestor(withAncestor.person, Some(grandparent)) })
        // call this method recursively on the result
        done ++ climbOneGeneration(withGrandparents)
      }
    }

    // call recursive method - start by assuming each person is its own parent, if it has one:
    climbOneGeneration(rdd.map(p => WithAncestor(p, p.parentId.map(i => p))))
  }

}

这是一个测试,可以更好地理解它是如何工作的:

/**
  *     Example input tree:
  *
  *            1             5
  *            |             |
  *      ----- 2 -----       6
  *      |           |
  *      3           4
  *
  */

val person1 = Person(1, None)
val person2 = Person(2, Some(1))
val person3 = Person(3, Some(2))
val person4 = Person(4, Some(2))
val person5 = Person(5, None)
val person6 = Person(6, Some(5))

test("find ultimate parent") {
  val input = sc.parallelize(Seq(person1, person2, person3, person4, person5, person6))
  val result = RecursiveParentLookup.findUltimateParent(input).collect()
  result should contain theSameElementsAs Seq(
    WithAncestor(person1, None),
    WithAncestor(person2, Some(person1)),
    WithAncestor(person3, Some(person1)),
    WithAncestor(person4, Some(person1)),
    WithAncestor(person5, None),
    WithAncestor(person6, Some(person5))
  )
}

将您的输入映射到这些应该很容易Person对象,并映射输出WithAncestor对象变成你需要的任何东西。请注意,此代码假设如果任何人有parentId X - 输入中实际上存在另一个具有该id的人

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Apache Spark 中的递归方法调用 的相关文章

随机推荐

  • 按下按钮时使用 JavaScript 显示/隐藏 div(并且首先隐藏所有 div)

    我是 javascript 新手 我无法从缝合隐藏 div 开始 我可以让 div 在彼此之间切换 任何帮助都会很棒
  • Java:按子值作为值分组

    假设我有一个对象 Person 其字段类型为 FirstName 和 LastName 现在我也有一个List
  • 带有 WebBrowser 控件的 Excel CustomTaskPane - 键盘/焦点问题

    我遇到了这个问题https social msdn microsoft com Forums vstudio en US e417e686 032c 4324 b778 fef66c7687cd excel customtaskpane w
  • CSS 重新排列特定元素并排,但在使用移动设备时堆叠/宽度太小?

    我一直在研究这个营养计算器 但在格式化 CSS 来优化数据可视化时遇到了麻烦 我尝试过调整 div 并添加容器 但由于某种原因 它只会导致重叠 我不知道如何修复 我的代码 https jsfiddle net q024go3v https
  • C# 如何将字符串转换为时间和日期格式?

    我有一个简短的程序 可以将字符串从简单的字符串转换为日期和时间格式 然而 由于字符串的顺序 系统似乎无法识别该字符串并将其转换为日期时间格式 应转换的字符串示例如下 Thu Dec 9 05 12 42 2010 方法为Convert To
  • 哪些对象保证具有不同的身份?

    原问题 我的问题适用于 Python 3 2 但我怀疑自 Python 2 7 以来这已经发生了变化 假设我使用我们通常期望创建对象的表达式 例子 1 2 3 42 abc range 10 True open readme txt MyC
  • 如何使用Python下载股票价格数据?

    我已经安装了pandas datareader但我想知道是否还有其他选择 到目前为止 我正在使用这个 import pandas datareader data as web start date 2018 01 01 end date 2
  • 签名扫描

    许多防病毒程序使用基于签名的恶意软件检测 这是为 ClamAV 创建签名 http www clamav net doc webinars Webinar Alain 2009 03 04 pdf 考虑到整个文件是恶意软件 我可以理解他们如
  • 如何加载用户.bashrc的RVM部分以在Apache下运行Ruby CGI脚本?

    我在 Ubuntu 12 04 上配置了一个新服务器 并开始使用 RVM 我已经按照我的用户 作为我自己 而不是使用 sudo 作为 root 安装了 RVM瑞安 比格的指南 http ryanbigg com 2010 12 ubuntu
  • R/Stringr 在第 n 次出现“_”后提取字符串,并以第一次出现“_”结束

    使用 R 和 stringr 包 或任何其他与此相关的包 我想在第 n 次出现 后提取字符串 并以第一次出现 结束 例如 df lt c J J HERE jfdkaldjhieuwui blahblah ffd THIS fjdkalfj
  • 如何生成新的GUID?

    我正在开发一个网络服务 需要一个新的GUID 作为对服务内方法的引用传递 我不熟悉C or the GUID object 但需要类似的东西PHP 因此创建一个新对象 根据我的理解 它返回一个empty blank GUID 有任何想法吗
  • 如何避免谷歌条形图中的条形标签重叠?

    我正在创建一个堆积条形图 需要显示堆栈内的标签 但很少有标签被重叠 以供参考image https i stack imgur com gGKG9 jpg 你能帮助我如何避免使用谷歌图表重叠吗 div div
  • if else key 分割 JSON

    我有这个功能 let input Apples Apples 501 82 Apples pos2 502 61 Apples pos3 502 61 Apples 502 16 let output Object keys input r
  • 拖动项目时触发的 click 事件 (Firefox)

    当我单击一个项目时 我可以编辑该字段 这要归功于引导程序可编辑 http vitalets github com bootstrap editable 当我拖放项目时 我可以更改项目的位置 这要归功于jquery ui sortable h
  • PhpStorm 中文件名旁边的神秘数字图标[关闭]

    Closed 这个问题不符合堆栈溢出指南 help closed questions 目前不接受答案 我不小心按下了 PhpStorm 中的热键 现在我的一个文件名旁边有一个小数字图标 这是什么意思 我该如何删除它 这是一个屏幕截图 这是什
  • 查询在 while 循环中无法正常工作

    我有一个 While 循环 我试图插入 DECLARE CurrentOffer int 121 DECLARE OldestOffer int 115 DECLARE MinClubcardID bigint 0 DECLARE MaxC
  • PHP脚本在特定时间执行

    有没有一种简单的方法可以让 php 脚本在一天中的特定时间执行一些 html 例如 我的主页上有一个标题 有时我希望能够在标题下方添加一些内容 在本例中是一个 iframe 我知道每个人都提到了 cron 作业 但是这将如何工作呢 还有替代
  • 使用 CSS3 过渡的动画 jQuery UI 可排序

    我如何使用 CSS3 过渡 或任何其他方式 来制作jQuery 可排序 http jqueryui com sortable 其行为更像是 iOS 中的列表重新排序 其中列表项在拖动时会平滑地进行动画处理 因此当您拖动时 项会迅速移开 编辑
  • dc.js barChart 第一个和最后一个栏未完全显示

    我有一个条形图d3 time scalex 轴 我每小时显示一些数据 但使用时第一个和最后一个数据点条总是被切成两半centerBar true 当使用centerBar false 最后一个栏完全消失 时间窗口基于数据本身 计算如下 va
  • Apache Spark 中的递归方法调用

    我正在从 Apache Spark 上的数据库构建一个家谱 使用递归搜索来查找数据库中每个人的最终父级 即家谱顶部的人 假设搜索 id 时返回的第一个人是正确的父母 val peopleById peopleRDD keyBy f gt f