概念
Kafka:是一个分布式消息系统,由linkedin使用scala编写,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。具有高水平扩展和高吞吐量。
Kafka和其他主流分布式消息系统的对比
ActiveMQ | RabbitMQ | Kafka | |
---|---|---|---|
开发语言 | Java | Erlang | Java |
支持协议 | OpenWire、STOMP、REST、XMP、AMQP | AMQP | AMQP |
事物 | 支持 | 支持 | 不支持 |
集群 | 支持 | 支持 | 支持 |
负载均衡 | 支持 | 支持 | 支持 |
动态扩容 | 不支持 | 不支持 | 支持(zk) |
础知识
- 消费者:(Consumer):从消息队列中请求消息的客户端应用程序
- 生产者:(Producer) :向broker发布消息的应用程序
- AMQP服务端(broker):用来接收生产者发送的消息并将这些消息路由给服务器中的队列,便于fafka将生产者发送的消息,动态的添加到磁盘并给每一条消息一个偏移量,所以对于kafka一个broker就是一个应用程序的实例
- 主题(Topic):一个主题类似新闻中的体育、娱乐、教育等分类概念,在实际工程中通常一个业务一个主题
-分区(Partition):一个Topic中的消息数据按照多个分区组织,分区是kafka消息队列组织的最小单位,一个分区可以看作是一个FIFO( First Input First Output的缩写,先入先出队列)的队列 - 每一个分区都可以有多个副本,以防止数据的丢失
- 某一个分区中的数据如果需要更新,都必须通过该分区所有副本中的leader来更新
- 消费者可以分组,比如有两个消费者组A和B,共同消费一个topic:order_info,A和B所消费的消息不会重复,比如 order_info 中有100个消息,每个消息有一个id,编号从0-99,那么,如果A组消费0-49号,B组就消费50-99号
- 消费者在具体消费某个topic中的消息时,可以指定起始偏移量
kafka分区是提高kafka性能的关键所在,当你发现你的集群性能不高时,常用手段就是增加Topic的分区,分区里面的消息是按照从新到老的顺序进行组织,
消费者从队列头订阅消息,生产者从队列尾添加消息
Kafka架构
生产者生产消息、kafka集群、消费者获取消息这样一种架构,如下图:
image.png
kafka集群中的消息,是通过Topic(主题)来进行组织的,如下图:
image.png
工作图
image.png
Kafka集群搭建
Kafka集群是把状态保存在Zookeeper中的,首先要搭建Zookeeper集群
搭建Zookeeper集群
这里三台服务器分别是
192.1682.158
192.1682.152
192.1682.150
在三台服务器上分别安装kafka
kafka官网下载地址 http://kafka.apache.org/downloads
wget http://mirrors.hust.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz
tar -zxvf kafka_2.12-2.0.0.tgz -C /usr/local/
mv /usr/local/kafka_2.12-2.0.0/ /usr/local/kafka
配置文件说明
broker.id=0 #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
#listeners=PLAINTEXT://192.168.2.152:9092 #当前kafka对外提供服务的端口默认是9092
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880 #消息保存的最大值5M
default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880 #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
zookeeper.connect=192.168.2.152:2181,192.168.2.150:2181,192.168.2.158:2181#设置zookeeper的连接端口
主要修改配置这几个地方
#每台服务器的broker.id都不能相同
broker.id=0
listeners=PLAINTEXT://:9092
zookeeper.connect=192.168.2.152:2181,192.168.2.150:2181,192.168.2.158:2181
三台服务器分别启动Kafka
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
检查服务是否启动
jps
image.png
在kafka集群中创建一个topic:
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.2.152:2181 --replication-factor 3 --partitions 1 --topic order
解释:
--replication-factor 3 #复制两份
--partitions 1 #创建1个分区
--topic #主题为order
查看一下自己创建的topic:
/usr/local/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.2.152:2181
image.png
在192.168.2.152机器上创建一个producer,发布者
/usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.2.152:9092 --topic order
在192.168.2.150与192.168.2.158机器上分别创建一个consumer,消费者者
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.150:9092 --topic order --from-beginning
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.2.158:9092 --topic order --from-beginning
测试
在发布者机器上输入内容
image.png
在两台消费者机器上可以看到
image.png
并且在zookeeper可以到kafka的一些情况
zkCli.sh -server 192.168.2.152:2181
image.png
上面的显示结果中:只有zookeeper是,zookeeper原生的,其他都是Kafka创建的
标注一个重要的
get /brokers/ids/0
image.png
删除topic命令
bin/kafka-topics.sh --delete --zookeeper ip:port --topic order
查看某个Topic的详情
/usr/local/kafka/bin/kafka-topics.sh --topic order --describe --zookeeper 192.168.2.152:2181
image.png
- PartitionCount 分区数量
- ReplicationFactor 复制因子数量
- leader 是在给出的所有partitons中负责读写的节点,每个节点都有可能成为leader
- replicas 显示给定partiton所有副本所存储节点的节点列表,不管该节点是否是leader或者是否存活。
- isr 副本都已同步的的节点集合,这个集合中的所有节点都是存活状态,并且跟leader同步