入口
// 最后,针对stage的task,创建TaskSet对象,调用taskScheduler的submitTasks()方法,提交taskSet //
默认情况下,我们的standalone模式,是使用的TaskSchedulerImpl,TaskScheduler只是一个trait
taskScheduler.submitTasks( new TaskSet(tasks.toArray, stage.id,
stage.newAttemptId(), stage.jobId, properties))
看看taskScheduler.submitTasks()方法,TaskSchedulerImpl的submitTasks()方法
/** * TaskScheduler提交任务的入口 * @param taskSet */ override def
submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task
set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { //
给每一个taskSet,都会创建一个TaskSetManager //
TaskSetManager实际上,在后面,会负责他的那个TaskSet的任务执行状况的监视和管理 val manager =
createTaskSetManager(taskSet, maxTaskFailures) // 加入内存缓存中
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if
(!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new
TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial
job has not accepted any resources; " + "check your cluster UI to ensure that
workers are registered " + "and have sufficient resources") } else {
this.cancel() } } }, STARVATION_TIMEOUT, STARVATION_TIMEOUT) } hasReceivedTask
= true } //
sparkContext原理剖析的时候,创建TaskScheduler的时候,一件非常重要的事情,就是为TaskSchedulerImpl创建 //
一个SparkDeploySchedulerBackend,这里的backend,指的就是之前创建好的SparkDeploySchedulerBackend,而且这个
// backend是负责创建AppClient,向Master注册Application的 backend.reviveOffers() }
看看TaskSetManager这个类
/** * 在TaskSchedulerImpl中,对一个单独的TaskSet的任务进行调度,这个类负责追踪每一个task,如果task失败的话, *
会负责重试task,直到超过重试的次数限制,并且会通过延迟调度,为这个TaskSet处理本地化调度机制。它的主要接口是resourceOffer, *
在这个接口中,TaskSet会希望在一个节点上运行一个任务,并且接受任务的状态改变消息,来知道它负责的task的状态改变了 */ private[spark]
class TaskSetManager( sched: TaskSchedulerImpl, val taskSet: TaskSet, val
maxTaskFailures: Int, clock: Clock = new SystemClock()) extends Schedulable
with Logging {
看看backend.reviveOffers()方法,CoarseGrainedSchedulerBackend的reviveOffers()方法
override def reviveOffers() { driverActor ! ReviveOffers }
CoarseGrainedSchedulerBackend这个类的,DriverActor这个类的ReviveOffers
case ReviveOffers => makeOffers()
看makeOffers()方法
// Make fake resource offers on all executors def makeOffers() { //
第一步,调用TaskSchedulerImpl的resourceOffers()方法,执行任务分配算法,将各个task分配到executor上去 //
第二步,分配好task到Executor之后,执行自己的的launchTasks()方法,将分配的task发送launchTask消息到对应的Executor上去,由Executor启动并执行task
//
给resourceOffers方法传入的是这个Application所有可用的Executor,并且将其封装成了WorkerOffer,每个WorkerOffer代表了每个Executor可用的cpu资源数量
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id,
executorData) => new WorkerOffer(id, executorData.executorHost,
executorData.freeCores) }.toSeq)) }
首先看scheduler.resourceOffers()
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] =
synchronized { // Mark each slave as alive and remember its hostname // Also
track if new executor is added var newExecAvail = false for (o <- offers) {
executorIdToHost(o.executorId) = o.host activeExecutorIds += o.executorId if
(!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new
HashSet[String]() executorAdded(o.executorId, o.host) newExecAvail = true } for
(rack <- getRackForHost(o.host)) { hostsByRack.getOrElseUpdate(rack, new
HashSet[String]()) += o.host } } //
首先,将可用的executor进行shuffle,也就是说,进行打散,从而做到,尽可能可以进行负载均衡 // Randomly shuffle offers
to avoid always placing tasks on the same set of workers. val shuffledOffers =
Random.shuffle(offers) // Build a list of tasks to assign to each worker. //
然后针对WorkerOffer,创建一堆需要用的东西 //
比如tasks,它可以理解为一个二维数组,即ArrayBuffer的元素又是一个ArrayBuffer,并且每个子ArrayBuffer的数量是固定的,也就是这个Executor可用的cpu数量
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray //
这个很重要,从rootPool中取出了排序的TaskSet,之前讲解TaskScheduler初始化的时候,创建完TaskSchedulerImpl、SparkDeploySchedulerBackend之后,执行一个initialize()
//
方法,在这个方法中,其实会创建一个调度池,这里,相当于是说,所有提交的taskSet,首先呢,会放入这个调度池,然后再执行task分配算法的时候,会从这个调度池中,取出排好队的TaskSet
val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <-
sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks)) if (newExecAvail) {
taskSet.executorAdded() } } // Take each TaskSet in our scheduling order, and
then offer it each node in increasing order // of locality levels so that it
gets a chance to launch local tasks on all of them. // NOTE: the
preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY //
这里,是任务分配算法的核心,双重for循环,遍历所有的taskSet,以及每一种本地化级别 // 本地化级别有 //
PROCESS_LOCAL,进程本地化,rdd的partition和task,进入一个Executor内,速度当然快 //
NODE_LOCAL,dd的partition和task,不在一个Executor重,不在一个进程,但是在一个worker节点上 //
NO_PREF,无,没有所谓的本地化级别 // RACK_LOCAL,机架本地化,至少rdd的partition和task,在一个机架上 //
ANY,任意的本地化级别 // 这几种本地化级别 是从小到大排列的 var launchedTask = false //
对每一个taskSet,从最好的一种本地化级别,开始遍历 for (taskSet <- sortedTaskSets; maxLocality <-
taskSet.myLocalityLevels) { do { //
对当前taskSet,尝试优先使用最小的本地化级别,将taskset的task,在Executor上进行启动 // 如果启动不了,那么就跳出这个do
while循环,进入下一种本地化级别,也就是放大本地化级别 // 以此类推,直到尝试将taskset在某些本地化级别下,在task在Executor上全部启动
launchedTask = resourceOfferSingleTaskSet( taskSet, maxLocality,
shuffledOffers, availableCpus, tasks) } while (launchedTask) } if (tasks.size >
0) { hasLaunchedTask = true } return tasks }
继续看resourceOfferSingleTaskSet()方法
private def resourceOfferSingleTaskSet( taskSet: TaskSetManager, maxLocality:
TaskLocality, shuffledOffers: Seq[WorkerOffer], availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = { var launchedTask =
false // 遍历所有Executor for (i <- 0 until shuffledOffers.size) { val execId =
shuffledOffers(i).executorId val host = shuffledOffers(i).host //
如果当前Executor的cpu数量大于每个task要使用的cpu数量,默认是1 if (availableCpus(i) >= CPUS_PER_TASK)
{ try { //
调用taskSetManager的resourceOffer方法,去找到,在这个Executor,用这种本地化级别,taskset的哪些task可以启动 //
resourceOffer()方法,就是说,会去判断这个task在这个这个本地化级别,之前的等待时间是多少,如果说,本地化级别的等待时间在一定范围内 //
那么就认为task使用本地化级别可以在executor上启动 for (task <- taskSet.resourceOffer(execId, host,
maxLocality)) { tasks(i) += task val tid = task.taskId taskIdToTaskSetId(tid) =
taskSet.taskSet.id taskIdToExecutorId(tid) = execId executorsByHost(host) +=
execId availableCpus(i) -= CPUS_PER_TASK assert(availableCpus(i) >= 0)
launchedTask = true } } catch { case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not
serializable") // Do not offer resources for this task, but don't throw an
error to allow other // task sets to be submitted. return launchedTask } } }
return launchedTask }
接下来看launchTasks()方法
// Launch tasks returned by a set of resource offers //
根据分配好的情况,去Executor上启动相应的task def launchTasks(tasks: Seq[Seq[TaskDescription]])
{ for (task <- tasks.flatten) { // 首先将每个Executor要执行的task信息,统一进行序列化操作 val ser =
SparkEnv.get.closureSerializer.newInstance() val serializedTask =
ser.serialize(task) if (serializedTask.limit >= akkaFrameSize -
AkkaUtils.reservedSizeBytes) { val taskSetId =
scheduler.taskIdToTaskSetId(task.taskId)
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet => try { var msg =
"Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values." msg =
msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
AkkaUtils.reservedSizeBytes) taskSet.abort(msg) } catch { case e: Exception =>
logError("Exception in error callback", e) } } } else { // 找到对应的executor val
executorData = executorDataMap(task.executorId) // 给executor上的资源,减去要使用的cpu资源
executorData.freeCores -= scheduler.CPUS_PER_TASK //
向executor发送LaunchTask消息,来在executor上启动task executorData.executorActor !
LaunchTask(new SerializableBuffer(serializedTask)) } } }

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信