作业出现异常,产生了重启或 failover 的情况,我们可能都已经习以为常,甚至对于绝大多数开发者来说,主动重启是遇到问题的第一反应。如果想深层次地定位问题,以及更深程度知道 Flink 的限制,那么了解 Failover 会非常重要。
如果大家对实时计算感兴趣,欢迎阅读其他文章:
- 实时计算系列(1) - Why not RocksDB in Streaming State
- 实时计算系列(2) - Flink 的容错机制以及弱一致性快照
- 实时计算系列(3) - 基于 Flink CEP 的规则系统
- 实时计算系列(4) - Failover in Flink
前言
本文并不是介绍 Failover 后作业的容错机制(关于这部分内容,网上文章其实很多),而是介绍一下 Failover 的来龙去脉,以及提出一些问题。可能有同学会问,Failover 不是一件非常普通且平常的事情吗?不就是作业抛出异常然后自动恢复的一个过程吗?这在大部分情况下确实是事实,但我们思考一些其他的场景:
- 用户使用 Flink 搭建了一个实时数据产出的任务,类似之前提到的 CEP 用于线上营销的推送,结果 Flink 作业的每次 Failover 都会导致线上的营销效果时效性变差,从而影响营收;
- 线上 0 点大促,在大促开始后所有人都会盯着实时的大盘指标,在关键时刻如果作业发生 Failover,大盘指标会停滞几分钟,还可能因为 Checkpoint 回退的原因导致数据出现诡异的变化;
上面两种场景其实都脱离了 Flink 的实时 BI 看板的主流用法,作业对 Failover 很敏感,但似乎 Flink 开发者又无能为力,这是很多企业实践中的常态。
如何看待 Failover?
「如何看待 Failover」这件事情,会决定 Flink 的应用场景的范围,如果我们将 Failover 认为是理所当然,业务损失不可避免的话,那么上面提到的实时场景可能就没有办法用 Flink,或者需要搭配一系列的热备、降级等预案。
当然,我这里更想将这种 Failover 带来的业务损失,认作是流计算引擎设计不合理导致的。所以,这种分布式流式计算场景中的 Failover 到底能不能被完全克服,有没有一个完美的解决方案来应对这个难题。解决这个问题会有两类思路:
- 接受 Failover 带来的影响,通过 HA 来保证下游消费无损;
- 降低 Failover 带来的影响,使其断流时间不断趋近于 0;
HA
HA 的方案很好理解,通过热备的方式实现,当一个任务出现 Failover 时,让另一个任务接管,保证下游不出现长时间断流。
方案设想很直接,看起来也能解决问题,但在实际应用场景我们会发现这个方案很难落地,比如:
- Source 位点如何控制?
- 如何快速判断任务发生 Failover?
- 如何控制切换过程中数据不丢不重?
- …
当然,现实中更加实用的方法是将整条链路解耦,比如通过相互隔离的两个 Flink 任务进行数据计算,并写入不同的存储表,在上游服务时,同时访问两个存储表的数据来获取容灾的效果(比如根据更新时间和数值),这个方式在落地上也有一些问题,在这里就不展开了。
降低 Failover 开销
另一思路是通过改进架构来降低 Failover 的开销,比如对于 map-only 的任务,我们倒也非常容易想到一个方案:拆分任务。比如 1w 并行度的任务,我们拆成 10 个 1000 并行度的任务,那么 Failover 的概率就小了好几个量级,并且每次 Failover 的恢复时间也减少了很多。
除特殊场景之外,我们能否有一个更加通用化的方案呢?
原理
在了解降低 Failover cost 方案之前,我们需要先来了解一下 Failover 在 Flink 内部的过程。
一个完整的 Flink 拓扑
上图是一个典型的窗口聚合任务,共有 5 个 Task,分配在 3 个 TaskManager 上。这 3 个 TaskManager 之间是 ALL-to-ALL 的全连接,很显然,任意一个 TaskManager 挂掉都会导致全局 Failover。
全局 Failover 会造成所有 Task 的重新部署,造成快照的重新拉取和恢复,都是非常耗时的操作。从 Task 发生异常到 JobManager 发起重新部署的过程中,通常会发生两种情况:
- TaskManager 感知到上下游异常,主动通知 JobManager 发生 Failover,也就是我们常见的 Connection Lost 相关错误;
- Task 异常,主动通知 JobManager 发生 Failover,这时我们通常能在 JobManager 找到对应 Task 报错的堆栈信息;
想要了解如何降低 Failover 开销,我们先来看看 Flink 不同组件间的通信机制,了解一次 Failover 是如何发生的。
TaskManager 之间
从上图可以看到(这里简略了网络传输机制,详细内容可参考A Deep-Dive into Flink’s Network Stack):
- Netty Server:Server 端的 Task 将数据放入 Buffer 队列后,由 Netty Server 从 Buffer 队列拉取数据向 Netty Client 发送;
- Netty Client:接收 Netty Server 发送的数据,并放入各个 Task 独占的 Buffer 队列中;
TaskManager 之间使用 TCP 协议进行数据通信,一般情况下通信出现异常,通常是 Full GC、节点无响应、进程异常等单点问题导致,这些错误可以分为两类:
- 传输层:TCP 层的连接类错误,比如 TCP 连接断开,我们常见的 Connection Refused 就可能是这类情况导致;Netty 对此做了封装,可以将错误透传到应用层;
- 应用层:Flink 内部的 TaskManager 之间(上下游)也会发送一些消息,比如
PartiionRequest
请求,用于提供给上游 TaskManager 发送数据所需的元数据,我们常见的PartitionNotFoundException
就属于这一类型;
这里有一个问题,所有的传输层错误都能被正确感知到吗? 未必,在真实场景中,大部分情况如进程异常、负载过高、FGC 等原因导致的进程关闭,都会进行系统资源的有序释放,其中就包括 TCP 连接资源,所以 Netty 可以将 TCP 层的 Error 透传到应用层让用户感知到。
但在机器断电等未正常释放资源的情况下,TCP 的 keep-alive 机制 会导致 TCP 的另一端持续保持连接资源,从而出现 hang 死的假象。那么这个时候,就需要 TaskManager 和 JobManager 的通信流程来解决这一问题了。
TaskManager 和 JobManager
JobManager 和 TaskManager 之间关于 Failover 的消息也可以分为两类:
- 心跳消息同步
- Task 状态同步
心跳消息是 JobManager 和每一个 TaskManager 都会保持一份定时的心跳传输机制,这里需要注意,由于 Flink 中使用 Akka 作为 Rpc 通信框架,且 JobManager 中大量行为使用单线程处理(类似 Spark 中的 EventLoop 机制),所以当 JobManager 在进行逻辑较重的操作时(比如 Failover 等),Akka 中存放 Rpc 请求的消息队列会出现消息堆积,可能会造成 TaskManager 和 JobManager 的心跳超时。
Task 状态的同步就相对复杂,我们可以参考 Jobs and Scheduling 中对 Task 生命周期的描述:
从 JobManager 和 TaskManager 交互的视角上来看,当某个 TaskManager 中的 Task 从 RUNNING 变成 FAILED 之后,TaskManager 通知 JobManager 并发送对应的 Exception,JobManager 根据用户配置的 Failover 策略找到相关的 Task 进行 CANCELING 操作,直至所有 Failover 涉及到的 Task 的状态变为 FAILED 和 CANCELLED,再开始进行新的一轮 SCHEDULE 和 DEPLOY 操作。
Failover 策略
Flink 默认使用 Region Failover 进行 failover 处理,将 Tasks 根据拓扑连接划分为若干个完全独立的 Region。在每个 Region 内部,任意 Task 出现异常都会将 Region 中全部 Tasks 进行重新部署。
这是一种不会出错的做法,但同时有些偷懒的意思,Region 中所有 Task 重新部署并从上一次 checkpoint 进行消费,对于拓扑中有 N 个 Task 的 Region 而言,因为 1 个 Task 的失败却要让其它 N-1 个 Task 进行数据的重复消费和计算。
对于规模较大的场景,Failover 多多少少会成为作业横向拓展的瓶颈,尤其是资源云化的趋势下,如果作业不能有稳定的拓展能力,将会引入很多人工运维的成本。
业界其他方案
分布式组件通常都需要考虑 Failover 带来的开销和影响,不少优秀的框架也确实在这块做了一些事情,我们以 MillWheel、Ray、RisingWave 等框架为例:
MillWheel: Fault-Tolerant Stream Processing at
Internet Scale
MillWheel 是早年在 Google 被广泛使用的数据处理框架,和 Flink 的 Region 级别容错不同,MillWheel 能够做到 Task 甚至 Record 级别的容错粒度,几乎保证了 Failover 的发生不会产生冗余的重复计算等操作。
以一条数据进入 MillWheel 的算子为例,会经过以下步骤:
- 数据进入新的算子,通过数据的 uniqueID 进行重复检查,如果发现重复则直接丢弃数据;
- 调用用户的计算逻辑进行运算,可能会包括 state、timer、发送到下游等操作;
- 将用户的 state 操作和待下发的 records 进行持久化到 state;
- 下游发送数据后回发 ACK,清理待下发的 records 记录;
可以看到,每一条数据带来的 change,MillWheel 都会将其应用在 state store 中,以保证 Exactly-Once 的语义和细粒度的 Failover 策略。当然,实际上它的 checkpoint 过程是以秒级来异步执行,并非每一次 state change 都会持久化到远端存储。
毫无疑问,MillWheel 的上述频繁对 state store 的变更对于进程性能的影响是巨大的,所以它也提出了 Weak Productions 的概念,来满足非 Exactly-Once 的场景,此时用户需要在逻辑中自行保证处理的幂等问题。
蚂蚁基于 Ray 构建了 Mobius AI 计算引擎,在其 Key Features 介绍中也提到了:
Single Node Failover. We designed a special failover mechanism that only
needs to rollback the failed node it's own, in most cases, to recover the
job. This will be a huge benefit if your job is sensitive about failure
recovery time. In other frameworks like Flink, instead, the entire job
should be restarted once a node has failure.
其采用的方式也和 MillWheel 理念一致,在 sender 端采用 ack 机制,在 receiver 端采用 state 缓存数据,通过上下游的冗余存储来实现单点的 Failover 能力。
其他:
其他组件如 RisingWave 和 Flink 类似,基于 Chandy-Lamport 分布式快照机制来完成,也会出现单 Task 失败导致的整个作业重启的问题。
曾经基于 Flink 做过一个单点恢复的功能,整体实现思路就是参考上述的 Flink 原理进行了一一修改,做好各个阶段的兜底和容错,局部调整了非常多的细节,最后也能在单点容错上有比较好的表现。但随着应用时间的变长,越来越多的边界 case 出现,比如
- TaskManagerA 和 TaskManagerB 断联,但是 TaskManagerA 和 JobManager 还保持正常的连接;
- 比如一次出现过多 Task 失败会导致新部署的 Task 由于无法和上游建立连接导致新的失败,产生“雪崩”;
对于这些边界 case 倒是可以不断对原有的架构进行修补,但与此同时又会出现新的边界 case,让整个调度和 Failover 流程显得更加混乱。这种混乱的现象,其根本原因是没有一个“裁判员”,来对全局的情况做判断,而是依赖上下游的 TCP 连接让 TaskManager 利用自己的“感应”能力进行自主报错和恢复。当然这里可以调整 Flink 的 JobManager,不过需要对 Failover 的整条流程进行重构,对 Flink 架构上的破坏有些太大了。
No Silver Bullet
显然如今不存在一个 silver bullet 来解决性能和容错的 trade-off 的问题,像 MillWheel 里强依赖 state 和 backend store 固然是一种听起来很靠谱的想法,但实际上频繁地读写 state 又是如何保证性能和低延迟,又是如何保证横向的可拓展性?这些问题在论文中没有提到,但相信同样需要花费不少精力去思考和解决,甚至在真实应用场景也并非应用自如。
结论
本文提出了 Failover 能力某种程度上限制了 Flink 的应用场景,并结合实际 case 讲解了 Flink Failover 过程中,TaskManager 和 JobManager 的通信流程,并对业界其他方案进行了简要阐述和自身实践的分享。