当在 RDD 上调用 cache() 时,RDD 的状态是否改变(以及
返回的 RDD 只是为了方便使用)或者创建一个新的 RDD
包裹现有的
相同RDD被返回 https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L165:
/**
* Mark this RDD for persisting using the specified level.
*
* @param newLevel the target storage level
* @param allowOverride whether to override any existing level with the new one
*/
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
// TODO: Handle changes of StorageLevel
if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
throw new UnsupportedOperationException(
"Cannot change storage level of an RDD after it was already assigned a level")
}
// If this is the first time this RDD is marked for persisting, register it
// with the SparkContext for cleanups and accounting. Do this only once.
if (storageLevel == StorageLevel.NONE) {
sc.cleaner.foreach(_.registerRDDForCleanup(this))
sc.persistRDD(this)
}
storageLevel = newLevel
this
}
缓存不会对上述 RDD 造成任何副作用。如果它已经被标记为持久化,则什么也不会发生。如果不是,唯一的副作用是将其注册到SparkContext
,其中副作用不在RDD
本身,而是上下文。
Edit:
看着JavaRDD.cache https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala#L38,看来底层调用会导致另一个的分配JavaRDD
:
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())
Where wrapRDD
calls JavaRDD.fromRDD
:
object JavaRDD {
implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
}
这将导致分配一个新的JavaRDD
。也就是说,内部实例RDD[T]
将保持不变。