Spark 重要模块的内部实现

June 14, 2017

本文介绍 spark (版本 2.3)里一些重要重要模块的内部实现。

Core

Scheduler

DAG 的生成

RDD 的所有 action 都会触发 SparkContext.runJob,该函数会调用 DAGScheduler.runJob(resultHandler),最后的实际工作会到 DAGScheduler.handleJobSubmitted,该函数就会生成和执行 Stage,具体过程如下:

我们看下 DAG 是如何形成的,即 getMissingParentStages 的内容:

任务的执行

上面讲到 Stage 提交前,会先提交 Stage 所依赖的 Stage,在这过程中形成了 DAG,那么当 Stage 没有依赖时,就会执行该 Stage。在 submitStage 中会调用 submitMissingTasks(stage) 生成具体的 Task,会生成 stage.rdd.partitions.length 个 task,ResultStage 生成 ResultTask,ShuffleMapStage 生成 ShuffleMapTask,生成 task 后通过 taskScheduler.submitTasks(TaskSet) 提交 task,一个 Stage 里的所有 Task 被分组为一个 TaskSet 进行提交。

从这里开始就有点绕了,做好准备。

Executor

worker 上运行的 Executor 的最外层是 CoarseGrainedExecutorBackend,实际执行任务的是 Executor,而 ExecutorBackend 负责与 driver 的通信。

RPC

一个 RpcEnv 对应一个 Server,即只监听一个端口,多个 RpcEndpoint 共享一个端口。这个 Server 即 NettyRpcEnv.startServer 启动的 Server。通过本地的 Endpoint 注册表 netty.Dispatcher,能够由 RpcEndpoint 得到 RpcEndpointRef。假设本地 A 为了远端 B 能够访问本 RpcEndpoint,一般会传序列化的 RpcEndpointRef 给 B,B 会在收到该消息时保存传递该消息的那条连接,从而当需要返回响应时,会通过那条连接回复(该方法行得通是因为A 一直是保持着该连接的,这个通过调查 NettyRpcEnv.send最后可以看到该连接是位于 Outbox.client),B 保存连接的细节可以从 NettyRpcEndpointRef.readObject 去追溯。

从上面的分析可以得出:

这篇文章 spark 网络讲的也不错

Shuffle

一个 ShuffleDependency 对应一个 shuffleID。

ShuffleMapTask 通过 ShuffleWriter将一个 rdd 输出到 disk store,以 SortShuffleWriter 为例,它通过 ExternalSorter 将一个 ShuffleMapTask 的输出输出到一个 data 文件(文件名为 shuffle_$shuffleId_$mapId_0.data)和一个 index 文件,index 文件存每个 partition 在 data 文件的索引。

由于 ShuffleMapTask 的输出时文件名是从 blockManager.diskBlockManager 拿到的,所以是在 blockManger 系统管理中的,可以通过 getBlockData(ShuffleBlockId) 得到其数据。

ShuffleMapTask 在运行完成时会返回 MapStatus(blockManager.shuffleServerId),表示该 task 对应的 BlockMangerId,该返回值会通过 Executor 的 statusUpdate 层层传递到 DAGScheduler.handleTaskCompletion,该函数会调用 mapOutputTracker.registerMapOutput(shuffleId, partitionId, status),这个信息会在 取数据时用到。

在取数据方面,例如 ShuffledRDD.compute,最后会生成 ShuffleBlockFetcherIterator(blocksByAddress) 来取数据,而 blocksByAddress 就是通过 mapOutputTracker.getMapSizesByExecutorId 来得到的。注意,map output 注册是在 DAGScheduler 里,即 driver 里,而获取是在 executor 里,因此需要通过 RPC 和 driver 的 mapOutputTracker 通信得到。想了解 fetch 时如何控制并发的等细节,去看 ShuffleBlockFetcherIterator 吧。

这里有个额外的概念是 ExternalShuffleClient,这个是什么呢?Shuffle 数据是由 blockManager 来负责提供给别人的,而 blockManager 运行在 executor 进程内,如果 executor 退出了(有正常退出的情况,例如使用 dynamic allocation 特性时),shuffle 数据就相当于丢了,虽然数据还在磁盘上。为了防止这种情况,可以启动一个 外部 shuffle service ,而 ExternalShuffleClient 就是和外部 shuffle service 通信用的,与内部 shuffle service (即 blockManager)通信的叫做 BlockTransferService,只要 blockManager 初始化时告诉 外部 shuffle service 自己管理的目录名等信息,外部 shuffle service 就可以为 ExternalShuffleClient 提供服务了。

ShuffleMapTask 完成时会执行 shuffleStage.pendingPartitions -= task.partitionId,当没有待计算的 partition 时该 ShuffleMapStage 就完成了。

Broadcast

TorrentBroadcast 为例。broadcast 是存在 blockManager 里的。driver 把序列化后的对象分为一块块,然后存到自己的 blockManager 里,executor 取 broadcast 变量时(调用 broadcast.value()),先尝试从本地 blockManager 取,取不到会问 driver 这些块都存在哪(每块都可能存在 driver,也可能同时存在其他 executor 中,以防止 driver 成瓶颈),然后远程去取,取到后存本地 blockManager,以供别的 executor 享用。

Storage

Deploy

RDD

Serializer

UI

Misc