Flink技术内幕:架构设计与实现原理
上QQ阅读APP看书,第一时间看更新

2.2 算子

上一节我们提到DataStream的相互转换会生成算子,本节我们来看下Flink中DataStream的转换有哪些,会生成哪些算子。篇幅所限,这里只选择一些有代表性的转换进行解释说明。

1. flatMap

作用:循环遍历Map中的元素并用相应的函数进行处理。

使用方式:

dataStream.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public void flatMap(String value, Collector<String> out)
            throws Exception {
        for(String word: value.split(" ")){
            out.collect(word);
        }
    }
});

该方法会生成算子StreamFlatMap。

2. keyBy

作用:按一定规则分区,比如常用的根据某个字段进行keyBy操作,Flink会根据该字段值的hashCode进行分区。分区计算方式为:

public static int computeOperatorIndexForKeyGroup(int maxParallelism,
        int parallelism, int keyGroupId) {
    return keyGroupId * parallelism / maxParallelism;
}

这里的keyGroupId就是根据字段的hashCode和Flink的最大并行度计算出来的。

使用方式:

dataStream.keyBy("someKey")
dataStream.keyBy(0)

该方法并不会生成一个算子,也就是说keyBy并没有生成运算拓扑的节点;但是keyBy依然生成了Transformation,也就是说它规定了上下两个节点数据的分区方式。

Flink还有其他几种分区方式。

  • rebalance:重新平衡分区,用于均衡数据,保证下游每个分区(在流系统中基本可以认为分区和并发是一个概念)的负载相同。
  • broadcast:广播分区,将输出的每条数据都发送到下游所有分区。
  • shuffle:随机分区,将输出的数据随机分发到下游分区。
  • forward:本地分区,将输出的数据分发到本地分区。
  • rescale:重新缩放分区,上下游根据分区数量分配对应的分配方式,然后循环发送。比如,如果上游分区为2,而下游分区为4,那么一个上游分区会把数据分发给两个下游分区,而另一个上游分区则把数据分发给其他两个下游分区,分区方式是循环分发。反之,如果下游操作的分区为2,而上游操作的分区为4,那么两个上游分区会把数据分发给一个下游分区,而另两个上游分区则把数据分发给另一个下游分区。
  • global:全局分区,所有数据进入下游第一个分区。

在Flink实现中,StreamPartitioner是分区接口类,每种分区对应一个StreamPartitioner的实现类。我们来看下rebalance对应的RebalancePartitioner。

在DataStream中的接口如下:

public DataStream<T> rebalance() {
    return setConnectionType(new RebalancePartitioner<T>());
}

可以看到设置了分区方式,分区方式(要注意的是StreamPartitioner并不是算子)就是RebalancePartitioner。

public class RebalancePartitioner<T> extends StreamPartitioner<T> {
    private static final long serialVersionUID = 1L;

    private int nextChannelToSendTo;

    @Override
    public void setup(int numberOfChannels) {
        super.setup(numberOfChannels);

        nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
    }
    @Override
    public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
        nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
        return nextChannelToSendTo;
    }

}

主要方法selectChannel的源代码实现比较简单,就是随机选择下发的分区。

3. aggregation

作用:聚合计算。

使用方式:

keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");

该方法会生成算子StreamGroupedReduce,包括fold、reduce及aggregation,只能作用于KeyedStream。需要注意的一点是,这些聚合计算都是针对某个键(Key)的,如果要求全局的最大值、最小值,该方法是无法做到的。

4. window及window apply

作用:根据窗口聚合计算数据。

使用方式:

dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)));
windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
    public void apply (Tuple tuple,
            Window window,
            Iterable<Tuple2<String, Integer>> values,
            Collector<Integer> out) throws Exception {
        int sum = 0;
        for (value t: values) {
            sum += t.f1;
        }
        out.collect (new Integer(sum));
    }
});

该方法会生成窗口。窗口分为Keyed Window和Non-Keyed Window(用WindowAll转换得到),二者的区别在于使用window转换之前是否进行keyBy操作。窗口将会在2.3节详细介绍。

5. union

作用:合并多个DataStream。

使用方式:

dataStream.union(otherStream1, otherStream2, ...);

该方法有个有意思的使用方式是可以合并数据本身,这样就可以得到一个两倍数据的流。该方法同样不会产生算子。

6. window join

作用:通过给定的键和窗口关联两个DataStream。

使用方式:

dataStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});

这里我们通过一个例子来看下Flink中window join是怎么实现的。假如我们有streamA(图2-2中用深灰色元素表示)和streamB(图2-2中用浅灰色元素表示)经过window join处理,伪代码如下(这段代码的主要内容就是两个流进行window join处理的用法示例):

DataStream<Integer> streamA = ...
DataStream<Integer> streamB = ...

streamA.join(streamB)
        .where(<KeySelector>)
        .equalTo(<KeySelector>)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
        .apply(new JoinFunction<Integer, Integer, String> (){
    @Override
    public String join(Integer first, Integer second) {
        return first + "," + second;
    }
});
034-1

图2-2 window join

图2-2中圆圈内的数字表示数据元素本身及事件时间,经过处理之后得到图2-2下方给出的数字组合(这里假设图2-2中给定的同一个窗口内数据的键是一样的,即每个窗口内的数据都满足join条件)。

