本篇以WordCount为例,去分析RDD的依赖关系和任务切分机制,分析样例如下:
sc.textFile(“xx") .flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_)
.saveAsTextFile(“xx")
一、RDD的依赖关系
RDD的依赖分为两种:窄依赖、宽依赖
*
窄依赖指的是每一个父RDD的Partition最多被子RDD的一个Partition使用。
*
宽依赖指的是多个子RDD的Partition会依赖同一个父RDD的Partition,会引起shuffle。
RDD存在着依赖关系,这些依赖关系形成了有向无环图DAG,DAG通过DAGScheduler进行Stage的划分,并基于每个Stage生成了TaskSet,提交给TaskScheduler。
二、Spark任务提交过程
1、Driver程序的代码运行到action操作,触发了SparkContext的runJob方法。
2、SparkContext调用DAGScheduler的runJob函数。
3、DAGScheduler把Job划分stage,然后把stage转化为相应的Tasks,把Tasks交给TaskScheduler。
4、通过TaskScheduler把Tasks添加到任务队列当中,交给SchedulerBackend进行资源分配和任务调度。
5、调度器给Task分配执行Executor,ExecutorBackend负责执行Task。
Spark调度相关概念如下:
* Task(任务):单个分区数据及上的最小处理流程单元。
* TaskSet(任务集):由一组关联的,但互相之间没有Shuffle依赖关系的任务所组- 成的任务集。
* Stage(调度阶段):一个任务集对应的调度阶段。
* Job(作业):有一个RDD Action生成的一个或多个调度阶段所组成的一次计算作业。
* Application(应用程序):Spark应用程序,由一个或多个作业组成。
Spark的调度管理模块中,最重要的类是DAGScheduler和TaskScheduler,TaskScheduler负责每个具体任务的实际物理调度,DAGScheduler负责将作业拆分成不同阶段的具有依赖关系的多批任务,可以理解为DAGScheduler负责任务的逻辑调度。Spark调度管理示意图如下:
三、RDD运行规划
前面介绍了RDD之间的依赖关系划分和Spark任务提交的过程,以及Spark调度过程中的概念,那Spark调度阶段如何划分的,RDD运行是如何规划的呢,下面以WordCount为例详解:
sc.textFile(“xx") .flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_)
.saveAsTextFile(“xx")
一个Spark任务提交后,DAGScheduler从RDD依赖链末端的RDD出发,遍历整个RDD依赖链,将Job分解成具有前后依赖关系的多个stage。DAGScheduler是根据ShuffleDependency(宽依赖)划分stage的,也就是说当某个RDD的运算需要将数据进行shuffle操作时,这个包含了shuffle依赖关系的RDD将被用来作为输入信息,构建一个新的调度阶段。以此为依据划分调度阶段,可以确保有依赖关系的数据能够按照正确的顺序得到处理和运算。
可以从Spark的历史日志中去查看Spark对RDD的运行规划
热门工具 换一换