kafka_2.13部署

kafka+zookeeper部署

一、环境部署

软件kafka_2.13-3.0.0 系统Centos7.9,JDK版本jdk1.8.0_311

1.1主机资源

主机公网Ip 主机内网Ip 主机名 角色
10.225.11.77 172.22.204.77 kafka01 Jdk+kafka+zk+kafka-manager
10.225.11.78 172.22.204.78 kafka02 Jdk+kafka+zk+kafka
10.225.11.79 172.22.204.79 kafka03 Jdk+kafka+zk+kafka

1.2关闭防火墙,selinux

systemctl stop firewalld && systemctl disable firewalld
sed -i 's/=enforcing/=disabled/g' /etc/selinux/config

1.3内核优化

net.ipv4.ip_forward = 0
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 0
net.ipv4.tcp_syncookies = 1
net.ipv4.tcp_fin_timeout = 30
net.ipv4.tcp_keepalive_time = 1200
net.ipv4.tcp_synack_retries = 3
net.ipv4.tcp_syn_retries = 3
net.ipv4.tcp_retrans_collapse = 0
net.ipv4.ip_local_port_range = 4000    65000
net.ipv4.tcp_max_syn_backlog = 8192
net.ipv4.tcp_max_tw_buckets = 50000
net.ipv4.tcp_timestamps = 0
net.core.somaxconn = 65535
vm.swappiness = 1

1.5主机命名

[root@172.22.204.77-Template01:~]$hostnamectl set-hostname kafka01
hostnamectl set-hostname kafka01
hostnamectl set-hostname kafka02
hostnamectl set-hostname kafka03

1.4资源下载

kafka.apache.org/downloads

  • Scala 2.12 - [kafka_2.12-3.0.0.tgz]
  • Scala 2.13 - [kafka_2.13-3.0.0.tgz]
    根据Scala分为2.12及2.13
JDk下载(jdk-8u311-linux-x64.tar.gz)
https://download.oracle.com/otn/java/jdk/8u311-b11/4d5417147a92418ea8b615e228bb6935/jdk-8u311-linux-x64.tar.gz?AuthParam=1635213117_07067dc7718de0ceb6e3c9e7202f6aec
Kafka下载
wget  https://www.apache.org/dyn/closer.cgi?path=/kafka/3.0.0/kafka_2.13-3.0.0.tgz

二、开始部署

先在kafka01上安装,再同步至其它两台机器

2.1,安装JDK

mkdir /opt/app
tar -zxvf jdk-8u311-linux-x64.tar.gz

配置环境变量

#####JDK
export JAVA_HOME=/opt/app/jdk1.8.0_311
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar 

2.2,验证JDK

[root@172.22.204.77-kafka01:app]$java -version
java version "1.8.0_311"
Java(TM) SE Runtime Environment (build 1.8.0_311-b11)
Java HotSpot(TM) 64-Bit Server VM (build 25.311-b11, mixed mode)

2.3,安装kafka


cd /opt/app/
tar -zxvf kafka_2.13-3.0.0.tgz 

2.4,配置zk

创建目录(同时在78和79服务器上面创建)

mkdir /opt/data/zookeeper -p
mkdir /opt/logs/kafka-logs -p
77myid
echo 1 >>/opt/data/zookeeper/myid
78myid
echo 2 >>/opt/data/zookeeper/myid
79myid
echo 3 >>/opt/data/zookeeper/myid

zookeeper的配置文件如下

 vi zookeeper.properties
dataDir=/opt/data/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false
initLimit=5
syncLimit=2
server.1=172.22.204.77:2888:3888
server.2=172.22.204.78:2888:3888
server.3=172.22.204.79:2888:3888
maxClientCnxns=30

2.5配置kafka

[root@172.22.204.77-kafka01:config]$cat server.properties 
broker.id=1
listeners=PLAINTEXT://172.22.204.77:9092
#advertised.listeners=PLAINTEXT://10.225.11.27:9092 #如需要配置公网,则打开。
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/logs/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.22.204.78:2181,172.22.204.77:2181,172.22.204.79:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true 
offsets.topic.replication.factor = 3
default.replication.factor=3

