Kafka并不难学!入门、进阶、商业实战
上QQ阅读APP看本书,新人免费读10天
设备和账号都新为新人

第2章 安装及配置Kafka

本章将介绍Kafka集群的安装与配置,包含以下内容:安装与配置基础环境、安装Kafka集群、安装与配置Kafka监控工具、编译Kafka源代码,以及将Kafka源代码导入代码编辑器中等。其中涉及的实战内容并不复杂。

2.1 本章教学视频说明

视频内容:基础环境的准备、安装Kafka集群、安装与配置Kafka监控工具、编译Kafka源代码,以及将Kafka源代码导入代码编辑器等。

视频时长:19分钟。

视频截图见图2-1所示。

图2-1 本章教学视频截图

2.2 安装与配置基础环境

需安装以下几个软件。

1. Linux操作系统

Kafka设计之初便是以Linux操作系统作为前提的,因此Linux操作系统能完美支持Kafka。本节以64位CentOS 6.6为例。

提示:

CentOS(Community Enterprise Operating System,社区企业操作系统)是Linux发行版之一。

2. Java软件开发工具包(Java Development Kit, JDK)

Kafka的源代码是利用Scala语言编写的,它需要运行在Java虚拟机(Java Virtual Machine, JVM)上。因此,在安装Kafka之前需要先安装JDK。

3. ZooKeeper

Kafka是一个分布式消息中间件系统,它依赖ZooKeeper管理和协调Kafka集群的各个代理(Broker)节点。因此,在安装Kafka集群之前需要先安装ZooKeeper集群。

在安装CentOS、JDK、ZooKeeper之前,需要准备好这些软件的安装包。安装包选择rpm或tar.gz类型均可。本书选择的是64位操作系统下的tar.gz类型的安装包,版本信息与下载地址见表2-1。

表2-1 版本信息与下载地址

2.2.1 安装并配置Linux操作系统

目前,市场上Linux操作系统的版本有很多,如RedHat、Ubuntu、CentOS等。读者可以根据自己的喜好选取合适的Linux操作系统,这对学习本书的影响不大。

CentOS 6.6安装包下载界面如图2-2所示。本书选择64位CentOS 6.6的镜像文件进行下载。

图2-2 CentOS操作系统下载预览

提示:

如果有现成的物理机或者云主机供学习使用,则可以跳过下面内容,直接进入2.2.2小节开始学习。如果是自行安装虚拟机学习使用,则请继续阅读下面内容。

在Windows操作系统中,安装Linux操作系统虚拟机可以使用VMware或VirtualBox。

在Mac操作系统中,安装Linux操作系统虚拟机可以使用Parallels Desktop或VirtualBox。

提示:

无论在Windows操作系统中还是在Mac操作系统环境中,VirtualBox软件都是免费的。而VMware和Parallels Desktop均属于商业产品,用户需要付费使用。

使用这些软件安装Linux操作系统虚拟机,不涉及复杂的操作,均是直接单击“下一步”按钮,直到最后单击“完成”按钮。

1.配置网络

安装完Linux操作系统虚拟机后,如果虚拟机需要连接外网,应做一个简单的网络配置。具体操作命令如下:

# 打开网络配置文件
[hadoop@dn1~]$ vi /etc/sysconfig/network-scripts/ifcfg-eth0

# 修改ONBOOT的值为yes
ONBOOT=yes

# 保存并退出

完成网络配置后,重启虚拟机使配置生效。具体操作命令如下:

# 重启Linux操作系统虚拟机。如果是非root用户,则重启可能需要使用sudo命令
[hadoop@dn1~]$ sudo reboot

2.配置hosts系统文件

这里安装的是三台Linux操作系统虚拟机。在其中一台配置好主机的hosts文件,然后使用复制命令将该hosts文件分发到其他两台机器中。

(1)在其中一个主机上配置hosts文件,具体操作命令如下:

# 打开dn1节点的hosts文件并编辑
[hadoop@dn1~]$ sudo vi /etc/hosts

# 添加如下内容
10.211.55.5    dn1
10.211.55.6    dn2
10.211.55.8    dn3

# 保存并退出

(2)使用Linux的复制命令分发文件。具体操作命令如下:

# 在/tmp目录中添加一个临时主机名文本文件
[hadoop@dn1~]$ vi /tmp/add.list

# 添加如下内容
dn2
dn3
# 保存并退出

# 将/etc/hosts文件复制到/tmp目录中
[hadoop@dn1~]$ cp /etc/hosts /tmp
# 使用scp命令将hosts文件下发到其他主机
[hadoop@dn1~]$ for i in `cat /tmp/add.list`; do scp /tmp/hosts $i:/tmp; done

(3)登录到其他主机,将/tmp目录下的hosts文本文件复制到/etc目录中。

2.2.2 实例1:安装与配置Java运行环境

本书选择的是Oracle官方的JDK8,版本号为8u144,如图2-3所示。

图2-3 JDK下载版本预览

提示:

在学习本书时,可能Oracle官方网站的JDK版本号又更新了,选择其他版本号的JDK进行下载也是可以的,这并不影响对本书内容的学习。

实例描述

在Linux操作系中安装JDK。其中涉及下载安装包、解压安装包并配置、同步安装包等操作。

1.安装JDK

由于CentOS操作系统可能会自带OpenJDK环境,所以,在安装JDK之前,需要先检查CentOS操作系统中是否存在OpenJDK环境。如存在,则需要先将其卸载。

具体操作步骤如下。

(1)卸载CentOS操作系统自带JDK环境。如果不存在自带的JDK环境,则可跳过此步骤。

# 查找Java安装依赖库
[hadoop@dn1~]$ rpm -qa | grep java
# 卸载Java依赖库
[hadoop@dn1~]$ yum -y remove java*

(2)将下载的JDK安装包解压缩到指定目录下(可自行指定),详细操作命令如下。

# 解压JDK安装包到当前目录
[hadoop@dn1~]$ tar -zxvf jdk-8u144-linux-x64.tar.gz
# 移动JDK到/data/soft/new目录下,并改名为jdk
[hadoop@dn1~]$ mv jdk-8u144-linux-x64 /data/soft/new/jdk

2.配置JDK

将JDK解压缩到指定目录后,需要配置JDK的全局环境变量,具体操作步骤如下。

(1)添加JDK全局变量,具体操作命令如下。

# 打开当前用户下的.bash_profile文件并进行编辑
[hadoop@dn1~]$ vi ~/.bash_profile

# 添加如下内容
export JAVA_HOME=/data/soft/new/jdk
export $PATH:$JAVA_HOME/bin

# 进行保存并退出

(2)若要使配置的内容立即生效,则需要执行以下命令。

# 使用source命令或者英文点(.)命令,让配置文件立即生效
[hadoop@dn1~]$ source ~/.bash_profile

(3)验证JDK环境是否安装成功,具体操作命令如下。

# 使用Java语言version命令来检验
[hadoop@dn1~]$ java -version

如果操作系统终端显示了对应的JDK版本号(如图2-4所示),则认为JDK环境配置成功。

图2-4 JDK打印版本信息

3.同步安装包

将第一台主机上解压后的JDK文件夹和环境变量配置文件.bash_profile分别同步到其他两个主机上。具体操作命令如下。

#在/tmp目录中添加一个临时主机名文本文件
[hadoop@dn1~]$ vi /tmp/add.list

# 添加如下内容
dn2
dn3
# 保存并退出
#使用scp命令同步JDK文件夹到指定目录
[hadoop@dn1~]$ for i in 'cat /tmp/add.list'; do scp -r /data/soft/new/jdk
 $i: /data/soft/new/; done
#使用scp命令将.bash_profile文件分发到其他主机
[hadoop@dn1~]$ for i in 'cat /tmp/add.list'; do scp ~/.bash_profile
 $i:~/; done

2.2.3 实例2:配置SSH免密码登录

Secure Shell简称SSH,由IETF的网络小组所制定。SSH协议建立在应用层基础上,专为远程登录会话和其他网络服务提供安全性保障。

提示:

国际互联网工程任务组(The Internet Engineering Task Force, IETF)是一个公开性质的大型民间国际团体,汇集了大量与互联网架构和“互联网正常运作”相关的网络设计者、运营者、投资人及研究人员。

1.了解SSH协议

利用SSH协议可以有效地防止在远程管理过程中重要信息的泄露。SSH起初是UNIX操作系统上的一个应用程序,后来扩展到其他操作系统平台。

正确使用SSH协议可以弥补网络中的漏洞。几乎所有的UNIX平台(例如Linux、AIX、Solaris)都可以运行SSH客户端。

提示:

AIX是IBM基于AT&T Unix System V开发的一套类似UNIX的操作系统,可运行在利用IBM专有的Power系列芯片设计的小型机上。

