如何将 COGROUP 用于大型数据集

2023-12-22

我有两个rdd's namely val tab_a: RDD[(String, String)] and val tab_b: RDD[(String, String)]我在用着cogroup对于那些数据集,例如:

val tab_c = tab_a.cogroup(tab_b).collect.toArray

val updated = tab_c.map { x =>
  {
 //somecode
  }
}

我在用着tab_c地图函数的共同分组值,对于小型数据集它工作得很好,但在大型数据集的情况下它会抛出Out Of Memory exception.

我尝试将最终值转换为 RDD 但没有运气同样的错误

val newcos = spark.sparkContext.parallelize(tab_c)

1.如何使用Cogroup处理大型数据集?

2.我们可以坚持同组价值吗?

Code

 val source_primary_key = source.map(rec => (rec.split(",")(0), rec))
source_primary_key.persist(StorageLevel.DISK_ONLY)

val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec))
destination_primary_key.persist(StorageLevel.DISK_ONLY)

val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect()

  var srcmis: Array[String] = new Array[String](0)
var destmis: Array[String] = new Array[String](0)

var extrainsrc: Array[String] = new Array[String](0)
var extraindest: Array[String] = new Array[String](0)

var srcs: String = Seq("")(0)
var destt: String = Seq("")(0)

val updated = cos.map { x =>
  {

    val key = x._1
    val value = x._2

    srcs = value._1.mkString(",")
    destt = value._2.mkString(",")

    if (srcs.equalsIgnoreCase(destt) == false && destt != "") {
      srcmis :+= srcs
      destmis :+= destt

    }

    if (srcs == "") {

      extraindest :+= destt.mkString("")
    }

    if (destt == "") {

      extrainsrc :+= srcs.mkString("")
    }

  }

}

代码更新:

  val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2)
 // tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)}
      {2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}..

ERROR:

 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)


ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

谢谢


当你使用collect()您基本上是在告诉 Spark 将所有结果数据移回主节点,这很容易产生瓶颈。此时您不再使用 Spark,而只是单台计算机中的普通数组。

要触发计算,只需使用需要每个节点的数据的东西,这就是执行器位于分布式文件系统之上的原因。例如saveAsTextFile().

以下是一些基本示例。 https://spark.apache.org/examples.html

请记住,这里的整个目标(也就是说,如果您有大数据)是将代码移动到您的数据并在那里进行计算,而不是将所有数据都进行计算。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何将 COGROUP 用于大型数据集 的相关文章

