搞了这么久的spark,多次用了,cache、persist、checkpoint ,这次总结一下他们之间的区别。
1.cache与persist的区别
源码
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. This can only be used to assign a new storage level if the RDD does not
* have a storage level set yet. Local checkpointing is an exception.
*/
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// This means the user previously called localCheckpoint(), which should have already
// marked this RDD for persisting. Here we should override the old storage level with
// one that is explicitly requested by the user (after adapting it to use disk).
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
从源码中可以看出,cache 底层调用的是 persist 方法,存储等级为: memory only,persist 的默认存储级别也是 memory only,persist 与 cache 的主要区别是 persist 可以自定义存储级别。哪些 RDD 需要 cache ? 会被重复使用的(但是)不能太大的RDD需要cache,cache 只使用 memory。
2.cache与checkpoint的区别
Tathagata Das 有一段回答:
There is a significant difference between cache and checkpoint. Cache materializes the RDD and keeps it in memory and/or disk. But the lineage of RDD (that is, seq of operations that generated the RDD) will be remembered, so that if there are node failures and parts of the cached RDDs are lost, they can be regenerated. However, checkpoint saves the RDD to an HDFS file and actually forgets the lineage completely. This is allows long lineages to be truncated and the data to be saved reliably in HDFS (which is naturally fault tolerant by replication).
上面这段话的大概意思就是:cache 和 checkpoint 之间有一个重大的区别,cache 将 RDD 以及 RDD 的血统(记录了这个RDD如何产生)缓存到内存中,当缓存的 RDD 失效的时候(如内存损坏),它们可以通过血统重新计算来进行恢复。但是 checkpoint 将 RDD 缓存到了 HDFS 中,同时忽略了它的血统(也就是RDD之前的那些依赖)。为什么要丢掉依赖?因为可以利用 HDFS 多副本特性保证容错!
/**
* Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
* directory set with `SparkContext#setCheckpointDir` and all references to its parent
* RDDs will be removed. This function must be called before any job has been
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint(): Unit = RDDCheckpointData.synchronized {
// NOTE: we use a global lock here due to complexities downstream with ensuring
// children RDD partitions point to the correct parent partitions. In the future
// we should revisit this consideration.
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
从上面的源码中可以看出,在进行checkpoint之前要设置 sc.setCheckpointDir("...")。除此之外,在进行 checkpoint 前,要先对 RDD 进行 cache。Why??? checkpoint 会等到 job 结束后另外启动专门的 job 去完成 checkpoint,也就是说需要 checkpoint 的 RDD 会被计算两次。
3.persist与checkpoint的区别
rdd.persist(StorageLevel.DISK_ONLY) 与 checkpoint 也有区别。前者虽然可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理。一旦 driver program 执行结束,也就是 executor 所在进程 CoarseGrainedExecutorBackend stop,blockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的,也就是说可以被下一个 driver program 使用,而 cached RDD 不能被其他 dirver program 使用。