
2.4 运行Spark程序(分布式方式)
本节介绍Spark程序的第二种运行方式:分布式运行,Spark程序分布在集群的多个节点上并行处理,这样可以利用集群强大的计算能力,且扩展方便。
Spark程序分布式运行要依赖特定的集群管理器,最常用的有 Yarn 和 Standalone。把Spark程序在Yarn上运行称为Spark on Yarn,同理,把Spark程序在Standalone上运行称为Spark on Standalone。
不管是Spark on Yarn还是Spark on Standalone,都统称为Spark程序的运行模式。
同时,根据 Client 和 Driver 是否在一个进程,又可以分为 client 和 cluster 两种部署模式,Spark on Yarn和Spark on Standalone都支持这两种部署模式。
因此,对Spark程序分布式运行来说,可以分为4类,如表2-1所示。本节将按照表中序号,依次介绍每种分类。
表2-1 Spark运行分类表

2.4.1 Spark on Yarn
如前所述,Spark on Yarn 有两种部署模式:client 和 cluster,本节针对这两种部署模式用示例进行说明。对于每个示例,会先介绍其具体的操作,然后介绍Spark程序在Yarn上的运行过程。
1.Spark on Yarn(client deploy mode)
本节以DFSReadWriteTest为例,说明Spark on Yarn 的client deploy mode。
DFSReadWriteTest 是 spark-examples_2.11-2.3.0.jar 自带的一个示例,它会读取本地文件进行单词计数,然后将本地文件上传到 HDFS,从 HDFS 读取该文件,使用 Spark 进行计数,最后比对两次计数的结果。
(1)提交Spark程序到Yarn上,以client deploy mode运行
运行命令如下。

具体参数说明如下。
●--class org.apache.spark.examples.DFSReadWriteTest,指明此次程序的Main Class;
●--master yarn,指明将程序提交到Yarn上运行;
●/home/user/spark-2.3.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.3.0.jar,程序所在的jar包;
●/etc/profile,表示本地文件路径(注意,不需要用file:///来表示本地文件路径);
●/output,是文件输出路径(位于HDFS上)。
要运行该程序,除了Yarn启动外,还要确保HDFS也已启动;
Spark在Yarn上运行,不需要在Spark中startall.sh脚本,也就是说不需要启动Master、Worker这些和Standalone有关的组件;
deploy mode 可以通过 spark-submit 后面传参--deploy-mode 来指定,默认是 client 模式,即 spark-submit 后面如果不加--deploy-mode,则部署模式是 client,如果要指定 cluster 模式,则要加--deploy-mode cluster,从上面的命令可以看出,其部署模式是client。
程序执行后,先得到/etc/profile的单词计数结果A;将profile上传到HDFS的/output目录;读取该文件,并使用RDD操作进行单词计数,计数结果写回/output/dfs_read_write_test目录,存储为文件B;最后比较A和B,如结果正确,则输出如下。

初次运行程序时,可能会有以下两个报错。
报错1的报错信息如下所示。

报错原因:没有设置环境变量:HADOOP_CONF_DIR 或YARN_CONF_DIR。
解决办法:在/etc/profile中增加下面的内容。


切回到普通用户,使刚才的配置生效。

报错2的报错信息如下所示。
报错原因:Container(容器)的内存超出了虚拟内存限制,Container 的虚拟内存为2.1GB,但使用了2.3GB。

Container是Yarn上的资源抽象,Yarn上的Container目前包括CPU和内存两种资源;
提交到Yarn上的程序,会在Container中运行。例如,Spark on Yarn程序运行时,其Executor就是在Container中运行的。
解决办法:
方法一:改变分配 Container 的最小物理内存值,将 yarn.scheduler.minimum-allocation-mb设置成2GB,重启Yarn,每个Container向RM申请的虚拟内存为2GB×2.1=4.2GB。
yarn.scheduler.minimum-allocation-mb:默认值是1GB;
yarn.scheduler.maximum-allocation-mb:默认值是8GB。
方法二:改变分配Container的虚拟内存比例,将yarn.nodemanager.vmem-pmem-ratio设置成3,重启Yarn,每个Container向RM申请的虚拟内存为1GB×3=3GB。
yarn.nodemanager.vmem-pmem-ratio:默认值是2.1,是Container中虚拟内存/物理内存的值。
方法三:不检查虚拟机内存限制,将yarn.nodemanager.vmem-check-enabled设置为false,重启Yarn。
yarn.nodemanager.vmem-check-enabled:默认值是 true,它会检查 Container 中虚拟内存的使用是否超过yarn.scheduler.minimum-allocation-mb*yarn.nodemanager.vmem-pmem-ratio。
(2)Spark程序在Yarn上的执行过程(client deploy mode)
本例中,DFSReadWriteTest是一个Spark程序,部署模式是client,那么,它在Yarn上的执行过程是怎样的?图 2-6 列出了Spark程序在 Yarn 上的执行过程(client deploy mode),同MapReduce程序一样,Spark程序在Yarn上也是运行在Container之中的。具体说明如下。
1)client模式下,Client和Driver在一个进程内,向Resource Manager发出请求;
2)Resource Manager 指定一个节点启动 Container,用来运行 AM(Application Master);AM向Resource Manager申请Container来执行程序,Resource Manager向AM返回可用节点;
3)AM同可用节点的NodeManager通信,在每个节点上启动Container,每个Container中运行一个Spark的Executor,Executor再运行若干Tasks;
4)Driver 与 Executor 通信,向其分配 Task 并运行,并监测其状态,直到整个任务完成;
5)总任务完成后,Driver清理Executor,通知AM、AM向ResourceManager请求释放Cotainer,所有资源清理完毕后,AM注销并退出、Client退出。