Solaris是Sun MicroSystems研发的计算机操作系统,它是UNIX操作系统的衍生版本之一。

在Windows、Linux和MacOS操作系统上的SSH客户端,可以使用SSH协议登录到Linux服务器。在SSH工具中输入Linux服务器的用户名和密码,或者在Linux服务器中添加客户端的公钥来进行登录。

登录的流程如图2-5所示。

图2-5 SSH登录

2.配置SSH免密登录

在Kafka集群启动时,实现三台主机免密码登录。这里使用SSH来实现。

实例描述

在Linux操作系统中配置SSH免密登录,涉及创建密钥、认证授权、文件赋权等操作。

具体操作步骤如下:

(1)创建密钥。

在Linux操作系统中,使用ssh-keygen命令来创建密钥文件,具体操作命令如下。

# 生成当前节点的私钥和公钥
[hadoop@dn1~]$ ssh-keygen -t rsa

接下来只需按Enter键,不用设置任何信息。命令操作结束后会在/home/hadoop/.ssh/目录下生成对应的私钥和公钥等文件。

(2)认证授权。

将公钥(id_rsa.pub)文件中的内容追加到authorized_keys文件中,具体操作命令如下。

# 将公钥(id_rsa.pub)文件内容追加到authorized_keys
[hadoop@dn1~]$ cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

(3)文件赋权。

在当前账号下,需要给authorized_keys文件赋予600权限,否则会因为权限限制导致登录失败。文件权限操作命令如下。

# 赋予600权限
[hadoop@dn1~]$ chmod 600~/.ssh/authorized_keys

(4)在其他节点上创建密钥。

在Kafka集群的其他节点下,使用ssh-keygen -t rsa命令生成对应的公钥。然后在第一台主机上使用Linux同步命令将authorized_keys文件分发到其他节点的/home/hadoop/.ssh/目录中。详细操作命令如下。

#在/tmp目录中添加一个临时主机名文本文件
[hadoop@dn1~]$ vi /tmp/add.list
# 添加如下内容
dn2
dn3
# 保存并退出

#使用scp命令同步authorized_keys文件到指定目录
[hadoop@dn1~]$ for i in `cat /tmp/add.list`; do scp ~/.ssh/authorized_keys
 $i:/home/hadoop/.ssh; done

提示:

如果在登录过程中系统没有提示输入密码,即表示免密码登录配置成功。反之,则配置失败。读者需核对配置步骤是否和本书一致。

为了方便维护集群,通常在所有主机中选择一台主机作为“管理者”,让其负责下发配置文件。这台主机与其他主机之间的免密关系如图2-6所示。

图2-6 SSH免密关系

在维护集群(例如执行启动、查看状态、停止等操作)时,通常在拥有“管理者”权限的主机上执行批处理脚本。

2.2.4 实例3:安装与配置Zookeeper

Zookeeper是一个分布式应用程序协调服务系统,是大数据生态圈的重要组件。Kafka、Hadoop、HBase等系统均依赖Zookeeper来提供一致性服务。

Zookeeper是将复杂且容易出错的核心服务进行封装,然后对外提供简单易用、高效稳定的接口。

实例描述

Zookeeper安装涉及下载软件包、配置Zookeeper系统文件、配置环境变量、启动Zookeeper等操作。

1.安装Zookeeper

(1)下载Zookeeper软件包。

按表2-1中的地址下载3.4.6版本安装包,然后将其解压到指定位置。本书所有的安装包都会被解压到/data/soft/new目录下。

(2)解压软件包。

对Zookeeper软件包进行解压和重命名,具体操作命令如下:

# 解压文件命令
[hadoop@dn1~]$ tar –zxvf zookeeper-3.4.6.tar.gz
# 重命名zookeeper-3.4.6文件夹为zookeeper
[hadoop@dn1~]$ mv zookeeper-3.4.6 zookeeper
# 创建状态数据存储文件夹
[hadoop@dn1~]$ mkdir –p /data/soft/new/zkdata

2.配置Zookeeper系统文件

(1)配置zoo.cfg文件。

在启动Zookeeper集群之前,需要配置Zookeeper集群信息。

读者可以将Zookeeper安装目录下的示例配置文件重命名,即,将zoo_sample.cfg修改为zoo.cfg。按如下所示编辑zoo.cfg文件。

# 配置需要的属性值
# zookeeper数据存放路径地址
dataDir=/data/soft/new/zkdata
# 客户端端口号
clientPort=2181
# 各个服务节点地址配置
server.1=dn1:2888:3888
server.2=dn2:2888:3888
server.3=dn3:2888:3888

(2)配置注意事项。

在配置的dataDir目录下创建一个myid文件,该文件里面写入一个0~255的整数,每个Zookeeper节点上这个文件中的数字要是唯一的。本书的这些数字是从1开始的,依次对应每个Kafka节点。主机与代理节点(Broker)的对应关系如图2-7所示。

图2-7 主机与代理节点(Broker)的对应关系

(3)操作细节。

文件中的数字要与DataNode节点下的Zookeeper配置的数字保持一致。例如,server.1=dn1:2888:3888,则dn1主机下的myid配置文件应该填写数字1。

在dn1主机上配置好ZooKeeper环境后,可使用scp命令将其传输到其他节点,具体命令如下。

#在/tmp目录中添加一个临时主机名文本文件
[hadoop@dn1~]$ vi /tmp/add.list

# 添加如下内容
dn2
dn3
# 保存并退出

#使用scp命令同步Zookeepers文件夹到指定目录
[hadoop@dn1~]$ for i in `cat /tmp/add.list`; do scp -r
 /data/soft/new/zookeeper $i:/data/soft/new; done

完成文件传输后,dn2主机和dn3主机上的myid文件中的数字分别被修改为2和3。

3.配置环境变量

在Linux操作系统中,可以对Zookeeper做全局的环境变量配置。这样做的好处是,可以很方便地使用Zookeeper脚本,不用切换到Zookeeper的bin目录下再操作。具体操作命令如下。

# 配置环境变量
[hadoop@dn1~]$ vi ~/.bash_profile
# 配置zookeeper全局变量
export ZK_HOME=/data/soft/new/zookeeper
export PATH=$PATH: $ZK_HOME/bin
# 保存编辑内容,并退出

之后,用以下命令使刚刚配置的环境变量立即生效:

# 使环境变量立即生效
[hadoop@dn1~]$ source ~/.bash_profile

接着,在其他两台主机上也做相同的配置操作。

4.启动Zookeeper

在安装了Zookeeper的节点上,分别执行启动进程的命令:

# 在不同的节点上启动zookeeper服务进程
[hadoop@dn1~]$ zkServer.sh start
[hadoop@dn2~]$ zkServer.sh start
[hadoop@dn3~]$ zkServer.sh start

但这样管理起来不够方便。可以对上述启动命令进行改进,例如编写一个分布式启动脚本(zk-daemons.sh),具体如下:

# 编写Zookeeper分布式启动脚本,可以输入start|stop|restart|status等命令
#! /bin/bash
hosts=(dn1 dn2 dn3)
for i in ${hosts[@]}
      do
          ssh hadoop@$i "source /etc/profile; zkServer.sh $1" &
      done

5.验证

完成启动命令后,在终端中输入jps命令。若显示QuorumPeerMain进程名称,即表示服务进程启动成功。也可以使用Zookeeper的状态命令status来查看,具体操作命令如下。

# 使用status命令来查看
[hadoop@dn1~]$ zk-daemons.sh status

结果如图2-8所示。在Zookeeper集群运行正常的情况下,若有三个节点,则会选举出一个Leader和两个Follower。

图2-8 进程状态预览结果

提示:

读者也可以查看Zookeeper的运行日志zookeeper.out文件,其中记录了Zookeeper的启动过程以及运行过程。

2.3 实例4:部署Kafka

安装Kafka比较简单,单机模式和分布式模式的部署步骤基本一致。由于生产环境所使用的操作系统一般是Linux,所以本书Kafka集群的部署也是基于Linux操作系统来完成的。

实例描述:

按两种模式部署(单机模式部署和分布式模式)Kafka系统,并观察结果。

2.3.1 单机模式部署

如果是测试环境或需要在本地调试Kafka应用程序代码,则会以单机模式部署一个Kafka系统。

部署的步骤也非常简单:启动一个Standalone模式的Zookeeper,然后启动一个Kafka Broker进程。

提示:

本书选择的Kafka安装包版本是0.10.2.0,读者在学习本书时,Kafka官方可能发布了更新的版本。读者可以选择更新的版本来安装,其配置过程依然可以参考本书所介绍的,这并不影响对本书的学习。

1.下载Kafka安装包

访问Kafka官方网站,找到下载地址,然后在Linux操作系统中使用wget命令进行下载。具体操作命令如下。

