1.准备
这里先安装单机版,集群版等把kakfa的api写完以后再去安装
要想安装kafka
则需要以下四个软件:
- java
- zookeeper
- kafka
- centos
其中关于java
的版本为1.8
,到oracle官网下载即可
其中zookeeper
去官网下载3.7.0
版本即可
其中kafka
到官网下载2.8.0
即可
至于centos
版本没有太大要求,centos7
即可
2.安装
2.0 centos
我这里用的虚拟机,ip地址为192.168.80.110
2.1 java
解压jdk的压缩包
tar xvf jdk-8u74-linux-x64.tar.gz -C /opt
mv /opt/jdk1.8.0_74 /opt/jdk
增加环境变量
vim /etc/profile
在文件中增加以下内容
export JAVA_HOME=/opt/jdk
export CLASSPATH=.
export PATH=$PATH:$JAVA_HOME/bin
让文件生效
source /etc/profile
查看是否配置成功
jps
2.2 zookeeper
2.2.1 安装
创建用户方便后期管理
创建组
groupadd kafka
创建用户
useradd -g kafka -s /bin/bash -d /home/kafka -m kafka
创建目录用来存储zookeeper
数据和日志
mkdir -p /var/data/zookeeper
mkdir -p /var/log/zookeeper
改变目录的拥有者和组
chown -R kafka:kafka /var/data/zookeeper
chown -R kafka:kafka /var/log/zookeeper
解压zookeeper
安装包
tar xvf apache-zookeeper-3.7.0-bin.tar.gz -C /opt/
mv /opt/apache-zookeeper-3.7.0-bin/ /opt/zookeeper
进入conf
目录修改配置文件
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
vim /opt/zookeeper/conf/zoo.cfg
修改的内容如下
#客户端与服务器之间维持心跳的间隔,每隔2000毫秒就会客户端就会像服务器发送一个心跳
tickTime=2000
# 允许从节点连接并同步到主节点的初始化连接时间
# 这个值是tickTime的倍数,所以initlimit是10* 2OOOms,也就是20s
initLimit=10
# 表示允许从节点与主节点处于不同步状态的时间上限,如果从节点在于主节点在这个时间内,没有完成同步,那么这个从节点就会被舍弃
# 这个值也是tickTime的倍数,所以syncLimit是 5 * 2000ms 也就是10s
syncLimit=5
# 存储内存中数据快照的位置
dataDir=/var/data/zookeeper
# 客户的端连接的端口
clientPort=2181
修改环境变量
vim /etc/profile
追加内容如下
export ZK_HOME=/opt/zookeeper
export PATH=$PATH:$ZK_HOME/bin
让配置文件生效
source /etc/profile
2.2.2 服务
把zookeeper
交给systemd
去管理,这样管理更加方便
在/etc/systemd/system
新建zookeeper.service
touch zookeeper.service
chmod u+x,o+x,g+x zookeeper.service
文件内容如下:
[Unit]
# 服务描述
Description=zookeeper.service
#
# 在网络服务启动后运行
After=network.target
#
[Service]
# 指定以什么用户去启动
User=kafka
# 指定用户组
Group=kafka
# Type 必须为 forking 否则无法启动,
Type=forking
#指定日志目录
Environment=ZOO_LOG_DIR=/var/log/zookeeper
# 指定要依赖的环境
Environment=JAVA_HOME=/opt/jdk
#
# 启动命令
ExecStart=/opt/zookeeper/bin/zkServer.sh start
#
# 停止命令
ExecStop=/opt/zookeeper/bin/zkServer.sh stop
#
# 重启命令
ExecReload=/opt/zookeeper/bin/zkServer.sh restart
# 启动失败后重启
Restart=on-failure
# 每次尝试重启间隔60秒
StartLimitInterval=60
# 最终尝试重启3000次
StartLimitBurst=3000
#
[Install]
WantedBy=multi-user.target
zookeeper命令操作如下
# 配置生效
systemctl daemon-reload
# 启动
systemctl start zookeeper
# 查看启动状态
systemctl status zookeeper
# 查看启动日志
journalctl -xe
# 重启
systemctl reload zookeeper
# 关闭
systemctl stop zookeeper
# 开启自启动
systemctl enable zookeeper
注意:启动之前关闭防火墙或者将
2181
端口加入防火墙
# 关闭防火强
systemctl stop firewalld
# 将端口加入防火墙
firewall-cmd --add-port=2181/tcp --permanent
firewall-cmd --reload
2.3 kafka
2.3.1 安装
解压安装包
tar xvf kafka_2.12-2.8.0.tgz -C /opt/
mv /opt/kafka_2.12-2.8.0/ /opt/kafka
chown -R kafka:kafka /opt/kafka/
创建目录用来存放数据
mkdir /var/data/kafka
chown -R kafka:kafka /var/data/kafka/
修改配置文件内容
vim /opt/kafka/config/server.properties
修改内容如下
# borker 的id,必须为一个正整数,默认值是0
# 如果是集群环境这个值在整个集群中必须是唯一的
broker.id=110
# kafka对外提供服务的端口默认是9092
listeners=PLAINTEXT://192.168.80.110:9092
advertised.listeners=PLAINTEXT://192.168.80.110:9092
# broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads=3
# broker处理磁盘IO 的线程数 ,数值应该大于你的硬盘数
num.io.threads=8
# socket的发送缓冲区,socket的调优参数SO_SNDBUFF
socket.send.buffer.bytes=102400
# socket的接受缓冲区,socket的调优参数SO_RCVBUFF
socket.receive.buffer.bytes=102400
# socket请求的最大数值,防止serverOOM,message.max.bytes必然要小于socket.request.max.bytes,会被topic创建时的指定参数覆盖
socket.request.max.bytes=104857600
# kafka数据的存放地址,多个地址的话用逗号分割
# 如果指定了多个地址,那么broker就会根据"最少使用原则"
# 把同一个分区数据片段放在同一个路径下
log.dirs=/var/data/kafka
# 每个topic的分区个数,若是在topic创建时候没有指定的话 会被topic创建时的指定参数覆盖
num.partitions=1
# Kafka会使用可配置的线程池来处理日志片段:
# 服务器正常启动,用于打开每个分区的日志片段
# 服务器崩愤后重启,用于检查和截短每个分区的日志片段
# 服务器正常关闭,用于关闭日志片段。
# 默认情况下,每个日志目录只使用一个线程。
# 因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到并行操作
# 特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间
# 设置此参数时需要注意,所配置的数字对应的是log.dirs指定的单个日志目录。
# 也就是说,如果num.recovery.threads.per.data.dir被设为8,井且log.dirs指定了3个路径,那么总共需要24个线程。
num.recovery.threads.per.data.dir=1
# 默认情况下,Kafka会在如下几种情形下自动创建主题
# 消费者进行消费而topic不存在时
# 当一个生产者开始往主题写入消息时
# 当任意一个客户端向主题发送元数据请求时
# 当然很多时候自动创建topic并非是预期的,根据Kafka协议,如果一个主题不先被创建,根本无法知道它是否已经存在
# 当然如果想要手动创建主题,那么就可以把这个参数设为false
# auto.create.topics.enable=false
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
# 数据默认保存时间 168小时 7天
log.retention.hours=168
# topic的分区是以一堆segment文件存储的,这个控制每个segment的大小
# 大小达到指定的上限(默认是lGB)时,当前`segment`就会被关闭
# 一个新的`segment`被打开
log.segment.bytes=1073741824
# 文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略
log.retention.check.interval.ms=300000
# zookeeper 连接地址
zookeeper.connect=localhost:2181
# 连接超时时间
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
关闭防火墙或者将9092
端口加入防火墙
# 关闭防火强
systemctl stop firewalld
# 将端口加入防火墙
firewall-cmd --add-port=9092/tcp --permanent
firewall-cmd --reload
配置环境变量
vim /etc/profile
在文件末尾追加以下内容
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin
让文件生效
source /etc/profile
2.3.2 服务
在/etc/systemd/system
新建kafka.service
文件
touch kafka.service
chmod u+x,o+x,g+x kafka.service
内容如下
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper.service
[Service]
Type=forking
User=kafka
Group=kafka
Environment=JAVA_HOME=/opt/jdk
ExecStart=/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
# 启动失败后重启
Restart=on-failure
# 每次尝试重启间隔60秒
StartLimitInterval=60
# 最终尝试重启3000次
StartLimitBurst=3000
[Install]
WantedBy=multi-user.target
kafka命令操作如下
# 配置生效
systemctl daemon-reload
# 启动
systemctl start kafka
# 查看启动状态
systemctl status kafka
# 查看启动日志
journalctl -xe
# 关闭
systemctl stop kafka
# 开启自启动
systemctl enable kafka
3. 监控
3.1 JMX
kafka
创建成功以后,需要对kafka
进行监控,例如topic
信息等等,因此可以安装kafka-eagle
去进行监控
由于kafka-eagle
是基于kafka
的jmx
监控的,因此需要先开启kafka
的jmx
监控
打开vim /opt/kafka/bin/kafka-server-start.sh
进行修改
在这个此处加入以下代码
export JMX_PORT=9999
打开vim /opt/kafka/bin/kafka-run-class.sh
进行修改
在此处将代码修改为以下内容
if [ -z "$KAFKA_JMX_OPTS" ]; then
KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote.rmi.port=$JMX_PORT -Djava.rmi.server.hostname=192.168.80.110 -Dcom.sun.anagement.jmxremote.local.only=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "
fi
修改完成后重新启动kafka
systemctl restart kafka
3.2 eagle
下载:https://github.com/smartloli/kafka-eagle
这里下载的版本为2.0.0
解压安装包
tar -xvf kafka-eagle-bin-2.0.0.tar.gz -C .
cd kafka-eagle-bin-2.0.0
tar -zxvf kafka-eagle-web-2.0.0-bin.tar.gz -C /opt
mv /opt/kafka-eagle-web-2.0.0/ /opt/kafka-eagle
chown -R kafka:kafka /opt/kafka-eagle
修改配置文件vim /opt/kafka-eagle/conf/system-config.properties
内容如下:
# 集群列表 目前就一个broker,一个就足够了
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
######################################
# zookeeper enable acl
######################################
cluster1.zk.acl.enable=false
######################################
# broker size online list
######################################
cluster1.kafka.eagle.broker.size=20
######################################
# zk client thread limit
######################################
kafka.zk.limit.size=25
######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048
######################################
# kafka offset storage
######################################
cluster1.kafka.eagle.offset.storage=kafka
######################################
# kafka metrics, 15 days by default
######################################
kafka.eagle.metrics.charts=true
kafka.eagle.metrics.retain=15
######################################
# kafka sql topic records max
######################################
kafka.eagle.sql.topic.records.max=5000
kafka.eagle.sql.fix.error=true
######################################
# delete kafka topic token
######################################
kafka.eagle.topic.token=keadmin
######################################
# kafka sasl authenticate
######################################
cluster1.kafka.eagle.sasl.enable=false
######################################
# kafka ssl authenticate
######################################
cluster1.kafka.eagle.ssl.enable=false
######################################
# kafka sqlite jdbc driver address
######################################
kafka.eagle.driver=org.sqlite.JDBC
kafka.eagle.url=jdbc:sqlite:/opt/kafka-eagle/db/ke.db
kafka.eagle.username=root
kafka.eagle.password=www.kafka-eagle.org
配置环境变量 vim /etc/profile
追加以下内容
export KE_HOME=/opt/kafka-eagle
export PATH=$PATH:$KE_HOME/bin
让配置生效
source /etc/profile
将端口添加到防火墙或者关闭防火墙
# 关闭防火墙
systemctl stop firewalld
# 添加端口
firewall-cmd --add-port=8048/tcp --permanent
firewall-cmd --reload
创建服务文件vim /etc/systemd/system/kafka-eagle.service
,内容如下:
[Unit]
Description=Kafka Eagle
After=kafka.service
[Service]
Environment=KE_HOME=/opt/kafka-eagle
Environment=JAVA_HOME=/opt/jdk
User=kafka
Group=kafka
Type=forking
ExecStart=/opt/kafka-eagle/bin/ke.sh start
ExecReload=/opt/kafka-eagle/bin/ke.sh restart
ExecStop=/opt/kafka-eagle/bin/ke.sh stop
#启动失败后重启
Restart=on-failure
#每次尝试重启间隔60秒
StartLimitInterval=60
#最终尝试重启50次
StartLimitBurst=50
[Install]
WantedBy=multi-user.target
kafka-eagle
命令操作如下
# 配置生效
systemctl daemon-reload
# 启动
systemctl start kafka-eagle
# 查看启动状态
systemctl status kafka-eagle
# 查看启动日志
journalctl -xe
# 关闭
systemctl stop kafka-eagle
# 开启自启动
systemctl enable kafka-eagle
启动成功后,访问地址192.168.80.110:8048
地址,即可看到监控网页
账号密码为 admin/123456
,登录成功以后即可看到监控kafka的详情
至于网页内容操作,先放一放,等后面讲完api的使用再回头细看该网页