随机推荐

  • 过期时间@cacheable spring boot

    我已经实现了缓存 现在我想添加过期时间 如何在 Spring Boot 中设置过期时间 Cacheable 这是一个代码片段 Cacheable value forecast unless result null 我这样使用生活黑客 Con
  • AutoFac - 为某些开放通用注册装饰器

    我正在尝试设置一个似乎有复杂要求的 Autofac 模块 开始 我有一个通用界面 public interface IMyInterface
  • MySql 选择行数作为额外列?

    我需要一个 MySql 语句来选择所有行以及总共有多少行 我正在使用 mysql query SELECT FROM posts LIMIT 0 5 尝试添加计数 mysql query SELECT COUNT AS total FROM
  • 特定节点上的 AST 匹配器

    我编写了一个 AST 匹配器来查找特定类型语句 在匹配的节点中 我计算了该节点的邻居兄弟节点 现在我需要在邻居节点上运行匹配器来验证它们是否满足我的条件 clang AST 匹配器将整个树节点一一匹配 我想针对特定节点运行匹配器 如果该节点
  • 快速 UITableView 设置 rowHeight

    我正在尝试设置每行的高度tableView使用以下代码到相应单元格的高度 override func tableView tableView UITableView heightForRowAtIndexPath indexPath NSI
  • 如何仅禁用/删除输入字段上的第一个空格?

    大家早上好 我有一个案例 当我应该阻止用户在输入字段中输入空格作为第一个字符时 我这里有一个演示 http jsbin com foyetolo 2 edit http jsbin com foyetolo 2 edit 仅当您第一次按空格
  • “this._events || (this._events = {});”是什么意思意思是?

    我已经开始学习 Backbone js 目前我的 JavaScript 技能还不太好 我已经开始检查backbone js文件并遇到了一行奇怪的代码 我无法弄清楚其用途 代码示例 如果您需要更多上下文 请手动下载主干 js http bac
  • JFrame 调整大小以适应可见组件

    我有一个应用程序 我希望用户能够在正常设置和高级设置之间进行选择 现在 如果用户检查 JCheckBox 并且高级设置应该消失 问题就会开始 我的想法是将所有不必要的摆动组件 JScrollPane JLabel 设置为不可见 然后找到一种
  • Rails:如何解压缩压缩的 xml 请求正文?

    我有一个 Rails 3 站点 它从 iPhone 应用程序获取 xml 请求 包括文件提交 iPhone 应用程序压缩其 xml 请求的正文 这些请求像这样传递到我的控制器 这是一个简单的请求 仅更新一些详细信息而不是发送任何文件 par
  • 是什么决定了验证器的触发顺序?

    我有一个带有两个自定义验证器的网络表单 用于验证字符串是否为日期 我不在乎什么格式 只要它是可解析的 另一种方法是确保一个日期等于或大于另一个日期 我只是无法让比较验证器能够很好地处理任何日期格式
  • Windows 上有类似于supervisord 的吗?

    我需要运行 python 脚本并确保它在终止后重新启动 我知道有一个名为supervisord 的UNIX 解决方案 但不幸的是 我的脚本必须运行的服务器是在 Windows 上 你知道什么工具有用吗 谢谢 尽管有很大的免责声明here h
  • “ndarray”类型的对象不可 JSON 序列化

    我是 python 和机器学习的新手 我有一个线性回归模型 它能够根据我转储用于 Web 服务的输入来预测输出 请参阅下面的代码 X train X test y train y test train test split X y test
  • 如何让 Perl::Critic 在其输出中显示违规策略?

    有人告诉我有可能Perl Critic http search cpan org perldoc Perl 3a 3aCritic显示在其输出中存在问题的策略名称 但我不记得我必须做什么才能打开它 如何在perlcritic rc The
  • 将自定义AuthenticationProvider添加到Spring Boot + oauth +oidc

    我使用 SpringBoot 2 1 7 和 Okta 提供身份验证服务开发了一个基本的 oauth oidc 示例 这是我的 Gradle 依赖设置供参考 plugins id org springframework boot versi
  • 如何解决“无法为连接 URL 创建类 'com.mysql.jdbc.Driver' 的 JDBC 驱动程序”

    首先我想说我检查了 stackoverflow 上的所有答案 但我无法修复这个错误 请帮帮我 我花了很多时间 却没有任何结果 我正在尝试使用 Tomcat8 创建连接池 我有一个例外 java sql SQLException 无法创建类的
  • SSH 指纹与 Gitlab.com 不匹配

    我向我的帐户添加了 ED25519 公钥gitlab com 然后我设置了我的 ssh config使用私钥Host gitlab com Host gitlab com gitlab com User git IdentityFile s
  • 将鼠标悬停在 Jquery SlideDown 菜单上时保持该菜单打开吗?

    我的 Jquery SlideDown 菜单遇到一些困难 当我将鼠标悬停在触发slideDown事件的按钮上时 它工作得很好 但是当我将鼠标悬停在向下滑动的子菜单上时 它会触发slideUp事件并关闭菜单 我正在寻找一种方法来更改我的代码
  • 如何在 DIV 中的长单词中强制换行?

    好吧 这真的让我很困惑 我在 div 中有一些内容 如下所示 div style background color green width 200px height 300px Thisisatest Thisisatest Thisisa
  • FFmpeg - 从传输流文件(.ts)中提取视频和音频

    我希望提取传输流文件中某个节目的音视频 ts 方法是在不损失质量的情况下指定其 PID 并在结果文件中使用相同的编解码器 输出文件是 MPEG 是这样吗possible与 FFmpeg 如果是这样 我该怎么办 到目前为止 我已经收到了这个命
  • 如何将 COGROUP 用于大型数据集

    我有两个rdd s namely val tab a RDD String String and val tab b RDD String String 我在用着cogroup对于那些数据集 例如 val tab c tab a cogro