可以看出这里join的行为和普通的inner join非常类似。为了更好地理解join的结果,我们来看下其源代码实现。window join实现可以从JoinedStreams的apply方法着手。

public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function,
        TypeInformation<T> resultType) {
    // 清除闭包
    function = input1.getExecutionEnvironment().clean(function);

    coGroupedWindowedStream = input1.coGroup(input2)
            .where(keySelector1)
            .equalTo(keySelector2)
            .window(windowAssigner)
            .trigger(trigger)
            .evictor(evictor)
            .allowedLateness(allowedLateness);

    return coGroupedWindowedStream.apply(
        new JoinCoGroupFunction<>(function), resultType);
}

这里可以看到,window join是通过coGroup来实现的,生成一个CoGroupedStreams,然后应用JoinCoGroupFunction。那么coGroup又是怎么实现window join的呢?我们继续来看CoGroupedStreams的apply方法(略去了无关代码):

public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function,
        TypeInformation<T> resultType) {

    DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1
            .map(new Input1Tagger<T1, T2>())
            .setParallelism(input1.getParallelism())
            .returns(unionType);
    DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2
            .map(new Input2Tagger<T1, T2>())
            .setParallelism(input2.getParallelism())
            .returns(unionType);
    // 1
    DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2);
    // 2
    windowedStream = new KeyedStream<TaggedUnion<T1, T2>, KEY>(
            unionStream, unionKeySelector, keyType).window(windowAssigner);
    // 3
    return windowedStream.apply(
            new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType);
}

这部分代码非常清楚地展示了其实现过程:代码1调用union把两个DataStream联合在一起,代码2生成一个WindowedStream,代码3对WindowedStream执行窗口函数。window join本质上还是通过union和window等更基础的算子实现的。我们再来看一下这个过程中传入的JoinCoGroupFunction:

private static class JoinCoGroupFunction<T1, T2, T>
        extends WrappingFunction<JoinFunction<T1, T2, T>>
        implements CoGroupFunction<T1, T2, T> {

    @Override
    public void coGroup(Iterable<T1> first, Iterable<T2> second,
            Collector<T> out) throws Exception {
        for (T1 val1: first) {
            for (T2 val2: second) {
                out.collect(wrappedFunction.join(val1, val2));
            }
        }
    }
}

这个函数就是图2-2描述的不同数据相互连接配对的实现逻辑。window join的窗口还可以是滑动窗口、会话窗口,这里不再详细讲解,实现原理基本一样。

7. interval join

作用:通过给定的键和时间范围连接两个DataStream。假如有数据e1和e2分别来自两个DataStream,那么要让两个数据可以连接输出,需要

e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound

interval join只支持基于事件时间的范围。

使用方式:

keyedStream.intervalJoin(otherKeyedStream)
        .between(Time.milliseconds(-2), Time.milliseconds(2)) // 下界和上界
        .upperBoundExclusive(true) // 可选
        .lowerBoundExclusive(true) // 可选
        .process(new IntervalJoinFunction() {...});

Flink API中实现了两种join:一种是window join,另一种就是interval join。两种join最大的不同在于join的数据分组不一样:window join是在同一个时间窗口内连接;interval join是每个数据元素根据自己的时间都有一个join取值范围,这个范围是由lowerBound和upperBound决定的。我们通过一个例子来直观地看下interval join的过程,然后分析其实现。

如图2-3所示,我们有两个流streamA和streamB,其数据分别用深灰色和浅灰色的圆圈表示,圆圈中的数字代表数据元素本身及事件时间。interval join的伪代码如下:

DataStream<Integer> streamA = ...
DataStream<Integer> streamB = ...

streamA.keyBy(<KeySelector>)
        .intervalJoin(streamB.keyBy(<KeySelector>))
        .between(Time.milliseconds(-2), Time.milliseconds(1))
        .process (new ProcessJoinFunction<Integer, Integer, String(){

    @Override
    public void processElement(Integer left, Integer right, Context ctx,
            Collector<String> out) {
        out.collect(first + "," + second);
    }
});
037-1

图2-3 interval join

由图2-3可以看到,判断两个流的数据可以连接的依据是两个流的数据符合lowerBound和upperBound界定的范围,即

streamA.data.ts + lowerBound <= streamB.data.ts.ts <= streamA.data.ts + upperBound

图2-3中streamA的每个数据元素都可以根据lowerBound和upperBound在streamB上界定一个可以连接的范围,比如:当图中streamA的数据元素2被处理的时候,发现streamB的数据元素0和1满足界定范围,这时输出2,0和2,1;在图中streamB的数据元素1进入算子之后,也会根据范围界限找到符合范围条件的streamA的0数据元素,然后输出0,1。

我们接下来看下Flink中interval join是怎么实现的。关键代码是IntervalJoinOperator中的processElement方法,实现过程与图2-3给出的逻辑一致。首先,需要两个状态保存两个流的数据,这里用的是MapState;然后,处理数据元素,遍历另一个流的数据,查询是否有满足界定范围的数据,如果有的话就将其发送出去;最后,注册一个状态清理函数,用来清理掉永远无法连接上的过期数据。

这里只介绍了几个常用的Transformation和算子,像Connect、CoMap、Split、Select之类的操作和转换就不一一介绍了,有兴趣的读者可以通过官网和源代码学习了解。