
2.2 算子
上一节我们提到DataStream的相互转换会生成算子,本节我们来看下Flink中DataStream的转换有哪些,会生成哪些算子。篇幅所限,这里只选择一些有代表性的转换进行解释说明。
1. flatMap
作用:循环遍历Map中的元素并用相应的函数进行处理。
使用方式:
dataStream.flatMap(new FlatMapFunction<String, String>() { @Override public void flatMap(String value, Collector<String> out) throws Exception { for(String word: value.split(" ")){ out.collect(word); } } });
该方法会生成算子StreamFlatMap。
2. keyBy
作用:按一定规则分区,比如常用的根据某个字段进行keyBy操作,Flink会根据该字段值的hashCode进行分区。分区计算方式为:
public static int computeOperatorIndexForKeyGroup(int maxParallelism, int parallelism, int keyGroupId) { return keyGroupId * parallelism / maxParallelism; }
这里的keyGroupId就是根据字段的hashCode和Flink的最大并行度计算出来的。
使用方式:
dataStream.keyBy("someKey") dataStream.keyBy(0)
该方法并不会生成一个算子,也就是说keyBy并没有生成运算拓扑的节点;但是keyBy依然生成了Transformation,也就是说它规定了上下两个节点数据的分区方式。
Flink还有其他几种分区方式。
- rebalance:重新平衡分区,用于均衡数据,保证下游每个分区(在流系统中基本可以认为分区和并发是一个概念)的负载相同。
- broadcast:广播分区,将输出的每条数据都发送到下游所有分区。
- shuffle:随机分区,将输出的数据随机分发到下游分区。
- forward:本地分区,将输出的数据分发到本地分区。
- rescale:重新缩放分区,上下游根据分区数量分配对应的分配方式,然后循环发送。比如,如果上游分区为2,而下游分区为4,那么一个上游分区会把数据分发给两个下游分区,而另一个上游分区则把数据分发给其他两个下游分区,分区方式是循环分发。反之,如果下游操作的分区为2,而上游操作的分区为4,那么两个上游分区会把数据分发给一个下游分区,而另两个上游分区则把数据分发给另一个下游分区。
- global:全局分区,所有数据进入下游第一个分区。
在Flink实现中,StreamPartitioner是分区接口类,每种分区对应一个StreamPartitioner的实现类。我们来看下rebalance对应的RebalancePartitioner。
在DataStream中的接口如下:
public DataStream<T> rebalance() { return setConnectionType(new RebalancePartitioner<T>()); }
可以看到设置了分区方式,分区方式(要注意的是StreamPartitioner并不是算子)就是RebalancePartitioner。
public class RebalancePartitioner<T> extends StreamPartitioner<T> { private static final long serialVersionUID = 1L; private int nextChannelToSendTo; @Override public void setup(int numberOfChannels) { super.setup(numberOfChannels); nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels); } @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels; return nextChannelToSendTo; } }
主要方法selectChannel的源代码实现比较简单,就是随机选择下发的分区。
3. aggregation
作用:聚合计算。
使用方式:
keyedStream.sum(0); keyedStream.sum("key"); keyedStream.min(0); keyedStream.min("key"); keyedStream.max(0); keyedStream.max("key"); keyedStream.minBy(0); keyedStream.minBy("key"); keyedStream.maxBy(0); keyedStream.maxBy("key");
该方法会生成算子StreamGroupedReduce,包括fold、reduce及aggregation,只能作用于KeyedStream。需要注意的一点是,这些聚合计算都是针对某个键(Key)的,如果要求全局的最大值、最小值,该方法是无法做到的。
4. window及window apply
作用:根据窗口聚合计算数据。
使用方式:
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() { public void apply (Tuple tuple, Window window, Iterable<Tuple2<String, Integer>> values, Collector<Integer> out) throws Exception { int sum = 0; for (value t: values) { sum += t.f1; } out.collect (new Integer(sum)); } });
该方法会生成窗口。窗口分为Keyed Window和Non-Keyed Window(用WindowAll转换得到),二者的区别在于使用window转换之前是否进行keyBy操作。窗口将会在2.3节详细介绍。
5. union
作用:合并多个DataStream。
使用方式:
dataStream.union(otherStream1, otherStream2, ...);
该方法有个有意思的使用方式是可以合并数据本身,这样就可以得到一个两倍数据的流。该方法同样不会产生算子。
6. window join
作用:通过给定的键和窗口关联两个DataStream。
使用方式:
dataStream.join(otherStream) .where(<key selector>).equalTo(<key selector>) .window(TumblingEventTimeWindows.of(Time.seconds(3))) .apply (new JoinFunction () {...});
这里我们通过一个例子来看下Flink中window join是怎么实现的。假如我们有streamA(图2-2中用深灰色元素表示)和streamB(图2-2中用浅灰色元素表示)经过window join处理,伪代码如下(这段代码的主要内容就是两个流进行window join处理的用法示例):
DataStream<Integer> streamA = ... DataStream<Integer> streamB = ... streamA.join(streamB) .where(<KeySelector>) .equalTo(<KeySelector>) .window(TumblingEventTimeWindows.of(Time.milliseconds(2))) .apply(new JoinFunction<Integer, Integer, String> (){ @Override public String join(Integer first, Integer second) { return first + "," + second; } });

