
2.1 DataStream
在Flink中用DataSet和DataStream来表示数据集,DataSet表示有界的数据,DataStream表示无界的数据。当然这只是概念层面的抽象,DataStream并没有真正的数据。DataStream通过初始化Source来构造,通过一系列的转换来表达计算过程,最后通过Sinker把结果输出到外部系统。Flink内部集成了大量与外部系统交互的Source和Sink,这部分对应Flink中的Connectors模块;还有大量的Transformation,这部分对应Flink中的算子(Operator)。
我们来看个官方的例子:
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class WindowWordCount { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple2<String, Integer>> dataStream = env .socketTextStream("localhost", 9999) .flatMap(new Splitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1); dataStream.print(); env.execute("Window WordCount"); } public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> { @Override public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception { for (String word: sentence.split(" ")) { out.collect(new Tuple2<String, Integer>(word, 1)); } } } }
在这个例子中,env.socketTextStream方法(从socket得到数据)得到DataStream,然后经过DataStream的各种转换,这里有flatMap、keyBy、window等转换,最后通过print把结果输出到标准输出(见图2-1)。

图2-1 Streaming Dataflow
上面的例子是通过socketTextStream从网络端口读取数据得到DataStream,还有一些其他方式,比如:通过读取文件,readFile (fileInputFormat, path);通过读取集合数据集,fromCollection (Collection)。当然,也可以通过方法StreamExecutionEnvironment.addSource (sourceFunction)来定制数据的读取,用户需要实现SourceFunction接口。我们来看下这个方法是怎么得到DataStream的,关键代码如下:
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) { // 此处省略不相关的代码 clean(function); final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function); return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName); }
可以看到该方法新建了一个DataStreamSource。继续看DataStreamSource你会发现,它继承了SingleOutputStreamOperator(这个类从命名看不是很清楚,很容易让人把它误认为是个算子,但实际上它是个DataStream子类),这样我们就得到了一个DataStream。
那么DataStream之间是怎么相互转换的呢?我们来看DataStream的flatMap方法:
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) { TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes( clean(flatMapper), getType(), Utils.getCallLocationName(), true); // 这里用FlatMapFunction构造了一个StreamOperator return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper))); }
这里构造了一个StreamFlatMap类型的算子,然后继续调用transform方法。我们接着看transform方法:
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) { //构造Transformation OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>( this.transformation, operatorName, operator, outTypeInfo, environment.getParallelism()); SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator( environment, resultTransform); // 把所有的Transformation都保存到StreamExecutionEnvironment中 getExecutionEnvironment().addOperator(resultTransform); return returnStream; }
可以看到,其中最主要的工作是基于刚才的算子新建了一个OneInputTransformation,然后把该Transformation保存下来。那么StreamExecutionEnvironment中保存的Transformation用来做什么呢?实际上Flink根据这些Transformation生成整个运行的拓扑,整个生成过程大致如下:
1)根据Transformation生成StreamGraph;
2)根据StreamGraph生成JobGraph;
3)根据JobGraph生成可以调度运行的ExecutionGraph。
整个过程还会在第5章详细介绍,这里可以先大致了解下。这里用户的执行代码FlatMapFunction实际上是通过先传递给算子,然后由算子来调用执行的。
最后本例通过dataStream.print()将结果输出。同样,Flink提供了很多API来把结果写到外部系统,这里简单介绍下。
- writeAsText():输出字符串到文件。
- writeAsCsv():输出CSV格式文本。
- print()/printToErr():标准输出/标准错误输出。
- writeToSocket():输出到socket。
- addSink():addSink与addSource一样,提供可以供用户扩展的输出方式,用户需要实现SinkFunction接口。