1 概述
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
1.1 Kafka的特性
- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
1.2 Kafka的使用场景
日志收集
消息系统
用户活动跟踪
运营指标
流式处理
2 集群安装配置
2.1 下载软件包
http://kafka.apache.org
上传软件包到Linux
2.2 解压并修改目录名
[root@huice101 /server/tools]# tar xf kafka_2.11-0.11.0.0.tgz -C /opt/module/
2.3 修改配置文件
[root@huice101 /]# vim /opt/module/kafka/config/server.properties
num.network.threads=3
delete.topic.enable=true
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=huice101:2181,huice102:2181,huice103:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
2.4 将 kafka文件夹复制到另外两个节点下
[root@huice101 /]# scp -rp /opt/module/kafka/ 10.0.0.102:/opt/module/
[root@huice101 /]# scp -rp /opt/module/kafka/ 10.0.0.103:/opt/module/
并修改每个节点对应的 server.properties 文件的 broker.id和listenrs
2.5启动服务
./kafka-server-start.sh ../config/server.properties &
或者
bin/kafka-server-start.sh config/server.properties &
3 Zookeeper+Kafka集群测试
(1)创建topic
/opt/module/kafka/bin/kafka-topics.sh --create --zookeeper huice101:2181, huice102:2181, huice103:2181 --replication-factor 3 --partitions 3 --topic test
(2)显示topic
/opt/module/kafka/bin/kafka-topics.sh --describe --zookeeper huice101:2181, huice102:2181, huice103:2181 --topic test
(3)列出topic
/opt/module/kafka/bin/kafka-topics.sh --list --zookeeper huice101:2181, huice102:2181, huice103:2181 test
(4)在master节点上测试生产信息
1 创建 consumer(消费者)
/opt/module/kafka/bin/kafka-console-producer.sh --broker-list huice101:9092, huice102:9092, huice103:9092 -topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper]. this is example ...
hello world
[2019-11-22 16:20:53,145] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: test-1. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
welcome to china
2 在huice103节点上 测试消费
/opt/module/kafka/bin/kafka-console-consumer.sh --zookeeper huice101:2181, huice102:2181, huice103:2181 -topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
welcome to china
hello world this is example ...
在 producer 里输入消息,consumer 中就会显示出同样的内容,表示消费成功
(5)删除 topic 和关闭服务
/opt/module/kafka/bin/kafka-topics.sh --delete --zookeeper huice101:2181, huice102:2181, huice103:2181 --topic test
/opt/module/kafka/bin/kafka-server-start.sh config/server.properties &
/opt/module/kafka/bin/kafka-server-stop.sh