为什么在 Spark 数据集上调用缓存需要很长时间?

2023-12-21

我正在加载大型数据集,然后缓存它们以供我的代码中参考。代码看起来像这样:

val conversations = sqlContext.read
  .format("com.databricks.spark.redshift")
  .option("url", jdbcUrl)
  .option("tempdir", tempDir)
  .option("forward_spark_s3_credentials","true")
  .option("query", "SELECT * FROM my_table "+
                   "WHERE date <= '2017-06-03' "+
                   "AND date >= '2017-03-06' ")
  .load()
  .cache()

如果我离开缓存,代码会快速执行,因为数据集是延迟计算的。但是如果我使用cache(),该块需要很长时间才能运行。

从在线 Spark UI 的事件时间轴来看,SQL 表正在传输到工作节点,然后缓存在工作节点上。

为什么缓存会立即执行?源代码似乎仅在计算数据时将其标记为缓存:

The 数据集的源代码 https://github.com/apache/spark/blob/258bff2c3f54490ddca898e276029db9adf575d9/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala调用此代码在 CacheManager.scala 中 https://github.com/apache/spark/blob/258bff2c3f54490ddca898e276029db9adf575d9/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala当调用缓存或持久时:

  /**
   * Caches the data produced by the logical representation of the given [[Dataset]].
   * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
   * recomputing the in-memory columnar representation of the underlying table is expensive.
   */
  def cacheQuery(
      query: Dataset[_],
      tableName: Option[String] = None,
      storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
    val planToCache = query.logicalPlan
    if (lookupCachedData(planToCache).nonEmpty) {
      logWarning("Asked to cache already cached data.")
    } else {
      val sparkSession = query.sparkSession
      cachedData.add(CachedData(
        planToCache,
        InMemoryRelation(
          sparkSession.sessionState.conf.useCompression,
          sparkSession.sessionState.conf.columnBatchSize,
          storageLevel,
          sparkSession.sessionState.executePlan(planToCache).executedPlan,
          tableName)))
    }
  }

这似乎只是标记为缓存而不是实际缓存数据。我希望缓存能够根据 Stack Overflow 上的其他答案立即返回。

有没有其他人看到缓存发生在action https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions是在数据集上执行的?为什么会发生这种情况?


cache是导致数据集执行的运算符之一。 Spark 会将整个数据集具体化到内存中。如果您在相当大的中间数据集上调用缓存,这可能需要很长时间。

可能存在的问题是缓存的数据集仅存储在内存中。当它不再适合时,数据集的分区将被逐出并根据需要重新计算(请参阅https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-persistence)。如果内存太少,您的程序可能会花费大量时间来重新计算。

为了加快缓存速度,您可以为应用程序提供更多内存,或者您可以尝试使用persist(MEMORY_AND_DISK)代替cache.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

为什么在 Spark 数据集上调用缓存需要很长时间? 的相关文章