# 使用wget命令下载安装包
[hadoop@dn1~]$ wget https://archive.apache.org/dist/kafka/0.10.2.0\
/kafka_2.11-0.10.2.0.tgz

2.解压Kafka安装包

下载了Kafka安装包后,在Linux操作系统指定位置进行解压操作。具体操作命令如下。

# 解压安装包
[hadoop@dn1~]$ tar -zxvf kafka_2.11-0.10.2.0.tgz
# 重命名
[hadoop@dn1~]$ mv kafka_2.11-0.10.2.0 kafka

3.配置Kafka全局变量

在/home/hadoop/.bash_profile文件中,配置Kafka系统的全局变量。具体操作命令如下。

# 编辑.bash_profile文件
[hadoop@dn1~]$ vi ~/.bash_profile
# 添加如下内容
export KAFKA_HOME=/data/soft/new/kafka
export PATH=$PATH:$KAFKA_HOME/bin
# 保存并退出

接着,使用source命令使刚刚配置的环境变量立即生效:

# 使用source命令使配置立即生效
[hadoop@dn1~]$ source ~/.bash_profile

4.配置Kafka系统

配置单机模式的Kafka系统步骤比较简单,只需要在$KAFKA_HOME/conf/server.properties文件中做少量的配置即可。具体操作命令如下。

# 配置server.properties文件
[hadoop@dn1~]$ vi $KAFKA_HOME/conf/server.properties

# 修改如下内容
broker.id=0                                 # 设置一个broker唯一ID
log.dirs=/data/soft/new/kafka/data          # 设置消息日志存储路径
zookeeper.connect=localhost:2181            # 指定Zookeeper的连接地址

# 然后保存并退出

5.启动Zookeeper

以Standalone模式启动Zookeeper进程,具体操作命令如下。

# 启动Standalone模式Zookeeper
[hadoop@dn1~]$ zkServer.sh start
# 查看Zookeeper状态
[hadoop@dn1~]$ zkServer.sh status

# 终端会显示如下内容
JMX enabled by default
Using config: /data/soft/new/zookeeper/bin/../conf/zoo.cfg
Mode: standalone

6.启动Kafka单机模式

在当前主机上使用Kafka命令来启动Kafka系统,具体操作命令如下。

# 启动Kafka单机模式
[hadoop@dn1~]$ kafka-server-start.sh $KAFKA_HOME/conf/server.properties &

启动成功后,终端会打印出如图2-9所示信息。

图2-9 Kafka启动信息

2.3.2 分布式模式部署

在生产环境中,一般会以分布式模式来部署Kafka系统,以便组建集群。

在分布式模式中,不推荐使用Standalone模式的Zookeeper,这样具有一定的风险。如果使用的是Standalone模式的Zookeeper,则一旦Zookeeper出现故障则导致整个Kafka集群不可用。所以,一般在生产环境中会以集群的形式来部署ZooKeeper。

1.下载

和单机模式的下载步骤一致。

2.解压

可参考单机模式的解压模式和重命名方法。

3.配置Kafka全局变量

可参考单机模式的全局配置过程。

4.配置Kafka系统

在分布式模式下配置Kafka系统和单机模式不一致。打开$KAFKA_HOME/conf/server. properties文件,编辑相关属性,具体修改内容见代码2-1。

代码2-1 Kafka系统属性文件配置

# 设置Kafka节点唯一ID
broker.id=0
# 开启删除Kafka主题属性
delete.topic.enable=true
# 非SASL模式配置Kafka集群
listeners=PLAINTEXT://dn1:9092
# 设置网络请求处理线程数
num.network.threads=10
# 设置磁盘IO请求线程数
num.io.threads=20
# 设置发送buffer字节数
socket.send.buffer.bytes=1024000
# 设置收到buffer字节数
socket.receive.buffer.bytes=1024000
# 设置最大请求字节数
socket.request.max.bytes=1048576000
# 设置消息记录存储路径
log.dirs=/data/soft/new/kafka/data
# 设置Kafka的主题分区数
num.partitions=6
# 设置主题保留时间
log.retention.hours=168
# 设置Zookeeper的连接地址
zookeeper.connect=dn1:2181, dn2:2181, dn3:2181
# 设置Zookeeper连接超时时间
zookeeper.connection.timeout.ms=60000

5.同步安装包

配置好一个主机上的Kafka系统后,使用Linux同步命令将配置好的Kafka文件夹同步到其他的主机上。具体操作命令如下。

#在/tmp目录中添加一个临时主机名文本文件
[hadoop@dn1~]$ vi /tmp/add.list

# 添加如下内容
dn2
dn3
# 保存并退出

#使用scp命令同步Kafka文件夹到指定目录中
[hadoop@dn1~]$ for i in `cat /tmp/add.list`; do scp -r /data/soft/new/kafka
 $i:/data/soft/new; done

由于Kafka集群中每个代理(Broker)节点的ID必须唯一,所以同步完成后需要将其他两台主机上的broker.id属性值修改为1和2(或者是其他不重复的正整数)。

6.启动Zookeeper集群

在启动Kafka集群之前,需要先启动Zookeeper集群。

启动Zookeeper集群无须在每台主机上分别执行Zookeeper启动命令,只需执行分布式启动命令即可:

# 分布式命令启动Zookeeper
[hadoop@dn1~]$ zk-daemons.sh start

7.启动Kafka集群

Kafka系统本身没有分布式启动Kafka集群的功能,只有单个主机节点启动Kafka进程的脚本。可以通过封装单个节点启动Kafka进程的步骤,来实现分布式启动Kafka集群,具体见代码2-2。

代码2-2 Kafka分布式启动

#! /bin/bash
# 配置Kafka代理(Broker)地址信息
hosts=(dn1 dn2 dn3)
for i in ${hosts[@]}
    do
    # 执行启动Kafka进程命令
    ssh hadoop@$i "source /etc/profile; kafka-server-start.sh
    $KAFKA_HOME/config/server.properties" &
    done

8.验证

启动Kafka集群后,可以通过一些简单的Kafka命令来验证集群是否正常。具体如下。

# 使用list命令来展示Kafka集群的所有主题(Topic)名
[hadoop@dn1~]$ kafka-topics.sh --list -zookeeper
dn1:2181, dn2:2181, dn3:2181

执行后,Linux终端会打印出所有的Kafka主题(Topic)名称,如图2-10所示。

图2-10 Kafka主题(Topic)名称预览

从图2-10中可以看出,除打印Kafka业务数据的主题(Topic)名称外,还打印出Kafka系统内部主题——_ _consumer_offsets,该主题用来记录Kafka消费者(Consumer)产生的消费记录,其中包含偏移量(Offset)、时间戳(Timestamp)和线程名等信息。

提示:

这里读者有一个大致的了解即可,后面的章会详细介绍Kafka系统的内部主题。

2.4 实例5:安装与配置Kafka监控工具

在实际业务场景中,需要频繁关注Kafka集群的运行情况。例如,查看集群的代理(Broker)节点健康状态、主题(Topic)列表、消费组(Consumer Group)列表、每个主题所对应的分区(Partition)列表等。

当业务场景并不复杂时,可以使用Kafka提供的命令工具,配合Zookeeper客户端命令来快速地实现。但是,随着业务场景的复杂化,消费组和主题的增加,再使用Kafka和Zookeeper命令监控则会增加维护的成本,这时Kafka监控系统便显得尤为重要。

实例描述

在Github开源社区中下载Kafka Eagle源代码,编译获取安装包,然后执行安装步骤,并观察执行结果。

2.4.1 获取并编译Kafka Eagle源代码

Kafka Eagle监控系统的源代码托管在Github上。

1.下载

打开浏览器,输入“https://github.com”进入Github官网,然后搜索“Kafka Eagle”关键字,获取具体下载地址为https://github.com/smartloli/kafka-eagle

然后直接单击“Clone or download”按钮进行下载,将下载的kafka-eagle-master.zip文件上传到Linux服务器中。

提示:

也可以在Linux服务器上执行以下Git命令下载Kafka Eagle源代码:

[hadoop@dn1~]$ git clone https://github.com/smartloli/kafka-eagle

2.编译

Kafka Eagle是用Java语言开发的,通过Maven构建。Maven是对Java语言进行编译的一个工具。截止到本书编写完时,Kafka Eagle发布了1.2.1版本,支持在Mac、Linux和Windows环境下运行,同时兼容Kafka-0.8.x、Kafka-0.9.x、Kafka-0.10.x和Kafka-1.0.x及以上版本。

Kafka Eagle源代码编译在MacOS、Linux和Windows环境下均可操作。这里以MacOS环境来演示,具体操作命令如下。

