本文介绍 Spark 对各个层面的失败的容错机制。这里只讨论 Spark 本身组件的容错,而不讨论资源管理器的容错。(例如 mesos master/slave 的崩溃等异常)
我们先考虑最底层的失败,即某一个 Task 执行失败了。
先来看应该如何处理:
上面是我能想到的最优的处理,我们来看 spark 是如何做的。
Task 失败由 executor 感知到,通过 statusUpdate 层层传递到 driver 端的 TaskSetManager.handleFailedTask
,其基本逻辑是:
spark.task.maxFailures
,taskSet 会失败,这意味着一个 stage 失败了。stage 失败整个任务就失败了,spark 会取消该 stage 对应的 job 包含的所有 task,并返回用户任务执行失败。我们来考虑 FetchFailed 的处理,值得注意的是 FetchFailed 的处理会在未来有比较大的变化,见 SPARK-20178。
FetchFailed 会在 ShuffleReader 取数据失败 N 次后抛出,然后由 executor 通过 statusUpdate 传到 driver 端,实际的处理会在 DAGScheduler.handleTaskCompletion
,它会重新提交该 Stage 和该 Stage 对应的 ShuffleMapStage,重试次数超过 spark.stage.maxConsecutiveAttempts
时会退出。
有时候 task 不是失败了,而是太慢了,这时 spark 会多启动几个实例(spark 中一个实例叫 attempt),有任何一个实例成功了该 task 就成功了。spark 会周期性检查是否有 task 符合 speculation 的条件,检查逻辑在 TaskSetManager.checkSpeculatableTasks
。
那么多个实例如何防止其输出不干扰呢?例如一个实例 A成功了,输出,另一个实例 B成功了,其输出覆盖了 A 的输出,甚至更糟糕的是两者同时写一个文件,导致输出错乱。
有几个层面:
首先,输出到文件是通过 ShuffleWriter.write
写的。以 SortShuffleWriter
为例,最后在 IndexShuffleBlockResolver.writeIndexFileAndCommit
里会先写数据到临时文件,然后检查文件是否存在,如果文件不存在,就重命名临时文件为正式文件。
TaskSetManager.dequeueSpeculativeTask
(IndexShuffleBlockResolver
中,检查文件是否存在和文件重命名是在一个 synchronized
块里的,这个 synchronized
看起来没什么必要)一个 attempt 成功了,会在 TaskSetManager.handleSuccessfulTask
中取消其他正在运行的 task 实例。如果在取消前,有另一个 attempt 成功了,那么另一个 attempt 的输出会注册到 mapOutputTracker 里,从而覆盖之前的实例的注册,这会导致不同 task 会读不同位置的输出,可能会有问题。
executor crash 是指 executor 异常退出了,比如 executor JVM 崩溃了。
先来看应该如何处理。
我们来看 spark 是如何做的。在 deploy mode 为 spark standalone 情况下,系统实际运行的进程是 CoarseGrainedExecutorBackend
,所以 Eexecutor crash 就等于该进程 crash。有几个检测 crash 的方式:
CoarseGrainedSchedulerBackend.onDisconnected
这是 RPC 系统发现 executor 连不上了,该函数会调用 removeExecutor
,进而调用 TaskSchedulerImpl.removeExecutor
。ExecutorRunner
会监控 executor 进程,其结束时会发送 ExecutorStateChanged
给 Worker,从而到达 Master,最后到达 CoarseGrainedSchedulerBackend.removeExecutor
。Master 会重启 executor,一个 app 的 executor 重启超过 spark.deploy.maxExecutorRetries
,app 会被终止。HeartbeatReceiver
,如果一段时间内没收到 executor 的心跳,会触发 TaskSchedulerImpl.executorLost
。该函数也会在 CoarseGrainedSchedulerBackend.removeExecutor
中被调用。如果 deploy mode 为 mesos,如果运行模式是 Coarse-Grained(Fine-Grained 已经是 deprecated 的了),实际运行的也是 CoarseGrainedExecutorBackend
。在该情况下 executor crash 会通过 MesosCoarseGrainedSchedulerBackend.statusUpdate
处理,它会从totalCoresAcquired
中减去已使用的 cpu,从而在下次调度时,由于 totalCoresAcquired < spark.cores.max
,会启动一个新的 executor,没有重启次数限制。
executor 挂了,task 会在 TaskSchedulerImpl.removeExecutor 里被标记为失败,而 task 失败的处理上一节介绍过了。
Executor 网络分区指的是 executor 没有挂,但是和 driver 的网络连接断了,之后可能会恢复。
先来应该如何处理。系统是无法区分网络分区和 crash 的,因此,系统会把网络分区当做 Crash 处理,关键是在 executor 重新连上时该如何做。重新连上时,driver 端应该忽略, executor 发出的任何消息,并且通知 executor 自杀。
我们来看 spark 如何做的。executor 是如何自杀的呢:
spark.executor.heartbeat.maxFailures
,executor 会自杀,默认设置下,10 分钟连不上,executor 就自杀了,所以这个机制起的是保底的作用,防止在任何情况下 executor 的泄露。CoarseGrainedExecutorBackend.onDisconnected
会杀掉 executorCoarseGrainedSchedulerBackend
在清理 executor 状态时会向 executor 发送 StopExecutor
driver 如何忽略旧 executor 发出的消息:
CoarseGainedSchedulerBackend.receive StatusUpdate
,并没有看 executorId 是否依旧在 driver 的表中。还是先看应该如何做。
如果想尽量保留 driver 已完成的工作,driver 挂了,应该依赖持久化存储和与 executor 的通信,恢复 driver 的状态,重现建立对正在运行的 executor,task 等的管理。而 executor 与 driver 通信失败时应该有重试机制,并且当重试失败时不应该退出,而应该等待 driver 恢复后重连,并把自己的状态告诉 driver,只有在超过 driver 恢复时间时才自行退出。
spark 的实现。不管什么模式,driver 挂了,所有相关的 executor 和 task 都会被清理,不尝试恢复,在 yarn-cluster 模式下还会重启 driver,重跑所有 task。这种方式浪费了已有的工作,但实现起来是最简单的。
应该忽略旧 driver 的所有消息,并且让旧 driver 退出。
在 spark 的实现下,例如在 yarn-cluster 模式,如果 driver 网络分区,yarn 会重启 driver,旧 driver 重连时,可能会有点问题,因为两个 driver 的 ID 是一样的,spark 分不清两者。
主机 Crash,只需要把主机上运行的 driver,executor,task 都按失败处理,同时上面的 shuffle 数据对应的 task 也得重跑。driver 等失败的处理已经介绍过了。
按主机 Crash 处理,上面的旧 driver 的处理也按 driver 网络分区一样处理,其他对象以此类推。
任何容错机制的设计都是先考虑正常情况下是如何处理的,然后去考虑各种失败场景,失败场景可分 Crash(kill -9,掉电等),正常退出(例如抛异常,程序可以做善后处理),网络分区。本文介绍了 spark 的 driver,executor,task 失败时的处理,同时也对各种情况应该如何处理表达了一些看法,希望有所帮助。