开发出的 Spark 程序可能慢了,需要调优,或者挂了需要查错,或者要改进 Spark 框架本身,这时就需要了解 Spark 的内部实现,在了解 Spark 内部实现时,第一个想知道的就是一个 RDD 程序到底是怎么跑起来的,那么,它是怎么跑起来的呢?
概括的讲,Spark 程序是通过启动 driver 和 executor 跑起来的,driver 把 Spark 程序生成的 DAG 规划为 Stage 和 Task,然后通过 RPC 使 executor 跑 Task,从而完成 Spark 程序的执行,而 driver,executor 都是通过 spark-submit 往 Spark RestServer 提交任务,然后通过资源管理器(Yarn, Mesos, Spark standlone)跑起来的。
上面的话是什么意思?首先,我们先得理解 Spark 部署后的进程有哪些。
我们要把 spark 程序跑起来,得先把 spark 部署起来,部署有三种方式 Mesos,Yarn,Standalone。以 Standalone 为例(其他类似),我们准备三台机器,一台运行 ./sbin/start-master.sh
,另两台运行 ./sbin/start-slave.sh <master-spark-URL>
,这样,spark cluster 就跑起来了。
那么 spark cluster 有什么用?我们知道,spark 是分布式执行的,对于一个计算,会分解为很多 task,那么这些 task 会分配到多台机器去执行,那么这就涉及一些问题,例如把 task 分配到哪台机器去执行?每台机器上的 CPU,Memory 还剩多少,够不够执行这个 task?task 结束了 CPU 资源怎么回收以分配给别的 task?这些功能就是资源管理器需要提供的,以 Mesos 为例:
resourceOffer
和 statusUpdate
回调。resourceOffer
会定期被 Master 触发调用,告诉应用当前在哪台机器上有多少 CPU 等资源,应用判断当前资源够时可以调用 launchTasks
在某台 Slave
机器上启动 Executor
程序Executor
即和应用相关的,用户业务逻辑需要的程序,例如 Spark 就有自己的 Executor,Executor 在执行任务过程中通过调用自己的 statusUpdate
告诉 Mesos Task 的状态(Task 正在运行、已失败、已成功等),而 Mesos 会负责调用 Scheduler 的 statusUpdate,从而把消息传递给 Scheduler,方便 Scheduler 对 Task 进行管理。综上,资源管理器完成的任务是:
注意上文中可靠地
三个字,在分布式环境下,Mesos Master 可能挂,Slave 可能挂,Slave 所在节点和 Master 网络不通,消息可能丢,可能延迟。如何在这些恶劣的环境下,依然维护好各机器的状态,各 Task 的状态,以及各 Scheduler 和 Executor 的状态,如何避免这些失败影响应用程序(即容错),在 Task 多,应用程序多时,如何保证调度程序的性能等都是资源管理器需要解决的问题,对此有额外的兴趣的可以去研究 Mesos,Yarn,以及 Borg 之类的,这里就不再深究。
spark 部署好了,我们要提交 spark 程序给 spark cluster 去运行,一般来说,我们通过 spark-submit 提交我们的程序,例如:
bin/spark-submit --class org.apache.spark.examples.GroupByTest --master spark://master:6066 --deploy-mode cluster /vagrant_data/spark/examples/target/scala-2.11/jars/spark-examples_2.11-2.3.0-SNAPSHOT.jar
我们提交了 spark 源码中 examples.GroupByTest 程序,然后我们会在 UI 上看到一个运行的 App,一个 Driver,点 worker 的链接还会看到运行的 Executor,接着这些顺利结束,我们的 Spark 程序也运行完毕了,那么 spark-submit 到底做了什么事情呢?
这个跟 deploy-mode
有关,我们先看 deploy-mode=cluster
的情况。spark master 会启动一个 HTTP server,用于用户提交 spark 应用,spark-submit 会和 spark master 通信(要不然为什么要有 --master
参数),把 spark 应用运行所需要的信息传递过去,例如jar 文件,主类等等,spark master 记录下来,这时 spark-submit 执行完毕,而 spark master 会在合适时机启动主类,这个主类,就是 GroupByTest,也就是我们会谈到的 spark 应用的 driver。
如果 deploy-mode=cluster master=mesos
,我们只讨论Coarse-Grained
的情况。用户在部署 mesos 的时候同时需要部署 MesosClusterDispatcher
,它会顺带启动MesosClusterScheduler
和 MesosRestServer
,其中 MesosClusterScheduler 是一个 mesos scheduler,当用户 submit 时,会提交到 MesosRestServer,它负责把 driver 塞到 MesosClusterScheduler 的待启动 driver 列表,而他会把 driver 做为 task 给启动起来,启动命令重用了 ./bin/spark-submit
,只是 deploy-mode 变成了 client。
MesosClusterScheduler 只负责启动 driver,该 driver 会包含 MesosCoarseGrainedSchedulerBackend
,该对象也是一个 mesos scheduler,这意味着它也可以启动 task,它会把 executor 做为 task 启动。在静态分配的情况下(非 dynamic allocation 就叫静态)该对象在初始化时会通过 spark.cores.max
等参数知道该 driver 需要启动多少个 executor,从而在 mesos 提供资源给它时启动对应数量的 executor。而 DAG 相关的 task 所需要的资源其实就是 executor 得到的资源。
综上,mesos 给资源给 MesosClusterScheduler
用于启动 driver,给资源给 MesosCoarseGrainedSchedulerBackend
用于启动 executor,executor 用自己的资源来跑 task。
那么 driver 和 executor 具体是什么呢?如果我们在 LogQuery 应用启动后去 worker 机器上查看运行的进程,我们就能看到一个额外的 DriverWrapper
进程,它只是对 GroupByTest
主类的一层简单封装,而运行的 Executor 进程,则是 CoarseGrainedExecutorBackend
。
driver 是用户定义的,而 executor 则是标准的,与应用无关的程序,因为 executor 需要运行的 task 是由 spark 定义的,所以 executor 可以由 spark 框架提供。那么 executor 运行的 task 到底是什么呢?
应用程序已经提交了,其对应的 driver 和 executor 也跑起来了,程序的具体执行是通过 driver 把 Spark 程序的 action 都转化为 executor 可以执行的 task 来完成的。DAGScheduler 会负责把 DAG(Directed Acyclic Graph)拆分成 Task,而 DAG 是对 RDD 进行变换得到的,因此我们需要先了解 RDD。
RDD 是 spark 执行运算的主要数据结构,具有以下几个重要的属性:
compute(Partition): Iterator
函数,用来表明如何计算得到每个 partition例如 sc.parallelize(seq, numSlices)
这个是直接生成了 RDD,共有 numSlices
个 partition,compute
函数返回 seq
中对应一段数据的 Iterator
,没有对其他 RDD 的依赖。类似的,其他输入源如 HDFS 上的文件产生的 RDD 其 compute 函数内容就是读文件。那么
我们以 GroupByTest
中 sc.parallelize(0 until numMappers, numMappers).flatMap(f).groupByKey(numReducers).count
的计算为例。这里面生成了三个 RDD,parallelize
生成 ParallelCollectionRDD
A,flatMap
生成 MapPartitionsRDD
B,其中 A 是 B 的依赖,而 groupByKey
生成 ShuffledRDD
C,B 是 C 的依赖。
MapPartitionsRDD
的 compute 函数是如何的呢,感受下:
override def compute(split: Partition, context: TaskContext): Iterator[U] =
firstParent[T].iterator(split, context).flatMap(f)
其中 f 就是 flatMap
接收的参数。即父 RDD 做 flatMap 得到,这里的重点是,.flatMap
并没有真正执行 flatMap
操作,而只是创建了一个 A->B 的依赖,并且记录了如何利用依赖计算出 B。
以此类推,以 RDD 为节点,依赖关系为边,最后会形成一个 DAG,那么最后需要计算的是叶子节点 RDD 的数据,而这需要计算父 RDD 的数据,这可能是同一个 Task,也可能不是,具体如何把 DAG 的计算拆解成一个个 Task 呢?
那么 DAG 是如何变成一个个可执行的 Task 的呢?我们可以设想一些例子。
rdd.map(f1).filter(f2).count
,假设 rdd 有 N 个 partition,那么 N 个 partition 可以分别做 map 然后做 filter,互相之间不会有影响,最后每个 partition 出一个 count,再把 N 个 count 求和就得到计算结果了。所以这里起 N 个 Task 就够了,每个 partition 起一个 Task,Task 做 map(f1).filter(f2).count
,N 个 Task 可以并行执行,不互相影响。rdd.map(f1).filter(f2).reduceByKey(f3).count
这里不一样的地方是 reduceByKey
,假设 rdd 有 N 个 partition,假设 reduceByKey 得到的 RDD 有 M 个 partition(由设置决定)那么,M 个 partition 每个 partition 的计算,都依赖于 N 个 partition,而不是像之前的 map 只依赖一个 partition,这种依赖,被称为 ShuffleDependency
。有 ShuffleDependency
的 DAG,需要在第一阶段执行 map,然后数据重新洗牌(shuffle),再执行第二阶段的各个 Task,因此就有了 Stage 的概念。接着上面 reduceByKey
的例子,分为两个 Stage,第一个 Stage 运行 N 个 Task,执行 rdd.map(f1).filter(f2)
,第二个 Stage 执行 reduceByKey(f3).count
,运行 M 个 Task。第二个 Stage 必须等第一个 Stage 完成后进行,第二个 Stage 的 M 个 Task,每一个 Task 都必须读 N 个 Task 的属于自己的那一份输出。
由上可见,Task 是执行的最小单位,Task 的输出需要保存到内存或硬盘上,这意味着:
那么什么是 Job 呢?一个 action 就是一个 Job,一个 Job 包含多个 Stage。例如上文中的 count
,会生成一个 Job,一个应用可能有多个 action,因此就有 Job 的概念,Job 之间独立执行的,不会互相共享数据。
DAGScheduler
划分好 Job/Stage/Task 后,会把 Task 通过 RPC 传给 executor 去执行,而由 executor 把 Task 运行状态不断地同步给 DAGScheduler
,DAGScheduler
会进行相应的处理,例如重试失败的 Stage,把宕机的 worker 上的任务重新调度等容错处理。
当 Task 成功运行完毕时,具体处理逻辑有:
CoarseGrainedSchedulerBackend.receive StatusUpdate
会回收对应 executor 上该 task 占据的资源TaskSetManager.handleSuccessfulTask
会取消该 task 的其他 attemptDAGScheduler.handleTaskCompletion
会判断 task 对应的 stage (通过 shuffleStage.pendingPartitions.isEmpty
或 resultStage.activeJob.numFinished=job.numPartitions
)是否完毕,如果完毕会触发依赖该 stage 的 stage 执行。当该 task 为 ShuffleMapTask 时,会通过 mapOutputTracker.registerMapOutput
注册其结果所在的位置,从而让后一 Stage 的 task 能够读到其输出。