首先判断,master状态不是ALIVE的话,直接返回
也就是说,standby master是不会进行Application等资源调度的
首先调度Driver
只有用yarn-cluster模式提交的时候,才会注册driver,因为standalone和yarn-client模式,都会在本地直接启动driver,而
不会来注册driver,就更不可能让master来调度driver了
Application的调度机制
首先,Application的调度算法有两种,一种是spreadOutApps,另一种是非spreadOutApps
默认是spreadOutApps
通过spreadOutApps这种算法,其实会将每个Application,要启动的Executor,都平均分布到各个worker上去
比如有20个cpu
core要分配,有10个worker,那么实际上会循环两遍worker,每次循环,给每个worker分配一个core,最后每个worker分配了两个core

所以,比如,spark-submit里,配置的是要10个executor,每个要2个core,那么总共是20个core,但这种算法下,其实总共只会启动2个executor,每个有10个core
非spreadOutApps调度算法,将每一个application,尽可能少的分配到Worker上去
这种算法和spreadOutApps算法正好相反,每个application都尽可能分配到尽量少的worker上去
比如总共有10个worker,每个有10个core,Application总共要分配20个core
那么其实只会分配到两个worker上,每个worker都占满10个core,那么其余的application,就只能分配到下一个worker了

源码剖析
private def schedule() { // 首先判断,master状态不是ALIVE的话,直接返回 // 也就是说,standby
master是不会进行Application等资源调度的 if (state != RecoveryState.ALIVE) { return } //
First schedule drivers, they take strict precedence over applications //
Randomization helps balance drivers // Random.shuffle的原理,就是对传入的集合的元素进行随机的打乱 //
取出了Workers中所有之前注册上来的worker,进行过滤,必须状态位ALIVE的worker //
对状态为ALIVE的worker,调用Random.shuffle方法进行随机的打乱 val shuffledAliveWorkers =
Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) //
拿到worker数量 val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 //
首先调度Driver //
只有用yarn-cluster模式提交的时候,才会注册driver,因为standalone和yarn-client模式,都会在本地直接启动driver,而
// 不会来注册driver,就更不可能让master来调度driver了 // driver调度机制 // 遍历waitingDrivers
ArrayBuffer for (driver <- waitingDrivers.toList) { // iterate over a copy of
waitingDrivers // We assign workers to each waiting driver in a round-robin
fashion. For each driver, we // start from the last worker that was assigned a
driver, and continue onwards until we have // explored all alive workers. var
launched = false var numWorkersVisited = 0 // while的条件
numWorkersVisited小于numWorkersAlive 只要还有活着的worker没有遍历到,就继续遍历 //
而且当前这个driver还没有被启动,也就是launched为false while (numWorkersVisited < numWorkersAlive
&& !launched) { val worker = shuffledAliveWorkers(curPos) numWorkersVisited +=
1 // 如果当前这个worker的空闲内存量大于等于driver需要的内存 // 并且worker的空闲cpu数量大于等于driver所需要的CPU数量
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >=
driver.desc.cores) { // 启动driver launchDriver(worker, driver) //
将driver从waitingDrivers队列中移除 waitingDrivers -= driver // launched设置为true
launched = true } // 将指针指向下一个worker curPos = (curPos + 1) % numWorkersAlive } }
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the
first app // in the queue, then the second app, etc. // Application的调度机制 //
首先,Application的调度算法有两种,一种是spreadOutApps,另一种是非spreadOutApps // 默认是spreadOutApps
if (spreadOutApps) { // Try to spread out each app among all the nodes, until
it has all its cores //
首先,遍历waitingApps中的ApplicationInfo,并且过滤出Application还有需要调度的core的Application for
(app <- waitingApps if app.coresLeft > 0) { // 从worker中过滤出状态为ALIVE的Worker //
再次过滤出可以被Application使用的Worker,Worker剩余内存数量大于等于Application的每一个Actor需要的内存数量,而且该Worker没有运行过该Application对应的Executor
// 将Worker按照剩余cpu数量倒序排序 val usableWorkers = workers.toArray.filter(_.state ==
WorkerState.ALIVE) .filter(canUse(app, _)).sortBy(_.coresFree).reverse val
numUsable = usableWorkers.length // 创建一个空数组,存储要分配给每个worker的cpu数量 val assigned =
new Array[Int](numUsable) // Number of cores to give on each node //
获取到底要分配多少cpu,取app剩余要分配的cpu的数量和worker总共可用cpu数量的最小值 var toAssign =
math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) //
通过这种算法,其实会将每个Application,要启动的Executor,都平均分布到各个worker上去 // 比如有20个cpu
core要分配,有10个worker,那么实际上会循环两遍worker,每次循环,给每个worker分配一个core,最后每个worker分配了两个core
//
所以,比如,spark-submit里,配置的是要10个executor,每个要2个core,那么总共是20个core,但这种算法下,其实总共只会启动2个executor,每个有10个core
// while条件,只要 要分配的cpu,还未分配完,就继续循环 var pos = 0 while (toAssign > 0) { //
每一个Worker,如果空闲的cpu数量大于已经分配出去的cpu数量,也就是说worker还有可分配的cpu if
(usableWorkers(pos).coresFree - assigned(pos) > 0) { //
将总共要分配的cpu数量-1,因为这里已经决定在这个worker上分配一个cpu了 toAssign -= 1 // 给这个worker分配的cpu数量,加1
assigned(pos) += 1 } // 指针移动到下一个worker pos = (pos + 1) % numUsable } // Now
that we've decided how many cores to give on each node, let's actually give
them // 给每个worker分配完Application要求的cpu core之后 遍历worker for (pos <- 0 until
numUsable) { // 只要判断之前给这个worker分配到了core if (assigned(pos) > 0) { //
那么就在worker上启动Executor //
首先,在Application内部缓存结构中,添加Executor,并且创建ExecutorDesc对象,其中封装了,给这个Executor分配多少个cpu
core // 这里,spark 1.3.0版本的Executor启动的内部机制 //
在spark-submit脚本中,可以指定要多少个Executor,每个Executor需要多少个cpu,多少内存 //
那么基于spreadOutApps机制,实际上,最终,Executor的实际数量,以及每个Executor的cpu,可能与配置是不一样的 //
因为我们这里是基于总的cpu来分配的,就是说,比如要求3个Executor,每个要三个cpu,有9个worker,每个有1个cpu //
那么根据这种算法,会给每个worker分配一个core,然后给每个worker启动一个Executor //
最后会启动9个Executor,每个Executor有一个cpu core val exec =
app.addExecutor(usableWorkers(pos), assigned(pos)) // 在worker上启动Executor
launchExecutor(usableWorkers(pos), exec) // 将application的状态设置为RUNNING app.state
= ApplicationState.RUNNING } } } } else { // Pack each app into as few nodes as
possible until we've assigned all its cores //
非spreadOutApps调度算法,将每一个application,尽可能少的分配到Worker上去 //
这种算法和spreadOutApps算法正好相反,每个application都尽可能分配到尽量少的worker上去 //
比如总共有10个worker,每个有10个core,Application总共要分配20个core //
那么其实只会分配到两个worker上,每个worker都占满10个core,那么其余的application,就只能分配到下一个worker了 //
遍历worker,并且状态为ALIVE。还有空闲空间的worker for (worker <- workers if worker.coresFree >
0 && worker.state == WorkerState.ALIVE) { //
遍历application,并且是还有需要分配的core的application for (app <- waitingApps if
app.coresLeft > 0) { // 判断,如果当前这个worker可以被application使用 if (canUse(app,
worker)) { // 取worker剩余cpu数量,与application要分配的cpu数量的最小值 val coresToUse =
math.min(worker.coresFree, app.coresLeft) // 如果worker剩余cpu为0,那么就不分配了 if
(coresToUse > 0) { // 给application添加一个executor val exec =
app.addExecutor(worker, coresToUse) // 在worker上启动executor
launchExecutor(worker, exec) // 将application的状态设置为RUNNING app.state =
ApplicationState.RUNNING } } } } } }
看看launchDriver()方法
// 在某个Worker上,启动driver def launchDriver(worker: WorkerInfo, driver:
DriverInfo) { logInfo("Launching driver " + driver.id + " on worker " +
worker.id) // 将driver加入worker内存缓存结构 // 将worker内使用的内存和cpu数量,都加上driver需要内存和cpu数量
worker.addDriver(driver) // 同时把worker也加入到driver内部的缓存结构中 driver.worker =
Some(worker) // 调用worker的actor,给他发送LaunchDriver消息,让worker来启动Driver worker.actor
! LaunchDriver(driver.id, driver.desc) // 将driver的状态设置为RUNNING driver.state =
DriverState.RUNNING }
看看launchExecutor()方法
def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) //
将Executor加入worker内部的缓存 worker.addExecutor(exec) //
向worker的actor发送LaunchExecutor消息 worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
// 向Executor对应的application对应的driver,发送ExecutorAdded消息 exec.application.driver !
ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) }
看看canUse()方法
def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app) }

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信