# 进入kafka eagle目录
dengjiedeMacBook-Pro:workspace dengjie$ cd kafka-egale
# 执行编译脚本
dengjiedeMacBook-Pro:kafka-egale dengjie$ ./build.sh

编译成功后,会在kafka-egale/kafka-eagle-web/target目录中生成打包好的压缩文件,编译结果如图2-11所示。

图2-11 编译Kafka Eagle的结果

2.4.2 安装与配置Kafka Eagle

1.解压缩安装并重命名

将编译好的kafka-eagle-web-1.2.1-bin.tar.gz安装包进行解压缩安装并重命名:

# 解压
[hadoop@dn1~]$ tar -zxvf kafka-eagle-web-1.2.1-bin.tar.gz
# 重命名
[hadoop@dn1~]$ mv kafka-eagle-web-1.2.1 kafka-eagle

2.配置环境变量

在.bash_profile文件中配置KE_HOME环境变量:

# 编辑~/.bash_profile文件
[hadoop@dn1~]$ vi ~/.bash_profile

# 添加如下内容
export KE_HOME=/data/soft/new/kafka-eagle
export PATH=$PATH:$KE_HOME/bin
# 保存并退出

然后使用source命令使配置的环境变量立即生效:

# 使用source命令
[hadoop@dn1~]$ source ~/.bash_profile

3.配置Kafka Eagle系统文件

进入$KE_HOME/conf目录中,编辑system-config.properties配置文件,配置内容见代码2-3。

代码2-3 Kafka Eagle配置文件

######################################
# 设置Kafka多集群的Zookeeper地址
######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=dn1:2181, dn2:2181, dn3:2181
#cluster2.zk.list=tdn1:2181, tdn2:2181, tdn3:2181

######################################
# 配置Zookeeper连接池大小
######################################
kafka.zk.limit.size=25

######################################
# 浏览器访问Kafka Eagle的端口地址
######################################
kafka.eagle.webui.port=8048

######################################
# Kafka的消费信息是否存储在Topic中
######################################
kafka.eagle.offset.storage=kafka

######################################
# 配置邮件告警服务器
######################################
kafka.eagle.mail.enable=false
kafka.eagle.mail.sa=alert_sa
kafka.eagle.mail.username=alert_sa@126.com
kafka.eagle.mail.password=123456
kafka.eagle.mail.server.host=smtp.126.com
kafka.eagle.mail.server.port=25

######################################
# 管理员删除Topic的口令
######################################
kafka.eagle.topic.token=keadmin

######################################
# 是否开启Kafka SASL安全认证
######################################
kafka.eagle.sasl.enable=false
kafka.eagle.sasl.protocol=SASL_PLAINTEXT
kafka.eagle.sasl.mechanism=PLAIN
kafka.eagle.sasl.client=/data/soft/new/kafka-eagle/conf/kafka_client_jaas.conf

######################################
# Kafka Eagle数据存储到MySQL
######################################
#kafka.eagle.driver=com.mysql.jdbc.Driver
#kafka.eagle.url=jdbc:mysql://127.0.0.1:3306/ke? useUnicode=true&characterEncoding=U
TF-8&zeroDateTimeBehavior=convertToNull
#kafka.eagle.username=root
#kafka.eagle.password=123456

######################################
# Kafka Eagle数据默认存储到Sqlite
######################################
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/data/soft/new/kafka-eagle/db/ke.db
kafka.eagle.username=root
kafka.eagle.password=root

4.启动Kafka Eagle系统

配置完成后,在Linux控制台执行启动命令:

# Kafka Eagle通过ke.sh脚本来控制系统,参数有启动(start)、停止(stop)、重启(restart)等
[hadoop@dn1~]$ ke.sh start

启动成功后,控制台会打印出对应的日志信息,如图2-12所示。

图2-12 启动Kafka Eagle系统

控制台日志中显示了一个用户名为admin、密码为123456的账号,可用于登录系统。

5.预览

(1)在浏览器中输入http://dn1:8048/ke,访问Kafka Eagle系统,之后按要求输入用户名和密码,如图2-13所示。输入正确的用户名和密码后单击“Signin”按钮。

图2-13 Kafka Eagle登录界面

(2)进入Kafka Eagle系统主界面中,如图2-14所示。

图2-14 Kafka Eagle主界面

6.停止Kafka Eagle系统

停止Kafka Eagle系统的方式有两种:① 通过执行$KE_HOME/bin/ke.sh脚本来停止;② 通过Linux操作系统的kill命令来直接停止。

(1)通过指定stop参数来停止Kafka Eagle系统:

# 停止Kafka Eagle系统
[hadoop@dn1~]$ ke.sh stop

(2)通过Linux操作系统的kill命令停止Kafka Eagle系统:

# 使用kill直接停止Kafka Eagle系统
[hadoop@dn1~]$ kill -9 `ps -fe |grep Bootstrap | grep kafka-eagle
 | awk -F ' ' '{print $2}'`

2.5 实例6:编译Kafka源代码

在学习Kafka技术时,阅读Kafka的源代码是很有必要的。在实际生产环境中,Kafka系统可能会随业务场景的复杂化、数据量的增加等出现异常,要修复这类异常问题则需要打补丁,然后重新编译Kafka源代码。

本节将介绍在MacOS操作系统下编译Kafka源代码。在其他操作系统中,编译Kafka源代码的过程基本类似,只不过是环境变量配置有所区别。

Kafka系统的核心模块是使用Scala语言编写的,所以,可以使用Gradle工具进行编译和构建。

实例描述

编译Kafka源代码需要准备如下环境:(1)安装与配置Scala运行环境;(2)安装与配置Gradle。同时,执行2.5.3小节的编译步骤,观察执行结果。

2.5.1 安装与配置Scala运行环境

本书所使用的Kafka版本是0.10.2.0,从Kafka官方网站可知,该版本所需要的Scala版本在2.10以上。本书中所选择的Kafka安装包是kafka_2.11-0.10.1.1.tgz,所以,这里选择最新的Scala-2.12版本进行安装与配置,如图2-15所示。

图2-15 Kafka系统依赖的Scala版本

1.下载Scala安装包

访问Scala官方网站,获取软件包下载地址:http://www.scala-lang.org/download,然后选择对应的安装包进行下载,如图2-16所示

图2-16 下载Scala安装包

2.安装Scala

下载完成后,将Scala安装包解压缩到指定目录进行安装,具体操作命令如下。

# 这里解压缩到MacOS操作系统的指定目录
dengjiedeMacBook-Pro:~ dengjie$ tar -zxvf scala-2.12.3.tgz
# 将Scale-2.12.3移动到/usr/local目录下,并重命为Scale
dengjiedeMacBook-Pro:~ dengjie$ mv scala-2.12.3 /usr/local/scala

3.配置Scala环境变量

完成安装后,在.bash_profile文件中配置Scala运行环境变量,具体操作命令如下。

# 打开~/.bash_profile文件
dengjiedeMacBook-Pro:~ dengjie$ vi ~/.bash_profile

# 添加如下内容
export SCALA_HOME=/usr/local/scala
export PATH=$PATH:$SCALA_HOME/bin

# 保存并退出

然后用source命令使配置的环境变量立即生效,具体操作命令如下。

# 使用source命令
dengjiedeMacBook-Pro:~ dengjie$ source ~/.bash_profile

4.验证

安装与配置好Scala环境后,在操作系统终端中输入Scala命令来验证环境是否配置成功。具体操作命令如下。

# 输入版本验证命令
dengjiedeMacBook-Pro:~ dengjie$ scala -version

如打印出如图2-17所示的信息,则表示安装与配置成功。

图2-17 Scala版本信息

2.5.2 安装与配置Gradle

通过浏览器访问Gradle的官方地址https://gradle.org/install,获取MacOS操作系统安装与配置Gradle的方法。

通过以下命令可以一键完成Gradle的安装与配置。

# 使用brew来进行一键安装与配置
dengjiedeMacBook-Pro:~ dengjie$ brew install gradle

安装与配置完成后,在操作系统终端中输入以下Gradle命令来进行版本验证。

# 输入版本验证命令
dengjiedeMacBook-Pro:~ dengjie$ gradle -version

如打印出如图2-18所示的信息,则表示安装与配置成功。

图2-18 Gradle版本信息

2.5.3 了解Kafka源代码的编译过程

访问Kafka官网地址http://kafka.apache.org/downloads下载Kafka源代码压缩包。本书使用的Kafka版本是kafka-0.10.2.0,这里选择kafka-0.10.2.0-src.tgz压缩包。将下载的Kafka源代码解压到MacOS操作系统的指定目录,Kafka源代码目录结构如图2-19所示。

图2-19 Kafka源代码目录结构

1.离线下载依赖包

如果编译环境网络状况不好,则在执行编译命令之前可以先下载核心依赖包gradle-3.3-all.zip,然后再将它移到gradle/wrapper/目录中,最后修改gradle/wrapper/gradle-wrapper.properties。

