1.4 根据事件时间开滚动窗口
假定某个无界数据集在事件时间区间[12:00,12:08)内有10条记录,每条记录的值都是整数。以事件时间为横轴,以处理时间(观察时间)为纵轴,记录以圆点表示,则所有记录在空间中的位置如图1-24所示。为了便于后续引用,这个例子被命名为Example1.1。
图1-24 Example1.1数据分布情况
其中,由于传输通道延迟,处理时间落后于事件时间 5分钟,即纵坐标轴的零点代表处理时间12:05。
根据事件时间开滚动窗口,窗口大小为 2分钟,本节的任务是计算每个窗口内记录值的和。
1.4.1 what:转换/where:窗口
Example1.1中的窗口为事件时间窗口,分别为[12:00,12:02)、[12:02,12:04)、[12:04,12:06)和[12:06,12:08)。转换操作为求和(聚合),聚合结果如图 1-25所示。
图1-25 根据事件时间开滚动窗口的聚合结果
聚合操作发生在什么处理时间点上呢?图1-25中假定所有聚合操作发生在处理时间点12:10,水印是解决这个问题的方案。
1.4.2 when:水印
由于事先并不知道横轴的每个窗格里有多少条记录,我们并不能确定何时触发聚合操作,水印则能标记出这个时间点。
水印能够标记出某个事件时间点以前的所有记录均已到达引擎。例如,在窗口[12:00,12:02)内观察到记录9的同时也观察到代表水印的时间戳12:02,则我们可以在记录9被观察到的时间点上正确地触发聚合操作。
水印可抽象地表示成函数f(P)=E,其中F代表处理时间,E代表事件时间,即我们能够在处理时间点P判定事件时间推进到了E。
图1-26描绘了水印、事件时间和处理时间的关系。
图1-26 水印、事件时间和处理时间的关系
考虑到系统开销,水印是离散的,即只有部分记录后附有水印。为了便于分析,水印通常以连续曲线绘制。此外,作为事件时间的推进器,水印曲线是单调递增的。
有以下两类水印。
(1)完美水印(Perfect Watermark):完美水印表示早于水印标记事件时间戳的所有记录均已到达,非乱序的无界数据集中最近一条记录的事件时间就是完美水印。
(2)启发式水印(Heuristic Watermark):启发式水印是尽可能地确定时间戳的一种估计,可能出现某些事件晚于水印到达的情况。在分布式系统中,定义完美水印往往是非常困难的,定义启发式水印的代价则相对较低。
分别嵌入完美水印和启发式水印时窗口的聚合情况,如图1-27所示。
图1-27 分别嵌入完美水印和启发式水印时窗口的聚合情况
可以看出,在嵌入启发式水印时,记录 9由于水印迟到而没有计入对应事件时间窗口的聚合结果内。
在解决这个问题时,这两类水印都存在缺陷。
(1)水印迟到:在嵌入完美水印时,由于记录9在处理时间轴上推进得太慢,事件时间窗口[12:02,12:04)和[12:04,12:06)的聚合操作被推迟到处理时间点12:08之后,这与低延迟计算的目标相悖;同时,会拉长这两个窗口的生存期,即这两个窗口所占用的资源不能及时释放。
(2)水印早到:在嵌入启发式水印时,事件时间窗口[12:00,12:02)在处理时间轴上推进得太快,导致记录 9没能计入本窗口的聚合结果内,这与精准计算的目标相悖,引擎应提供事后更正机制。
1.4.3 when:触发器
作为定义转换操作时间点的另一类方案,触发器解决了水印有缺陷的问题。
(1)在嵌入完美水印时,事件时间窗口[12:02,12:04)的聚合操作被推迟到处理时间点 12:08 之后,这与低延迟计算的目标相悖。为此,类比将事件时间轴划分为长度为 2 分钟的窗格(pane)而得到事件时间窗口,我们将处理时间轴划分为长度为1分钟的窗格,如[12:05,12:06)、[12:06,12:07)、[12:07,12:08)、[12:08,12:09)、[12:09,12:10),然后在每个窗格边界处触发一次聚合计算,这样先后得到实时(图 1-28(a)中以“早到”标注)聚合结果 7、14 和 22。由于在得到聚合结果22时水印还没有被观察到,这个窗口仍需保留至处理时间点12:09。
(2)在嵌入完美水印时,事件时间窗口[12:04,12:06)的聚合操作也被推迟到处理时间点 12:08 之后,按照上述方案需要分别在处理时间窗格[12:05,12:06)、[12:06,12:07)、[12:07,12:08)、[12:08,12:09)内触发聚合计算。由于这个事件时间窗口内只有处理时间窗格[12:06,12:07)内有记录,因此在其他窗格内触发聚合计算没有意义,可以通过定义事件数量触发器解决这个问题。在本例中可以定义事件数量为1。
(3)在嵌入启发式水印时,在事件时间窗口[12:02,12:04)的处理时间窗格[12:07,12:08)内观察到水印,因此我们不是在处理时间窗格[12:07,12:08)的边界处触发聚合计算,而是按照水印的推进时间触发聚合计算,这会按时(on-time)聚合出结果22。
(4)在嵌入启发式水印的事件时间窗口[12:00,12:02)内,水印跟随记录5到达,我们会按时得到聚合结果 5。为了得到精准的聚合结果,我们必须延长这个时间窗口的生存期,但是由于并不能确切地获悉还有多少迟到的记录,如何确定这个时间窗口的生存期是个问题。
因此,可以根据水印、处理时间轴窗格和事件数量确定在处理时间轴的什么地方触发聚合计算,如图1-28所示。
图1-28 根据事件时间开滚动窗口解决水印有缺陷的问题
截至目前,这个数据处理设计还存在两个需要解决的问题。
(1)在启发式水印早到时,为了确保精准计算,引擎必须延长对应事件时间窗口的生存期,这会加大引擎的内存消耗。
(2)同一个事件时间窗口的多个处理时间窗格会输出多个聚合结果,引擎需要提供定义这些结果之间关系的机制。
可以利用迟到生存期(Allowed Lateness)解决第一个问题,利用累加(Accumulation)模式解决第二个问题。
1.4.4 when:迟到生存期
在嵌入完美水印时,事件不会迟到,窗口能够及时销毁;在启发式水印早到时,为了确保精准计算,引擎必须延长对应事件时间窗口的生存期,所以迟到生存期只会发生在嵌入启发式水印时。
假定迟到生存期为1分钟,下面以事件窗口[12:00,12:02)为例进行分析。
(1)根据水印函数曲线计算出本窗口生存期结束的事件时间点所对应的处理时间点。本窗口生存期结束于事件时间 12:03(12:02+0:01),从水印曲线上找到这个事件时间点对应于处理时间轴上的12:07~12:08之间的某个时间点,记为A。
这里需要再次强调生存期是事件时间,但是决定销毁窗口的时间点是处理时间。在图1-24中,我们均匀地标注了事件时间坐标点,如12:01、12:02等,但实际上事件时间的推进并不是均匀的,所以我们不能通过处理时间的推进(间隔)推断事件时间的推进,这也是为什么要从水印曲线上找到窗口的处理时间结束点(A)的原因。
(2)如果在处理时间结束点之前观察到事件,则应再次触发聚合计算;在处理时间结束点之后,本窗口被销毁。因此,可以看出迟到生存期和水印一样都是聚合计算的触发信号。基于这种定义,记录9不会被丢弃。
我们设定迟到生存期为1分钟的聚合情况,如图1-29所示。
图1-29 根据事件时间开滚动窗口加入迟到生存期的结果
在图1-29中,平行于事件时间轴的虚线标记本窗口的处理时间结束点。
1.4.5 how:累加模式
处理时间轴窗格会多次触发聚合计算,累加模式定义同一个事件时间窗口的多个聚合结果之间的关系。有以下三种累加模式。
● 丢弃(discarding):启发式水印的事件时间窗口[12:02,12:04)将产生三个聚合结果,在丢弃的模式下分别为7、7和8,即每个窗格内的聚合结果和其他窗格无关。
● 累加(accumulating):每个窗格会累加前一个相邻窗格的聚合结果。
● 撤回(retracting):这种模式是在累加模式的基础上增加一个撤回结果。启发式水印的事件时间窗口[12:02,12:04)的第二个窗格聚合结果为-7和14,-7代表撤回,14代表截至当前处理时间总的聚合结果。
这三种模式的聚合情况,分别如图1-30~图1-32所示。
图1-30 根据事件时间开滚动窗口丢弃模式的结果
图1-31 根据事件时间开滚动窗口累加模式的结果
图1-32 根据事件时间开滚动窗口撤回模式的结果