我正在加载大型数据集,然后缓存它们以供我的代码中参考。代码看起来像这样:
val conversations = sqlContext.read
.format("com.databricks.spark.redshift")
.option("url", jdbcUrl)
.option("tempdir", tempDir)
.option("forward_spark_s3_credentials","true")
.option("query", "SELECT * FROM my_table "+
"WHERE date <= '2017-06-03' "+
"AND date >= '2017-03-06' ")
.load()
.cache()
如果我离开缓存,代码会快速执行,因为数据集是延迟计算的。但是如果我使用cache(),该块需要很长时间才能运行。
从在线 Spark UI 的事件时间轴来看,SQL 表正在传输到工作节点,然后缓存在工作节点上。
为什么缓存会立即执行?源代码似乎仅在计算数据时将其标记为缓存:
The 数据集的源代码 https://github.com/apache/spark/blob/258bff2c3f54490ddca898e276029db9adf575d9/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala调用此代码在 CacheManager.scala 中 https://github.com/apache/spark/blob/258bff2c3f54490ddca898e276029db9adf575d9/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala当调用缓存或持久时:
/**
* Caches the data produced by the logical representation of the given [[Dataset]].
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
def cacheQuery(
query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
val planToCache = query.logicalPlan
if (lookupCachedData(planToCache).nonEmpty) {
logWarning("Asked to cache already cached data.")
} else {
val sparkSession = query.sparkSession
cachedData.add(CachedData(
planToCache,
InMemoryRelation(
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize,
storageLevel,
sparkSession.sessionState.executePlan(planToCache).executedPlan,
tableName)))
}
}
这似乎只是标记为缓存而不是实际缓存数据。我希望缓存能够根据 Stack Overflow 上的其他答案立即返回。
有没有其他人看到缓存发生在action https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions是在数据集上执行的?为什么会发生这种情况?