图2-6 Spark程序在Yarn上执行过程图(client deploy mode)
AM的作用是Container的申请、释放和管理,它是Yarn中的一个概念;
Spark Driver负责Spark任务的管理和监控;
Client负责Spark任务的提交。
2.Spark on Yarn(cluster deploy mode)
本节继续以DFSReadWriteTest为例,说明Spark on Yarn的cluster deploy mode。
(1)提交DFSReadWriteTest到Yarn运行(cluster deploy mode)
下面使用Cluster模式在Yarn上运行Spark程序,命令如下,增加了--deploy-mode cluster的参数。


(2)Spark程序在Yarn上执行过程(cluster deploy mode)
在cluster deploy mode下,Spark on Yarn 执行过程如图2-7所示,Client和Driver分离,Driver在另一个节点,Driver和AM合并在同一个进程内,执行过程如下。
1)Client向ResourceManager提交Application请求;
2)ResourceManager指定一个节点,启动Container来运行AM和Spark Driver;AM根据任务情况向ResourceManager申请Container;ResourceManager返回可以运行Container的NodeManger;
3)AM与这些NodeManager通信,启动Container,在Container中运行Executor;
4)Spark Driver与Executor通信,向它们分配Task,并监控Task执行状态;
5)所有Task执行完毕后,清理Executor(总任务执行完毕后,有的Executor已经执行完毕,有的 Executor 可能还在执行 Task),清理完毕后,Driver 通知 AM,AM 请求ResourceManager,释放所有Container;Client收到Application FINISHED后退出。

图2-7 Spark程序在Yarn上执行过程图(cluster deploy mode)
2.4.2 Spark on Standalone
Standalone是Spark自带的一个集群管理器,主/从式架构,包括Master和Worker两种角色,Master管理所有的Worker,Worker负责单个节点的管理。
Spark程序可以在Standalone上运行,好处是简单、方便,可以快速部署;缺点是不通用,只支持Spark,不支持MapReduce等,此外功能没有专门的集群管理器如Yarn等强大。
如前所述,Spark on Standalone有两种部署模式:client和cluster,本节针对这两种部署模式,用示例说明。每个示例,都会先介绍具体的操作,然后介绍Spark程序在 Standalone上的运行过程。
1.Spark on Standalone(client deploy mode)
(1)部署Standalone
本书的Spark Standalone框架部署如图2-8所示,分为6层,与之前类似,第1层到第5层是已有的,第6层为Standalone,也是本节需要构建的。Standalone包含在Spark Package中,因此,不需要要装额外的Package,直接在Spark中配置即可。

图2-8 Spark Standalone部署图
Standalone的Master和Worker都部署在scaladev虚拟机上,具体步骤如下。
1)配置slaves文件,该文件保存了整个集群中被管理节点的主机名。先复制模板文件;

2)编辑slaves文件;

3)将localhost修改为scaladev;

4)添加JAVA_HOME;

5)编辑spark-env.sh文件;

6)在最后一行增加下面的内容;

7)启动Standalone集群;

8)验证,使用jps查看当前运行的Java进程,如下所示,如果Master和Worker都在,则说明启动、配置成功;

