1 简介
kafka 依赖zookeeper,zookeeper 简称ZK,集群官方推荐是奇数个server,奇数个最少3个,这样坏一个还有两个,可以正常选举。(官方解释:对于复制模式,需要最少三个服务器,并且强烈建议您有奇数个服务器。如果只有两个服务器,那么,如果有一个服务器失败,则没有足够的机器形成多数法定人数。两个服务器本质上不如单个服务器稳定,因为有两个单点故障。)
注意事项:
1、 kafka的java环境要统一,小版本号要一致。
2、 如果安装kafka集群的服务器主机名不是localhost,需要在/etc/hosts文件里填写集群主机名和IP地址
2 搭建集群并配置集群
2.1 搭建基础环境
由于zookeeper要依赖java环境,并且之后的Elasticsearch也需要依赖java所以首先需要搭建java1.8的基础环境。JDK版本为8u172。
2.1.1 下载jdk1.8:
下载地址:
2.1.2 上传到服务器并解压
[root@kafka11 opt]#tar –zxvf jdk-8u172-linux-x64.tar.gz
2.1.3 修改环境变量
[root@kafka11 ~]# vim /etc/profile
#jdk1.8
export JAVA_HOME=/opt/jdk1.8.0_172
export JAVA_BIN=/opt/jdk1.8.0_172/bin
export PATH=$PATH:$JAVA_HOME/bin
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME JAVA_BIN PATH CLASSPATH
2.1.4 刷新配置文件
[root@kafka11 ~]# source /etc/profile
2.2 搭建zookeeper集群
版本为:zookeeper3.4.12,官网下地址:
https://archive.apache.org/dist/zookeeper/
2.2.1 配置zookeeper
配置文件模板在$zookeeperhome/conf/zoo_sample.cfg。复制此模版并且改名为zoo.cfg
[root@kafka11 conf]# cp zoo_sample.cfg zoo.cfg
[root@kafka11 conf]# vim zoo.cfg
lientCnxns=200
tickTime=2000
initLimit=5
syncLimit=2
dataDir=/data/zookeeper/data ##注意此目录需要手动创建
dataLogDir=/data/zookeeper/logs ##此目录需要手动创建
clientPort=2181
server.1=10.10.4.11:2888:3888
server.2=10.10.4.12:2888:3888
server.3=10.10.4.13:2888:3888
2.2.2 创建目录及myid
[root@kafka11 data]# mkdir –p /data/zookeeper/data
[root@kafka11 data]# mkdir –p /data/zookeeper/logs
[root@kafka11 ~]# cd /data/zookeeper/data/
[root@kafka11 data]# echo 1 > myid ##此次注意,要和配置文件里的server.*的*对应
2.2.3 其他节点配置
注意:myid一定要注意,要和配置文件的server后边对应的数字相同相同
2.2.4 Zookeeper命令
开启:[root@kafka12 ~]# /data/zookeeper-3.4.12/bin/zkServer.sh start
停止:[root@kafka12 ~]# /data/zookeeper-3.4.12/bin/zkServer.sh stop
查看状态:[root@kafka12 ~]# /data/zookeeper-3.4.12/bin/zkServer.sh status
使用jps命令查看zookeeper进程
#执行命令jps
20348 Jps
4233 QuorumPeerMain
如果没有jps命令可以看端口或者进程
2.3 搭建kafka集群
2.3.1 创建目录并下载安装软件
下载地址:版本号为2.11-0.11.0.3
https://archive.apache.org/dist/kafka/0.11.0.3/kafka_2.11-0.11.0.3.tgz
2.3.2 配置kafka(我的配置文件)
[root@kafka11 config]# cat server.properties
#每个server需要单独配置broker id,如果不配置系统会自动配置。
broker.id=1 ##集群其他服务器需要改动
delete.topic.enable=true
port=9092
host.name=10.10.4.11 ##集群其他服务器需要改动
listeners=PLAINTEXT://:9092
advertised.host.name=10.10.4.11 ##集群其他服务器需要改动
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs ##此目录要先创建,不会自动创建。
num.partitions=3 ##分区个数
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=10.10.4.11:2181,10.10.4.12:2181,10.10.4.13:2181
zookeeper.connection.timeout.ms=6000
session.timeout.ms=10s
max.poll.records=100
max.poll.interval.ms=600000
group.initial.rebalance.delay.ms=0
2.3.3 创建目录
[root@kafka11 config]# mkdir -p /data/kafka/logs
2.3.4 启动kafka集群
[root@kafka11 config]# cd /data/kafka_2.11-0.11.0.3/bin/
[root@kafka11 bin]# ./kafka-server-start.sh -daemon ../config/server.properties
-daemon参数是kafka后台运行的参数。
2.3.5 检查是否启动
#执行命令jps
20348 Jps
4233 QuorumPeerMain
18991 Kafka
如果没有jps命令可以看端口或者进程
3 Kafka相关命令
1、kafka启动命令:
./kafka-server-start.sh -daemon ../config/server.properties
2、kafka查看topic命令:
./kafka-topics.sh --zookeeper 10.10.4.11:2181 --list
3、查看指定topic的详细:
./kafka-topics.sh --zookeeper 10.10.4.11:2181 --topic test_property --describe
4、创建topic:
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-ref-7
--replication-factor:指定副本个数
--partitions:指定分区个数
--topic:指定topic名
5、写入数据:
./kafka-producer-perf-test.sh --num-records 10000000 --topic test-ref-9 --record-size 500 --throughput 100000 --producer-props bootstrap.servers=10.10.4.11:9092,10.10.4.12:9092,10.10.4.13:9092
--num-records:记录的条数
--topic:指定topic的名字
--record-size:一条记录大小
--throughput:吞吐大小(自己感觉无用,只是弹屏的多少)
--producer-props bootstrap.servers=10.10.4.11:9092,10.10.4.12:9092,10.10.4.13:9092:指定kafka集群
6、消费数据:
./kafka-consumer-perf-test.sh --messages 10000000 --threads 3 --zookeeper localhost:2181 --num-fetch-threads 3 --topic test-ref-8
--messages:指定消费条目数
--threads:指定线程数
--num-fetch-threads 3:指定消费人数
4 kafka监控工具kafka-manager1.3.3.18安装使用
4.1 环境要求
Kafka 0.8.. or 0.9.. or 0.10.. or 0.11..
Java 8+
sbt0.13.*
4.2 安装部署kafka-manager
4.2.1 安装sbt
[root@kafka11 ~]# curl https://bintray.com/sbt/rpm/rpm > bintray-sbt-rpm.repo
[root@kafka11 ~]# mv bintray-sbt-rpm.repo /etc/yum.repos.d/
[root@kafka11 ~]# yum install sbt –y
4.2.2 下载
由于下载编译网站国内访问效果不佳在网上找了个编译好的1.3.3.18版本的可以直接使用
链接:https://pan.baidu.com/s/1XRXFEmpS4f7KsT5zaaXfTw 提取码:pb5g
4.2.3 安装配置
unzip kafka-manager-1.3.3.18.zip
在解压后的conf目录中打开 application.conf文件,修改其中的zookeeper配置信息:
kafka-manager.zkhosts="hadoop4:2181,hadoop5:2181,hadoop6:2181"
##用逗号隔开
4.2.4 指定端口启动
nohup bin/kafka-manager -Dconfig.file=conf/application.conf -Dhttp.port 9001 &
或者修改配置文件
添加http.port=9001
4.3 使用manager
1、添加一个集群
点击【Cluster】>【Add Cluster】打开如下添加集群的配置界面:
输入集群的名字(如Kafka-1)和 Zookeeper 服务器地址(如10.10.4.11:2181)。选择最接近的Kafka版本(如0.10.1.0)。
注意:如果没有在 Kafka 中配置过 JMX_PORT,千万不要选择第一个复选框。
Enable JMX Polling。如果选择了该复选框,Kafka-manager 可能会无法启动。
2、新建主题
点击【Topic】>【Create】可以方便的创建并配置主题。如下显示。
5 Kafka监控工具KafkaOffsetMonitor配置及使用
5.1 安装及配置
1、正常直接去官网下载最新的jar包,上传到某服务器上,可以直接运行就可以运行起来了:https://github.com/quantifind/KafkaOffsetMonitor/releases
但是在使用过程中有可能会发现页面反应缓慢或者无法显示相应内容的情况。据说这是由于jar包中的某些js等文件需要连接到网络,或者需要翻墙导致的。网上找的一个修改版的KafkaOffsetMonitor对应jar包,可以完全在本地运行,经过测试效果不错。我已经上传到我的网盘:链接:https://pan.baidu.com/s/1sCZQl6K0zcPGeS54MTVoGw 提取码:njsd
2、上传解压
2、在解压目录里创建一个新的脚本
#!/bin/bash
java -cp KafkaOffsetMonitor-assembly-0.2.0.jar \
com.quantifind.kafka.offsetapp.OffsetGetterWeb \
--zk 10.10.4.11:2181,10.10.4.12:2181,10.10.4.13:2181 \
--port 8088 \
--refresh 5.minutes \
--retain 1.day
--zk localhost:2181——指的是zookeeper的IP和端口号
--port 8088——指的是KafkaOffsetMonitor访问的端口号,即监控kafka的端口号(当前系统没有用到的端口号)
--refresh 10.seconds——10秒一刷新
--retain 1.days——http://localhost:8089页面保持1天
3、然后运行脚本
之后会有提示让你访问:IP:8088
5.2 应用(只有在消费的时候可以看到数据)
1、 刚进入界面
2、有一个Visualizations选项卡,点击其中的Cluster Overview可以查看当前Kafka集群的Broker情况