kafka笔记
一、kafak简介
1、消息队列
- 消息队列:用于存放消息的组件
- 程序员可以将消息放入到队列中,也可以从消息队列中获取消息
- 很多时候消息队列不是一个永久性的存储,是作为临时存储存在的
- 消息 队列中间件:消息队列的组件,例如——kafka,RabbitMQ,ActiveMQ,RocketMQ等
2、kafka的应用场景
- 异步处理
- 可以将一些比较耗时的操作放在其他系统中,通过消息队列将需要进行处理的消息进行存储,其他系统可以消费消息队列中的数据
- 比较常见的:发送短信验证码、发送邮件
- 系统解耦
- 原先一个微服务是通过接口(HTTP)调用另一个微服务,这时候耦合很严重,只要接口发生变化就会导致系统不可用
- 使用消息队列可以将系统进行解耦,现在一个微服务可以将消息放入到消息队列中,另一个微服务可以从消息队列中把消息取出来进行处理。进行系统解耦
- 流量削峰
- 因为消息队列是低延迟、高可靠、高吞吐的,可以应对大量并发
- 日志处理
- 可以使用消息队列作为临时存储,或者一种通信管道
3、消息队列的两种模型
- 生产者、消费者模型
- 生产者负责将消息生产到MQ中
- 消费者负责从MQ中获取消息
- 生产者和消费者是解耦的,可能是生产者一个程序、消费者是另外一个程序
- 消息队列的模式
- 点对点:一个消费者消费一个消息
- 发布订阅:多个消费者消费一个消息
二、操作kafka
启动kafka服务
./bin/kafka-server-start.sh ./config/server.properties
创建一个topic
.bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
查看topic
./bin/kafka-topics.sh --list --zookeeper localhost:2181
删除主题
./bin/kafka-topics.sh --delete topic test --zookeeper localhost:2181
启动一个生产者,生产消息
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic tets
启动一个消费者,消费消息
./bin/kafka-console-consumer.sh --bookstrap-server localhost:9092 --topic test --from-beginning
设置多代理集群
- 伪集群方式
cp ./config/server.properties ./config/server-1.properties
cp ./config/server.properties ./config/server-2.properties
# 编辑新文件,设置如下属性
vim ./config/server-1.properties
broker.id=1
listeners=PLAINTEXT://9093
log.dir=/tmp/kafka-logs-1
vim ./config/server-2.properties
broker.id=2
listeners=PLAINTEXT://9092
log.dir=/tmp/kafka-logs-2
# 启动这两个新的节点
./bin/kafka-server-start.sh ./config/server-1.properties &
./bin/kafka-server-start.sh ./config/server-2.properties &
- 伪集群生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092,localhost:9093,localhost:9094 --topic test
- 伪集群消费者
# 0.9之前的打开方式
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 -topic test --from-beginning
# 0.9之后的打开方式
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
停止kafka服务
./bin/kafka-server-stop.sh
三、操作zookeeper
1、zookeeper服务命令
-
启动zk服务
sh ./bin/zkServer.sh start
-
查看zk服务状态
sh ./bin/zkServer.sh status
-
停止zk服务
sh ./bin/zkServer.sh stop
-
重启zk服务
sh ./bin/zkServer.sh restart
2、zk客户端命令
-
连接到zk服务
zkCli.sh -server 127.0.0.1:2181
-
连接到zk服务后命令行工具的一些简单操作
- 查看当前zookeeper中所包含的内容:ls /
- 查看当前节点数据并能看到更新次数等数据:ls2 /
- 创建文件,并设置初始内容:create /zk "test" —— 创建一个新的znode节点’zk‘以及与它关联的字符串
- 获取文件内容:get /zk
- 修改文件内容:set /zk
- 删除文件:delete /zk
- 退出客户端:quit
- 帮助命令:help
四、kafka中的重要概念
1、broker
- broker服务器进程,生产者、消费者都要连接broker
- 一个集群由多个broker组成,共同实现kafka集群的负载均衡、容错
2、producer
- 生产者,负责生产消息
3、consumer
- 消费者,负责消费消息
4、topic
- 主题:一个kafka集群中,可以包含多个topic。一个topic可以包含多个分区
- 是一个逻辑结构,生产、消费消息都需要指定topic
5、partition
- kafka集群的分布式就是由分区来实现的。一个topic中的消息可以分布在topic中的不同partition中
6、replica
- 副本,实现kafka集群的容错,实现partition的容错 ,一个topic至少包含大于1个的副本
7、consumer group
- 消费者组,一个消费者组中的消费者可以共同消费topic中的分区数据。每一个消费者组都有一个唯一的名字。配置group.id一样的消费者是属于同一组中
- 一个消费者组中可以包含多个消费者,共同来消费topic中的数据
- 一个topic中如果只有一个分区,那么这个分区只能被某个组中的一个消费者消费
- 有多少个分区,那么就可以被同一个组内的多个消费者消费
8、offset
- 偏移量,相对消费者、partition来说,可以通过offset来拉取数据
9、幂等性
-
生产者消息重复问题
- kafka生产者生产消息到partition,如果直接发送消息,kafka会将消息保存到分区中,当kafka返回一个ack给生产者,表示当前操作是否成功,是否已经保存了这条消息。如果ack响应过程失败了,此时生产者会重试,继续发送没有发送成功的消息,kafka又会保存一条一模一样的消息
-
在kafka中可以开启幂等性
当kafka的生产者生产消息时,会增加一个pid(生产者唯一编号)和sequence number(针对消息的一个递增序列)
发送消息,会连着pid和sequence number一块发送
kafka接收到消息,会将消息和pid、sequence number一并保存下来
如果ack响应失败,生产者重试,再次发送消息时,kafka会根据pid、sequence number是否需要再保存一条消息
判断条件:生产者发送过来的sequence number是否小于等于partition中消息对应的sequence
kafka中的分区副本机制
1、生产者的分区写入策略
- 轮询策略,消息会均匀地分部到每个partition
- 写入消息的时候,key为null的时候,默认使用的是轮询策略
- 随机策略(不使用)
- 按key写入策略,key.hash()%分区的数量
- 自定义分区策略(类似于MapReduce指定分区)
乱序问题
- 在kafka中生产者是有写入策略,如果topoic有多个分区,就会将数据分散在不同的partition中存储
- 当partition数量大于1的时候,数据(消息)会打散分布在不同的partition中
- 如果只有一个分区,消息是有序的
2、消费组Consumer Group Rebalance机制
- 再均衡:在某些情况下,消费者组中的消费者消费的分区会产生变化,会导致消费者分配不均匀(例如:有两个消费者消费3个,因为某个partition崩溃了,还有一个消费者当前没有分区要消费),kafka consumer group就会启用rebalance机制,重新平衡这个consumer group内的消费者消费的分区分配
- 触发时机
- 消费者数量发生变化
- 某个消费者crash
- 新增消费者
- topic的数量发生变化
- 某个topic被删除
- partition的数量发生变化
- 删除partition
- 新增partition
- 消费者数量发生变化
- 不良影响
- 发生rebalance,所有的consumer将不再工作,共同来参与再均衡,直到每个消费者都已经被成功分配所需要消费的分区为止(rebalance结束)
3、消费者的分区分配策略
分区分配策略:保障每个消费者尽量能够均衡地消费分区的数据,不能出现某个消费者消费分区的数量特别多,某个消费者消费的分区特别少
- Range分配策略(范围分配策略):kafka默认的分配策略
- n:分区的数量/消费者的数量
- m:分区的数量%消费者的数量
- 前m个消费者消费m+1个分区
- 剩余的消费者消费n个分区
- RoundRobin分配策略(轮询分配策略)
- 消费者挨个分配消费的分区
- Striky粘性分配策略
- 在没有发生rebalance跟轮询分配策略是一致的
- 发生了rebalance,轮询分配策略是重新走一遍轮询分配的过程 。而粘性会保证跟上一次的尽量一致,只是将新的需要分配的分区,均匀的分配到现有可用的消费者中即可
- 减少上下文的切换
4、副本的ACK机制
producer是不断地往kafka中写入数据,写入数据会有一个返回结果,表示写入成功。这里对应有一个ACKs的配置
- acks=0:生产者只管写入,不管是否写入成功,可能会有数据丢失。性能是最好的
- acks=1:生产者会等到leader分区写入成功后,返回成功,接着发送下一条
- acks=-1/all:确保消息写入到leader分区、还确保消息写入到对应副本都成功后,接着发送下一条,性能是最差的
根据业务情况来选择ack机制,如果要求高性能,对部分数据丢失感觉影响不大,可以选择0/1.如果要求数据一定不能够丢失的就得配置为-1/all
分区中是有leader和follower的感念,为了确保消费者消费的数据是一致的,只能从分区leader去读写消息,follower做的事情就是同步数据,Backup
六、kafka原理
1、leader和follower
- kafka中的leader和follower是相对分区有意义,不是相对broker
- kafka在创建topic的时候,会尽量分配分区的leader在不同的broker中,其实就是负载均衡
- leader职责:读写数据
- follower职责:同步数据、参与选举(leader crash之后,会选举一个follower重新成为分区的leader)
- 注意和zookeeper区分
- ZK的leader负责读、写,follower可以读取
- kafka的leader负责读写、follower不能读写数据(确保每个消费者消费的数据是一致的),kafka一个topic有多个分区leader,一样可以实现数据操作的负载均衡
2、kafka读写流程
-
写流程
- 通过Zookeeper找partition对应的leader,leader是负责写的
- producer开始写入数据
- ISR里面的follower开始同步数据,并返回给leader ACK
- 返回给producer ACK
-
读流程
- 通过Zookeeper找partition对应的leader,leader是负责读的
- 通过Zookeeper找到消费者对应的offset
- 然后开始从offset往后顺序拉取数据
- 提交offset(自动提交:每隔多少秒提交一次offset。手动提交:放入到事务中提交)
3、kafak的物理存储
- kafka的数据组织结构
- topic
- partition
- segment
- .log数据文件
- .index(稀疏索引)
- .timeindex(根据时间做的索引)
- 深入了解读数据的流程
- 消费者的offset是一个针对partition全局offset
- 可以根据这个offset找到segment段
- 接着需要将全局的offset转换成segment的局部offset
- 根据局部的offset,就可以从(.index稀疏索引)找到对应的数据位置
- 开始顺序读取
4、消息传递的语义性
Flink里面有对应的每种不同机制的保证,提供Exactly-Once保障(二阶段事务提交方式)
- At-most once:最多一次(只管把数据消费到,不管有没有成功,可能有数据丢失)
- At-least once:最少一次(有可能会出现重复消费)
- Exactly-once:仅有一次(事务性的保障,保证消息有且仅被处理一次)
5、kafka的消息不丢失
- broker消息不丢失:因为有副本relicas的存在,会不断地从leader中同步副本,所以,一个brokercrash,不会导致数据丢失,除非是只有一个副本
- 生产者消息不丢失:ACK机制(配置成ALL/-1)、配置0或1有可能会存在丢失
- 消费者消息不丢失:重点控制offset
- At-least once:一种数据可能会重复消费
- Exactly once:仅被一次消费
6、数据积压
- 数据积压指的是消费者因为有一些外部的IO、一些比较耗时的操作(Full GC -- Stop the world),就会造成消息在partition中一直存在得不到消费,就会产生数据积压
- 在企业中,我们要有监控系统,如果出现这种情况,需要尽快处理。虽然后续的Spark Streaming/Flink 可以实现背压机制,但是数据积累太多一定对实时系统它的实时性是有影响的