Flink技术内幕:架构设计与实现原理
上QQ阅读APP看书,第一时间看更新

2.1 DataStream

在Flink中用DataSet和DataStream来表示数据集,DataSet表示有界的数据,DataStream表示无界的数据。当然这只是概念层面的抽象,DataStream并没有真正的数据。DataStream通过初始化Source来构造,通过一系列的转换来表达计算过程,最后通过Sinker把结果输出到外部系统。Flink内部集成了大量与外部系统交互的Source和Sink,这部分对应Flink中的Connectors模块;还有大量的Transformation,这部分对应Flink中的算子(Operator)。

我们来看个官方的例子:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class WindowWordCount {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter())
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word: sentence.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

在这个例子中,env.socketTextStream方法(从socket得到数据)得到DataStream,然后经过DataStream的各种转换,这里有flatMap、keyBy、window等转换,最后通过print把结果输出到标准输出(见图2-1)。

028-1

图2-1 Streaming Dataflow

上面的例子是通过socketTextStream从网络端口读取数据得到DataStream,还有一些其他方式,比如:通过读取文件,readFile (fileInputFormat, path);通过读取集合数据集,fromCollection (Collection)。当然,也可以通过方法StreamExecutionEnvironment.addSource (sourceFunction)来定制数据的读取,用户需要实现SourceFunction接口。我们来看下这个方法是怎么得到DataStream的,关键代码如下:

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
        String sourceName, TypeInformation<OUT> typeInfo) {
    // 此处省略不相关的代码
    clean(function);
    final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
    return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
}

可以看到该方法新建了一个DataStreamSource。继续看DataStreamSource你会发现,它继承了SingleOutputStreamOperator(这个类从命名看不是很清楚,很容易让人把它误认为是个算子,但实际上它是个DataStream子类),这样我们就得到了一个DataStream。

那么DataStream之间是怎么相互转换的呢?我们来看DataStream的flatMap方法:

public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {

    TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(
            clean(flatMapper), getType(), Utils.getCallLocationName(), true);
    // 这里用FlatMapFunction构造了一个StreamOperator
    return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
}

这里构造了一个StreamFlatMap类型的算子,然后继续调用transform方法。我们接着看transform方法:

public <R> SingleOutputStreamOperator<R> transform(String operatorName,
        TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

    //构造Transformation
    OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
            this.transformation,
            operatorName,
            operator,
            outTypeInfo,
            environment.getParallelism());

    SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(
            environment, resultTransform);
    // 把所有的Transformation都保存到StreamExecutionEnvironment中
    getExecutionEnvironment().addOperator(resultTransform);
    return returnStream;
}

可以看到,其中最主要的工作是基于刚才的算子新建了一个OneInputTransformation,然后把该Transformation保存下来。那么StreamExecutionEnvironment中保存的Transformation用来做什么呢?实际上Flink根据这些Transformation生成整个运行的拓扑,整个生成过程大致如下:

1)根据Transformation生成StreamGraph;

2)根据StreamGraph生成JobGraph;

3)根据JobGraph生成可以调度运行的ExecutionGraph。

整个过程还会在第5章详细介绍,这里可以先大致了解下。这里用户的执行代码FlatMapFunction实际上是通过先传递给算子,然后由算子来调用执行的。

最后本例通过dataStream.print()将结果输出。同样,Flink提供了很多API来把结果写到外部系统,这里简单介绍下。

  • writeAsText():输出字符串到文件。
  • writeAsCsv():输出CSV格式文本。
  • print()/printToErr():标准输出/标准错误输出。
  • writeToSocket():输出到socket。
  • addSink():addSink与addSource一样,提供可以供用户扩展的输出方式,用户需要实现SinkFunction接口。