架构
生产者向broker发送消息,消费者接收消息,broker是物理概念,部署几个kafka即几个broker,topic是逻辑概念,往topic里发送消息会发送到设置好的几个partion上,每个partion存储作为不同队列存储不同数据,partion有leader和follower备份机制,消息发送时会轮循发送到不同broker的不同partion中,同一消费者只能消费同一分区,通过offset记录消费位置,消费者组可以访问一个topic的不同partion
docker中kafka的使用
启动镜像
docker run -dit -p 9200:9200 kafka镜像id (端口映射之前有说过)
启动kafka可以带上参数,这样会自动修改kafka里的配置文件(/opt/kafka_版本/conf/server.properties),否则不带参数需要自己进入进行手动修改 带参数版启动可参考
docker run -dit --name kafka -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=172.17.0.3:2181/kafka -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.4:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
其中172.17.0.3需要改成自己docker的网桥连接地址
查看已启动容器
docker ps
查看所有容器
docker ps -a
启动未启动的容器
docker start 容器id
进入kafka容器
docker exec -it 容器id bash(默认进入/目录)
创建主题
./kafka-topics.sh --create --zookeeper 172.17.0.3:2181/kafka --replication-factor 1 --partitions 3 --topic sun
主题和分区可以理解为:topic是逻辑划分,kafka通过topic进行区分消息,topic的数据会被存储到日志中,如果数据量太大可以引入partion(同时提高读写吞吐量)来分段存储数据。其中replication-factor作用是将任意分区复制到broker上,broker是物理概念,部署了一个kafka可认为broker数为1,我本机只有一个kafka所以这里replication-factor超过1会报错。综上几个概念可以理解为:集群中有多个broker,创建主题时可以指明topic有多个partitions(消息拆分到不同分区进行存储,一个partion只能被一个消费者消费--partion内部保证接收数据顺序),可以为分区创建多个副本replication,不同副本在不同的broker中(作为备份使用,这里有leader和flower的区分)。
查看topic信息
./kafka-topics.sh --describe --zookeeper 172.17.0.3/kafka --topic sun
集群部署
可以通过compose集群化部署过es,这里通过创建另一个compose.yml文件来部署kafka,配置文件参考 docker-compose集群部署
docker-compose -f docker-compose.yml -f docker-compose.prod.yml up -d
spring 中kafka生产消费
生产者:
./kafka-console-producer.sh --bootstrap-server node1:9092 --topic my-kafka-topic
消费者:
方式一:从当前主题的迁移量位置+1开始取数据
./kafka-console-consumer.sh --bootstrap-server node1:9092 --topic my-kafka-topic
方式二:从当前主题第一条消息开始消费
./kafka-console-consumer.sh --bootstrap-server node1:9092 --from-beginning --topic my-kafka-top
生产者将消息发送broker,broker将消息保存到本地日志中,消息的保存时有序的
单播消息:
当存在一个生产者,一个消费者组的时候,一个消费者组中只有一个消费者会收到消息
./kafka-console-consumer.sh --bootstrap-server node1:9092 --consumer-property group.id=testGroup --topic my-kafka-topic
多播消息:
当存在一个生产者,多个消费组,不同消费组只有一个消费者收到消息
./kafka-console-consumer.sh --bootstrap-server node1:9092 --consumer-property group.id=testGroup1 --topic my-kafka-topic
./kafka-console-consumer.sh --bootstrap-server node1:9092 --consumer-property group.id=testGroup2 --topic my-kafka-topic
查看消费组详细信息:
./kafka-console-consumer.sh --bootstrap-server node1:9092 --describe --group testGroup
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
cat 2 79668 79668 0
CURRENT-OFFSET:最后被消费的偏移量
LOG-END-OFFSET:消息总量(最后一条消息的偏移量)
LAG :积压了多少条消息
常见问题:
1、如何防止消息丢失
生产者:使用同步消息发送;ack设置为1/all;设置同步分区数>=2
消费者:把自动提交改成手动提交
2、如何防止消息的重复消费
针对网络抖动导致的生产者重试(发送消息),可以设置消费者加锁解决;
3、消息积压
消费者使用多线程异步处理接收数据;创建多个消费者组部署到其他机器上;通过业务架构设计,提升业务层面消费性能。
ps:
缓冲区:kafka默认会创建一个消息缓冲区去存放要发送的消息,大小是32M,每次本地线程会去缓冲区拉16K数据发送到broker,如果不到16K等待10ms也会将数据发送到broker
参考链接:
1、kafka安装教程--推荐
2、kafka配置文件server.properties参数说明
3、创建主题分区数
4、解决docker容器启动不了的问题
5、通过docker-compose集群部署
6、学习视频