流程图



CacheManager原理剖析.png

源码

入口,RDD的iterator()方法
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) { // cacheManager相关东西 //
如果storageLevel不为NONE,就是说,我们之前持久化过RDD,那么就不要直接去父RDD执行算子,计算新的RDD的partition了 //
优先尝试使用CacheManager,去获取持久化的数据 SparkEnv.get.cacheManager.getOrCompute(this,
split, context, storageLevel) } else { // 进行rdd partition的计算
computeOrReadCheckpoint(split, context) } }
看看cacheManager.getOrCompute()方法
def getOrCompute[T]( rdd: RDD[T], partition: Partition, context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = { val key = RDDBlockId(rdd.id,
partition.index) logDebug(s"Looking for partition $key") //
直接用BlockManager来获取数据,如果获取到了,直接返回就好了 blockManager.get(key) match { case
Some(blockResult) => // Partition is already materialized, so just return its
values val inputMetrics = blockResult.inputMetrics val existingMetrics =
context.taskMetrics .getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.incBytesRead(inputMetrics.bytesRead) val iter =
blockResult.data.asInstanceOf[Iterator[T]] new
InterruptibleIterator[T](context, iter) { override def next(): T = {
existingMetrics.incRecordsRead(1) delegate.next() } } //
如果从BlockManager获取不到数据,要进行后续的处理 //
虽然RDD持久化过,但是因为未知原因,书籍即不在本地内存或磁盘,也不在远程BlockManager的本地或磁盘 case None => // Acquire
a lock for loading this partition // If another thread already holds the lock,
wait for it to finish return its results //
再次调用一次BlockManager的get()方法,去获取数据,如果获取到了,那么直接返回数据,如果还是没有获取数据,那么往后走 val
storedValues = acquireLockForPartition[T](key) if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get) } // Otherwise,
we have to load the partition ourselves try { logInfo(s"Partition $key not
found, computing it") // 调用computeOrReadCheckpoint()方法 //
如果rdd之前checkpoint过,那么尝试读取它的checkpoint,如果rdd没有checkpoint过,那么只能重新使用父RDD的数据,执行算子,计算一份
val computedValues = rdd.computeOrReadCheckpoint(partition, context) // If the
task is running locally, do not persist the result if
(context.isRunningLocally) { return computedValues } // Otherwise, cache the
values and keep track of any updates in block statuses val updatedBlocks = new
ArrayBuffer[(BlockId, BlockStatus)] //
由于走CacheManager,肯定意味着RDD是设置过持久化级别的,只是因为某些原因,持久化数据没有找到,才会到这 //
所以读取了checkpoint的数据,或者是重新计算数据之后,要用putInBlockManager()方法,将数据再BlockManager中持久化一份
val cachedValues = putInBlockManager(key, computedValues, storageLevel,
updatedBlocks) val metrics = context.taskMetrics val lastUpdatedBlocks =
metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq) new
InterruptibleIterator(context, cachedValues) } finally { loading.synchronized {
loading.remove(key) loading.notifyAll() } } } }
这里主要看blockManager.get()、rdd.computeOrReadCheckpoint()、putInBlockManager()方法
首先看blockManager.get()
// 通过BlockManager获取数据的入口方法,优先从本地获取,如果本地没有,那么从远程获取 def get(blockId: BlockId):
Option[BlockResult] = { val local = getLocal(blockId) if (local.isDefined) {
logInfo(s"Found block $blockId locally") return local } val remote =
getRemote(blockId) if (remote.isDefined) { logInfo(s"Found block $blockId
remotely") return remote } None }
接着看rdd.computeOrReadCheckpoint()
private[spark] def computeOrReadCheckpoint(split: Partition, context:
TaskContext): Iterator[T] = { // Checkpointed相关先忽略 if (isCheckpointed)
firstParent[T].iterator(split, context) else compute(split, context) }
接着看putInBlockManager()
private def putInBlockManager[T]( key: BlockId, values: Iterator[T], level:
StorageLevel, updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = { val
putLevel = effectiveStorageLevel.getOrElse(level) // 如果持久化级别,没有指定内存级别,仅仅是纯磁盘的级别
if (!putLevel.useMemory) { /* * This RDD is not to be cached in memory, so we
can just pass the computed values as an * iterator directly to the BlockManager
rather than first fully unrolling it in memory. */ updatedBlocks ++= //
直接调用blockManager的putIterator()方法,将数据写入磁盘即可 blockManager.putIterator(key,
values, level, tellMaster = true, effectiveStorageLevel) blockManager.get(key)
match { case Some(v) => v.data.asInstanceOf[Iterator[T]] case None =>
logInfo(s"Failure to store $key") throw new BlockException(key, s"Block manager
failed to return cached value for $key!") } } // 如果指定了内存级别,往下看 else { /* * This
RDD is to be cached in memory. In this case we cannot pass the computed values
* to the BlockManager as an iterator and expect to read it back later. This is
because * we may end up dropping a partition from memory store before getting
it back. * * In addition, we must be careful to not unroll the entire partition
in memory at once. * Otherwise, we may cause an OOM exception if the JVM does
not have enough space for this * single partition. Instead, we unroll the
values cautiously, potentially aborting and * dropping the partition to disk if
applicable. */ // 这里会调用blockManager的unrollSafely()方法,尝试将数据写入内存 //
如果unrollSafely()方法判断数据可以写入内存,那么就将数据写入内存 //
如果unrollSafely()方法判断某些数据无法写入内存,那么只能写入磁盘文件
blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match { case
Left(arr) => // We have successfully unrolled the entire partition, so cache it
in memory updatedBlocks ++= blockManager.putArray(key, arr, level, tellMaster =
true, effectiveStorageLevel) arr.iterator.asInstanceOf[Iterator[T]] case
Right(it) => // There is not enough space to cache this partition in memory val
returnValues = it.asInstanceOf[Iterator[T]] // 如果有些数据实在无法写入内存,那么就判断,数据是否有磁盘级别
// 如果有的话,那么就使用磁盘级别,将数据写入磁盘文件 if (putLevel.useDisk) { logWarning(s"Persisting
partition $key to disk instead.") val diskOnlyLevel = StorageLevel(useDisk =
true, useMemory = false, useOffHeap = false, deserialized = false,
putLevel.replication) putInBlockManager[T](key, returnValues, level,
updatedBlocks, Some(diskOnlyLevel)) } else { returnValues } } } }
看看unrollSafely()方法
def unrollSafely( blockId: BlockId, values: Iterator[Any], droppedBlocks:
ArrayBuffer[(BlockId, BlockStatus)]) : Either[Array[Any], Iterator[Any]] = { //
Number of elements unrolled so far var elementsUnrolled = 0 // Whether there is
still enough memory for us to continue unrolling this block var keepUnrolling =
true // Initial per-thread memory to request for unrolling blocks (bytes).
Exposed for testing. val initialMemoryThreshold = unrollMemoryThreshold // How
often to check whether we need to request more memory val memoryCheckPeriod =
16 // Memory currently reserved by this thread for this particular unrolling
operation var memoryThreshold = initialMemoryThreshold // Memory to request as
a multiple of current vector size val memoryGrowthFactor = 1.5 // Previous
unroll memory held by this thread, for releasing later (only at the very end)
val previousMemoryReserved = currentUnrollMemoryForThisThread // Underlying
vector for unrolling the block var vector = new SizeTrackingVector[Any] //
Request enough memory to begin unrolling keepUnrolling =
reserveUnrollMemoryForThisThread(initialMemoryThreshold) if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId
in memory.") } // Unroll this block safely, checking whether we have exceeded
our threshold periodically try { while (values.hasNext && keepUnrolling) {
vector += values.next() if (elementsUnrolled % memoryCheckPeriod == 0) { // If
our vector's size has exceeded the threshold, request more memory val
currentSize = vector.estimateSize() if (currentSize >= memoryThreshold) { val
amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
// Hold the accounting lock, in case another thread concurrently puts a block
that // takes up the unrolling space we just ensured here
accountingLock.synchronized { if
(!reserveUnrollMemoryForThisThread(amountToRequest)) { // If the first request
is not granted, try again after ensuring free space // If there is still not
enough space, give up and drop the partition val spaceToEnsure =
maxUnrollMemory - currentUnrollMemory // 反复循环,判断,只要还有数据没有写入内存,而且可以继续尝试往内存中写 //
那么就判断,如果内存大小够不够存放数据,调用ensureFreeSpace()方法,尝试清空一些内存空间 if (spaceToEnsure > 0) {
val result = ensureFreeSpace(blockId, spaceToEnsure) droppedBlocks ++=
result.droppedBlocks } keepUnrolling =
reserveUnrollMemoryForThisThread(amountToRequest) } } // New threshold is
currentSize * memoryGrowthFactor memoryThreshold += amountToRequest } }
elementsUnrolled += 1 } if (keepUnrolling) { // We successfully unrolled the
entirety of this block Left(vector.toArray) } else { // We ran out of space
while unrolling the values for this block logUnrollFailureMessage(blockId,
vector.estimateSize()) Right(vector.iterator ++ values) } } finally { // If we
return an array, the values returned do not depend on the underlying vector and
// we can immediately free up space for other threads. Otherwise, if we return
an iterator, // we release the memory claimed by this thread later on when the
task finishes. if (keepUnrolling) { val amountToRelease =
currentUnrollMemoryForThisThread - previousMemoryReserved
releaseUnrollMemoryForThisThread(amountToRelease) } } }

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信