安装
先去Kafka官网下载,最好与安装的Scala版本一致。点进去,复制镜像链接,然后:
cd /usr/local
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -zxvf kafka_2.12-2.3.0.tgz
其实就已经好了,也可以添加下PATH。新版本的Kafka集成了Zookeeper,无需独立下载。
运行测试
主要参考官网的Kafka的quickstart,可以装个tmux来方便操作。
yum install tmux
然后建tmux session,tmux的操作这里就不详述了,启动zookeeper。
cd /usr/local/kafka_2.12-2.3.0
tmux new -s zk
bin/zookeeper-server-start.sh config/zookeeper.properties
然后ctrl+b > d 退出tmux,继续建一个tmux session,启动kafka。
tmux new -s kafka
bin/kafka-server-start.sh config/server.properties
然后ctrl+b > d 退出tmux,创建一个test的topic。
bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
bin/kafka-topics.sh --list --bootstrap-server localhost:9092 # 查看下
然后建一个tmux session,用来生成消息。
tmux new -s producer
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test # 发两条消息试试
> hello world!
> hello kafka
然后ctrl+b > d 退出tmux,建一个tmux session,用来消费消息。
tmux new -s consumer
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
可以接收到刚才的消息,并持续接受。
配置集群
先配置zk,vim config/zookeeper.properties
,每个节点的参数都一样。
dataDir=/data/log/zookeeper
dataLogDir=/data/log/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
tickTime=2000
initLimit=5
syncLimit=5
server.0=10.14.11.111:2888:3888
server.1=10.14.11.112:2888:3888
server.2=10.14.11.113:2888:3888
再配置Kafka,vim config/server.properties
,
broker.id=0 # 另外两个节点可以是1 2
listeners=PLAINTEXT://10.14.11.111:9092 # 根据节点不同
# 下面的配置相同。
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# A comma separated list of directories under which to store log files
log.dirs=/data/log/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
delete.topic.enable=true
zookeeper.connect=10.14.11.111:2181,10.14.11.112:2181,10.14.11.113:2181
zookeeper.connection.timeout.ms=6000
配置完毕,启动时需要先启动各个节点的zk:
bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
然后再启动各个节点的Kafka :
bin/kafka-server-start.sh ./config/server.properties # 或者
bin/kafka-server-start.sh -daemon ./config/server.properties &
运行Spark Streaming + Kafka + Python
跟SparkStreaming的连接,可以参考这里运行Spark Streaming + Kafka + Python
可视化管理Kafka-manager
去https://github.com/yahoo/kafka-manager下载,找一个release版本
cd /usr/local
wget https://github.com/yahoo/kafka-manager/archive/2.0.0.2.tar.gz
tar -zxvf 2.0.0.2.tar.gz
cd kafka-manager-2.0.0.2
./sbt clean dist # 编译,时间很长,1小时左右
cd ..
mv kafka-manager-2.0.0.2 kafka-manager-2.0.0.2-1
mv kafka-manager-2.0.0.2-1/target/universal/kafka-manager-2.0.0.2.zip ./
unzip kafka-manager-2.0.0.2.zip # 解压编译出来的压缩包
cd kafka-manager-2.0.0.2
vim conf/application.conf
然后修改zookeeper配置:
然后启动:
bin/kafka-manager
就可以在浏览器9000端口查看(记得打开端口的防火墙、安全组策略什么的),简单配置下即可查看。
够了。