Spark 容错机制

June 17, 2017

本文介绍 Spark 对各个层面的失败的容错机制。这里只讨论 Spark 本身组件的容错,而不讨论资源管理器的容错。(例如 mesos master/slave 的崩溃等异常)

Task

我们先考虑最底层的失败,即某一个 Task 执行失败了。

先来看应该如何处理:

上面是我能想到的最优的处理,我们来看 spark 是如何做的。

Task 失败由 executor 感知到,通过 statusUpdate 层层传递到 driver 端的 TaskSetManager.handleFailedTask,其基本逻辑是:

FetchFailed

我们来考虑 FetchFailed 的处理,值得注意的是 FetchFailed 的处理会在未来有比较大的变化,见 SPARK-20178

FetchFailed 会在 ShuffleReader 取数据失败 N 次后抛出,然后由 executor 通过 statusUpdate 传到 driver 端,实际的处理会在 DAGScheduler.handleTaskCompletion,它会重新提交该 Stage 和该 Stage 对应的 ShuffleMapStage,重试次数超过 spark.stage.maxConsecutiveAttempts 时会退出。

Speculative task

有时候 task 不是失败了,而是太慢了,这时 spark 会多启动几个实例(spark 中一个实例叫 attempt),有任何一个实例成功了该 task 就成功了。spark 会周期性检查是否有 task 符合 speculation 的条件,检查逻辑在 TaskSetManager.checkSpeculatableTasks

那么多个实例如何防止其输出不干扰呢?例如一个实例 A成功了,输出,另一个实例 B成功了,其输出覆盖了 A 的输出,甚至更糟糕的是两者同时写一个文件,导致输出错乱。

有几个层面:

Executor

Executor crash

executor crash 是指 executor 异常退出了,比如 executor JVM 崩溃了。

先来看应该如何处理。

我们来看 spark 是如何做的。在 deploy mode 为 spark standalone 情况下,系统实际运行的进程是 CoarseGrainedExecutorBackend,所以 Eexecutor crash 就等于该进程 crash。有几个检测 crash 的方式:

如果 deploy mode 为 mesos,如果运行模式是 Coarse-Grained(Fine-Grained 已经是 deprecated 的了),实际运行的也是 CoarseGrainedExecutorBackend。在该情况下 executor crash 会通过 MesosCoarseGrainedSchedulerBackend.statusUpdate 处理,它会从totalCoresAcquired 中减去已使用的 cpu,从而在下次调度时,由于 totalCoresAcquired < spark.cores.max,会启动一个新的 executor,没有重启次数限制。

executor 挂了,task 会在 TaskSchedulerImpl.removeExecutor 里被标记为失败,而 task 失败的处理上一节介绍过了。

Executor 网络分区

Executor 网络分区指的是 executor 没有挂,但是和 driver 的网络连接断了,之后可能会恢复。

先来应该如何处理。系统是无法区分网络分区和 crash 的,因此,系统会把网络分区当做 Crash 处理,关键是在 executor 重新连上时该如何做。重新连上时,driver 端应该忽略, executor 发出的任何消息,并且通知 executor 自杀。

我们来看 spark 如何做的。executor 是如何自杀的呢:

driver 如何忽略旧 executor 发出的消息:

Driver

Driver crash

还是先看应该如何做。

如果想尽量保留 driver 已完成的工作,driver 挂了,应该依赖持久化存储和与 executor 的通信,恢复 driver 的状态,重现建立对正在运行的 executor,task 等的管理。而 executor 与 driver 通信失败时应该有重试机制,并且当重试失败时不应该退出,而应该等待 driver 恢复后重连,并把自己的状态告诉 driver,只有在超过 driver 恢复时间时才自行退出。

spark 的实现。不管什么模式,driver 挂了,所有相关的 executor 和 task 都会被清理,不尝试恢复,在 yarn-cluster 模式下还会重启 driver,重跑所有 task。这种方式浪费了已有的工作,但实现起来是最简单的。

Driver 网络分区

应该忽略旧 driver 的所有消息,并且让旧 driver 退出。

在 spark 的实现下,例如在 yarn-cluster 模式,如果 driver 网络分区,yarn 会重启 driver,旧 driver 重连时,可能会有点问题,因为两个 driver 的 ID 是一样的,spark 分不清两者。

主机

主机 Crash

主机 Crash,只需要把主机上运行的 driver,executor,task 都按失败处理,同时上面的 shuffle 数据对应的 task 也得重跑。driver 等失败的处理已经介绍过了。

主机网络分区

按主机 Crash 处理,上面的旧 driver 的处理也按 driver 网络分区一样处理,其他对象以此类推。

总结

任何容错机制的设计都是先考虑正常情况下是如何处理的,然后去考虑各种失败场景,失败场景可分 Crash(kill -9,掉电等),正常退出(例如抛异常,程序可以做善后处理),网络分区。本文介绍了 spark 的 driver,executor,task 失败时的处理,同时也对各种情况应该如何处理表达了一些看法,希望有所帮助。