图2-2 window join
图2-2中圆圈内的数字表示数据元素本身及事件时间,经过处理之后得到图2-2下方给出的数字组合(这里假设图2-2中给定的同一个窗口内数据的键是一样的,即每个窗口内的数据都满足join条件)。
可以看出这里join的行为和普通的inner join非常类似。为了更好地理解join的结果,我们来看下其源代码实现。window join实现可以从JoinedStreams的apply方法着手。
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { // 清除闭包 function = input1.getExecutionEnvironment().clean(function); coGroupedWindowedStream = input1.coGroup(input2) .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) .allowedLateness(allowedLateness); return coGroupedWindowedStream.apply( new JoinCoGroupFunction<>(function), resultType); }
这里可以看到,window join是通过coGroup来实现的,生成一个CoGroupedStreams,然后应用JoinCoGroupFunction。那么coGroup又是怎么实现window join的呢?我们继续来看CoGroupedStreams的apply方法(略去了无关代码):
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) { DataStream<TaggedUnion<T1, T2>> taggedInput1 = input1 .map(new Input1Tagger<T1, T2>()) .setParallelism(input1.getParallelism()) .returns(unionType); DataStream<TaggedUnion<T1, T2>> taggedInput2 = input2 .map(new Input2Tagger<T1, T2>()) .setParallelism(input2.getParallelism()) .returns(unionType); // 1 DataStream<TaggedUnion<T1, T2>> unionStream = taggedInput1.union(taggedInput2); // 2 windowedStream = new KeyedStream<TaggedUnion<T1, T2>, KEY>( unionStream, unionKeySelector, keyType).window(windowAssigner); // 3 return windowedStream.apply( new CoGroupWindowFunction<T1, T2, T, KEY, W>(function), resultType); }
这部分代码非常清楚地展示了其实现过程:代码1调用union把两个DataStream联合在一起,代码2生成一个WindowedStream,代码3对WindowedStream执行窗口函数。window join本质上还是通过union和window等更基础的算子实现的。我们再来看一下这个过程中传入的JoinCoGroupFunction:
private static class JoinCoGroupFunction<T1, T2, T> extends WrappingFunction<JoinFunction<T1, T2, T>> implements CoGroupFunction<T1, T2, T> { @Override public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception { for (T1 val1: first) { for (T2 val2: second) { out.collect(wrappedFunction.join(val1, val2)); } } } }
这个函数就是图2-2描述的不同数据相互连接配对的实现逻辑。window join的窗口还可以是滑动窗口、会话窗口,这里不再详细讲解,实现原理基本一样。
7. interval join
作用:通过给定的键和时间范围连接两个DataStream。假如有数据e1和e2分别来自两个DataStream,那么要让两个数据可以连接输出,需要
e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
interval join只支持基于事件时间的范围。
使用方式:
keyedStream.intervalJoin(otherKeyedStream) .between(Time.milliseconds(-2), Time.milliseconds(2)) // 下界和上界 .upperBoundExclusive(true) // 可选 .lowerBoundExclusive(true) // 可选 .process(new IntervalJoinFunction() {...});
Flink API中实现了两种join:一种是window join,另一种就是interval join。两种join最大的不同在于join的数据分组不一样:window join是在同一个时间窗口内连接;interval join是每个数据元素根据自己的时间都有一个join取值范围,这个范围是由lowerBound和upperBound决定的。我们通过一个例子来直观地看下interval join的过程,然后分析其实现。
如图2-3所示,我们有两个流streamA和streamB,其数据分别用深灰色和浅灰色的圆圈表示,圆圈中的数字代表数据元素本身及事件时间。interval join的伪代码如下:
DataStream<Integer> streamA = ... DataStream<Integer> streamB = ... streamA.keyBy(<KeySelector>) .intervalJoin(streamB.keyBy(<KeySelector>)) .between(Time.milliseconds(-2), Time.milliseconds(1)) .process (new ProcessJoinFunction<Integer, Integer, String(){ @Override public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) { out.collect(first + "," + second); } });

图2-3 interval join
由图2-3可以看到,判断两个流的数据可以连接的依据是两个流的数据符合lowerBound和upperBound界定的范围,即
streamA.data.ts + lowerBound <= streamB.data.ts.ts <= streamA.data.ts + upperBound
图2-3中streamA的每个数据元素都可以根据lowerBound和upperBound在streamB上界定一个可以连接的范围,比如:当图中streamA的数据元素2被处理的时候,发现streamB的数据元素0和1满足界定范围,这时输出2,0和2,1;在图中streamB的数据元素1进入算子之后,也会根据范围界限找到符合范围条件的streamA的0数据元素,然后输出0,1。
我们接下来看下Flink中interval join是怎么实现的。关键代码是IntervalJoinOperator中的processElement方法,实现过程与图2-3给出的逻辑一致。首先,需要两个状态保存两个流的数据,这里用的是MapState;然后,处理数据元素,遍历另一个流的数据,查询是否有满足界定范围的数据,如果有的话就将其发送出去;最后,注册一个状态清理函数,用来清理掉永远无法连接上的过期数据。
这里只介绍了几个常用的Transformation和算子,像Connect、CoMap、Split、Select之类的操作和转换就不一一介绍了,有兴趣的读者可以通过官网和源代码学习了解。