Flink与Kylin深度实践
上QQ阅读APP看书,第一时间看更新

2.5 Flink on YARN模式

Flink任务也可以运行在YARN上面,将Flink任务提交到YARN平台可以实现统一的任务资源调度管理,方便开发人员管理集群中的CPU和内存等资源。如图2-1所示,Flink on YARN也有两种模式:单个YARN Session模式和多个YARN Session模式。

环境要求:Hadoop至少为2.2版;HDFS及YARN服务启动正常。

图2-1 Flink on YARN模式

2.5.1 单个YARN Session模式

这种模式需要先启动集群,然后再提交作业,接着会向YARN申请资源空间,之后资源保持不变。如果资源不足,下一个作业就无法提交,只能等到YARN中的一个作业执行完成后释放资源,所以实际工作中一般不会使用这种模式。

这种模式不需要做任何配置,可以直接将任务提交到YARN集群,这之前需要提前启动HDFS以及YARN集群。

1.修改yarn-site.xml配置文件

在node01上执行以下命令开始修改yarn-site.xml。

添加以下配置属性到yarn-site.xml文件中。

然后在node01上将修改后的配置文件复制到node02与node03服务器,命令如下。

之后重新启动YARN集群。

2.修改Flink配置文件

在node01上执行以下命令修改Flink配置文件。

3.在HDFS上创建文件夹

命令如下。

4.在YARN中启动Flink集群

直接在node01上执行以下命令,在YARN中启动一个全新的Flink集群。

可以直接使用yarn-session.sh这个脚本来启动。也可以使用“--help”查看更多参数设置。

注意:

如果启动时YARN的内存太小,则可能报出以下错误。

此时需要修改yarn-site.xml添加以下配置,然后重启YARN。

这个参数的功能主要是让YARN集群跳过集群资源检查,避免由于虚拟机内存不够而导致任务提交失败。

5.查看YARN管理界面

访问YARN的8088管理界面http://node01:8088/cluster,发现其中有一个应用,这是为Flink单独启动的一个Session。

6.提交任务

使用Flink自带的jar包实现单词统计功能。

在node01上准备单词文件。

文件内容如下。

在HDFS上创建文件夹并上传文件。

在node01上执行以下命令,提交任务到Flink集群。

7.验证YARN Session的高可用性

通过node01的8088界面,查看YARN Session在哪一台机器上启动,然后关闭YARN Session进程,这时YARN Session会在另外一台机器上重新启动。

找到YarnSessionClusterEntrypoint所在的服务器,然后关闭该进程。

关闭进程之后,会发现YARN集群重新启动了一个YarnSessionClusterEntrypoint进程在其他机器上。如图2-2所示,YARN上又启动了一个新的任务。

图2-2 Flink on YARN的高可用性

2.5.2 多个YARN Session模式

这种模式的优点是一个任务对应一个Job,即每提交一个Job都会根据自身情况向YARN申请资源,直到Job执行完成,并不会影响下一个Job的正常运行,除非YARN上没有任何资源。

注意:

Client端必须设置YARN_CONF_DIR、HADOOP_CONF_DIR或者HADOOP_HOME环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败。

这种模式下不需要在YARN中启动任何集群,直接提交任务即可。

1.直接执行命令提交任务

编写提交任务的脚本并执行。

2.查看输出结果

在HDFS中执行以下命令查看输出结果。

3.查看“flink run”的帮助文档

使用“--help”查看帮助文档中的参数。

结果如下。

2.5.3“flink run”脚本分析

提交Flink任务时,可以加入以下这些参数。

1)默认查找当前YARN集群中已有YARN Session信息中的JobManager(所在路径:/tmp/.yarn-properties-root)。

2)连接指定主机和端口的JobManager。

3)启动一个新的YARN-Session。

注意:

YARN Session命令行的选项也可以使用“./bin/flink”获得。它们都有一个“y”或者“yarn”的前缀,例如:bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar。