目前最好的异步消息队列处理器
阻塞队列
写一个生产者消费者模式,使用阻塞队列
Kafka入门
消息持久化:消息永久保存,存到硬盘里。硬盘顺序读写性能很高,可能高于内存的随机读写。所以才能保证海量数据高速度处理
高可靠性:靠分布式,可以做集群部署
高扩展性:简单配配就可以加一台服务器
术语
Broker:就是一台服务器的意思
Zookeeper:是一个独立的软件和应用,用来管理集群,可以用Zookeeper管理集群。可以单独下载也可以内置。
消息队列实现方式
点对点:生产者消费者,每个数据只被一个消费者消费
发布订阅模式(Kafka模式):数据可以被多个消费者使用
Topic:生产者发布的位置就是Topic,可以理解为文件夹。
Partition:对Topic位置进行分区如最开始的图片,可以同时向多个分区写入,追加的写入。例如生产者追加数据。读的话则按照顺序读连续的Offset作为一个消息
OffSet:就是存放的索引
RePlica:副本对数据做备份的,为了让数据更可靠,每一个分区都存不是一个,而是多个副本
Leader Replica:主副本从这个分区获取数据的时候是主副本给你数据
Follower Replica:只负责备份数据,如果主副本挂了,则会从副本中选择一个作为主副本
- 官网下载推荐版本,解压
- 配置zookeeper.properties,用于存放数据的目录
dataDir=/home/...
配置server.properties
log.dirs=/home/...
- 命令
去官网快速开始部分学习
启动zookeeper,使用配置文件
./zookeeper-server-start.sh config/zookeeper.properties
再开启一个窗口启动kafka。同上操作
再启动一个命令,使用kafka命令工具
//创建一个Topic主题,代表位置,也代表消息的分类
./kafka-topics.sh --create --bootstrap-server localhost:9092
--replication-factor 1 --partitions 1
--topic test
没有提示则创建好了
查看所有的主题,指定某个服务器
./kafka-topics.sh --list --bootstrap-server localhost:9092
向主题发送消息,调用生产者执行文件
//指定向哪个服务器发送消息以及主题
kafka-console-producer --broker-list localhost:9092 --topic test
//发送消息
>hello
>word
再开启一个窗口执行消费者
//指定服务器,它topic 并从头读数据
./kafka-console-consumer.sh --bootstrap-server localhost:9092 -- topic test --from-beginning
//输出hello word
Spring整合Kafka
- 引入依赖
spring-kafka - 配置Kafka
配置server、consumer - 访问Kafka
- 生产者
kafkaTemplate.send(topic, data);
- 生产者
- 消费者
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {}
配置项
-
consumer配置项
2.application.properties
# KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=community-consumer-group
spring.kafka.consumer.enable-auto-commit=true //自动提交,是否自动提交偏移量,要不要记录下来做提交
spring.kafka.consumer.auto-commit-interval=3000//间隔3s
-
测试类
测试类中定义
发送系统通知
三类定义三个不同的主题,
-
封装事件对象,更好的扩展。
把set方法返回this,实现链式编程。
-
事件生产者类,EventProducer
- 事件的消费者
事件的消费者主要是接收到事件对象后把它封装到Message类中,并存入数据库。参照数据库from_id为1 的数据格式。 - 评论点赞,关注,
取消赞没必要通知。
评论连接到评论详情,关注连接到用户主页,帖子也是连接到帖子详情。
like方法加一个postId参数,便于连接到帖子详情。 - 注意要把kafka zookeeper启动。windows下可以kafka会报错,我们把kafka-log目录删掉重启即可。
显示系统通知
- 通知列表
显示评论、点赞、关注三种类型的通知 - 通知详情
分页显示某一类主题所包含的通知 -
未读消息
在页面头部显示所有的未读消息数量
2, MessageController
把三种类型的数据都查出来,其中content是json传的数据,我么需要把它转成对象,但是传进去的有转义字符,我们需要把转义字符解析出来
String content = HtmlUtil.htmlUnescape(message.getContent());
Map<String, Object> data = JSONObject.parseObject(content, HashMap.class);
这样就得到了我们要的数据
查询评论类通知需要的数据是
另外两类差不多,直接复制然后修改一些名字即可。
关注去掉postId参数,因为不用连接到详情。
然后查询私信的未读消息数量以及未读通知的数量。计算出总的消息数。私信列表也要查询出通知的数量。
- 处理页面,
letter的头复制到notice即可唯一修改的就是active激活位置,active表示激活的页面标签,修改下即可。 -
通知详情
设置分页,消息已读,消息的发送人
页面,处理返回按钮,三个都是显示到同一个评论处理,我们复制三份,然后通过判断显示。
-
消息未读的数量,用拦截器处理
MessageInterceptor