具体修改内容见代码2-4。

代码2-4 修改编译配置文件

#Wed May 10 10:25:30 CST 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
# 上面内容保持不变,修改Gradle核心依赖包路径地址
distributionUrl=gradle-3.3-all.zip

提示:

gradle-3.3-all.zip包的下载地址为https://services.gradle.org/distributions

2.在线编译Kafka源代码

如果编译网络环境状态良好,则无需配置任何配置文件,直接执行以下编译命令即可。

# 先清理无效文件,然后再执行编译命令
dengjiedeMacBook-Pro:kafka dengjie$ ./gradlew clean && ./gradlew
releaseTarGz

成功执行上述编译命令后,会自动下载gradle-3.3-bin.zip依赖包,下载完成后会存放在/Users/dengjie/.gradle/wrapper/dists目录中。下载过程所需的时间,完全取决于当时的网络状况。

编译成功后,操作系统控制台会打印出如图2-20所示的信息。

图2-20 成功编译Kafka源代码

编译之后的Kafka二进制压缩包文件,会自动存放在core/build/distributions目录中。这里的压缩包文件和在Kafka官网上提供的一样,读者可以直接使用编译好的Kafka安装包。

2.6 实例7:将Kafka源代码导入编辑器

在实际应用场景中,可能会遇到一些Kafka异常问题,需要阅读Kafka源代码来分析异常问题产生的原因。

如果直接打开Kafka源代码查看,则阅读起来会很不方便,所以需要借助代码编辑器来阅读Kafka源代码。这里列举两种常见的代码编辑器——IntelliJ IDEA和Eclipse。可以通过访问各自的官方网站来获取软件安装包,其下载地址见表2-2。

表2-2 代码编辑器下载地址

实例描述

从表2-2中获取编辑器安装包,按照下列两种情况将Kafka源代码导入:(1)在IntelliJ IDEA编辑器中导入Kafka源代码;(2)在Eclipse编辑器中导入Kafka源代码。

2.6.1 导入IntelliJ IDEA编辑器

IntelliJ IDEA简称IDEA,是Java语言开发的集成环境。它在智能代码提示、重构、版本控制工具(如Git、SVN等)、代码分析等方面的功能非常完善。

IDEA是JetBrains公司的产品,目前分为旗舰版和社区版。

· 旗舰版包含所有功能,但是需要付费购买;

· 社区版属于免费产品,功能较少,但对于阅读Kafka源代码来说已足够了。

1.将Kafka源代码转成IDEA结构

Kafka源代码中提供了Gradle工具,可以很方便地将Kafka源代码转换成IDEA结构。只需执行一条转换命令即可,具体操作命令如下。

# 进入Kafka源代码目录,然后执行下列命令
dengjiedeMacBook-Pro:kafka dengjie$ gradle idea

执行命令后,如果转换成功,则控制台会打印出如图2-21所示的信息。

图2-21 转成IDEA结构

之后,在Kafka源代码目录会生产三个文件——kafka.iml、kafka.iws和kafka.ipr,如图2-22所示。

图2-22 IDEA结构文件

2.导入Kafka代码

打开IDEA社区版代码编辑器,然后选择菜单“File”-“Open”命令,在弹出的对话框中选中Kafka源代码目录并单击“Open”按钮,弹出如图2-23所示对话框供用户选择。

图2-23 IDEA代码编辑器对话框

单击“New Window”按钮,表示重新在一个新的IDEA编辑器窗口中导入Kafka源代码。之后等待代码编辑器自动下载需要的依赖包。初始化完成后会出现如图2-24所示的结果。

图2-24 IDEA完成代码导入

2.6.2 导入Eclipse编辑器

Eclipse是一款著名的跨平台开源集成开发环境,最开始主要用于Java语言开发。通过安装不同的插件,Eclipse可以支持不同的计算机编程语言,比如Scala、C++、Python等。

Eclipse代码编辑器的所有功能都是免费的。使用它,无论开发项目功能,还是阅读源代码都不错。

访问Scala-IDE官网网站 http://scala-ide.org/download/sdk.html,获取“Mac OS X Cocoa 64 bit”软件安装包,如图2-25所示。

图2-25 Scala IDE for Eclipse下载

提示:

可根据实际的操作系统来选择软件安装包。本书的环境是Mac操作系统,故选择Mac操作系统的软件安装包。

1.将Kafka源代码转成Eclipse结构

Kafka源代码中提供了Gradle工具,它可以很方便地将Kafka源代码转换成Eclipse结构。只需执行以下换命令。

# 进入Kafka源代码目录,然后执行下列命令
dengjiedeMacBook-Pro:kafka dengjie$ gradle eclipse

如果转换成功,则控制台会打印出如图2-26所示的结果。

图2-26 转成Eclipse结构

2.导入Kafka代码

(1)打开“Scala IDE for Eclipse”代码编辑器,然后选择菜单“File”-“Import”命令,在弹出的对话框中找到“Gradle”选项并展开选择“Existing Gradle Project”选项,然后单击“Next”按钮,如图2-27所示。

图2-27 选择“Gradle”选项

(2)弹出一个对话框,提示需要选择已存在的Gradle项目,单击“Browser”按钮并选择对应的Kafka源代码目录,单击“Finish”按钮,如图2-28所示。

图2-28 选择Kafka源代码

(3)代码编辑器开始自行下载依赖包,在下载完成后会出现如图2-29所示的结果。

图2-29 完成代码导入

2.7 了解元数据的存储分布

在Kafka系统中,核心组件的元数据信息均存储在Zookeeper系统中。这些元数据信息具体包含:控制器选举次数、代理节点和主题、配置、管理员操作、控制器。它们在Zookeeper系统中的分布如图2-30所示。

图2-30 Kafka元数据在Zookeeper系统中的分布

1.控制器选举次数

在Kafka系统中,控制器每进行一次选举次数,都会在Zookeeper系统/controller_epoch节点下进行记录,该值为一个数字。在Kafka集群中,第一个代理节点(Broker)启动时,该值为1。

在Kafka集群中,如果遇到代理节点宕机或者变更,则Kafka集群会重新选举新的控制器。每次控制器发生变化时,Zookeeper系统/controller_epoch节点中的值就会加1。

提示:

在Zookeeper系统中,元数据存储的格式为“英文斜杠+英文名称”,例如:/admin。通常会将这种存储类型称之为节点,如“/admin节点”。

2.代理节点和主题

· 在Zookeeper系统/brokers节点中,存储着Kafka代理节点和主题的元数据信息。

· 在Zookeeper系统/brokers/ids节点中,存储着代理节点的ID值。

· 在Zookeeper系统/brokers/topics节点中,存储着主题和分区的元数据信息。

3.配置

在Kafka系统中,修改主题属性这类操作会被存储到Zookeeper系统/config节点中。/config节点主要包含以下三个子节点。

· topic:存储着Kafka集群主题的额外属性,比如修改过主题的属性操作;

· client:存储着客户端和主题配置信息,包含消费者应用和生产者应用;

· changes:存储着修改信息。

4.管理员操作

在执行管理员操作(比如删除、分配等)时,在Zookeeper系统/admin节点会生成相应的子节点,内容如下。

· delete_topics:存储着待删除主题名的标记;

· reassign_partitions:存储着重新分配分区操作的命令;

· preferred_replica_election:存储着恢复Leader分区平衡操作的命令。

5.控制器

在Kafka系统正常运行时,在Zookeeper系统/controller节点下会存储一个Kafka代理节点的ID值。该ID值与Kafka代理节点ID相同,表示代理节点上存在控制器功能。

2.8 了解控制器的选举流程

控制器,其实就是Kafka系统的一个代理节点。它除具有一般代理节点的功能外,还具有选举主题分区Leader节点的功能。

提示:

只有当代理节点上存在控制器时才具有这种功能。

在启动Kafka系统时,其中一个代理节点(Broker)会被选举为控制器,负责管理主题分区和副本状态,还会执行分区重新分配的管理任务。

在Kafka系统运行过程中,如果当前的控制器出现故障导致不可用,则Kafka系统会从其他正常运行的代理节点中重新选举出新的控制器。

2.8.1 了解控制器的启动顺序

在Kafka集群中,每个代理节点(Broker)在启动时会实例化一个KafkaController类。该类会执行一系列业务逻辑,选举出主题分区的Leader节点。具体选举主题分区Leader节点的步骤如下。

(1)第一个启动的代理节点,会在Zookeeper系统里面创建一个临时节点/controller,并写入该节点的注册信息,使该节点成为控制器。

