第2章 安装及配置Kafka
本章将介绍Kafka集群的安装与配置,包含以下内容:安装与配置基础环境、安装Kafka集群、安装与配置Kafka监控工具、编译Kafka源代码,以及将Kafka源代码导入代码编辑器中等。其中涉及的实战内容并不复杂。
2.1 本章教学视频说明
视频内容:基础环境的准备、安装Kafka集群、安装与配置Kafka监控工具、编译Kafka源代码,以及将Kafka源代码导入代码编辑器等。
视频时长:19分钟。
视频截图见图2-1所示。
图2-1 本章教学视频截图
2.2 安装与配置基础环境
需安装以下几个软件。
1. Linux操作系统
Kafka设计之初便是以Linux操作系统作为前提的,因此Linux操作系统能完美支持Kafka。本节以64位CentOS 6.6为例。
提示:
CentOS(Community Enterprise Operating System,社区企业操作系统)是Linux发行版之一。
2. Java软件开发工具包(Java Development Kit, JDK)
Kafka的源代码是利用Scala语言编写的,它需要运行在Java虚拟机(Java Virtual Machine, JVM)上。因此,在安装Kafka之前需要先安装JDK。
3. ZooKeeper
Kafka是一个分布式消息中间件系统,它依赖ZooKeeper管理和协调Kafka集群的各个代理(Broker)节点。因此,在安装Kafka集群之前需要先安装ZooKeeper集群。
在安装CentOS、JDK、ZooKeeper之前,需要准备好这些软件的安装包。安装包选择rpm或tar.gz类型均可。本书选择的是64位操作系统下的tar.gz类型的安装包,版本信息与下载地址见表2-1。
表2-1 版本信息与下载地址
2.2.1 安装并配置Linux操作系统
目前,市场上Linux操作系统的版本有很多,如RedHat、Ubuntu、CentOS等。读者可以根据自己的喜好选取合适的Linux操作系统,这对学习本书的影响不大。
CentOS 6.6安装包下载界面如图2-2所示。本书选择64位CentOS 6.6的镜像文件进行下载。
图2-2 CentOS操作系统下载预览
提示:
如果有现成的物理机或者云主机供学习使用,则可以跳过下面内容,直接进入2.2.2小节开始学习。如果是自行安装虚拟机学习使用,则请继续阅读下面内容。
在Windows操作系统中,安装Linux操作系统虚拟机可以使用VMware或VirtualBox。
在Mac操作系统中,安装Linux操作系统虚拟机可以使用Parallels Desktop或VirtualBox。
提示:
无论在Windows操作系统中还是在Mac操作系统环境中,VirtualBox软件都是免费的。而VMware和Parallels Desktop均属于商业产品,用户需要付费使用。
使用这些软件安装Linux操作系统虚拟机,不涉及复杂的操作,均是直接单击“下一步”按钮,直到最后单击“完成”按钮。
1.配置网络
安装完Linux操作系统虚拟机后,如果虚拟机需要连接外网,应做一个简单的网络配置。具体操作命令如下:
# 打开网络配置文件 [hadoop@dn1~]$ vi /etc/sysconfig/network-scripts/ifcfg-eth0 # 修改ONBOOT的值为yes ONBOOT=yes # 保存并退出
完成网络配置后,重启虚拟机使配置生效。具体操作命令如下:
# 重启Linux操作系统虚拟机。如果是非root用户,则重启可能需要使用sudo命令 [hadoop@dn1~]$ sudo reboot
2.配置hosts系统文件
这里安装的是三台Linux操作系统虚拟机。在其中一台配置好主机的hosts文件,然后使用复制命令将该hosts文件分发到其他两台机器中。
(1)在其中一个主机上配置hosts文件,具体操作命令如下:
# 打开dn1节点的hosts文件并编辑 [hadoop@dn1~]$ sudo vi /etc/hosts # 添加如下内容 10.211.55.5 dn1 10.211.55.6 dn2 10.211.55.8 dn3 # 保存并退出
(2)使用Linux的复制命令分发文件。具体操作命令如下:
# 在/tmp目录中添加一个临时主机名文本文件 [hadoop@dn1~]$ vi /tmp/add.list # 添加如下内容 dn2 dn3 # 保存并退出 # 将/etc/hosts文件复制到/tmp目录中 [hadoop@dn1~]$ cp /etc/hosts /tmp # 使用scp命令将hosts文件下发到其他主机 [hadoop@dn1~]$ for i in `cat /tmp/add.list`; do scp /tmp/hosts $i:/tmp; done
(3)登录到其他主机,将/tmp目录下的hosts文本文件复制到/etc目录中。
2.2.2 实例1:安装与配置Java运行环境
本书选择的是Oracle官方的JDK8,版本号为8u144,如图2-3所示。
图2-3 JDK下载版本预览
提示:
在学习本书时,可能Oracle官方网站的JDK版本号又更新了,选择其他版本号的JDK进行下载也是可以的,这并不影响对本书内容的学习。
实例描述
在Linux操作系中安装JDK。其中涉及下载安装包、解压安装包并配置、同步安装包等操作。
1.安装JDK
由于CentOS操作系统可能会自带OpenJDK环境,所以,在安装JDK之前,需要先检查CentOS操作系统中是否存在OpenJDK环境。如存在,则需要先将其卸载。
具体操作步骤如下。
(1)卸载CentOS操作系统自带JDK环境。如果不存在自带的JDK环境,则可跳过此步骤。
# 查找Java安装依赖库 [hadoop@dn1~]$ rpm -qa | grep java # 卸载Java依赖库 [hadoop@dn1~]$ yum -y remove java*
(2)将下载的JDK安装包解压缩到指定目录下(可自行指定),详细操作命令如下。
# 解压JDK安装包到当前目录 [hadoop@dn1~]$ tar -zxvf jdk-8u144-linux-x64.tar.gz # 移动JDK到/data/soft/new目录下,并改名为jdk [hadoop@dn1~]$ mv jdk-8u144-linux-x64 /data/soft/new/jdk
2.配置JDK
将JDK解压缩到指定目录后,需要配置JDK的全局环境变量,具体操作步骤如下。
(1)添加JDK全局变量,具体操作命令如下。
# 打开当前用户下的.bash_profile文件并进行编辑 [hadoop@dn1~]$ vi ~/.bash_profile # 添加如下内容 export JAVA_HOME=/data/soft/new/jdk export $PATH:$JAVA_HOME/bin # 进行保存并退出
(2)若要使配置的内容立即生效,则需要执行以下命令。
# 使用source命令或者英文点(.)命令,让配置文件立即生效 [hadoop@dn1~]$ source ~/.bash_profile
(3)验证JDK环境是否安装成功,具体操作命令如下。
# 使用Java语言version命令来检验 [hadoop@dn1~]$ java -version
如果操作系统终端显示了对应的JDK版本号(如图2-4所示),则认为JDK环境配置成功。
图2-4 JDK打印版本信息
3.同步安装包
将第一台主机上解压后的JDK文件夹和环境变量配置文件.bash_profile分别同步到其他两个主机上。具体操作命令如下。
#在/tmp目录中添加一个临时主机名文本文件 [hadoop@dn1~]$ vi /tmp/add.list # 添加如下内容 dn2 dn3 # 保存并退出 #使用scp命令同步JDK文件夹到指定目录 [hadoop@dn1~]$ for i in 'cat /tmp/add.list'; do scp -r /data/soft/new/jdk $i: /data/soft/new/; done #使用scp命令将.bash_profile文件分发到其他主机 [hadoop@dn1~]$ for i in 'cat /tmp/add.list'; do scp ~/.bash_profile $i:~/; done
2.2.3 实例2:配置SSH免密码登录
Secure Shell简称SSH,由IETF的网络小组所制定。SSH协议建立在应用层基础上,专为远程登录会话和其他网络服务提供安全性保障。
提示:
国际互联网工程任务组(The Internet Engineering Task Force, IETF)是一个公开性质的大型民间国际团体,汇集了大量与互联网架构和“互联网正常运作”相关的网络设计者、运营者、投资人及研究人员。
1.了解SSH协议
利用SSH协议可以有效地防止在远程管理过程中重要信息的泄露。SSH起初是UNIX操作系统上的一个应用程序,后来扩展到其他操作系统平台。
正确使用SSH协议可以弥补网络中的漏洞。几乎所有的UNIX平台(例如Linux、AIX、Solaris)都可以运行SSH客户端。
提示:
AIX是IBM基于AT&T Unix System V开发的一套类似UNIX的操作系统,可运行在利用IBM专有的Power系列芯片设计的小型机上。
Solaris是Sun MicroSystems研发的计算机操作系统,它是UNIX操作系统的衍生版本之一。
在Windows、Linux和MacOS操作系统上的SSH客户端,可以使用SSH协议登录到Linux服务器。在SSH工具中输入Linux服务器的用户名和密码,或者在Linux服务器中添加客户端的公钥来进行登录。
登录的流程如图2-5所示。
图2-5 SSH登录
2.配置SSH免密登录
在Kafka集群启动时,实现三台主机免密码登录。这里使用SSH来实现。
实例描述
在Linux操作系统中配置SSH免密登录,涉及创建密钥、认证授权、文件赋权等操作。
具体操作步骤如下:
(1)创建密钥。
在Linux操作系统中,使用ssh-keygen命令来创建密钥文件,具体操作命令如下。
# 生成当前节点的私钥和公钥 [hadoop@dn1~]$ ssh-keygen -t rsa
接下来只需按Enter键,不用设置任何信息。命令操作结束后会在/home/hadoop/.ssh/目录下生成对应的私钥和公钥等文件。
(2)认证授权。
将公钥(id_rsa.pub)文件中的内容追加到authorized_keys文件中,具体操作命令如下。
# 将公钥(id_rsa.pub)文件内容追加到authorized_keys [hadoop@dn1~]$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
(3)文件赋权。
在当前账号下,需要给authorized_keys文件赋予600权限,否则会因为权限限制导致登录失败。文件权限操作命令如下。
# 赋予600权限 [hadoop@dn1~]$ chmod 600~/.ssh/authorized_keys
(4)在其他节点上创建密钥。
在Kafka集群的其他节点下,使用ssh-keygen -t rsa命令生成对应的公钥。然后在第一台主机上使用Linux同步命令将authorized_keys文件分发到其他节点的/home/hadoop/.ssh/目录中。详细操作命令如下。
#在/tmp目录中添加一个临时主机名文本文件 [hadoop@dn1~]$ vi /tmp/add.list # 添加如下内容 dn2 dn3 # 保存并退出 #使用scp命令同步authorized_keys文件到指定目录 [hadoop@dn1~]$ for i in `cat /tmp/add.list`; do scp ~/.ssh/authorized_keys $i:/home/hadoop/.ssh; done
提示:
如果在登录过程中系统没有提示输入密码,即表示免密码登录配置成功。反之,则配置失败。读者需核对配置步骤是否和本书一致。
为了方便维护集群,通常在所有主机中选择一台主机作为“管理者”,让其负责下发配置文件。这台主机与其他主机之间的免密关系如图2-6所示。
图2-6 SSH免密关系
在维护集群(例如执行启动、查看状态、停止等操作)时,通常在拥有“管理者”权限的主机上执行批处理脚本。
2.2.4 实例3:安装与配置Zookeeper
Zookeeper是一个分布式应用程序协调服务系统,是大数据生态圈的重要组件。Kafka、Hadoop、HBase等系统均依赖Zookeeper来提供一致性服务。
Zookeeper是将复杂且容易出错的核心服务进行封装,然后对外提供简单易用、高效稳定的接口。
实例描述
Zookeeper安装涉及下载软件包、配置Zookeeper系统文件、配置环境变量、启动Zookeeper等操作。
1.安装Zookeeper
(1)下载Zookeeper软件包。
按表2-1中的地址下载3.4.6版本安装包,然后将其解压到指定位置。本书所有的安装包都会被解压到/data/soft/new目录下。
(2)解压软件包。
对Zookeeper软件包进行解压和重命名,具体操作命令如下:
# 解压文件命令 [hadoop@dn1~]$ tar –zxvf zookeeper-3.4.6.tar.gz # 重命名zookeeper-3.4.6文件夹为zookeeper [hadoop@dn1~]$ mv zookeeper-3.4.6 zookeeper # 创建状态数据存储文件夹 [hadoop@dn1~]$ mkdir –p /data/soft/new/zkdata
2.配置Zookeeper系统文件
(1)配置zoo.cfg文件。
在启动Zookeeper集群之前,需要配置Zookeeper集群信息。
读者可以将Zookeeper安装目录下的示例配置文件重命名,即,将zoo_sample.cfg修改为zoo.cfg。按如下所示编辑zoo.cfg文件。
# 配置需要的属性值 # zookeeper数据存放路径地址 dataDir=/data/soft/new/zkdata # 客户端端口号 clientPort=2181 # 各个服务节点地址配置 server.1=dn1:2888:3888 server.2=dn2:2888:3888 server.3=dn3:2888:3888
(2)配置注意事项。
在配置的dataDir目录下创建一个myid文件,该文件里面写入一个0~255的整数,每个Zookeeper节点上这个文件中的数字要是唯一的。本书的这些数字是从1开始的,依次对应每个Kafka节点。主机与代理节点(Broker)的对应关系如图2-7所示。
图2-7 主机与代理节点(Broker)的对应关系
(3)操作细节。
文件中的数字要与DataNode节点下的Zookeeper配置的数字保持一致。例如,server.1=dn1:2888:3888,则dn1主机下的myid配置文件应该填写数字1。
在dn1主机上配置好ZooKeeper环境后,可使用scp命令将其传输到其他节点,具体命令如下。
#在/tmp目录中添加一个临时主机名文本文件 [hadoop@dn1~]$ vi /tmp/add.list # 添加如下内容 dn2 dn3 # 保存并退出 #使用scp命令同步Zookeepers文件夹到指定目录 [hadoop@dn1~]$ for i in `cat /tmp/add.list`; do scp -r /data/soft/new/zookeeper $i:/data/soft/new; done
完成文件传输后,dn2主机和dn3主机上的myid文件中的数字分别被修改为2和3。
3.配置环境变量
在Linux操作系统中,可以对Zookeeper做全局的环境变量配置。这样做的好处是,可以很方便地使用Zookeeper脚本,不用切换到Zookeeper的bin目录下再操作。具体操作命令如下。
# 配置环境变量 [hadoop@dn1~]$ vi ~/.bash_profile # 配置zookeeper全局变量 export ZK_HOME=/data/soft/new/zookeeper export PATH=$PATH: $ZK_HOME/bin # 保存编辑内容,并退出
之后,用以下命令使刚刚配置的环境变量立即生效:
# 使环境变量立即生效 [hadoop@dn1~]$ source ~/.bash_profile
接着,在其他两台主机上也做相同的配置操作。
4.启动Zookeeper
在安装了Zookeeper的节点上,分别执行启动进程的命令:
# 在不同的节点上启动zookeeper服务进程 [hadoop@dn1~]$ zkServer.sh start [hadoop@dn2~]$ zkServer.sh start [hadoop@dn3~]$ zkServer.sh start
但这样管理起来不够方便。可以对上述启动命令进行改进,例如编写一个分布式启动脚本(zk-daemons.sh),具体如下:
# 编写Zookeeper分布式启动脚本,可以输入start|stop|restart|status等命令 #! /bin/bash hosts=(dn1 dn2 dn3) for i in ${hosts[@]} do ssh hadoop@$i "source /etc/profile; zkServer.sh $1" & done
5.验证
完成启动命令后,在终端中输入jps命令。若显示QuorumPeerMain进程名称,即表示服务进程启动成功。也可以使用Zookeeper的状态命令status来查看,具体操作命令如下。
# 使用status命令来查看 [hadoop@dn1~]$ zk-daemons.sh status
结果如图2-8所示。在Zookeeper集群运行正常的情况下,若有三个节点,则会选举出一个Leader和两个Follower。
图2-8 进程状态预览结果
提示:
读者也可以查看Zookeeper的运行日志zookeeper.out文件,其中记录了Zookeeper的启动过程以及运行过程。
2.3 实例4:部署Kafka
安装Kafka比较简单,单机模式和分布式模式的部署步骤基本一致。由于生产环境所使用的操作系统一般是Linux,所以本书Kafka集群的部署也是基于Linux操作系统来完成的。
实例描述:
按两种模式部署(单机模式部署和分布式模式)Kafka系统,并观察结果。
2.3.1 单机模式部署
如果是测试环境或需要在本地调试Kafka应用程序代码,则会以单机模式部署一个Kafka系统。
部署的步骤也非常简单:启动一个Standalone模式的Zookeeper,然后启动一个Kafka Broker进程。
提示:
本书选择的Kafka安装包版本是0.10.2.0,读者在学习本书时,Kafka官方可能发布了更新的版本。读者可以选择更新的版本来安装,其配置过程依然可以参考本书所介绍的,这并不影响对本书的学习。
1.下载Kafka安装包
访问Kafka官方网站,找到下载地址,然后在Linux操作系统中使用wget命令进行下载。具体操作命令如下。
# 使用wget命令下载安装包 [hadoop@dn1~]$ wget https://archive.apache.org/dist/kafka/0.10.2.0\ /kafka_2.11-0.10.2.0.tgz
2.解压Kafka安装包
下载了Kafka安装包后,在Linux操作系统指定位置进行解压操作。具体操作命令如下。
# 解压安装包 [hadoop@dn1~]$ tar -zxvf kafka_2.11-0.10.2.0.tgz # 重命名 [hadoop@dn1~]$ mv kafka_2.11-0.10.2.0 kafka
3.配置Kafka全局变量
在/home/hadoop/.bash_profile文件中,配置Kafka系统的全局变量。具体操作命令如下。
# 编辑.bash_profile文件 [hadoop@dn1~]$ vi ~/.bash_profile # 添加如下内容 export KAFKA_HOME=/data/soft/new/kafka export PATH=$PATH:$KAFKA_HOME/bin # 保存并退出
接着,使用source命令使刚刚配置的环境变量立即生效:
# 使用source命令使配置立即生效 [hadoop@dn1~]$ source ~/.bash_profile
4.配置Kafka系统
配置单机模式的Kafka系统步骤比较简单,只需要在$KAFKA_HOME/conf/server.properties文件中做少量的配置即可。具体操作命令如下。
# 配置server.properties文件 [hadoop@dn1~]$ vi $KAFKA_HOME/conf/server.properties # 修改如下内容 broker.id=0 # 设置一个broker唯一ID log.dirs=/data/soft/new/kafka/data # 设置消息日志存储路径 zookeeper.connect=localhost:2181 # 指定Zookeeper的连接地址 # 然后保存并退出
5.启动Zookeeper
以Standalone模式启动Zookeeper进程,具体操作命令如下。
# 启动Standalone模式Zookeeper [hadoop@dn1~]$ zkServer.sh start # 查看Zookeeper状态 [hadoop@dn1~]$ zkServer.sh status # 终端会显示如下内容 JMX enabled by default Using config: /data/soft/new/zookeeper/bin/../conf/zoo.cfg Mode: standalone
6.启动Kafka单机模式
在当前主机上使用Kafka命令来启动Kafka系统,具体操作命令如下。
# 启动Kafka单机模式 [hadoop@dn1~]$ kafka-server-start.sh $KAFKA_HOME/conf/server.properties &
启动成功后,终端会打印出如图2-9所示信息。
图2-9 Kafka启动信息
2.3.2 分布式模式部署
在生产环境中,一般会以分布式模式来部署Kafka系统,以便组建集群。
在分布式模式中,不推荐使用Standalone模式的Zookeeper,这样具有一定的风险。如果使用的是Standalone模式的Zookeeper,则一旦Zookeeper出现故障则导致整个Kafka集群不可用。所以,一般在生产环境中会以集群的形式来部署ZooKeeper。
1.下载
和单机模式的下载步骤一致。
2.解压
可参考单机模式的解压模式和重命名方法。
3.配置Kafka全局变量
可参考单机模式的全局配置过程。
4.配置Kafka系统
在分布式模式下配置Kafka系统和单机模式不一致。打开$KAFKA_HOME/conf/server. properties文件,编辑相关属性,具体修改内容见代码2-1。
代码2-1 Kafka系统属性文件配置
# 设置Kafka节点唯一ID broker.id=0 # 开启删除Kafka主题属性 delete.topic.enable=true # 非SASL模式配置Kafka集群 listeners=PLAINTEXT://dn1:9092 # 设置网络请求处理线程数 num.network.threads=10 # 设置磁盘IO请求线程数 num.io.threads=20 # 设置发送buffer字节数 socket.send.buffer.bytes=1024000 # 设置收到buffer字节数 socket.receive.buffer.bytes=1024000 # 设置最大请求字节数 socket.request.max.bytes=1048576000 # 设置消息记录存储路径 log.dirs=/data/soft/new/kafka/data # 设置Kafka的主题分区数 num.partitions=6 # 设置主题保留时间 log.retention.hours=168 # 设置Zookeeper的连接地址 zookeeper.connect=dn1:2181, dn2:2181, dn3:2181 # 设置Zookeeper连接超时时间 zookeeper.connection.timeout.ms=60000
5.同步安装包
配置好一个主机上的Kafka系统后,使用Linux同步命令将配置好的Kafka文件夹同步到其他的主机上。具体操作命令如下。
#在/tmp目录中添加一个临时主机名文本文件 [hadoop@dn1~]$ vi /tmp/add.list # 添加如下内容 dn2 dn3 # 保存并退出 #使用scp命令同步Kafka文件夹到指定目录中 [hadoop@dn1~]$ for i in `cat /tmp/add.list`; do scp -r /data/soft/new/kafka $i:/data/soft/new; done
由于Kafka集群中每个代理(Broker)节点的ID必须唯一,所以同步完成后需要将其他两台主机上的broker.id属性值修改为1和2(或者是其他不重复的正整数)。
6.启动Zookeeper集群
在启动Kafka集群之前,需要先启动Zookeeper集群。
启动Zookeeper集群无须在每台主机上分别执行Zookeeper启动命令,只需执行分布式启动命令即可:
# 分布式命令启动Zookeeper [hadoop@dn1~]$ zk-daemons.sh start
7.启动Kafka集群
Kafka系统本身没有分布式启动Kafka集群的功能,只有单个主机节点启动Kafka进程的脚本。可以通过封装单个节点启动Kafka进程的步骤,来实现分布式启动Kafka集群,具体见代码2-2。
代码2-2 Kafka分布式启动
#! /bin/bash # 配置Kafka代理(Broker)地址信息 hosts=(dn1 dn2 dn3) for i in ${hosts[@]} do # 执行启动Kafka进程命令 ssh hadoop@$i "source /etc/profile; kafka-server-start.sh $KAFKA_HOME/config/server.properties" & done
8.验证
启动Kafka集群后,可以通过一些简单的Kafka命令来验证集群是否正常。具体如下。
# 使用list命令来展示Kafka集群的所有主题(Topic)名 [hadoop@dn1~]$ kafka-topics.sh --list -zookeeper dn1:2181, dn2:2181, dn3:2181
执行后,Linux终端会打印出所有的Kafka主题(Topic)名称,如图2-10所示。
图2-10 Kafka主题(Topic)名称预览
从图2-10中可以看出,除打印Kafka业务数据的主题(Topic)名称外,还打印出Kafka系统内部主题——_ _consumer_offsets,该主题用来记录Kafka消费者(Consumer)产生的消费记录,其中包含偏移量(Offset)、时间戳(Timestamp)和线程名等信息。
提示:
这里读者有一个大致的了解即可,后面的章会详细介绍Kafka系统的内部主题。
2.4 实例5:安装与配置Kafka监控工具
在实际业务场景中,需要频繁关注Kafka集群的运行情况。例如,查看集群的代理(Broker)节点健康状态、主题(Topic)列表、消费组(Consumer Group)列表、每个主题所对应的分区(Partition)列表等。
当业务场景并不复杂时,可以使用Kafka提供的命令工具,配合Zookeeper客户端命令来快速地实现。但是,随着业务场景的复杂化,消费组和主题的增加,再使用Kafka和Zookeeper命令监控则会增加维护的成本,这时Kafka监控系统便显得尤为重要。
实例描述
在Github开源社区中下载Kafka Eagle源代码,编译获取安装包,然后执行安装步骤,并观察执行结果。
2.4.1 获取并编译Kafka Eagle源代码
Kafka Eagle监控系统的源代码托管在Github上。
1.下载
打开浏览器,输入“https://github.com”进入Github官网,然后搜索“Kafka Eagle”关键字,获取具体下载地址为https://github.com/smartloli/kafka-eagle。
然后直接单击“Clone or download”按钮进行下载,将下载的kafka-eagle-master.zip文件上传到Linux服务器中。
提示:
也可以在Linux服务器上执行以下Git命令下载Kafka Eagle源代码:
[hadoop@dn1~]$ git clone https://github.com/smartloli/kafka-eagle
2.编译
Kafka Eagle是用Java语言开发的,通过Maven构建。Maven是对Java语言进行编译的一个工具。截止到本书编写完时,Kafka Eagle发布了1.2.1版本,支持在Mac、Linux和Windows环境下运行,同时兼容Kafka-0.8.x、Kafka-0.9.x、Kafka-0.10.x和Kafka-1.0.x及以上版本。
Kafka Eagle源代码编译在MacOS、Linux和Windows环境下均可操作。这里以MacOS环境来演示,具体操作命令如下。
# 进入kafka eagle目录 dengjiedeMacBook-Pro:workspace dengjie$ cd kafka-egale # 执行编译脚本 dengjiedeMacBook-Pro:kafka-egale dengjie$ ./build.sh
编译成功后,会在kafka-egale/kafka-eagle-web/target目录中生成打包好的压缩文件,编译结果如图2-11所示。
图2-11 编译Kafka Eagle的结果
2.4.2 安装与配置Kafka Eagle
1.解压缩安装并重命名
将编译好的kafka-eagle-web-1.2.1-bin.tar.gz安装包进行解压缩安装并重命名:
# 解压 [hadoop@dn1~]$ tar -zxvf kafka-eagle-web-1.2.1-bin.tar.gz # 重命名 [hadoop@dn1~]$ mv kafka-eagle-web-1.2.1 kafka-eagle
2.配置环境变量
在.bash_profile文件中配置KE_HOME环境变量:
# 编辑~/.bash_profile文件 [hadoop@dn1~]$ vi ~/.bash_profile # 添加如下内容 export KE_HOME=/data/soft/new/kafka-eagle export PATH=$PATH:$KE_HOME/bin # 保存并退出
然后使用source命令使配置的环境变量立即生效:
# 使用source命令 [hadoop@dn1~]$ source ~/.bash_profile
3.配置Kafka Eagle系统文件
进入$KE_HOME/conf目录中,编辑system-config.properties配置文件,配置内容见代码2-3。
代码2-3 Kafka Eagle配置文件
###################################### # 设置Kafka多集群的Zookeeper地址 ###################################### kafka.eagle.zk.cluster.alias=cluster1 cluster1.zk.list=dn1:2181, dn2:2181, dn3:2181 #cluster2.zk.list=tdn1:2181, tdn2:2181, tdn3:2181 ###################################### # 配置Zookeeper连接池大小 ###################################### kafka.zk.limit.size=25 ###################################### # 浏览器访问Kafka Eagle的端口地址 ###################################### kafka.eagle.webui.port=8048 ###################################### # Kafka的消费信息是否存储在Topic中 ###################################### kafka.eagle.offset.storage=kafka ###################################### # 配置邮件告警服务器 ###################################### kafka.eagle.mail.enable=false kafka.eagle.mail.sa=alert_sa kafka.eagle.mail.username=alert_sa@126.com kafka.eagle.mail.password=123456 kafka.eagle.mail.server.host=smtp.126.com kafka.eagle.mail.server.port=25 ###################################### # 管理员删除Topic的口令 ###################################### kafka.eagle.topic.token=keadmin ###################################### # 是否开启Kafka SASL安全认证 ###################################### kafka.eagle.sasl.enable=false kafka.eagle.sasl.protocol=SASL_PLAINTEXT kafka.eagle.sasl.mechanism=PLAIN kafka.eagle.sasl.client=/data/soft/new/kafka-eagle/conf/kafka_client_jaas.conf ###################################### # Kafka Eagle数据存储到MySQL ###################################### #kafka.eagle.driver=com.mysql.jdbc.Driver #kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke? useUnicode=true&characterEncoding=U TF-8&zeroDateTimeBehavior=convertToNull #kafka.eagle.username=root #kafka.eagle.password=123456 ###################################### # Kafka Eagle数据默认存储到Sqlite ###################################### kafka.eagle.driver=org.sqlite.JDBC kafka.eagle.url=jdbc:sqlite:/data/soft/new/kafka-eagle/db/ke.db kafka.eagle.username=root kafka.eagle.password=root
4.启动Kafka Eagle系统
配置完成后,在Linux控制台执行启动命令:
# Kafka Eagle通过ke.sh脚本来控制系统,参数有启动(start)、停止(stop)、重启(restart)等 [hadoop@dn1~]$ ke.sh start
启动成功后,控制台会打印出对应的日志信息,如图2-12所示。
图2-12 启动Kafka Eagle系统
控制台日志中显示了一个用户名为admin、密码为123456的账号,可用于登录系统。
5.预览
(1)在浏览器中输入http://dn1:8048/ke,访问Kafka Eagle系统,之后按要求输入用户名和密码,如图2-13所示。输入正确的用户名和密码后单击“Signin”按钮。
图2-13 Kafka Eagle登录界面
(2)进入Kafka Eagle系统主界面中,如图2-14所示。
图2-14 Kafka Eagle主界面
6.停止Kafka Eagle系统
停止Kafka Eagle系统的方式有两种:① 通过执行$KE_HOME/bin/ke.sh脚本来停止;② 通过Linux操作系统的kill命令来直接停止。
(1)通过指定stop参数来停止Kafka Eagle系统:
# 停止Kafka Eagle系统 [hadoop@dn1~]$ ke.sh stop
(2)通过Linux操作系统的kill命令停止Kafka Eagle系统:
# 使用kill直接停止Kafka Eagle系统 [hadoop@dn1~]$ kill -9 `ps -fe |grep Bootstrap | grep kafka-eagle | awk -F ' ' '{print $2}'`
2.5 实例6:编译Kafka源代码
在学习Kafka技术时,阅读Kafka的源代码是很有必要的。在实际生产环境中,Kafka系统可能会随业务场景的复杂化、数据量的增加等出现异常,要修复这类异常问题则需要打补丁,然后重新编译Kafka源代码。
本节将介绍在MacOS操作系统下编译Kafka源代码。在其他操作系统中,编译Kafka源代码的过程基本类似,只不过是环境变量配置有所区别。
Kafka系统的核心模块是使用Scala语言编写的,所以,可以使用Gradle工具进行编译和构建。
实例描述
编译Kafka源代码需要准备如下环境:(1)安装与配置Scala运行环境;(2)安装与配置Gradle。同时,执行2.5.3小节的编译步骤,观察执行结果。
2.5.1 安装与配置Scala运行环境
本书所使用的Kafka版本是0.10.2.0,从Kafka官方网站可知,该版本所需要的Scala版本在2.10以上。本书中所选择的Kafka安装包是kafka_2.11-0.10.1.1.tgz,所以,这里选择最新的Scala-2.12版本进行安装与配置,如图2-15所示。
图2-15 Kafka系统依赖的Scala版本
1.下载Scala安装包
访问Scala官方网站,获取软件包下载地址:http://www.scala-lang.org/download,然后选择对应的安装包进行下载,如图2-16所示
图2-16 下载Scala安装包
2.安装Scala
下载完成后,将Scala安装包解压缩到指定目录进行安装,具体操作命令如下。
# 这里解压缩到MacOS操作系统的指定目录 dengjiedeMacBook-Pro:~ dengjie$ tar -zxvf scala-2.12.3.tgz # 将Scale-2.12.3移动到/usr/local目录下,并重命为Scale dengjiedeMacBook-Pro:~ dengjie$ mv scala-2.12.3 /usr/local/scala
3.配置Scala环境变量
完成安装后,在.bash_profile文件中配置Scala运行环境变量,具体操作命令如下。
# 打开~/.bash_profile文件 dengjiedeMacBook-Pro:~ dengjie$ vi ~/.bash_profile # 添加如下内容 export SCALA_HOME=/usr/local/scala export PATH=$PATH:$SCALA_HOME/bin # 保存并退出
然后用source命令使配置的环境变量立即生效,具体操作命令如下。
# 使用source命令 dengjiedeMacBook-Pro:~ dengjie$ source ~/.bash_profile
4.验证
安装与配置好Scala环境后,在操作系统终端中输入Scala命令来验证环境是否配置成功。具体操作命令如下。
# 输入版本验证命令 dengjiedeMacBook-Pro:~ dengjie$ scala -version
如打印出如图2-17所示的信息,则表示安装与配置成功。
图2-17 Scala版本信息
2.5.2 安装与配置Gradle
通过浏览器访问Gradle的官方地址https://gradle.org/install,获取MacOS操作系统安装与配置Gradle的方法。
通过以下命令可以一键完成Gradle的安装与配置。
# 使用brew来进行一键安装与配置 dengjiedeMacBook-Pro:~ dengjie$ brew install gradle
安装与配置完成后,在操作系统终端中输入以下Gradle命令来进行版本验证。
# 输入版本验证命令 dengjiedeMacBook-Pro:~ dengjie$ gradle -version
如打印出如图2-18所示的信息,则表示安装与配置成功。
图2-18 Gradle版本信息
2.5.3 了解Kafka源代码的编译过程
访问Kafka官网地址http://kafka.apache.org/downloads下载Kafka源代码压缩包。本书使用的Kafka版本是kafka-0.10.2.0,这里选择kafka-0.10.2.0-src.tgz压缩包。将下载的Kafka源代码解压到MacOS操作系统的指定目录,Kafka源代码目录结构如图2-19所示。
图2-19 Kafka源代码目录结构
1.离线下载依赖包
如果编译环境网络状况不好,则在执行编译命令之前可以先下载核心依赖包gradle-3.3-all.zip,然后再将它移到gradle/wrapper/目录中,最后修改gradle/wrapper/gradle-wrapper.properties。
具体修改内容见代码2-4。
代码2-4 修改编译配置文件
#Wed May 10 10:25:30 CST 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists # 上面内容保持不变,修改Gradle核心依赖包路径地址 distributionUrl=gradle-3.3-all.zip
提示:
gradle-3.3-all.zip包的下载地址为https://services.gradle.org/distributions。
2.在线编译Kafka源代码
如果编译网络环境状态良好,则无需配置任何配置文件,直接执行以下编译命令即可。
# 先清理无效文件,然后再执行编译命令 dengjiedeMacBook-Pro:kafka dengjie$ ./gradlew clean && ./gradlew releaseTarGz
成功执行上述编译命令后,会自动下载gradle-3.3-bin.zip依赖包,下载完成后会存放在/Users/dengjie/.gradle/wrapper/dists目录中。下载过程所需的时间,完全取决于当时的网络状况。
编译成功后,操作系统控制台会打印出如图2-20所示的信息。
图2-20 成功编译Kafka源代码
编译之后的Kafka二进制压缩包文件,会自动存放在core/build/distributions目录中。这里的压缩包文件和在Kafka官网上提供的一样,读者可以直接使用编译好的Kafka安装包。
2.6 实例7:将Kafka源代码导入编辑器
在实际应用场景中,可能会遇到一些Kafka异常问题,需要阅读Kafka源代码来分析异常问题产生的原因。
如果直接打开Kafka源代码查看,则阅读起来会很不方便,所以需要借助代码编辑器来阅读Kafka源代码。这里列举两种常见的代码编辑器——IntelliJ IDEA和Eclipse。可以通过访问各自的官方网站来获取软件安装包,其下载地址见表2-2。
表2-2 代码编辑器下载地址
实例描述
从表2-2中获取编辑器安装包,按照下列两种情况将Kafka源代码导入:(1)在IntelliJ IDEA编辑器中导入Kafka源代码;(2)在Eclipse编辑器中导入Kafka源代码。
2.6.1 导入IntelliJ IDEA编辑器
IntelliJ IDEA简称IDEA,是Java语言开发的集成环境。它在智能代码提示、重构、版本控制工具(如Git、SVN等)、代码分析等方面的功能非常完善。
IDEA是JetBrains公司的产品,目前分为旗舰版和社区版。
· 旗舰版包含所有功能,但是需要付费购买;
· 社区版属于免费产品,功能较少,但对于阅读Kafka源代码来说已足够了。
1.将Kafka源代码转成IDEA结构
Kafka源代码中提供了Gradle工具,可以很方便地将Kafka源代码转换成IDEA结构。只需执行一条转换命令即可,具体操作命令如下。
# 进入Kafka源代码目录,然后执行下列命令 dengjiedeMacBook-Pro:kafka dengjie$ gradle idea
执行命令后,如果转换成功,则控制台会打印出如图2-21所示的信息。
图2-21 转成IDEA结构
之后,在Kafka源代码目录会生产三个文件——kafka.iml、kafka.iws和kafka.ipr,如图2-22所示。
图2-22 IDEA结构文件
2.导入Kafka代码
打开IDEA社区版代码编辑器,然后选择菜单“File”-“Open”命令,在弹出的对话框中选中Kafka源代码目录并单击“Open”按钮,弹出如图2-23所示对话框供用户选择。
图2-23 IDEA代码编辑器对话框
单击“New Window”按钮,表示重新在一个新的IDEA编辑器窗口中导入Kafka源代码。之后等待代码编辑器自动下载需要的依赖包。初始化完成后会出现如图2-24所示的结果。
图2-24 IDEA完成代码导入
2.6.2 导入Eclipse编辑器
Eclipse是一款著名的跨平台开源集成开发环境,最开始主要用于Java语言开发。通过安装不同的插件,Eclipse可以支持不同的计算机编程语言,比如Scala、C++、Python等。
Eclipse代码编辑器的所有功能都是免费的。使用它,无论开发项目功能,还是阅读源代码都不错。
访问Scala-IDE官网网站 http://scala-ide.org/download/sdk.html,获取“Mac OS X Cocoa 64 bit”软件安装包,如图2-25所示。
图2-25 Scala IDE for Eclipse下载
提示:
可根据实际的操作系统来选择软件安装包。本书的环境是Mac操作系统,故选择Mac操作系统的软件安装包。
1.将Kafka源代码转成Eclipse结构
Kafka源代码中提供了Gradle工具,它可以很方便地将Kafka源代码转换成Eclipse结构。只需执行以下换命令。
# 进入Kafka源代码目录,然后执行下列命令 dengjiedeMacBook-Pro:kafka dengjie$ gradle eclipse
如果转换成功,则控制台会打印出如图2-26所示的结果。
图2-26 转成Eclipse结构
2.导入Kafka代码
(1)打开“Scala IDE for Eclipse”代码编辑器,然后选择菜单“File”-“Import”命令,在弹出的对话框中找到“Gradle”选项并展开选择“Existing Gradle Project”选项,然后单击“Next”按钮,如图2-27所示。
图2-27 选择“Gradle”选项
(2)弹出一个对话框,提示需要选择已存在的Gradle项目,单击“Browser”按钮并选择对应的Kafka源代码目录,单击“Finish”按钮,如图2-28所示。
图2-28 选择Kafka源代码
(3)代码编辑器开始自行下载依赖包,在下载完成后会出现如图2-29所示的结果。
图2-29 完成代码导入
2.7 了解元数据的存储分布
在Kafka系统中,核心组件的元数据信息均存储在Zookeeper系统中。这些元数据信息具体包含:控制器选举次数、代理节点和主题、配置、管理员操作、控制器。它们在Zookeeper系统中的分布如图2-30所示。
图2-30 Kafka元数据在Zookeeper系统中的分布
1.控制器选举次数
在Kafka系统中,控制器每进行一次选举次数,都会在Zookeeper系统/controller_epoch节点下进行记录,该值为一个数字。在Kafka集群中,第一个代理节点(Broker)启动时,该值为1。
在Kafka集群中,如果遇到代理节点宕机或者变更,则Kafka集群会重新选举新的控制器。每次控制器发生变化时,Zookeeper系统/controller_epoch节点中的值就会加1。
提示:
在Zookeeper系统中,元数据存储的格式为“英文斜杠+英文名称”,例如:/admin。通常会将这种存储类型称之为节点,如“/admin节点”。
2.代理节点和主题
· 在Zookeeper系统/brokers节点中,存储着Kafka代理节点和主题的元数据信息。
· 在Zookeeper系统/brokers/ids节点中,存储着代理节点的ID值。
· 在Zookeeper系统/brokers/topics节点中,存储着主题和分区的元数据信息。
3.配置
在Kafka系统中,修改主题属性这类操作会被存储到Zookeeper系统/config节点中。/config节点主要包含以下三个子节点。
· topic:存储着Kafka集群主题的额外属性,比如修改过主题的属性操作;
· client:存储着客户端和主题配置信息,包含消费者应用和生产者应用;
· changes:存储着修改信息。
4.管理员操作
在执行管理员操作(比如删除、分配等)时,在Zookeeper系统/admin节点会生成相应的子节点,内容如下。
· delete_topics:存储着待删除主题名的标记;
· reassign_partitions:存储着重新分配分区操作的命令;
· preferred_replica_election:存储着恢复Leader分区平衡操作的命令。
5.控制器
在Kafka系统正常运行时,在Zookeeper系统/controller节点下会存储一个Kafka代理节点的ID值。该ID值与Kafka代理节点ID相同,表示代理节点上存在控制器功能。
2.8 了解控制器的选举流程
控制器,其实就是Kafka系统的一个代理节点。它除具有一般代理节点的功能外,还具有选举主题分区Leader节点的功能。
提示:
只有当代理节点上存在控制器时才具有这种功能。
在启动Kafka系统时,其中一个代理节点(Broker)会被选举为控制器,负责管理主题分区和副本状态,还会执行分区重新分配的管理任务。
在Kafka系统运行过程中,如果当前的控制器出现故障导致不可用,则Kafka系统会从其他正常运行的代理节点中重新选举出新的控制器。
2.8.1 了解控制器的启动顺序
在Kafka集群中,每个代理节点(Broker)在启动时会实例化一个KafkaController类。该类会执行一系列业务逻辑,选举出主题分区的Leader节点。具体选举主题分区Leader节点的步骤如下。
(1)第一个启动的代理节点,会在Zookeeper系统里面创建一个临时节点/controller,并写入该节点的注册信息,使该节点成为控制器。
(2)其他的代理节点陆续启动时,也会尝试在Zookeeper系统里面创建/controller节点。但由于/controller节点已经存在,所以会抛出“创建/controller节点失败异常”的信息。创建失败的代理节点会根据返回的结果,判断出在Kafka集群中已经有一个控制器被成功创建了,所以放弃创建/controller节点。这样确保了Kafka集群控制器的唯一性。
(3)其他的代理节点,会在控制器上注册相应的监听器。各个监听器负责监听各自代理节点的状态变化,当监听到节点状态发生变化时,会触发相应的监听函数进行处理。
1.查看控制器创建的优先级
控制器创建的优先级是按照Kafka系统代理节点成功启动的顺序来创建的。用户可以通过改变Kafka系统代理节点的启动顺序,来查看控制器的创建优先级。具体操作命令如下所示:
(1)启动Kafka集群的各节点。
顺序依次是:dn1、dn2、dn3。脚本内容见代码2-5。
代码2-5 按照执行顺序启动Kafka集群
#! /bin/bash # Kafka代理节点地址,按照指定顺序启动 hosts=(dn1 dn2 dn3) # 打印启动分布式脚本信息 mill=`date "+%N"` tdate=`date "+%Y-%m-%d %H:%M:%S, ${mill:0:3}"` echo [$tdate] INFO [Kafka Cluster] begins to execute the $1 operation # 执行分布式开启命令 function start() { for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S, ${smill:0:3}"` ssh hadoop@$i "source /etc/profile; echo [$stdate] INFO [Kafka Broker $i] begins to execute the startup operation.; kafka-server-start.sh $KAFKA_HOME/config/server.properties>/dev/null" & sleep 1 done } # 执行分布式关闭命令 function stop() { for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S, ${smill:0:3}"` ssh hadoop@$i "source /etc/profile; echo [$stdate] INFO [Kafka Broker $i] begins to execute the shutdown operation.; kafka-server-stop.sh>/dev/null; " & sleep 1 done } # 查看Kafka代理节点状态 function status() { for i in ${hosts[@]} do smill=`date "+%N"` stdate=`date "+%Y-%m-%d %H:%M:%S, ${smill:0:3}"` ssh hadoop@$i "source /etc/profile; echo [$stdate] INFO [Kafka Broker $i] status message is :; jps | grep Kafka; " & sleep 1 done } # 判断输入的Kafka命令参数是否有效 case "$1" in start) start ;; stop) stop ;; status) status ;; *) echo "Usage: $0 {start|stop|status}" RETVAL=1 esac
然后,在Zookeeper系统中查看/controller临时节点的内容,具体操作命令如下。
# 进入Zookeeper集群 [hadoop@dn1 bin]$ zkCli.sh -server dn1:2181 # 执行查看命令 [zk: dn1:2181(CONNECTED) 1] get /controller
执行上述命令后,可以看到代理节点0(即dn1节点)上成功创建了控制器。输出结果如图2-31所示。
图2-31 控制器内容结果
提示:
本书的Kafka集群配置的代理节点ID分别是0、1、2,它们分别对应的主机名是dn1、dn2、dn3。
(2)修改Kafka集群节点启动顺序。
新的启动顺序为:dn3、dn1、dn2。修改代码2-5中第4行内容,变更信息如下。
# 修改代码2-5中第4行内容 hosts=(dn3 dn1 dn2)
然后,执行该脚本再次重启Kafka集群。重启命令如下所示。
# 重启Kafka集群命令 # 先执行停止命令 [hadoop@dn1 bin]$ kafka-daemons.sh stop # 然后执行启动命令 [hadoop@dn1 bin]$ kafka-daemons.sh start
接着,在Zookeeper系统中执行“get /controller”命令,查看输出结果。可以看到代理节点2(即dn3节点)上成功创建了控制器,如图2-32所示。
图2-32 修改启动顺序后的控制器内容
2.切换控制器所属的代理节点
当控制器被关闭或者与Zookeeper系统断开连接时,Zookeeper系统上的临时节点就会被清除。Kafka集群中的监听器会接收到变更通知,各个代理节点会尝试到Zookeeper系统中创建一个控制器的临时节点。第一个成功在Zookeeper系统中创建的代理节点,将会成为新的控制器。每个新选举出来的控制器,会在Zookeeper系统中获取一个递增的controller_epoch值。
为了观察选举变化过程,可以先将dn3代理节点的Kafka进程停止,让Kafka集群中的控制器处理关闭状态。具体操作命令如下。
# 使用kill命令关闭Kafka进程 [hadoop@dn3 bin]$ kill -9 `ps -fe | grep kafka | grep server | awk -F ' ' '{print $2}'`
然后,在Zookeeper系统中执行查看命令,输出结果如图2-33所示。与图2-32中的结果对比,控制器已经从dn3节点切换到dn2节点了。
图2-33 新的控制器内容
接着,去查看controller_epoch的值是否有增加。操作命令如下。
# 进入Zookeeper集群中 [hadoop@dn1 bin]$ zkCli.sh -server dn1:2181 # 执行查看命令 [zk: dn1:2181(CONNECTED) 1] get /controller_epoch
执行上述命令后,可以看到选举次数已经累加了一次,如图2-34所示。
图3-34 查看选举次数
2.8.2 了解主题分区Leader节点的选举过程
选举控制器的核心思想是:各个代理节点公平竞抢在Zookeeper系统中创建/controller临时节点,最先创建成功的代理节点会成为控制器,并拥有选举主题分区Leader节点的功能。
整个控制器的选举流程如图2-35所示。
图2-35 控制器选举流程图
从图2-35中可知,当Kafka系统实例化KafkaController类时,主题分区Leader节点的选举流程便会开始。其中涉及的核心类包含KafkaController类、ZookeeperLeaderElector类、LeaderChangeListener类、SessionExpirationListener类。
下面围绕这四个核心的类,详细介绍Kafka系统控制器Leader选举流程。
1.了解KafkaController类的作用
KafkaController类在实例化ZookeeperLeaderElector类时,分别设置了两个关键的回调函数——onControllerFailover和onControllerResignation。具体实现见代码2-6。
代码2-6 KafkaController实现代码
class KafkaController (val config: KafkaConfig, zkUtils: ZkUtils, val brokerState: BrokerState, time: Time, metrics: Metrics, threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup{ private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, onControllerResignation, config.brokerId, time) // 初始化ZK选举实例 /** 准备选举 */ def startup() = { inLock(controllerContext.controllerLock) { info("Controller starting up") // 打印开始选举信息 registerSessionExpirationListener() // 注册会话过期监听器 isRunning = true // 标记正常运行 controllerElector.startup // 开始执行选举逻辑 info("Controller startup complete") // 完成所有选举操作后,打印选举完成日志 } } }
在onControllerFailover回调函数中初始化Leader依赖模块,包括在Zookeeper系统中递增控制器选举次数。
当Kafka系统当前代理节点不再是Leader角色时,会触发onControllerResignation回调函数重新进行注册选举。KafkaController类启动后,会向Zookeeper系统注册会话超时监听器,并尝试选举Leader。
2.了解ZookeeperLeaderElector类的作用
ZookeeperLeaderElector类实现了主题分区的Leader节点选举功能,但是它并不会处理“代理节点与Zookeeper系统之间出现会话超时”这种情况。
ZookeeperLeaderElector类主要负责创建元数据存储路径、实例化变更监听器等,并通过订阅数据变更监听器来实时监听数据的变化,进而开始执行选举Leader的逻辑。具体实现见代码2-7。
代码2-7 ZookeeperLeaderElector实现代码
class ZookeeperLeaderElector(controllerContext: ControllerContext, electionPath: String, onBecomingLeader: () => Unit, onResigningAsLeader: () => Unit, brokerId: Int, time: Time) extends LeaderElector with Logging { var leaderId = -1 // 如果不存在,则在Zookeeper系统上创建一个选举路径 val index = electionPath.lastIndexOf("/") if (index > 0) controllerContext.zkUtils. makeSurePersistentPathExists(electionPath.substring(0, index)) // 实例化一个Leader变更监听器 val leaderChangeListener = new LeaderChangeListener /** 开始选举 */ def startup { // 加锁 inLock(controllerContext.controllerLock) { controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) elect // 执行选举 } } }
ZookeeperLeaderElector类完成选举前的准备工作后,开始执行startup()函数来订阅数据变化监听器,同时调用elect方法来执行选举Leader的逻辑。
通常情况下,触发执行elect方法的条件有以下几点:
· 代理节点启动。
· 在上一次创建临时节点成功后,由于网络原因或服务器故障等导致连接中断。然后调用resign()函数并删除Zookeeper系统中的/controller节点。
· 在上一次创建临时节点成功后,由于网络原因或服务器故障等导致连接中断。再次进入elect方法,发现Kafka系统中已经有代理节点成了Leader。
· 在上一次创建临时节点成功后,在执行onBecomingLeader()函数时抛出了异常信息,执行业务逻辑后,再尝试选举Leader。
elect方法的具体实现逻辑见代码2-8。
代码2-8 elect实现逻辑
def elect: Boolean = { val timestamp = time.milliseconds.toString val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp)) // 获取LeadedID leaderId = getControllerID // 判断LeadedID if(leaderId ! = -1) { debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId)) return amILeader } try { // 实例化ZKCheckedEphemeral val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath, electString, controllerContext.zkUtils.zkConnection.getZookeeper, JaasUtils.isZkSecurityEnabled()) zkCheckedEphemeral.create() // 创建临时节点/controller info(brokerId + " successfully elected as leader") leaderId = brokerId onBecomingLeader() // 成为Leader } catch { // 异常处理 case _: ZkNodeExistsException => leaderId = getControllerID if (leaderId ! = -1) debug("Broker %d was elected as leader instead of broker %d".format(leaderId, brokerId)) else warn("A leader has been elected but just resigned, this will result in another round of election") case e2: Throwable => error("Error while electing or becoming leader on broker %d".format(brokerId), e2) resign() } amILeader } // 关闭 def close = { leaderId = -1 } def amILeader : Boolean = leaderId == brokerId // 放弃Leader选举,并删除临时节点 def resign() = { leaderId = -1 controllerContext.zkUtils.deletePath(electionPath) }
在Zookeeper系统中,创建临时节点/controller时,如果产生ZkNodeExistsException类异常,则说明Kafka系统中已经有代理节点成了Leader。
而如果是执行onBecomingLeader()方法出现异常,则说明初始化Leader的相关模块存在问题。若是初始化失败,则调用resign()函数删除Zookeeper系统/controller节点上的数据。Zookeeper系统中的/controller节点被删除,会触发LeaderChangeListener监听器尝试重新选举Leader,这样避免了Kafka系统中控制器无Leader的问题。
3.了解LeaderChangeListener类的作用
如果节点数据发生变化,则Kafka系统中的其他代理节点可能已经成为Leader,接着Kafka控制器会调用onResigningAsLeader()函数。
当Kafka代理节点宕机或者被人为误删除时,则处于该节点上的Leader会被重新选举。通过调用onResigningAsLeader()函数重新选择其他正常运行的代理节点成为新的Leader,具体实现见代码2-9。
代码2-9 LeaderChangeListener实现类
class LeaderChangeListener extends IZkDataListener with Logging { /** 回调函数,处理数据变更 */ @throws[Exception] def handleDataChange(dataPath: String, data: Object) { val shouldResign = inLock(controllerContext.controllerLock) { val amILeaderBeforeDataChange = amILeader leaderId = KafkaController.parseControllerId(data.toString) info("New leader is %d".format(leaderId)) // 如果旧的Leader不再是Leader就会重新被选举 amILeaderBeforeDataChange && ! amILeader } if (shouldResign) onResigningAsLeader() } /** 删除临时节点 */ @throws[Exception] def handleDataDeleted(dataPath: String) { val shouldResign = inLock(controllerContext.controllerLock) { debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader" .format(brokerId, dataPath)) amILeader } if (shouldResign) onResigningAsLeader() inLock(controllerContext.controllerLock) { elect } } }
在选举的过程中,执行elect方法中的实现逻辑时会调用onBecomingLeader方法。该方法相当于KafkaController类中的onControllerFailover方法,也是用于选举当前代理节点作为新的控制器。
当Kafka集群中的某个代理节点成为新的Leader后,会初始化Leader的所有功能模块,例如注册分区监听器、注册副本监听器、注册分区状态机、注册副本状态机等。
在执行数据变更监听器逻辑时,会调用onResigningAsLeader方法。该方法相当于KafkaController类中的onControllerResignation方法,也是用来重新分配控制器,是控制器内部清理数据结构所必须的步骤。
4. SessionExpirationListener类的作用
当Kafka系统的代理节点和Zookeeper系统建立连接后,SessionExpirationListener中的handleNewSession()函数会被调用。对于Zookeeper系统中会话过期的连接,会先进行一次判断:
· 如果当前的控制器ID和代理节点ID相同,则Kafka会跳过重新选举的环节。
· 如果当前控制器的ID和代理节点ID不同,则Kafka会关闭当前的控制器,然后尝试重新选举。
具体实现见代码2-10。
代码2-10 SessionExpirationListener实现类
class SessionExpirationListener() extends IZkStateListener with Logging { this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], " /** 会话过期,重新建立连接 */ @throws[Exception] def handleNewSession() { info("ZK expired; shut down all controller components and try to re-elect") // 判断控制器ID和代理节点ID是否相同 if (controllerElector.getControllerID() ! = config.brokerId) { onControllerResignation() inLock(controllerContext.controllerLock) { controllerElector.elect } } else { info("ZK expired, but the current controller id %d is the same as this broker id, skip re-elect".format(config.brokerId)) } } }
2.8.3 了解注册分区和副本状态机
Kafka系统的控制器主要负责管理主题、分区和副本。
Kafka系统在操作主题、分区和副本时,控制器会在Zookeeper系统的/brokers/topics节点,以及其子节点路径上注册一系列的监听器。
使用Kafka应用接口或者是Kafka系统脚本创建一个主题时,服务端会将创建后的结果返回给客户端。当客户端收到创建成功的提示时,其实服务端并没有实际创建主题,而只是在Zookeeper系统的/brokers/topics节点中创建了该主题对应的子节点名称。
之后,服务端以异步的方式来创建主题。当服务端完成主题创建操作后,可以去$KAFKA_HOME/data中查看实际数据,或者访问Zookeeper系统查看元数据。
例如,查看Zookeeper系统/brokers/topics节点中的一个主题,操作命令如下。
# 查看/brokers/topics节点中的主题[zk: dn1:2181(CONNECTED) 28] get /brokers/topics/ip_login
执行上述命令后,输出结果如图2-36所示。
图2-36 查看主题分区和副本
通过上述命令可以查看指定主题的分区和副本分配信息,以及每个分区上的Leader所在代理节点。
提示:
主题元数据信息是通过分区索引值和代理节点ID来表示的。例如"4":[2,1,0]表示的是,分区索引值为4, Leader所在代理节点ID为2。其他分区上的Leader所在的节点以此类推。
代理节点调用onBecomingLeader()函数实际上调用的是onControllerFailover()函数,所以在控制器调用onControllerFailover()函数时,会在初始化阶段分别创建分区状态机和副本状态机。具体实现见代码2-11。
代码2-11 创建分区和副本监听器
def onControllerFailover() { if(isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) readControllerEpochFromZookeeper() incrementControllerEpoch(zkUtils.zkClient) // 在/brokers/topics节点注册监听器 registerReassignedPartitionsListener() registerIsrChangeNotificationListener() registerPreferredReplicaElectionListener() partitionStateMachine.registerListeners() // 注册分区状态机 replicaStateMachine.registerListeners() // 注册副本状态机 initializeControllerContext() // 在控制器初始化之后,在状态机启动之前,需要发送更新元数据请求 sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) replicaStateMachine.startup() // 启动副本状态机 partitionStateMachine.startup() // 启动分区状态机 // 在自动故障转移中为所有主题注册分区更改监听器 controllerContext.allTopics.foreach(topic => partitionStateMachine. registerPartitionChangeListener(topic)) info("Broker %d is ready to serve as the new controller with epoch %d". format(config.brokerId, epoch)) maybeTriggerPartitionReassignment() maybeTriggerPreferredReplicaElection() if (config.autoLeaderRebalanceEnable) { info("starting the partition rebalance scheduler") autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS) } deleteTopicManager.start() } else info("Controller has been shut down, aborting startup/failover") }
主题的分区状态机通过registerListeners()函数,在Zookeeper系统中的/brokers/topics节点上注册了TopicChangeListener和DeleteTopicListener两个监听器。
主题的副本状态机通过registerListeners()函数,在Zookeeper系统中/brokers/ids节点上注册了一个代理节点监听器BrokerChangeListener。通过BrokerChangeListener监听器来监听/brokers/ids子节点下的变化。
创建一个主题时,主题信息、主题分区和副本会被写到Zookeeper系统的/brokers/topics节点中,这会触发分区和副本状态机注册监听器。
2.8.4 了解分区自动均衡和分区重新分配
Kafka系统在启动时,会通过控制器来管理主题分区自动平衡,以及重新分配分区。
1.了解分区自动平衡
在初始化控制器时,在onControllerFailover()函数中如果读取到“auto.leader.rebalance.enable”的属性值为true,则会开启分区自动均衡。
可以通过调用checkAndTriggerPartitionRebalance()函数来实现,见代码2-12。
代码2-12 分区自动均衡触发器
private def checkAndTriggerPartitionRebalance(): Unit = { if (isActive) { trace("checking need to trigger partition rebalance") // 获取所有在线的代理节点 var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null inLock(controllerContext.controllerLock) { preferredReplicasForTopicsByBrokers = controllerContext.partitionReplicaAssignment.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy { case (_, assignedReplicas) => assignedReplicas.head } } debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers) // 遍历所有代理节点,判断是否需要触发副本机制 preferredReplicasForTopicsByBrokers.foreach { case(leaderBroker, topicAndPartitionsForBroker) => { var imbalanceRatio: Double = 0 var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null inLock(controllerContext.controllerLock) { topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, _) => controllerContext.partitionLeadershipInfo.contains(topicPartition) && controllerContext.partitionLeadershipInfo(topicPartition). leaderAndIsr.leader ! = leaderBroker } debug("topics not in preferred replica " + topicsNotInPreferredReplica) val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker trace("leader imbalance ratio for broker %d is %f" .format(leaderBroker, imbalanceRatio)) } // 检查均衡比率。如果大于阈值,则触发重新均衡 if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) { topicsNotInPreferredReplica.keys.foreach { topicPartition => inLock(controllerContext.controllerLock) { // 当代理节点处于存活状态,并且没有分区被重新分配以及副本没有优先选择操作,才会触发检 查操作 if (controllerContext.liveBrokerIds.contains(leaderBroker) && controllerContext.partitionsBeingReassigned.isEmpty && controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty && !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) && controllerContext.allTopics.contains(topicPartition.topic)) { onPreferredReplicaElection(Set(topicPartition), true) } } } } } } } }
分区自动均衡是将分区的优先副本选择为Leader。如果分区副本是通过Kafka系统自动分配的,则会确保分区的副本被分配在不同的代理节点上。
提示:
优先副本是指排在最前面的副本。例如,"4":[2,1,0],分区索引值为4,副本分别是2、1、0,而副本节点ID为2的排在最前面,那么它会被优先选择为Leader。
在初始化控制器时,会判断每个代理节点的分区是否均衡,通过“leader.imbalance. per.broker.percentage”属性值来判断,默认值是10%。
判别主题分区不均衡方法是:每个代理节点上的分区Leader非优先副本的总数与该代理节点上分区总数的比值大于均衡阈值,则判断主题分区不均衡。具体计算公式如下。
# 计算不均衡的公式 比值 = 每个代理节点上分区Leader非优先副本总数 / 该代理节点上分区的总数
如果比值imbalanceRatio超过默认值的10%,则触发自动均衡操作。通过调用onPreferredReplicaElection()函数执行优先选择副本操作,让优先选择副本成为分区的Leader,这样就能实现分区自动均衡功能。
2.了解分区重新分配
通过Kafka Eagle监控工具可以直接查看主题(ip_login)的分区及副本详情,如图2-37所示。
图2-37 主题分区和副本详情
当客户端修改主题(ip_login)的分区时,会在Zookeeper系统的/admin节点下创建一个reassign_partitions子节点,分区和副本的分配策略会被写入/admin/reassign_partitions节点中。正常情况下,这个过程执行得很快,分区重新配置操作完成后,reassign_partitions节点会被自动删除。
/admin/reassign_partitions节点的数据发生更新会触发PartitionsReassignedListener监听器来完成一系列的检测处理。最后调用KafkaController.onPartitionReassignment()函数来完成分区的重新分配操作。具体实现见代码2-13。
代码2-13 实现重新分配分区
def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) { val reassignedReplicas = reassignedPartitionContext.newReplicas if (! areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) { info("New replicas %s for partition %s being " .format(reassignedReplicas.mkString(", "), topicAndPartition) + "reassigned not yet caught up with the leader") val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet // 1. 在Zookeeper上更新AR updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq) // 2. 发送LeaderAndIsr请求给每个副本 updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition), newAndOldReplicas.toSeq) // 3. 给重新分配的分区开启新的副本 startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList) info("Waiting for new replicas %s for partition %s being " .format(reassignedReplicas.mkString(", "), topicAndPartition) + "reassigned to catch up with the leader") } else { // 4. 等待所有的副本与Leader完成同步 val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition) .toSet -- reassignedReplicas.toSet // 5. 重新分配副本 reassignedReplicas.foreach { replica => replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica( topicAndPartition.topic, topicAndPartition.partition, replica)), OnlineReplica) } // 6. 将AR设置为内存中RAR // 7. 发送一个新的Leader和新的AR moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext) // 8. 清理离线或是不在isr列表中的副本 // 9. 清理不存在的副本 stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas) // 10. 在Zookeeper中更新AR updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas) // 11. 更新Zookeeper系统中的/admin/reassign_partitions节点 removePartitionFromReassignedPartitions(topicAndPartition) info("Removed partition %s from the list of reassigned partitions in zookeeper" .format(topicAndPartition)) controllerContext.partitionsBeingReassigned.remove(topicAndPartition) // 12. 选举Leader后,副本和isr信息改变,重新发送元数据更新请求 sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition)) deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic)) } }
2.9 小结
学习本章内容,需要注意两个容易出现错误的地方:一个是配置实现各个主机之间免密码登录,另一个是准备好编译Kafka源代码的环境。
本章的主要内容正好帮助读者达到了该目的,读者可以参考本章内容,轻松搭建一个分布式的Kafka集群。