文章目录

* 绪论 <https://blog.csdn.net/qq_33247435/article/details/83960291#_1>
* 1、伪代码 <https://blog.csdn.net/qq_33247435/article/details/83960291#1_3>
* 2、小知识点普及 <https://blog.csdn.net/qq_33247435/article/details/83960291#2_32>
* 3、图解 <https://blog.csdn.net/qq_33247435/article/details/83960291#3_47>
* 4、流程介绍 <https://blog.csdn.net/qq_33247435/article/details/83960291#4_50>
* 5、Spark更多内容
<https://blog.csdn.net/qq_33247435/article/details/83960291#5Spark_68>


<>绪论

  阅读前请参考《Spark的任务调度
<https://blog.csdn.net/qq_33247435/article/details/83687361>》和《Spark的资源调度
<https://blog.csdn.net/qq_33247435/article/details/83894967>》,以便您更好的理解本文内容(
有自信直接看这篇博客也没问题的)。

<>1、伪代码

  下面这段伪代码就是用Scala语言写的一个小的Spark应用程序。如对代码有疑惑请查阅《Scala快速学习
<https://blog.csdn.net/qq_33247435/article/details/83473879>》
main(){ //声明配置对象 val conf = new SparkConf() //设置这个Application的运行模式
conf.setMaster() //设置这个Application的名称 conf.setAppName() //这个选项可以配置一些配置参数
conf.set("","") //声明应用上下文 val sc = new SparkContext(conf) //得到第一个RDD val rdd =
sc.textFile(hdfs://) //下面是应用算子对RDD的操作 val mapRDD = rdd.map() val filterRDD =
mapRDD.filter() filterRDD.cont() val flatMapRDD = filterRDD.flatMap()
flatMapRDD.foreach(println) //释放资源 sc.stop() }
<>2、小知识点普及

  Driver进程是怎么启动起来的?
  只是简单的看上面的伪代码的话,想看出来在哪启动的Driver进程还是有点困难的。通过查看源码我们可以发现在声明应用上下文的时候,即执行val sc =
new SparkContext(conf)命令的时候执行了一系列的操作,其中就包括启动Driver进程(或者为Driver进程申请资源并启动)。

  什么是挣扎的task???
  鉴定一个task有三个指标:75%,100s,1.5。

  当所有的task中,75%以上的task都运行成功了,就会每隔一百秒计算一次,计算出目前所有的未成功执行的任务的已执行时间的中位数,用这个中位数乘以1.5得到一个时间值,凡是比这个时间长的task都是挣扎的task。


  为什么spark使用粗粒度的资源调度???

  在任务执行之前把资源申请完毕。每一个task执行时就不用自己去申请资源了,直接使用已经申请好的资源就行。可以使task启动时间边快,进而导致stage、job、application的执行效率都变高了。


  启动Executor的时候根本没有考虑数据的位置,有利于数据的本地化。为什么要这样设计???
  启动Executor的时候使用的是轮询的方式。至于为什么会有利于数据的本地化,《Spark的资源调度
<https://blog.csdn.net/qq_33247435/article/details/83894967>
》里面有介绍。下面我们讨论为什么要这样设计。讨论过程从两个方面入手:1、多个job。2、task的执行效率及时间。

  如果我们有多个job,每个job需要的数据存储位置可能都不一样,如果要考虑数据位置的话,要启动很多的executor。而使用轮询的方式启动,不需要考虑这些位置信息。只是在每一个task存在的节点启动一个Executor就可以了,而且还有利于数据的本地化。
  从task执行效率来说,使用粗粒度的资源调度,在task执行之前会申请好资源,以后的所有task都不需要申请资源了,可以提高执行效率,缩短执行时间。



<>3、图解


  提交Application的方式有两种:cluster方式和client方式。这里我们以cluster方式来举例(client方式只是少了为Driver申请资源的操作)。
  

<>4、流程介绍



=======================================资源调度=========================================

  1、启动Master和备用Master(如果是高可用集群需要启动备用Master,否则没有备用Master)。
  2、启动Worker节点。Worker节点启动成功后会向Master注册。在works集合中添加自身信息。
  3、在客户端提交Application,启动spark-submit进程。伪代码:spark-submit --master
--deploy-mode cluster --class jarPath

  4、Client向Master为Driver申请资源。申请信息到达Master后在Master的waitingDrivers集合中添加该Driver的申请信息。

  5、当waitingDrivers集合不为空,调用schedule()方法,Master查找works集合,在符合条件的Work节点启动Driver。启动Driver成功后,waitingDrivers集合中的该条申请信息移除。Client客户端的spark-submit进程关闭。
  (Driver启动成功后,会创建DAGScheduler对象和TaskSchedule对象)

  6、当TaskScheduler创建成功后,会向Master会Application申请资源。申请请求发送到Master端后会在waitingApps集合中添加该申请信息。

  7、当waitingApps集合中的元素发生改变,会调用schedule()方法。查找works集合,在符合要求的worker节点启动Executor进程。

  8、当Executor进程启动成功后会将waitingApps集合中的该申请信息移除。并且向TaskSchedule反向注册。此时TaskSchedule就有一批Executor的列表信息。


=======================================任务调度=========================================

  9、根据RDD的宽窄依赖,切割job,划分stage。每一个stage是由一组task组成的。每一个task是一个pipleline计算模式。

  10、TaskScheduler会根据数据位置分发task。(taskScheduler是如何拿到数据位置的???TaskSchedule调用HDFS的api,拿到数据的block块以及block块的位置信息)
  11、TaskSchedule分发task并且监控task的执行情况。
  12、若task执行失败或者挣扎。会重试这个task。默认会重试三次。

  13、若重试三次依旧失败。会把这个task返回给DAGScheduler,DAGScheduler会重试这个失败的stage(只重试失败的这个stage)。默认重试四次。
  14、告诉master,将集群中的executor杀死,释放资源。

<>5、Spark更多内容

  如果想了解Spark更多内容,推荐阅读《Spark学习总结
<https://blog.csdn.net/qq_33247435/article/details/83653584#8Spark_71>》。

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信