(2)其他的代理节点陆续启动时,也会尝试在Zookeeper系统里面创建/controller节点。但由于/controller节点已经存在,所以会抛出“创建/controller节点失败异常”的信息。创建失败的代理节点会根据返回的结果,判断出在Kafka集群中已经有一个控制器被成功创建了,所以放弃创建/controller节点。这样确保了Kafka集群控制器的唯一性。

(3)其他的代理节点,会在控制器上注册相应的监听器。各个监听器负责监听各自代理节点的状态变化,当监听到节点状态发生变化时,会触发相应的监听函数进行处理。

1.查看控制器创建的优先级

控制器创建的优先级是按照Kafka系统代理节点成功启动的顺序来创建的。用户可以通过改变Kafka系统代理节点的启动顺序,来查看控制器的创建优先级。具体操作命令如下所示:

(1)启动Kafka集群的各节点。

顺序依次是:dn1、dn2、dn3。脚本内容见代码2-5。

代码2-5 按照执行顺序启动Kafka集群

#! /bin/bash

# Kafka代理节点地址,按照指定顺序启动
hosts=(dn1 dn2 dn3)

# 打印启动分布式脚本信息
mill=`date "+%N"`
tdate=`date "+%Y-%m-%d %H:%M:%S, ${mill:0:3}"`

echo [$tdate] INFO [Kafka Cluster] begins to execute the $1 operation

# 执行分布式开启命令
function start()
{
    for i in ${hosts[@]}
        do
            smill=`date "+%N"`
            stdate=`date "+%Y-%m-%d %H:%M:%S, ${smill:0:3}"`
            ssh hadoop@$i "source /etc/profile; echo [$stdate] INFO [Kafka Broker $i]
begins
 to execute the startup operation.; kafka-server-start.sh
 $KAFKA_HOME/config/server.properties>/dev/null" &
            sleep 1
        done
}

# 执行分布式关闭命令
function stop()
{
    for i in ${hosts[@]}
        do
            smill=`date "+%N"`
            stdate=`date "+%Y-%m-%d %H:%M:%S, ${smill:0:3}"`
            ssh hadoop@$i "source /etc/profile; echo [$stdate] INFO [Kafka Broker $i]
begins
 to execute the shutdown operation.; kafka-server-stop.sh>/dev/null; " &
            sleep 1
        done
}

# 查看Kafka代理节点状态
function status()
{
    for i in ${hosts[@]}
        do
            smill=`date "+%N"`
            stdate=`date "+%Y-%m-%d %H:%M:%S, ${smill:0:3}"`
            ssh hadoop@$i "source /etc/profile; echo [$stdate] INFO [Kafka Broker $i]
status
message is :; jps | grep Kafka; " &
            sleep 1
        done
}

# 判断输入的Kafka命令参数是否有效
case "$1" in
  start)
      start
      ;;
  stop)
      stop
      ;;
  status)
      status
      ;;
  *)
      echo "Usage: $0 {start|stop|status}"
      RETVAL=1
esac

然后,在Zookeeper系统中查看/controller临时节点的内容,具体操作命令如下。

# 进入Zookeeper集群
[hadoop@dn1 bin]$ zkCli.sh -server dn1:2181

# 执行查看命令
[zk: dn1:2181(CONNECTED) 1] get /controller

执行上述命令后,可以看到代理节点0(即dn1节点)上成功创建了控制器。输出结果如图2-31所示。

图2-31 控制器内容结果

提示:

本书的Kafka集群配置的代理节点ID分别是0、1、2,它们分别对应的主机名是dn1、dn2、dn3。

(2)修改Kafka集群节点启动顺序。

新的启动顺序为:dn3、dn1、dn2。修改代码2-5中第4行内容,变更信息如下。

# 修改代码2-5中第4行内容
hosts=(dn3 dn1 dn2)

然后,执行该脚本再次重启Kafka集群。重启命令如下所示。

# 重启Kafka集群命令
# 先执行停止命令
[hadoop@dn1 bin]$ kafka-daemons.sh stop
# 然后执行启动命令
[hadoop@dn1 bin]$ kafka-daemons.sh start

接着,在Zookeeper系统中执行“get /controller”命令,查看输出结果。可以看到代理节点2(即dn3节点)上成功创建了控制器,如图2-32所示。

图2-32 修改启动顺序后的控制器内容

2.切换控制器所属的代理节点

当控制器被关闭或者与Zookeeper系统断开连接时,Zookeeper系统上的临时节点就会被清除。Kafka集群中的监听器会接收到变更通知,各个代理节点会尝试到Zookeeper系统中创建一个控制器的临时节点。第一个成功在Zookeeper系统中创建的代理节点,将会成为新的控制器。每个新选举出来的控制器,会在Zookeeper系统中获取一个递增的controller_epoch值。

为了观察选举变化过程,可以先将dn3代理节点的Kafka进程停止,让Kafka集群中的控制器处理关闭状态。具体操作命令如下。

# 使用kill命令关闭Kafka进程
[hadoop@dn3 bin]$ kill -9 `ps -fe | grep kafka | grep server |
awk -F ' ' '{print $2}'`

然后,在Zookeeper系统中执行查看命令,输出结果如图2-33所示。与图2-32中的结果对比,控制器已经从dn3节点切换到dn2节点了。

图2-33 新的控制器内容

接着,去查看controller_epoch的值是否有增加。操作命令如下。

# 进入Zookeeper集群中
[hadoop@dn1 bin]$ zkCli.sh -server dn1:2181

# 执行查看命令
[zk: dn1:2181(CONNECTED) 1] get /controller_epoch

执行上述命令后,可以看到选举次数已经累加了一次,如图2-34所示。

图3-34 查看选举次数

2.8.2 了解主题分区Leader节点的选举过程

选举控制器的核心思想是:各个代理节点公平竞抢在Zookeeper系统中创建/controller临时节点,最先创建成功的代理节点会成为控制器,并拥有选举主题分区Leader节点的功能。

整个控制器的选举流程如图2-35所示。

图2-35 控制器选举流程图

从图2-35中可知,当Kafka系统实例化KafkaController类时,主题分区Leader节点的选举流程便会开始。其中涉及的核心类包含KafkaController类、ZookeeperLeaderElector类、LeaderChangeListener类、SessionExpirationListener类。

下面围绕这四个核心的类,详细介绍Kafka系统控制器Leader选举流程。

1.了解KafkaController类的作用

KafkaController类在实例化ZookeeperLeaderElector类时,分别设置了两个关键的回调函数——onControllerFailover和onControllerResignation。具体实现见代码2-6。

代码2-6 KafkaController实现代码

class KafkaController (val config: KafkaConfig,
                          zkUtils: ZkUtils,
                          val brokerState: BrokerState,
                          time: Time,
                          metrics: Metrics,
                          threadNamePrefix: Option[String] = None)
                          extends Logging with KafkaMetricsGroup{

    private val controllerElector = new ZookeeperLeaderElector(controllerContext,
        ZkUtils.ControllerPath, onControllerFailover, onControllerResignation,
        config.brokerId, time)                     // 初始化ZK选举实例

    /** 准备选举 */
    def startup() = {
    inLock(controllerContext.controllerLock) {
            info("Controller starting up")         // 打印开始选举信息
            registerSessionExpirationListener()    // 注册会话过期监听器
            isRunning = true                       // 标记正常运行
            controllerElector.startup              // 开始执行选举逻辑
            info("Controller startup complete")    // 完成所有选举操作后,打印选举完成日志
    }
    }
}

在onControllerFailover回调函数中初始化Leader依赖模块,包括在Zookeeper系统中递增控制器选举次数。

当Kafka系统当前代理节点不再是Leader角色时,会触发onControllerResignation回调函数重新进行注册选举。KafkaController类启动后,会向Zookeeper系统注册会话超时监听器,并尝试选举Leader。

2.了解ZookeeperLeaderElector类的作用

ZookeeperLeaderElector类实现了主题分区的Leader节点选举功能,但是它并不会处理“代理节点与Zookeeper系统之间出现会话超时”这种情况。

ZookeeperLeaderElector类主要负责创建元数据存储路径、实例化变更监听器等,并通过订阅数据变更监听器来实时监听数据的变化,进而开始执行选举Leader的逻辑。具体实现见代码2-7。

代码2-7 ZookeeperLeaderElector实现代码

class ZookeeperLeaderElector(controllerContext: ControllerContext,
                      electionPath: String,
                      onBecomingLeader: () => Unit,
                      onResigningAsLeader: () => Unit,
                      brokerId: Int,
                      time: Time)
    extends LeaderElector with Logging {

    var leaderId = -1
    // 如果不存在,则在Zookeeper系统上创建一个选举路径
    val index = electionPath.lastIndexOf("/")
    if (index > 0)
    controllerContext.zkUtils.
makeSurePersistentPathExists(electionPath.substring(0, index))
    // 实例化一个Leader变更监听器
    val leaderChangeListener = new LeaderChangeListener

    /** 开始选举 */
    def startup {
        // 加锁
        inLock(controllerContext.controllerLock) {
            controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath,
            leaderChangeListener)
            elect        // 执行选举
        }
    }
}

