架构图:
一台机器上安装3个kafka节点,然后把这三个kakfa节点注册到同一台zk上。
zk版本: 3.4.13
zookeeper-3.4.13.tar.gz
kafka版本 : 2.11-2.1.0.0
一、安装zk
1.从zk官网上下载3.4.13的zk
http://apache.mirror.colo-serv.net/zookeeper/zookeeper-3.4.13/
2.创建zk的安装目录
2.把zk的安装包上传到服务器上
3.解压
4.修改配置文件
4.1 在zookeeper目录下,创建俩个目录,一个是:data,另一个用于日志logs
mkdir data
mkdir logs
4.2.复制出来一份配置文件
cp zoo_sample.cfg zoo.cfg
4.3.修改zoo.cfg配置文件
修改zoo.cfg配置文件里面的内容
#指定zk的基准时间间隔
tickTime=2000
#指定zk存放数据文件夹的目录为 /usr/local/zookeeper/data
dataDir= /usr/local/zookeeper/data
#指定zk存放日志文件夹的目录为:/usr/local/zookeeper/logs
dataLogDir=/usr/local/zookeeper/logs
#指定客户端访问zk的端口号为2181
clientPort=2181
4.4.保存配置文件
5.修改系统环境变量,把zk的启动命令加入到系统环境变量中
5.1 编辑/etc/profile文件
vim /etc/profile
把zk相关的启动命令放入profile中:
#add zk config to profile
#zk dir
export ZK_HOME=/usr/local/zookeeper/zookeeper-3.4.13
#append zk bin to system path
export PATH=${ZK_HOME}/bin:$PATH
5.2 使profile配置生效
source /etc/profile
6.启动zk
zkServer.sh start
此时,即表明zk已经成功启动。
我们还可以使用zkServer.sh status命令来查看zk的运行状态。
二、安装kafka
1.下载kakfa安装包
kafka_2.11-2.1.0
kafka_2.11-2.1.0.tgz
2.上传至服务器
3.在/usr/local下创建kafka的安装目录
创建kafka的安装目录:
mkdir kafka
4.解压到指定路径下
解压至/usr/local/kafka目录下:
[root@iz2ze38jexm12odh3o3k5ez ~]# tar -zxvf kafka_2.11-2.1.0.tgz -C /usr/local/kafka/
5.在kafka目录下创建一个目录kafka-logs用于kafka存放文件
mkdir kafka-logs
6.修改kafka配置
kafka的几个重要配置:
broker.id #broker实例的id
log.dirs # kafka数据的存放地址
port #broker server的服务端口号
zookeeper.connect #zookeeper集群的地址
#zk相关配置
zookeeper.connect = localhost:2181
zookeeper.session.timeout.ms=6000
zookeeper.connection.timeout.ms =6000
zookeeper.sync.time.ms =2000
broker.id=0 # 每一个broker在集群中的唯一表示,要求是正数。当该服务器的IP地址发生改变时,broker.id没有变化,则不会影响 consumers的消息情况
num.network.threads=2 #broker处理消息的最大线程数,一般情况下数量为cpu核数
num.io.threads=8 #broker处理磁盘IO的线程数,数值为cpu核数2倍
socket.send.buffer.bytes=1048576 #socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.receive.buffer.bytes=1048576 #socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.request.max.bytes=104857600 #socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于 socket.request.max.bytes,会被topic创建时的指定参数覆盖
log.dirs=/usr/local/kafka/kafka-logs #kafka数据的存放地址,多个地址的话用逗号分割,多个目录分布在不同磁盘上可以提高读写性 能
num.partitions=2 #每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
log.retention.hours=168
log.segment.bytes=536870912 #topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes。-1没有大小 限log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖
log.retention.check.interval.ms=60000 #文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.cleaner.enable=false #是否开启日志清理
#
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=1000000 #ZooKeeper的最大超时时间,就是心跳的间隔,若是没有反映,那么认为已经死了,不易过 大
6.1 打开server.properties配置文件
进行如下配置:
配置zk信息:
保存配置。
7.启动kafka
利用单节点部署多个broker。不同的broker不同的id,监听端口以及日志目录
这里我们复制出来三份配置,broker.id分别设置为1,2,3,端口号为:9092,9093,9094
日志目录: kafka-logs、 kafka-logs2、 kafka-logs3
依次启动三个broker:
bin/kafka-server-start.sh -daemon config/server.properties &