Spark源码-Core-05-persist&&checkpoint&&blockManager
- cache 方法实际上调用的是 MEMORY_ONLY level 的 Persist
- 设置过 persist 的 rdd 在调用时会在实际计算时调用 getOrCompute,如果存在直接从缓存中拿结果,否则重新计算并交给 blockManager 缓存
- checkpoint 使用之前需要先调用 sc.setCheckpointDir() 设置缓存目录.目录最好设置在分布式文件系统上
- checkpoint 执行的时机是在任务 rdd 执行结束之后在 sc.runJob 中调用最后一个 rdd 的 doCheckpoint 方法
- doCheckpoint 方法会重新提交 job 运行以存储 rdd,因为 rdd 的计算逻辑时如果存在 persist 数据直接从缓存中取,所以最好在想要 checkpoint 的地方先调用 persist, 否则会重新计算这个 rdd 的数据
- checkpoint 会改变 rdd 的 lineage
- Broadcast Variable 可以使得需要共享的变量在每个节点保存一份而不是在每个 task 上,减少了网络 io 和内存消耗。
cache()
Persist this RDD with the default storage level (`MEMORY_ONLY`)
persist()
def persist(newLevel: StorageLevel): this.type = {
if (isLocallyCheckpointed) {
// 已经缓存过,transformStorageLevel 会给缓存级别上加一个磁盘
persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
} else {
persist(newLevel, allowOverride = false)
}
}
在 iterator 会检查缓存级别,如果缓存了,先去缓存中拿。
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
}
最终会调用 blockManager 的 getOrElseUpdate 方法
def getOrElseUpdate[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[T],
makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {
// 从本地或者远程 blockmanager 读
get[T](blockId)(classTag) match {
case Some(block) =>
return Left(block)
case _ =>
// Need to compute the block.
}
// 重新计算,重新缓存
// makeIterator 是重新计算的函数
// 形式
// () => {
// readCachedBlock = false
// // 重新计算
// computeOrReadCheckpoint(partition, context)
// }
// doPutIterator 略长,就不贴了主要做了如下事情
// 1. 根据设置的内存磁盘缓存级别,还有是否序列化做处理
// 2. 报告 blockManagerMaster 状态变化
doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {
case None =>
// doPut() 啥都没做,可能多线程?别的线程做完了?
val blockResult = getLocalValues(blockId).getOrElse {
releaseLock(blockId)
throw new SparkException(s"get() failed for block $blockId even though we held a lock")
}
// 释放锁,返回结果
releaseLock(blockId)
Left(blockResult)
case Some(iter) =>
// 数据太大内存放不下, dropped 到磁盘,通过迭代器返回
Right(iter)
}
}
下面是 get 方法
def get[T: ClassTag](blockId: BlockId): Option[BlockResult] = {
val local = getLocalValues(blockId)
if (local.isDefined) {
logInfo(s"Found block $blockId locally")
return local
}
// 远程的要连接远程节点的 blockManager 获取
// 要通过 blockManagerMaster 获取 block 在哪些节点上
// 然后再从远程节点的 blockManager 上获取
val remote = getRemoteValues[T](blockId)
if (remote.isDefined) {
logInfo(s"Found block $blockId remotely")
return remote
}
None
}
checkpoint()
def checkpoint(): Unit = RDDCheckpointData.synchronized {
if (context.checkpointDir.isEmpty) {
throw new SparkException("Checkpoint directory has not been set in the SparkContext")
} else if (checkpointData.isEmpty) {
// 初始化
checkpointData = Some(new ReliableRDDCheckpointData(this))
}
}
然后在 sparkContext 的 runJob 方法,注意最后一行,在 job 执行完成之后调用 rdd 的 doCheckpoint 方法。
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
再看 doCheckpoint 方法
private[spark] def doCheckpoint(): Unit = {
RDDOperationScope.withScope(sc, "checkpoint", allowNesting = false, ignoreParent = true) {
if (!doCheckpointCalled) {
doCheckpointCalled = true
// 递归的往前找,直到有一个定义了 checkpointData
if (checkpointData.isDefined) {
if (checkpointAllMarkedAncestors) {
// 继续往下走
// 从第一个 checkpoint 的地方开始调用 checkpointData.get.checkpoint()
dependencies.foreach(_.rdd.doCheckpoint())
}
checkpointData.get.checkpoint()
} else {
dependencies.foreach(_.rdd.doCheckpoint())
}
}
}
}
下面是 RDDCheckpointData#checkpoint 方法
final def checkpoint(): Unit = {
// Guard against multiple threads checkpointing the same RDD by
// atomically flipping the state of this RDDCheckpointData
RDDCheckpointData.synchronized {
if (cpState == Initialized) {
// 标记状态为 CheckpointingInProgress
cpState = CheckpointingInProgress
} else {
return
}
}
// 新建一个 RDD
val newRDD = doCheckpoint()
// 更改状态为 Checkpointed,并且改变依赖关系(lineage)
RDDCheckpointData.synchronized {
cpRDD = Some(newRDD)
cpState = Checkpointed
// 清理 rdd 原有的依赖
rdd.markCheckpointed()
}
}
这个是 doCheckpoint 方法创建 ReliableCheckpointRDD 的代码,在这里面完成了 checkpoint 文件存储操作。
def writeRDDToCheckpointDirectory[T: ClassTag](
originalRDD: RDD[T],
checkpointDir: String,
blockSize: Int = -1): ReliableCheckpointRDD[T] = {
val sc = originalRDD.sparkContext
// Create the output path for the checkpoint
val checkpointDirPath = new Path(checkpointDir)
val fs = checkpointDirPath.getFileSystem(sc.hadoopConfiguration)
if (!fs.mkdirs(checkpointDirPath)) {
throw new SparkException(s"Failed to create checkpoint path $checkpointDirPath")
}
// Save to file, and reload it as an RDD
val broadcastedConf = sc.broadcast(
new SerializableConfiguration(sc.hadoopConfiguration))
// 划重点 -- writePartitionToCheckpointFile 就是把数据写的持久化目录中的方法
// 比如写到 hdfs 的某个目录
// 这个 runjob 是重新提交 job
// 这也是为什么想要 checkpoint 的节点最好先 persist 一下的原因。
// 如果不 persist 那么需要 checkpoint 的数据需要从头跑一遍。
// writePartitionToCheckpointFile 的方法就不贴了, 没什么意思
sc.runJob(originalRDD,
writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
if (originalRDD.partitioner.nonEmpty) {
writePartitionerToCheckpointDir(sc, originalRDD.partitioner.get, checkpointDirPath)
}
val newRDD = new ReliableCheckpointRDD[T](
sc, checkpointDirPath.toString, originalRDD.partitioner)
if (newRDD.partitions.length != originalRDD.partitions.length) {
throw new SparkException(
s"Checkpoint RDD $newRDD(${newRDD.partitions.length}) has different " +
s"number of partitions from original RDD $originalRDD(${originalRDD.partitions.length})")
}
newRDD
}
下面看一看 checkpoint rdd 是怎么起作用的:
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
// isCheckpointedAndMaterialized 在checkpoint 过的节点返回为真
if (isCheckpointedAndMaterialized) {
// 比较关键的是 firstParent 怎么的到的,具体看下面的方法
firstParent[T].iterator(split, context)
} else {
compute(split, context)
}
}
在 firstParent 方法中会去拿依赖的 head, 依赖获取的方法如下:
final def dependencies: Seq[Dependency[_]] = {
// 如果 checkpoint 有值直接返回
checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
if (dependencies_ == null) {
dependencies_ = getDependencies
}
dependencies_
}
}
到这里 checkpoint 的原理机制就介绍完了
闭包处理
如果看过 spark 算子实现的代码你肯定会疑惑在每个 spark 算子中都会出现的下面这句话到底做了什么
val cleanF = sc.clean(f)
这里辗转调用最终调用到一个非常长的方法 ClosureCleaner#clean, clean 方法内部使用了 asm 代码库直接分析字节码去确定闭包引用,然后在每个 task 上都会 copy 一份闭包引用,也就是说每个 task 有一份变量实例。因为 asm 库不太熟悉,等以后再补上代码分析..
Broadcast Variable && Accumulator
就不看代码了,看吐了,简单介绍下特点
一个算子的函数中使用到了某个外部的变量,那么这个变量的值会被拷贝到每个task中,此时每个task只能操作自己的那份变量副本,也就是闭包清理步骤做的事情。如果想要在多个 task 间共享一份数据这种方法是不恰当的,应该使用 Broadcast Variable(广播变量)或者 Accumulator(累加变量),Broadcast Variable会将使用到的变量仅在每个节点拷贝一份,而不是在每个 task 上(底层使用 blockManager 实现),减少网络传输以及内存消耗。
Accumulator 提供了累加的功能,但是task只能对Accumulator进行累加操作,不能读取它的值,只有Driver程序可以读取Accumulator的值。
blockManager
这是 spark 中非常底层的部分,shuffle、persist、Broadcast 等都要用到它,blockMangerMaster 在 Driver 上启动,负责保存各个节点上 block 的状态每个 executor 上有 blockManager 负责管理本节点上的数据缓存,以及状态到 BlockManagerMaster 的报备。
… 有空再补上吧, 找实习烦死了