ZookeeperLeaderElector类完成选举前的准备工作后,开始执行startup()函数来订阅数据变化监听器,同时调用elect方法来执行选举Leader的逻辑。

通常情况下,触发执行elect方法的条件有以下几点:

· 代理节点启动。

· 在上一次创建临时节点成功后,由于网络原因或服务器故障等导致连接中断。然后调用resign()函数并删除Zookeeper系统中的/controller节点。

· 在上一次创建临时节点成功后,由于网络原因或服务器故障等导致连接中断。再次进入elect方法,发现Kafka系统中已经有代理节点成了Leader。

· 在上一次创建临时节点成功后,在执行onBecomingLeader()函数时抛出了异常信息,执行业务逻辑后,再尝试选举Leader。

elect方法的具体实现逻辑见代码2-8。

代码2-8 elect实现逻辑

def elect: Boolean = {
    val timestamp = time.milliseconds.toString
    val electString = Json.encode(Map("version" -> 1,
      "brokerid" -> brokerId, "timestamp" -> timestamp))

  // 获取LeadedID
  leaderId = getControllerID

  // 判断LeadedID
    if(leaderId ! = -1) {
      debug("Broker  %d  has  been  elected  as  leader,  so  stopping  the  election
process.".format(leaderId))
      return amILeader
    }

    try {
    // 实例化ZKCheckedEphemeral
    val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
                                  electString,

    controllerContext.zkUtils.zkConnection.getZookeeper,
                                  JaasUtils.isZkSecurityEnabled())
    zkCheckedEphemeral.create()                    // 创建临时节点/controller
    info(brokerId + " successfully elected as leader")
    leaderId = brokerId
    onBecomingLeader()                             // 成为Leader
    } catch {
    // 异常处理
    case _: ZkNodeExistsException =>
      leaderId = getControllerID

      if (leaderId ! = -1)
debug("Broker %d was elected as leader instead of
 broker %d".format(leaderId, brokerId))
      else
      warn("A leader has been elected but just resigned,
       this will result in another round of election")

  case e2: Throwable =>
    error("Error while electing or becoming leader on broker %d".format(brokerId),
e2)
    resign()
  }
  amILeader
}

// 关闭
def close = {
  leaderId = -1
}

def amILeader : Boolean = leaderId == brokerId

// 放弃Leader选举,并删除临时节点
def resign() = {
  leaderId = -1
  controllerContext.zkUtils.deletePath(electionPath)
}

在Zookeeper系统中,创建临时节点/controller时,如果产生ZkNodeExistsException类异常,则说明Kafka系统中已经有代理节点成了Leader。

而如果是执行onBecomingLeader()方法出现异常,则说明初始化Leader的相关模块存在问题。若是初始化失败,则调用resign()函数删除Zookeeper系统/controller节点上的数据。Zookeeper系统中的/controller节点被删除,会触发LeaderChangeListener监听器尝试重新选举Leader,这样避免了Kafka系统中控制器无Leader的问题。

3.了解LeaderChangeListener类的作用

如果节点数据发生变化,则Kafka系统中的其他代理节点可能已经成为Leader,接着Kafka控制器会调用onResigningAsLeader()函数。

当Kafka代理节点宕机或者被人为误删除时,则处于该节点上的Leader会被重新选举。通过调用onResigningAsLeader()函数重新选择其他正常运行的代理节点成为新的Leader,具体实现见代码2-9。

代码2-9 LeaderChangeListener实现类

class LeaderChangeListener extends IZkDataListener with Logging {
  /** 回调函数,处理数据变更 */
    @throws[Exception]
    def handleDataChange(dataPath: String, data: Object) {
    val shouldResign = inLock(controllerContext.controllerLock) {
      val amILeaderBeforeDataChange = amILeader
      leaderId = KafkaController.parseControllerId(data.toString)
      info("New leader is %d".format(leaderId))
      // 如果旧的Leader不再是Leader就会重新被选举
      amILeaderBeforeDataChange && ! amILeader
    }

    if (shouldResign)
      onResigningAsLeader()
    }

    /** 删除临时节点 */
    @throws[Exception]
    def handleDataDeleted(dataPath: String) {
    val shouldResign = inLock(controllerContext.controllerLock) {
      debug("%s leader change listener fired for path %s to handle data deleted:
        trying to elect as a leader"
        .format(brokerId, dataPath))
      amILeader
    }

    if (shouldResign)
      onResigningAsLeader()

    inLock(controllerContext.controllerLock) {
      elect
    }
    }
  }

在选举的过程中,执行elect方法中的实现逻辑时会调用onBecomingLeader方法。该方法相当于KafkaController类中的onControllerFailover方法,也是用于选举当前代理节点作为新的控制器。

当Kafka集群中的某个代理节点成为新的Leader后,会初始化Leader的所有功能模块,例如注册分区监听器、注册副本监听器、注册分区状态机、注册副本状态机等。

在执行数据变更监听器逻辑时,会调用onResigningAsLeader方法。该方法相当于KafkaController类中的onControllerResignation方法,也是用来重新分配控制器,是控制器内部清理数据结构所必须的步骤。

4. SessionExpirationListener类的作用

当Kafka系统的代理节点和Zookeeper系统建立连接后,SessionExpirationListener中的handleNewSession()函数会被调用。对于Zookeeper系统中会话过期的连接,会先进行一次判断:

· 如果当前的控制器ID和代理节点ID相同,则Kafka会跳过重新选举的环节。

· 如果当前控制器的ID和代理节点ID不同,则Kafka会关闭当前的控制器,然后尝试重新选举。

具体实现见代码2-10。

代码2-10 SessionExpirationListener实现类

class SessionExpirationListener() extends IZkStateListener with Logging {
    this.logIdent = "[SessionExpirationListener on " + config.brokerId + "], "

    /** 会话过期,重新建立连接 */
    @throws[Exception]
    def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
// 判断控制器ID和代理节点ID是否相同
    if (controllerElector.getControllerID() ! = config.brokerId) {
      onControllerResignation()
      inLock(controllerContext.controllerLock) {
        controllerElector.elect
      }
    } else {
      info("ZK expired, but the current controller id %d is the same as this broker id,
            skip re-elect".format(config.brokerId))
    }
    }
}

2.8.3 了解注册分区和副本状态机

Kafka系统的控制器主要负责管理主题、分区和副本。

Kafka系统在操作主题、分区和副本时,控制器会在Zookeeper系统的/brokers/topics节点,以及其子节点路径上注册一系列的监听器。

使用Kafka应用接口或者是Kafka系统脚本创建一个主题时,服务端会将创建后的结果返回给客户端。当客户端收到创建成功的提示时,其实服务端并没有实际创建主题,而只是在Zookeeper系统的/brokers/topics节点中创建了该主题对应的子节点名称。

之后,服务端以异步的方式来创建主题。当服务端完成主题创建操作后,可以去$KAFKA_HOME/data中查看实际数据,或者访问Zookeeper系统查看元数据。

例如,查看Zookeeper系统/brokers/topics节点中的一个主题,操作命令如下。

# 查看/brokers/topics节点中的主题[zk: dn1:2181(CONNECTED) 28] get /brokers/topics/ip_login

执行上述命令后,输出结果如图2-36所示。

图2-36 查看主题分区和副本

通过上述命令可以查看指定主题的分区和副本分配信息,以及每个分区上的Leader所在代理节点。

提示:

主题元数据信息是通过分区索引值和代理节点ID来表示的。例如"4":[2,1,0]表示的是,分区索引值为4, Leader所在代理节点ID为2。其他分区上的Leader所在的节点以此类推。

代理节点调用onBecomingLeader()函数实际上调用的是onControllerFailover()函数,所以在控制器调用onControllerFailover()函数时,会在初始化阶段分别创建分区状态机和副本状态机。具体实现见代码2-11。

代码2-11 创建分区和副本监听器

def onControllerFailover() {
    if(isRunning) {
info("Broker %d starting become controller state
 transition".format(config.brokerId))
    readControllerEpochFromZookeeper()
    incrementControllerEpoch(zkUtils.zkClient)

    // 在/brokers/topics节点注册监听器
    registerReassignedPartitionsListener()
    registerIsrChangeNotificationListener()
    registerPreferredReplicaElectionListener()
    partitionStateMachine.registerListeners()      // 注册分区状态机
    replicaStateMachine.registerListeners()        // 注册副本状态机
    initializeControllerContext()

    // 在控制器初始化之后,在状态机启动之前,需要发送更新元数据请求
    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)

    replicaStateMachine.startup()                  // 启动副本状态机
    partitionStateMachine.startup()                // 启动分区状态机

    // 在自动故障转移中为所有主题注册分区更改监听器
    controllerContext.allTopics.foreach(topic => partitionStateMachine.
            registerPartitionChangeListener(topic))
    info("Broker %d is ready to serve as the new controller with epoch %d".
            format(config.brokerId, epoch))
    maybeTriggerPartitionReassignment()
    maybeTriggerPreferredReplicaElection()
    if (config.autoLeaderRebalanceEnable) {
      info("starting the partition rebalance scheduler")
      autoRebalanceScheduler.startup()
      autoRebalanceScheduler.schedule("partition-rebalance-thread",
        checkAndTriggerPartitionRebalance,
        5,
        config.leaderImbalanceCheckIntervalSeconds.toLong,
        TimeUnit.SECONDS)
    }
    deleteTopicManager.start()
  }
  else
    info("Controller has been shut down, aborting startup/failover")
}

主题的分区状态机通过registerListeners()函数,在Zookeeper系统中的/brokers/topics节点上注册了TopicChangeListener和DeleteTopicListener两个监听器。

主题的副本状态机通过registerListeners()函数,在Zookeeper系统中/brokers/ids节点上注册了一个代理节点监听器BrokerChangeListener。通过BrokerChangeListener监听器来监听/brokers/ids子节点下的变化。

创建一个主题时,主题信息、主题分区和副本会被写到Zookeeper系统的/brokers/topics节点中,这会触发分区和副本状态机注册监听器。

2.8.4 了解分区自动均衡和分区重新分配

Kafka系统在启动时,会通过控制器来管理主题分区自动平衡,以及重新分配分区。

1.了解分区自动平衡

在初始化控制器时,在onControllerFailover()函数中如果读取到“auto.leader.rebalance.enable”的属性值为true,则会开启分区自动均衡。

可以通过调用checkAndTriggerPartitionRebalance()函数来实现,见代码2-12。

代码2-12 分区自动均衡触发器

private def checkAndTriggerPartitionRebalance(): Unit = {
    if (isActive) {
    trace("checking need to trigger partition rebalance")
    // 获取所有在线的代理节点
var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]]
    = null
    inLock(controllerContext.controllerLock) {
      preferredReplicasForTopicsByBrokers =
        controllerContext.partitionReplicaAssignment.filterNot(p =>
deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic)).groupBy {
          case (_, assignedReplicas) => assignedReplicas.head
        }
    }
    debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)
    // 遍历所有代理节点,判断是否需要触发副本机制
    preferredReplicasForTopicsByBrokers.foreach {
      case(leaderBroker, topicAndPartitionsForBroker) => {
        var imbalanceRatio: Double = 0
        var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
        inLock(controllerContext.controllerLock) {
          topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter {
            case (topicPartition, _) =>
                  controllerContext.partitionLeadershipInfo.contains(topicPartition)
&&
            controllerContext.partitionLeadershipInfo(topicPartition).
            leaderAndIsr.leader ! = leaderBroker
          }
          debug("topics not in preferred replica " + topicsNotInPreferredReplica)
          val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
          val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
          imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble /
            totalTopicPartitionsForBroker
          trace("leader imbalance ratio for broker %d is %f"
            .format(leaderBroker, imbalanceRatio))
        }
        // 检查均衡比率。如果大于阈值,则触发重新均衡
        if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble /
100)) {
          topicsNotInPreferredReplica.keys.foreach { topicPartition =>
            inLock(controllerContext.controllerLock) {
            // 当代理节点处于存活状态,并且没有分区被重新分配以及副本没有优先选择操作,才会触发检
查操作
            if (controllerContext.liveBrokerIds.contains(leaderBroker) &&
                controllerContext.partitionsBeingReassigned.isEmpty &&

controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty &&
                !deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic)
&&
                controllerContext.allTopics.contains(topicPartition.topic)) {
              onPreferredReplicaElection(Set(topicPartition), true)
            }
            }
          }
        }
      }
    }
    }
}