9)查看Standalone的Web监控界面,如图2-9所示。
在Host浏览器中输入http://192.168.0.226:8080,其中192.168.0.226是scaladev的IP地址,是Master所在的IP。在Web监控界面可以查看集群信息、Spark的Application运行信息等。

图2-9 Standalone的Web界面
(2)提交Spark程序到Standalone上,以client deploy mode运行
提交前应确保HDFS已经启动,HDFS上/output目录下已经清空。具体命令如下。

其中,-master spark://scaladev:7077表示连接Standalone集群,scaladev是Master所在的主机名,没有指定--deploy-mode cluster,则部署模式为默认的client。
(3)Spark程序在Standalone上的运行过程(client deploy mode)
client部署模式下,Spark程序在Standalone的运行过程如图2-10所示。
1)Client初始化,内部启动Client模块和Driver模块,并向Master发送Application请求;
2)Master接收请求,为其分配Worker,并通知Worker启动Executor;
3)Executor向Driver注册,Driver向Executor发送Task,Executor执行Task,并反馈执行状态,Driver再根据Excutorer的当前情况,继续发送Task,直到整个Job完成。

图2-10 Spark程序在Standalone上的运行过程(client deploy mode)
2.Spark on Standalone(cluster deploy mode)
(1)提交Spark程序到Standalone,以cluster deploy mode运行
具体命令如下。

有4点需要特别注意。
●采用cluster deploy mode 时,Driver 需要一个处理器,后续Executor还需要另外的处理器,如果虚拟机 scaladev 只有 1 个处理器的话,就会出现资源不足的警告,导致程序运行失败,如下所示;

解决办法为:增加虚拟机的处理器为两个。
● 命令参数中,--master spark://scaladev:6066 用来指定 Master 的 URL,cluster deploy mode下,Client会向Master提交Rest URL,spark://scaladev:6066就是Spark的Rest URL;如果还是使用原来的参数--master spark://scaladev:7077,则会报下面的错误;

●HDFS的路径前面要加hdfs://,因为Cluster Mode下,core-site.xml中的defaultFS设置不起作用;
●Client提交成功后就会退出,而不是等待Application结束后才退出。
cluster deploy mode的还有专门的Driver日志,位于Driver节点Spark目录的work目录下,会创建Driver开头的目录,例如:driver-20180815020432-0015,在这个下面有stderr和stdout两个日志文件。
(2)Spark程序在Standalone上的运行过程(cluster deploy mode)
cluster deploy mode下,Spark程序在Standalone的运行过程如图2-11所示。
1)Client初始化,内部启动Client模块,并向Master注册Driver模块,并等待Driver信息,待后续Driver模块正常运行,Client退出;
2)Master 接收请求,分配一个 Worker,并通知此 Worker 运行 Driver 模块,Driver 向Master发送Application请求;
3)Master接收请求,分配Worker,并通知这些Worker启动Executor;
4)Executor向Driver注册,Driver向Executor发送Task,Executor执行Task,并反馈执行状态,Driver再根据Executor的当前情况,继续发送Task,直到整个Job完成。

图2-11 Spark程序在Standalone上的运行过程(cluster deploy mode)
3.Spark on Standalone的日志
Spark程序运行时,其日志在排查问题时非常重要。其中,Standalone 的日志分为两类:框架日志和应用日志。
框架日志是指Master和Worker的日志,Master日志位于Master的Spark目录下的logs目录下,文件名为:spark-user-org.apache.spark.deploy.master.Master-1-scaladev.out;Worker位于每个Worker节点的Spark目录下的logs目录下,文件名为:spark-user-org.apache.spark.deploy.worker.Worker-1-scaladev.out。
应用日志是指每个Spark程序运行的日志,因为一个Spark程序可能会启动多个Executor,每个Executor都会有一个日志文件,位于Executor所在的Worker节点的Spark目录的work目录下,每个Spark运行会分配一个ID,运行时在控制台会打印该ID的值,如下所示。

列出woker目录下的内容,命令如下。

显示内容如下,在Worker下,每个ID会有一个目录。

列出下面路径的内容,命令如下。

显示内容如下,可见每个Executor的日志在该目录下。

应用日志是分散在Worker节点上的,Executor在哪个Worker节点上运行,日志就在此Worker节点上。
此外,如果使用cluster部署模式,在Client的Spark目录work目录下,还会有对应的driver日志。