1.5 一致性
让批处理数据处理程序在由廉价机器组成的集群上可靠地运行不是一件容易的事情,流处理程序则更难。可靠运行的核心问题是如何保证分布式系统有状态计算的一致性。本节将分析在Flink架构中容错与一致性的实现方案,即异步屏障快照技术。
1.5.1 有状态计算
在Example1.1中,聚合结果与本窗口内的所有记录有关,由于每个记录都是一个独立事件,窗口需要缓存这些独立事件或由这些独立事件产生的中间结果。这类聚合运算被称为有状态计算,而基于单个事件的过滤处理则被称为无状态计算,如图1-33所示。
图1-33 有状态计算与无状态计算
Flink有以下两类状态。
(1)数据处理应用程序自定义的状态,这类状态由应用程序创建维护。
(2)引擎定义的状态,这类状态由引擎负责管理,如窗口缓存的事件及中间聚合结果。
1.5.2 exactly-once语义
在分布式系统中,所有数据备份在同一时刻的值是相同的,或者说所有客户端读取的值是相同的,这就是一致性的含义。根据正确性级别的不同,一致性有以下三种形式。
(1)at-most-once:尽可能正确,但不保证一定正确。对应Example1.1,在系统发生故障恢复后,聚合结果可能会出错。
(2)at-least-once:对应Example1.1,在系统发生故障恢复后,聚合计算不会漏掉故障恢复之前窗口内的事件,但可能会重复计算某些事件,这通常用于实时性较高但准确性要求不高的场合。例如,Lambda架构将强实时性的Storm和强一致性的 Hadoop 批处理系统融合在一起,Storm 负责实时生成近似结果,Hadoop负责计算最终精准结果。
(3)exactly-once:对应Example1.1,在系统发生故障恢复后,聚合结果与假定没有发生故障情况时一致。这种语义加大了高吞吐和低延迟的实现难度,异步屏障快照技术是Flink提供这种语义的理论基础。
1.5.3 异步屏障快照
为了更好地理解异步屏障快照(ABS,Asynchronous Barrier Snapshot)理论,我们首先介绍几个相关概念。
(1)检查点(Checkpoint):关系型数据库并不会立即将提交的事物写回磁盘,而是先写入缓存(Buffer Cache)和重做日志(Redo Log),这种技术能够在保证数据一致性的同时提高数据访问效率。为了提高故障恢复(Crash Recovery)的速度,数据库仅需要回滚某个时间点之后的未写入磁盘的事物,这个时间点就是检查点。
(2)快照(Snapshot):数据的一个拷贝,有两种实现方式,分别为写时拷贝(COW,Copy On Write)和写重定向(ROW,Redirect On Write),其中COW用于读密集型系统,ROW用于写密集型系统。
(3)消息队列pull模式:在分布式消息系统中消费者(Consumer)主动连接缓存代理(Broker)获取消息的一种消息消费模式。相应地,在push模式中系统将消息主动推送给消费者。
流式数据处理引擎用计算图的形式编译数据处理应用程序,其中计算图用有向无环图(DAG,Directed Acyclic Execution Graph)的形式描述。它有两种表示形式,即逻辑形式和物理部署形式。逻辑形式的计算图由一系列计算节点的单实例组成,而物理部署形式则由计算节点的多个并行实例组成,其中并行实例的含义是在分布式环境中同一计算节点有多个功能相同的物理部署实例,如图1-34所示,逻辑形式中的map()节点会有两个部署实例map()[1]和map()[2]。
图1-34 计算图的逻辑形式与物理部署形式
流式数据处理计算图中的节点可分为三类:Source(负责数据输入)、Sink(负责结果输出)和算子(图1-34中的map、keyBy和window),它们之间由数据传输通道连接。此外,计算节点的每个部署实例也被称为任务(task)。
以T表示计算节点的集合,E表示边(数据传输通道)的集合,则计算图可表示为,其抽象形式如图1-35所示。
图1-35 复杂计算图的抽象形式
以M表示E中传输数据的集合,则对于任意一个计算节点:
(1)具有输入输出数据集。
(2)具有状态。
(3)功能由函数[这里指用户自定义函数(UDF,User Defined Function)]定义。节点拉取数据,由函数更新状态至,并生成输出数据,即
一个很自然的想法是对计算图在某些时间点上做快照,这样在故障发生后整个数据处理系统可以恢复到某个快照时间点的状态,以保证exactly-once语义。
定义快照为
其中,是所有节点状态的集合,即
是所有传输通道状态的集合,即。为了保证exactly-once语义,快照需要具备以下两个约束条件。
(1)快照必须在有限时间内完成。
(2)快照必须包含所有信息(包括在通道上传输的数据)及这些信息的因果关系,这涉及与的关系。为了实现这个看似很自然的想法,先驱者展开了开创性的研究。
(1)同步快照(Global Synchronous Snapshot)。同步快照分为三个步骤:第一步是暂停整个数据处理引擎;第二步是执行快照操作;第三步是继续执行。同步快照包括此刻仍在所有传输通道中的数据和所有节点的状态,因此快照的容量较大。这种同步机制会严重影响系统的吞吐量,增加引擎运行时的系统开销。(2)异步快照(Asynchronous Snapshot)。为了规避快照同步造成系统吞吐量降低的缺陷,研究人员提出异步机制,即在引擎执行计算任务的同时执行快照操作,且不需要所有节点和传输通道同时执行快照操作。这种机制并没有解决快照容量大的问题,也没有提升故障恢复效率。
(3)异步屏障快照。这是一种轻量级异步快照,不仅适用于DAG,而且适用于有环图,本节以DAG为例分析快照算法和故障恢复机制。
ABS的前置条件如下。
(1)传输通道提供阻塞(Block)和非阻塞(Unblock)操作,数据以先进先出(FIFO,First Input First Output)的方式传输。当传输通道处于阻塞状态时,所有数据将被缓存。
(2)计算节点可以阻塞或和与之连接的通道解除阻塞,并能在通道上传输控制消息,还可以在其输出通道上广播消息(Broadcasting Messages)。
(3)控制消息流不参与任何节点函数的计算。快照算法的步骤如下。
(1)引擎定期向 Source 节点插入检查点屏障(Barrier)。在收到作为控制消息的检查点屏障后,Source 节点对自己的状态做快照,并在其输出通道上广播此检查点屏障消息。此外,不同的检查点屏障可以通过id区分。
(2)当其从任意一个输入通道收到检查点屏障消息时,算子或Sink节点阻塞此输入通道,直至本节点从所有输入通道收到检查点屏障。
(3)在其从所有输入通道收到检查点屏障后,算子或Sink节点对自己的状态做快照,然后对其所有输入通道解除阻塞。
于是,对于同一检查点屏障,ABS会产生在下面条件时的快照,
ABS算法中条件式(1.1)表明快照仅包括节点的状态,不包括仍在传输通道中的数据,因为这会降低快照的容量。同时,ABS算法很好地兼顾了低延迟和高吞吐。Flink采用ABS算法实现一致性,图1-36描述了Flink检查点屏障的流转过程。
图1-36 Flink检查点屏障的流转过程
ABS是全局的,这样可以通过快照id计算出同一时刻计算图的状态,引擎可恢复到这些时间点重启计算任务,进而保证 exactly-once 语义。那么,如何进行故障恢复呢?故障恢复分为以下两种情况。
(1)计算图的拓扑结构不变的情况。引擎从持久化后端中读入上一个可用快照,以此重新初始化所有物理计算任务,即恢复计算图;然后计算任务继续运行,就如同没有发生故障一样。Flink窗口的状态不仅包括事件时间推进情况,还包括处理时间推进情况,对于故障恢复这一点异常重要,也从另一个侧面说明引入时间与窗口机制加大了架构流式数据处理引擎的难度。
(2)计算图的拓扑结构发生变化的情况,如图1-37所示。
这需要引擎的任务管理器(Job Managers/Task Managers)根据快照重新编排计算任务,这也是弹性(Resilient)计算的要求之一。
图1-37 计算图的拓扑结构发生变化的情况
1.5.4 保存点
检查点屏障由引擎负责实现,不需要数据处理应用程序编程。保存点(Savepoint)则由应用程序借助检查点底层机制实现一致性的应用层机制,广泛应用于数据处理程序平滑升级中。检查点的目标是轻量,而保存点的目标是实现应用层的一致性功能,例如,Flink 可配置 RocksDB 作为存储后端以实现增量式状态,而保存点则不需要这种优化配置。
保存点可用于应用程序平滑升级、引擎升级、A/B 测试等场景。在平滑升级任务中,我们首先保存旧版本程序(如V1.0)在升级前(如时间点t1)的运行状态,然后用保存的状态初始化新版本程序(如 V2.0),这种版本状态管理是由保存点实现的。此外,为了保证平滑升级和升级失败回退,在新版本正常提供服务之前,旧版本仍需继续运行,保存点实现版本状态管理的过程,如图1-38所示。
图1-38 保存点实现版本状态管理的过程