分区自动均衡是将分区的优先副本选择为Leader。如果分区副本是通过Kafka系统自动分配的,则会确保分区的副本被分配在不同的代理节点上。

提示:

优先副本是指排在最前面的副本。例如,"4":[2,1,0],分区索引值为4,副本分别是2、1、0,而副本节点ID为2的排在最前面,那么它会被优先选择为Leader。

在初始化控制器时,会判断每个代理节点的分区是否均衡,通过“leader.imbalance. per.broker.percentage”属性值来判断,默认值是10%。

判别主题分区不均衡方法是:每个代理节点上的分区Leader非优先副本的总数与该代理节点上分区总数的比值大于均衡阈值,则判断主题分区不均衡。具体计算公式如下。

# 计算不均衡的公式
比值 = 每个代理节点上分区Leader非优先副本总数 / 该代理节点上分区的总数

如果比值imbalanceRatio超过默认值的10%,则触发自动均衡操作。通过调用onPreferredReplicaElection()函数执行优先选择副本操作,让优先选择副本成为分区的Leader,这样就能实现分区自动均衡功能。

2.了解分区重新分配

通过Kafka Eagle监控工具可以直接查看主题(ip_login)的分区及副本详情,如图2-37所示。

图2-37 主题分区和副本详情

当客户端修改主题(ip_login)的分区时,会在Zookeeper系统的/admin节点下创建一个reassign_partitions子节点,分区和副本的分配策略会被写入/admin/reassign_partitions节点中。正常情况下,这个过程执行得很快,分区重新配置操作完成后,reassign_partitions节点会被自动删除。

/admin/reassign_partitions节点的数据发生更新会触发PartitionsReassignedListener监听器来完成一系列的检测处理。最后调用KafkaController.onPartitionReassignment()函数来完成分区的重新分配操作。具体实现见代码2-13。

代码2-13 实现重新分配分区

def onPartitionReassignment(topicAndPartition: TopicAndPartition,
        reassignedPartitionContext: ReassignedPartitionsContext) {
    val reassignedReplicas = reassignedPartitionContext.newReplicas
    if (! areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition,
    reassignedReplicas)) {
    info("New replicas %s for partition %s being "
        .format(reassignedReplicas.mkString(", "),
        topicAndPartition) +
      "reassigned not yet caught up with the leader")
    val newReplicasNotInOldReplicaList = reassignedReplicas.toSet
        -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
    val newAndOldReplicas = (reassignedPartitionContext.newReplicas
        ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
    // 1. 在Zookeeper上更新AR
    updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
    // 2. 发送LeaderAndIsr请求给每个副本
    updateLeaderEpochAndSendRequest(topicAndPartition,
        controllerContext.partitionReplicaAssignment(topicAndPartition),
      newAndOldReplicas.toSeq)
    // 3. 给重新分配的分区开启新的副本
    startNewReplicasForReassignedPartition(topicAndPartition,
        reassignedPartitionContext, newReplicasNotInOldReplicaList)
    info("Waiting for new replicas %s for partition %s being "
        .format(reassignedReplicas.mkString(", "), topicAndPartition) +
      "reassigned to catch up with the leader")
  } else {
    // 4. 等待所有的副本与Leader完成同步
    val                                oldReplicas                                =
controllerContext.partitionReplicaAssignment(topicAndPartition)
        .toSet -- reassignedReplicas.toSet
    // 5. 重新分配副本
    reassignedReplicas.foreach { replica =>
      replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(
        topicAndPartition.topic, topicAndPartition.partition,
        replica)), OnlineReplica)
    }
    // 6. 将AR设置为内存中RAR
    // 7. 发送一个新的Leader和新的AR
    moveReassignedPartitionLeaderIfRequired(topicAndPartition,
        reassignedPartitionContext)
    // 8. 清理离线或是不在isr列表中的副本
    // 9. 清理不存在的副本
    stopOldReplicasOfReassignedPartition(topicAndPartition,
        reassignedPartitionContext, oldReplicas)
    // 10. 在Zookeeper中更新AR
    updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
    // 11. 更新Zookeeper系统中的/admin/reassign_partitions节点
    removePartitionFromReassignedPartitions(topicAndPartition)
    info("Removed partition %s from the list of reassigned partitions in zookeeper"
        .format(topicAndPartition))
    controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
    // 12. 选举Leader后,副本和isr信息改变,重新发送元数据更新请求
    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq,
        Set(topicAndPartition))
    deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
  }
}

2.9 小结

学习本章内容,需要注意两个容易出现错误的地方:一个是配置实现各个主机之间免密码登录,另一个是准备好编译Kafka源代码的环境。

本章的主要内容正好帮助读者达到了该目的,读者可以参考本章内容,轻松搭建一个分布式的Kafka集群。