如何拆分一个巨大的rdd并轮流广播?

2024-02-14

描述:

我们的spark版本是1.4.1

我们想要连接两个巨大的 RDD,其中之一带有倾斜数据。所以spark rdd操作join可能会导致内存问题。我们尝试将较小的一个分割成多个片段,然后分批广播它们。在每个广播回合中,我们尝试将较小的rdd的一部分收集到驱动程序,然后将其保存到HashMap,然后广播HashMap。每个执行器使用广播值对较大的rdd进行map操作。我们通过这种方式实现倾斜数据连接。

但是当它每轮处理广播值时。我们发现处理后我们不能破坏我们的广播值。如果我们使用broadcast.destroy(),下一轮我们处理数据将 触发错误。像这样:

java.io.IOException: org.apache.spark.SparkException: Attempted to use Broadcast(6) after it was destroyed (destroy at xxx.java:369)

我们查看了spark的源码,发现这个问题是由rdd依赖关系导致的。 if rdd3 -> rdd2 -> rdd1 (箭头显示依赖关系)。 rdd1是使用名为b1的广播变量生成的,rdd2使用b2生成的。生成rdd3时,源代码显示需要序列化b1和b2。如果b1或b2在rdd3生产过程之前被破坏。它将导致我上面列出的失败。

Question:

是否存在一种方法可以让rdd3忘记其依赖关系,使其在生成过程中不需要b1、b2,只需要rdd2?

或者是否存在处理倾斜连接问题的方法?

顺便说一句,我们为每个回合设置了检查点。并将spark.cleaner.ttl设置为600。问题仍然存在。如果我们不销毁广播变量,执行器将在第五回合中失败。

我们的代码是这样的:

for (int i = 0; i < times; i++) {
    JavaPairRDD<Tuple2<String, String>, Double> prevItemPairRdd = curItemPairRdd;
    List<Tuple2<String, Double>> itemSplit = itemZippedRdd
            .filter(new FilterByHashFunction(times, i))
            .collect();

    Map<String, Double> itemSplitMap = new HashMap<String, Double>();
    for (Tuple2<String, Double> item : itemSplit) {
        itemSplitMap.put(item._1(), item._2());
    }
    Broadcast<Map<String, Double>> itemSplitBroadcast = jsc
            .broadcast(itemSplitMap);

    curItemPairRdd = prevItemPairRdd
            .mapToPair(new NormalizeScoreFunction(itemSplitBroadcast))
            .persist(StorageLevel.DISK_ONLY());
    curItemPairRdd.count();

    itemSplitBroadcast.destroy(true);
    itemSplit.clear();

}

就我个人而言,我会尝试一些不同的方法。让我们从一个小的模拟数据集开始

import scala.util.Random
Random.setSeed(1)

val left = sc.parallelize(
  Seq.fill(200)(("a", Random.nextInt(100))) ++ 
  Seq.fill(150)(("b",  Random.nextInt(100))) ++ 
  Seq.fill(100)(Random.nextPrintableChar.toString, Random.nextInt(100))
)

并按键计数:

val keysDistribution = left.countByKey

进一步假设第二个 RDD 是均匀分布的:

val right = sc.parallelize(
  keysDistribution.keys.flatMap(x => (1 to 5).map((x, _))).toSeq)

并将每个键可以处理的值数量阈值设置为 10:

val threshold = 10
  1. Use 代理键以增加粒度。

    想法很简单。而不是加入(k, v)成对使用((k, i), v) where i是一个整数,取决于给定的阈值和元素数量k.

    val buckets = keysDistribution.map{
      case (k, v) => (k -> (v / threshold + 1).toInt)
    }
    
    // Assign random i to each element in left
    val leftWithSurrogates = left.map{case (k, v) => {
      val i = Random.nextInt(buckets(k))
      ((k, i), v)
    }}
    
    // Replicate each value from right to i buckets
    val rightWithSurrogates = right.flatMap{case (k, v) => {
      (0 until buckets(k)).map(i => ((k, i), v))
    }}
    
    val resultViaSurrogates = leftWithSurrogates
      .join(rightWithSurrogates)
      .map{case ((k, _), v) => (k, v)}
    
  2. 分而治之- 频繁键和不频繁键的分开处理。

    首先让我们使用不常用的键加入:

    val infrequentLeft = left.filter{
      case (k, _) => keysDistribution(k) < threshold
    }
    
    val infrequentRight = right.filter{
      case (k, _) => keysDistribution(k) < threshold
    }
    
    val infrequent = infrequentLeft.join(infrequentRight)
    

    接下来让我们分别处理每个常用键:

    val frequentKeys = keysDistribution
      .filter{case (_, v) => v >= threshold}
      .keys
    
    val frequent = sc.union(frequentKeys.toSeq.map(k => {
      left.filter(_._1 == k)
        .cartesian(right.filter(_._1 == k))
        .map{case ((k, v1), (_, v2)) => (k, (v1, v2))}
    }))
    

    最后让两个子集合并:

    val resultViaUnion = infrequent.union(frequent)
    

快速健全性检查:

val resultViaJoin = left.join(right).sortBy(identity).collect.toList

require(resultViaUnion.sortBy(identity).collect.toList == resultViaJoin)
require(resultViaSurrogates.sortBy(identity).collect.toList == resultViaJoin)

显然,这更像是一个草图,而不是一个完整的解决方案,但应该可以让您了解如何继续。最大的优势在于broadcast它消除了驱动程序的瓶颈。

是否存在一种方法可以让rdd3忘记其依赖关系,使其在生成过程中不需要b1、b2,只需要rdd2?

您使用检查点和强制计算,但如果丢失任何分区,它仍然会失败。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