修改 consumer.properties
修改一下配置

[root@172.22.204.77-kafka01:config]$cat consumer.properties |grep -v ^#|grep -v ^$
bootstrap.servers=localhost:9092
group.id=Test-group

2.6同步至另两台服务器

cd /opt/app/
cp -r * 172.22.204.78:`pwd`
cp -r * 172.22.204.79:`pwd`
scp /etc/profile  172.22.204.78:/etc/profile
scp /etc/profile  172.22.204.79:/etc/profile

2.7修改另两台配置文件

78配置文件如下

broker.id=2   #(修改)
listeners=PLAINTEXT://172.22.204.78:9092   #(修改)
#advertised.listeners=PLAINTEXT://10.225.11.28:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/logs/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.22.204.78:2181,172.22.204.77:2181,172.22.204.79:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true 
offsets.topic.replication.factor = 3
default.replication.factor=3

79配置

[root@172.22.204.79-kafka03:config]$cat server.properties
broker.id=3
listeners=PLAINTEXT://172.22.204.79:9092
#advertised.listeners=PLAINTEXT://10.225.11.27:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/logs/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.22.204.78:2181,172.22.204.77:2181,172.22.204.79:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true 
offsets.topic.replication.factor = 3
default.replication.factor=3

2.8,相关配置说明


#broker的全局唯一编号,不能重复
broker.id=1
#用来监听链接的端口,producer或consumer将在此端口建立连接
listeners=PLAINTEXT://172.22.204.77:9092
advertised.listeners=PLAINTEXT://10.225.11.27:9092
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
#kafka消息存放的路径
log.dirs=/opt/logs/kafka-logs
#topic在当前broker上的分片个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=172.22.204.78:2181,172.22.204.77:2181,172.22.204.79:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
##删除topic需要server.properties中设置delete.topic.enable=true否则只是标记删除
delete.topic.enable=true 
offsets.topic.replication.factor = 3
default.replication.factor=3

2.9,启动zk、kafka

[root@172.22.204.77-kafka01:kafka_2.13-3.0.0]$cat start_zk.sh 
cd /opt/app/kafka_2.13-3.0.0;
nohup  bin/zookeeper-server-start.sh config/zookeeper.properties >/dev/null 2>&1 &
[root@172.22.204.77-kafka01:kafka_2.13-3.0.0]$cat start_kafka.sh 
cd /opt/app/kafka_2.13-3.0.0
nohup  bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

[root@172.22.204.77-kafka01:kafka_2.13-3.0.0]$cat stop_kafka.sh 
ps -ef |grep server.properties|grep -v grep |awk '{print $2}'|xargs kill -9

脚本启动zk及kafka

sh start_zk.sh
sh  start_kafka.sh

三,验证环境

3.1创建topic

[root@172.22.204.78-kafka02:bin]$./kafka-topics.sh  --create --bootstrap-server 172.22.204.78:9092  --replication-factor 1 --partitions 1 --topic topic-testgef
Created topic topic-testgef.

3.2,查看topic

[root@172.22.204.77-kafka01:kafka_2.13-3.0.0]$bin/kafka-topics.sh --list --bootstrap-server 172.22.204.77:9092
topic-testgef

3.3,创建生产者 producer

 bin/kafka-console-producer.sh --broker-list 172.22.204.77:9092 --topic topic-testgef

3.4,创建生产者 consumer

./kafka-console-consumer.sh --bootstrap-server 172.22.204.77:9092  --topic topic-testgef --from-beginning

3.5,如图

[root@172.22.204.77-kafka01:kafka_2.13-3.0.0]$ bin/kafka-console-producer.sh --broker-list 172.22.204.77:9092 --topic topic-testgef
>aaa
>fdsafdsa
>fdsadfasfa
>fdsadffasfa

[root@172.22.204.78-kafka02:bin]$ ./kafka-console-consumer.sh --bootstrap-server 172.22.204.77:9092  --topic topic-testgef --from-beginning
aaa
fdsafdsa
fdsadfasfa
fdsadffasfa

四、kafka-manager
暂略

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容