本文介绍 spark (版本 2.3)里一些重要重要模块的内部实现。
RDD 的所有 action 都会触发 SparkContext.runJob
,该函数会调用 DAGScheduler.runJob(resultHandler)
,最后的实际工作会到 DAGScheduler.handleJobSubmitted
,该函数就会生成和执行 Stage,具体过程如下:
ResultStage
A,调用 submitStage(A)
getMissingParentStages
计算 A 的依赖(这里就开始形成了 Stage 之间的 DAG),如果 A 有依赖 B,则 A 暂缓执行,先执行依赖 submitStage(B)
ShuffleMapTask
结束时,会检测 ShuffleMapStage
是否完成(所有 ShuffleMapTask 完成时,ShuffleMapStage 就完成了),如果完成了,会调用 submitWaitingChildStages
,来触发 A 的提交。runJob
时传过来的回调函数 resultHandler
,从而把结果传回给 RDD 模块的 action,输出给用户。具体时机可以在 DAGScheduler
文件搜索 taskSucceeded
。我们看下 DAG 是如何形成的,即 getMissingParentStages
的内容:
rdd.dependencies
,碰到 ShuffleDependency
时生成新的 ShuffleMapStage
,否则继续往上遍历dependency.rdd.dependencies
。由此可知,最开始生成了一个 ResultStage,然后每碰到一个 ShuffleDependency
就生成一个新的 stage。ShuffleMapTask
通过 ShuffleWriter
输出到文件,而在 Stage 的分界处的 rdd,例如 ShuffledRDD
,其 compute
是读文件得到 rdd 的数据,通过这种方式,完成了两个 Stage 间数据的接力,而同一 Stage 内是不需要数据交换的,每个 partition 切分为一个 ShuffleMapTask,各 ShuffleMapTask 间不存在数据上的依赖关系。上面讲到 Stage 提交前,会先提交 Stage 所依赖的 Stage,在这过程中形成了 DAG,那么当 Stage 没有依赖时,就会执行该 Stage。在 submitStage
中会调用 submitMissingTasks(stage)
生成具体的 Task,会生成 stage.rdd.partitions.length
个 task,ResultStage 生成 ResultTask,ShuffleMapStage 生成 ShuffleMapTask,生成 task 后通过 taskScheduler.submitTasks(TaskSet)
提交 task,一个 Stage 里的所有 Task 被分组为一个 TaskSet
进行提交。
从这里开始就有点绕了,做好准备。
TaskSet
被封装成 TaskSetManager
,它提供回调(给 TaskSchedulerImpl)来完成调度 TaskSet 内的 tasks 以及 task 失败时重试的逻辑。SchedulerBackend.reviveOffers
,让它负责分配资源并启动 task。CoarseGrainedSchedulerBackend
收集 executor 的 CPU 等资源信息(这些资源是 executor 启动时就从 mesos 拿到的),调用 TaskSchedulerImpl.resourceOffers
得到 task 列表,该函数做了以下事:
spark.scheduler.mode
决定 TaskSet 的优先顺序TaskLocality
决定 task 适合在哪个 executor 上运行LaunchTask
指令。StatusUpdate
消息 -> TaskSchedulerImpl.statusUpdate
-> TaskResultGetter.enqueueSuccessfulTask
-> TaskSchedulerImpl.handleSuccessfulTask
-> TaskSetManager.handleSuccessfulTask
-> DAGScheduler.taskEnded
worker 上运行的 Executor 的最外层是 CoarseGrainedExecutorBackend
,实际执行任务的是 Executor
,而 ExecutorBackend
负责与 driver
的通信。
一个 RpcEnv
对应一个 Server,即只监听一个端口,多个 RpcEndpoint
共享一个端口。这个 Server 即 NettyRpcEnv.startServer
启动的 Server。通过本地的 Endpoint 注册表 netty.Dispatcher
,能够由 RpcEndpoint 得到 RpcEndpointRef
。假设本地 A 为了远端 B 能够访问本 RpcEndpoint,一般会传序列化的 RpcEndpointRef
给 B,B 会在收到该消息时保存传递该消息的那条连接,从而当需要返回响应时,会通过那条连接回复(该方法行得通是因为A 一直是保持着该连接的,这个通过调查 NettyRpcEnv.send
最后可以看到该连接是位于 Outbox.client
),B 保存连接的细节可以从 NettyRpcEndpointRef.readObject
去追溯。
从上面的分析可以得出:
RpcEndpointRef.name
(所有消息都会带 RpcEndpointRef)分发到不同的 Endpoint 上去。TransportResponseHandler
一个 ShuffleDependency 对应一个 shuffleID。
ShuffleMapTask 通过 ShuffleWriter
将一个 rdd 输出到 disk store,以 SortShuffleWriter
为例,它通过 ExternalSorter
将一个 ShuffleMapTask 的输出输出到一个 data 文件(文件名为 shuffle_$shuffleId_$mapId_0.data
)和一个 index 文件,index 文件存每个 partition 在 data 文件的索引。
由于 ShuffleMapTask 的输出时文件名是从 blockManager.diskBlockManager
拿到的,所以是在 blockManger 系统管理中的,可以通过 getBlockData(ShuffleBlockId)
得到其数据。
ShuffleMapTask 在运行完成时会返回 MapStatus(blockManager.shuffleServerId)
,表示该 task 对应的 BlockMangerId,该返回值会通过 Executor 的 statusUpdate 层层传递到 DAGScheduler.handleTaskCompletion,该函数会调用 mapOutputTracker.registerMapOutput(shuffleId, partitionId, status)
,这个信息会在 取数据时用到。
在取数据方面,例如 ShuffledRDD.compute
,最后会生成 ShuffleBlockFetcherIterator(blocksByAddress)
来取数据,而 blocksByAddress
就是通过 mapOutputTracker.getMapSizesByExecutorId
来得到的。注意,map output 注册是在 DAGScheduler 里,即 driver 里,而获取是在 executor 里,因此需要通过 RPC 和 driver 的 mapOutputTracker 通信得到。想了解 fetch 时如何控制并发的等细节,去看 ShuffleBlockFetcherIterator 吧。
这里有个额外的概念是 ExternalShuffleClient
,这个是什么呢?Shuffle 数据是由 blockManager 来负责提供给别人的,而 blockManager 运行在 executor 进程内,如果 executor 退出了(有正常退出的情况,例如使用 dynamic allocation 特性时),shuffle 数据就相当于丢了,虽然数据还在磁盘上。为了防止这种情况,可以启动一个 外部 shuffle service ,而 ExternalShuffleClient 就是和外部 shuffle service 通信用的,与内部 shuffle service (即 blockManager)通信的叫做 BlockTransferService
,只要 blockManager 初始化时告诉 外部 shuffle service 自己管理的目录名等信息,外部 shuffle service 就可以为 ExternalShuffleClient 提供服务了。
ShuffleMapTask 完成时会执行 shuffleStage.pendingPartitions -= task.partitionId
,当没有待计算的 partition 时该 ShuffleMapStage 就完成了。
以 TorrentBroadcast
为例。broadcast 是存在 blockManager 里的。driver 把序列化后的对象分为一块块,然后存到自己的 blockManager 里,executor 取 broadcast 变量时(调用 broadcast.value()
),先尝试从本地 blockManager 取,取不到会问 driver 这些块都存在哪(每块都可能存在 driver,也可能同时存在其他 executor 中,以防止 driver 成瓶颈),然后远程去取,取到后存本地 blockManager,以供别的 executor 享用。