如何拆分一个巨大的rdd并轮流广播? 的相关文章

随机推荐

  • 在mysql中存储mp3文件

    我发现很多人建议不要将 mp3 文件存储为 blob 我没有得到明确的解释为什么会这样 我的网站流量较低 每分钟 10 次点击 到目前为止 我已将图像文件作为 blob 存储在数据库表中 我对图像表所做的唯一查找是基于单个主键 图像渲染速度
  • Open Feignclient 与 Netflix Ribbon 集成在 springboot 中不起作用

    Feing客户端服务器 RestController public class FeinApiCall Autowired CustomFeignClient customFeignClient GetMapping getinfothro
  • C 中的冲突类型

    我试图用 C 语言构建一个非常简单的程序 该程序从函数返回浮点值 但由于某种原因我收到了错误 include
  • 一般来说,在 ExtJS 应用程序上转义 HTML 的最佳方法是什么?

    我正在使用 ExtJS 开发一个 Web 应用程序来构建 GUI 并通过 RESTful Web 服务与服务器通信 返回的数据格式化为 JSON 对象 现在我在处理包含 HTML 标签 Javascript 代码的数据时遇到问题 因为当我将
  • 使用 ffmpeg 在音频文件中的单词之间添加静音

    我想做的是concat wav包含短音频的文件 我能够concat它们放入一个文件中 但我试图在特定时间设置每个文件 目前 我可以concat这些文件 但我无法将每个文件放置在需要的特定时间 我想也许我可以在他们之间添加适当的沉默 这样就可
  • 在多个存储库上重用 Github Actions 自托管运行器

    我有一个自托管的 Github Actions 运行器在我的服务器上运行 用于我的 1 个存储库 但现在我想将同一个运行程序用于另一个存储库 有没有一种方法可以让我将同一个 Github Actions 运行器重复用于其他存储库 而无需创建
  • 有没有办法在 Ruby 中全局目录但排除某些目录?

    我想全局一个目录来后处理头文件 但我想排除项目中的一些目录 现在默认的方式是 Dir h each header puts header 如果每个标头条目位于排除目录中 则手动检查它似乎效率低下 我知道这已经晚了 4 年 但对于可能遇到这个
  • 多租户:使用 Spring Data JPA 管理多个数据源

    我需要创建一个可以管理多个数据源的服务 当应用程序首次运行应用程序时 这些数据源不一定存在 实际上端点将创建新的数据库 我希望能够切换到它们并创建数据 例如 假设我有 3 个数据库 A B 和 C 然后我启动应用程序 我使用创建 D 的端点
  • Electron 应用程序:无法加载预加载脚本

    每次我启动电子应用程序时都会遇到此错误 electron js2c renderer init js 91 Unable to load preload script C Users Desktop Projects Electron Ap
  • ng-repeat过滤器空值不显示

    为什么当我应用时角度不会显示为空的值 ng repeat p in foo filter filter2 其中过滤器2是 scope filter2 function p if p state null return p state els
  • 使用 Moq 对工作单元和通用存储库模式框架进行单元测试

    我正在对一个服务进行单元测试 该服务使用工作单元和通用存储库 并使用起订量 问题是 当我在调试模式下运行测试时 在服务类中 subsiteRepository 始终为 null 我正在嘲笑的服务类的设置 private readonly I
  • 我应该如何在msbuild脚本中引用sn.exe?

    我需要在构建完成后重新签署我的程序集 并且我已经对其做了一些其他事情 所以我开始添加一个
  • 在单个 SQL 查询中,查询可以使用单个表中的多少个索引?

    在 Oracle 中 如果您有一个表 在 3 个不同的列上有 3 个索引 单个 SQL 语句可以从一个表中使用多少个索引 可能这三个都可以使用 Oracle 不太可能使用所有这三个 或者使用所有这三个将是有益的 但这是可能的 Oracle
  • Javascript insideHTML 没有更新

    您好 我正在尝试更新以下脚本的innerHTML div class layout wrapper div class alert success animate in Your submission was successful div
  • Directx 11,将多个纹理发送到着色器

    使用此代码我可以将一个纹理发送到着色器 devcon gt PSSetShaderResources 0 1 pTexture 当然 我通过以下方式制作了 pTexture D3DX11CreateShaderResourceViewFro
  • 迭代 NSSet - SwiftUI 2.0

    我开始查看 CoreData 并有两个实体 这些在 Player 之间形成多对多关系 和团队 我试图在列表中使用 ForEach 来迭代团队的球员 但我似乎无法让它工作 我一直在尝试的代码如下 我尝试的任何其他解决方案都会收到错误 例如 N
  • 在远程仓库上删除本地 Git 分支后删除它们

    我希望本地和远程存储库在分支方面始终保持同步 在 GitHub 上进行拉取请求审查后 我合并并删除了那里的分支 远程 我如何在本地存储库中获取此信息并让 Git 也删除我的本地版本的分支 快速的方法 git branch merged gr
  • 从另一个类文件访问 MainForm

    我有这个 MainForm 类 namespace homework 001 public partial class MainForm Form public MainForm InitializeComponent public str
  • 在组中创建带有条件的 random.randint?

    我有一个专栏叫做 cars并想创建另一个名为persons using random randint 我有 dat persons np random randint 1 5 len dat 这样我就可以输入使用这些的人数 但我会 想知道如
  • 如何拆分一个巨大的rdd并轮流广播?

    描述 我们的spark版本是1 4 1 我们想要连接两个巨大的 RDD 其中之一带有倾斜数据 所以spark rdd操作join可能会导致内存问题 我们尝试将较小的一个分割成多个片段 然后分批广播它们 在每个广播回合中 我们尝试将较小的rd