随机推荐

  • 从 Jenkinsfile 中的共享库导入类

    我有一个具有以下结构的共享库存储库 root src com company DeploySteps groovy vars MainDeploySteps groovy 该库通过 Jenkinsfile 导入到作业中 如下所示 libra
  • 保存后显示相同页面

    我想显示一个带有某些字段 示例中的一个 的表单 提交它 保存并显示同一页面并重置所有字段 当我提交问题时 我执行 保存 操作 但是当我显示视图时 表单仍然被填写 该模型 public class TestingModel public st
  • 多个已编译的 linq 查询可以链接在一起吗?

    我正在尝试将多个已编译的 linq 查询链接在一起 我已经成功地将两个查询链接在一起 但我无法让三个查询链正常工作 因此 这里减少了我的代码以重现问题 我的两个问题是 为什么这不起作用 以及 是否有更好的方法来保持编译查询的性能优势并避免重
  • 如何在 PHP Web 应用程序中实现 Mozilla DeepSpeech 以将语音转换为文本?

    我有一个 PHP Web 应用程序 正在寻找一种开源 高精度的语音到文本识别实现 该实现将采用语音命令来打开用户的网页 例子 进行销售 这将打开创建销售 PHP 页面 下达采购订单 打开日终报告 etc 我的问题 我想知道我们是否可以使用M
  • 如何使用代表点对复杂形状进行建模?

    我想将该图像中的白色像素数量减少到输出图像中的一些候选点或代表点 目标是对不同类型的形状进行建模 如果您只是将输出图像中的灰点连接在一起 您将拥有相同的路径 但白色像素较少 这条路径应该只有一个起点和一个终点 并且涵盖从起点到终点的所有路径
  • GDB未知目标异常(在64位环境下调试32位目标时)

    我想使用 64 位版本的 MinGW w64 x86 64 w64 mingw32 提供的 GDB 调试器来调试使用 32 位版本的 MinGW w64 x86 64 w64 mingw32 创建的 32 位目标 根据在 64 位环境中使用
  • GSON 未以 UTF-8 发送

    以下方法发送 JSON 回复 然而 在接收端 我不断收到无效字符 并且 UTF 8 无法解码数据 我究竟做错了什么 对客户端的响应 数据输出流 Get the client request clientRequest new Buffere
  • Android oreo 在白色圆形内显示图标

    我的 Android 应用程序在 Android oreo 版本上显示白色圆圈内的启动器图标 我想显示启动器图标 因为默认情况下它在奥利奥设备上是方形的 浏览了各种博客 发现 1 移动用户可以从主屏幕设置屏幕更改图标类型 但我不想要 我想默
  • getElementById 的 Javascript 简写

    JavaScript document getElementById 是否有简写 或者有什么方法可以定义一个吗 它会重复重新输入over and over var function id return document getElement
  • C# 中的自动属性是什么?它们的用途是什么?

    有人可以对 C 中的自动属性及其用途提供一个非常简单的解释吗 也许还可以提供一些示例 请尽量用通俗易懂的语言来表达 拜托 当属性访问器中不需要附加逻辑时 将使用自动属性 该声明看起来像这样 public int SomeProperty g
  • Jersey 不支持嵌套资源中的声明性超链接,这是否正确?

    public class Widget Ref resource WidgetResource class URI url public URI getUrl return url public Wonkle getWonkle retur
  • 匹配函数来匹配多个值

    我有两列 Column A Column B Apple A banana B Grape C Apple D Banana F 现在我想找到有数据的行的行号Apple D 他们的使用方式是Match函数获取行号 你可以使用这个 LOOKU
  • Jqgrid 数据类型从数组本地加载

    在我询问如何在从本地 js var 刷新下划线数据后重新应用工具栏过滤器时在这里问 https stackoverflow com questions 12983215 alternative to jqgrid triggertoolba
  • 如果您已经拥有资源,为什么要将 ETag 设置为必须要求?

    为什么要将 ETag 设置为 必须要求级别 您在 ETag 返回之前获取资源 我正在开发一个项目 其中我是向服务器发送 HTTP 请求的客户端 该服务器返回带有 ETag 的 HTTP Cache Control 标头来缓存响应 其中在每个
  • 插入具有多对一关系的 Hibernate 实体

    我对 Hibernate 相当陌生 并且有一个关于当实体具有 FK 时如何处理插入的问题 具体来说如何创建要插入的实体 我的实体简化了以下结构 Entity Table name event public class Event imple
  • 访问修饰符有什么用

    编程语言中需要使用访问修饰符吗 如果我们选择所有成员和方法作为私有 那么输出会是什么 See 封装 http en wikipedia org wiki Encapsulation object oriented programming 维
  • java 8 嵌套流

    假设您有这样的结构类 public class Review private Integer idReview private String description private ArrayList
  • Apache 2.4 with mod_wsgi: 403 Forbidden, 无权访问此服务器上的 /calbase

    所以我尝试使用 apache 2 4 和 mod wsgi 和 pythong 3 4 在 Windows 服务器上部署我的 django 项目 在我配置 httpd conf 并尝试启动安装了 mod wsgi 的 apache 之前 它
  • Java 进程中的 Sudo

    我正在开发一个终端应用程序 它允许人们从 Swing GUI 执行 bash 命令 尝试使用 sudo 执行命令时遇到以下问题 sudo cd Users myname Desktop sudo 不存在 tty 且未指定 Askpass 程
  • 为什么在 Spark 数据集上调用缓存需要很长时间?

    我正在加载大型数据集 然后缓存它们以供我的代码中参考 代码看起来像这样 val conversations sqlContext read format com databricks spark redshift option url jd