流程图



Executor原理剖析.png

源码

worker中为Application启动的executor,实际上是启动了这个CoarseGrainedExecutorBackend进程

Executor注册机制
/** * 在actor的初始化方法中 */ override def preStart() { logInfo("Connecting to
driver: " + driverUrl) // 获取了driver的executor driver =
context.actorSelection(driverUrl) //
向driver发送RegisterExecutor消息,driver是CoarseGrainedSchedulerBackend的一个内部类 //
driver注册executor成功之后,会发送回来RegisteredExecutor消息 driver !
RegisterExecutor(executorId, hostPort, cores, extractLogUrls)
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) }
driver ! RegisterExecutor()
class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor with
ActorLogReceive { override protected def log =
CoarseGrainedSchedulerBackend.this.log private val addressToExecutorId = new
HashMap[Address, String] override def preStart() { // Listen for remote client
disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) //
Periodically revive offers to allow delay scheduling to work val reviveInterval
= conf.getLong("spark.scheduler.revive.interval", 1000) import
context.dispatcher context.system.scheduler.schedule(0.millis,
reviveInterval.millis, self, ReviveOffers) } def receiveWithLogging = { case
RegisterExecutor(executorId, hostPort, cores, logUrls) =>
Utils.checkHostPort(hostPort, "Host port expected " + hostPort) if
(executorDataMap.contains(executorId)) { sender !
RegisterExecutorFailed("Duplicate executor ID: " + executorId) } else {
logInfo("Registered executor: " + sender + " with ID " + executorId) sender !
RegisteredExecutor addressToExecutorId(sender.path.address) = executorId
totalCoreCount.addAndGet(cores) totalRegisteredExecutors.addAndGet(1) val
(host, _) = Utils.parseHostPort(hostPort) val data = new ExecutorData(sender,
sender.path.address, host, cores, cores, logUrls) // This must be synchronized
because variables mutated // in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data) if (numPendingExecutors > 0) {
numPendingExecutors -= 1 logDebug(s"Decremented number of pending executors
($numPendingExecutors left)") } } listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers() }
收到RegisteredExecutor消息
override def receiveWithLogging = { //
driver注册executor成功之后,会发送回来RegisteredExecutor消息 //
此时,CoarseGrainedExecutorBackend会创建Executor对象,作为执行句柄 //
其实它的大部分功能,都是通过Executor实现的 case RegisteredExecutor => logInfo("Successfully
registered with driver") val (hostname, _) = Utils.parseHostPort(hostPort)
executor = new Executor(executorId, hostname, env, userClassPath, isLocal =
false)
启动task机制
// 启动task case LaunchTask(data) => if (executor == null) { logError("Received
LaunchTask command but executor was null") System.exit(1) } else { // 反序列化task
val ser = env.closureSerializer.newInstance() val taskDesc =
ser.deserialize[TaskDescription](data.value) logInfo("Got assigned task " +
taskDesc.taskId) // 用内部的执行句柄,Executor的launchTask()方法来启动一个task
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber =
taskDesc.attemptNumber, taskDesc.name, taskDesc.serializedTask) }
executor.launchTask()
def launchTask( context: ExecutorBackend, taskId: Long, attemptNumber: Int,
taskName: String, serializedTask: ByteBuffer) { // 对于每一个task,都会创建一个TaskRunner
// TaskRunner继承的是Java多线程中的Runnable接口 val tr = new TaskRunner(context, taskId =
taskId, attemptNumber = attemptNumber, taskName, serializedTask) //
将TaskRunner放入内存缓存 runningTasks.put(taskId, tr) //
Executor内部有一个Java线程池,这里其实将task封装在一个线程中(TaskRunner),直接将线程丢入线程池,进行执行 //
线程池是自动实现了排队机制的,也就是说,如果线程池内的线程暂时没有空闲的,那么丢进去的线程都是要排队的 threadPool.execute(tr) }

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信