题目
题目内容
Topic类似于水坝(蓄积功能,消峰填谷之利器),Queue类似于水渠;每当新建一个Queue的时候,可以选择绑定到几个Topic,类似于水渠从水坝引水; 每个Topic可以被任意多个Queue绑定,这点与现实生活不太一样,因为数据可以多次拷贝; 在发送的时候,可以选择发送到Topic,也可以选择直接发送到Queue;直接发送到Queue的数据只能被对应Queue消费,不能被其他Queue读取到; 在消费的时候,除了要读取绑定的Topic的数据,还要去取直接发送到该Queue的数据。
程序目标
实现以下接口:
- Producer的createBytesMessageToTopic(topic, body)
- Producer的createBytesMessageToQueue(queue, body)
- Producer的send(message)
- PullConsumer的attachQueue(queue, topics)
- PullConsumer的poll()
程序校验逻辑
- 10~20个线程(位于同一进程中)各自独立调用Producer发送消息(每个线程启动一个Producer,每条消息随机发送到某个Topic或者Queue),持续时间T1,请注意把消息数据写入磁盘中
- 强行kill Producer进程,未写入磁盘的消息都会丢失
- 10~20个线程(位于同一进程中)独立调用Consumer收取消息(每个线程启动一个Consumer,attach到指定的Queue,不同的Consumer不会attach同一个Queue),验证消息顺序准确性,可靠性,消费持续的时间为T2,消费到的总消息数目为N
- 以N/(t1+t2)来衡量性能
补充说明
- 测试时,topic和queue的数目大约是100个(其中queue的数目与消费者线程数相等);
- 测试时,消息大小不会超过256K;
- 可靠性是指,消息不能丢失,且消息的内容不能被篡改;在测试消费的时候,会对消息的body,headers,properties的内容进行校验;
- header与properties中key和value都不会插入null或空值;
- 仅允许依赖JavaSE8包含的lib;
消息顺序的说明
- 顺序只针对单个topic或者queue,不同topic,不同queue,topic与queue之间都不用考虑顺序;
- 消息产品的一个重要特性是顺序保证,也就是消息消费的顺序要与发送的时间顺序保持一致;
- 在多发送端的情况下,保证全局顺序代价比较大,只要求各个发送端的顺序有保障即可; 举个例子P1发送M11,M12,M13,P2发送M21,M22,M23,在消费的时候,只要求保证M11,M12,M13(M21,M22,M23)的顺序,也就是说实际消费顺序为M11,M21,M12,M13,M22,M23 正确,M11,M21,M22,M12,M13,M23 正确,M11,M13,M21,M22,M23,M12 错误,M12与M13的顺序颠倒了;
题目解读
- 题目要求实现五个接口,分别对应于Producer生产消息、发送消息,Consumer绑定topic和queue、消费消息;
- topic可以存储数据,queue也可以存储数据;
- Producer可以把消息发送到任意的topic和queue中,但是一条消息只能发送到一个topic或queue中;
- Consumer和queue数量相等,两者一一对应,一个Consumer绑定一个queue和多个topic,不同的Consumer绑定不同的queue,topic可以相同;
- Consumer消费数据时,只保证对应Producer局部有序,即Consumer消费某topic/queue的消息时,来自同一Producer的数据其接收顺序与发送顺序相同;
答辩资料解读
初赛第六名代码链接(本人非作者):
https://github.com/whutjs/MessageSystem
生产者架构
- 每个生产者对应一个输出文件;
- 生产者每生产一条消息,就把消息编码后写入ByteBuffer中,再把ByteBuffer中二进制数据写入对应topic或queue的Cache中,然后清空ByteBuffer;
- 当topic/queue对应的Cache存满后,就把这个Cache中所有二进制数据写到输出文件对应的ByteBuffer中;
- 写入ByteBuffer时依次写入topic/queue的编号、数据长度的类型、数据长度和多条消息的二进制数据;
- ByteBuffer存满后就把数据刷到输出文件中;
消费者架构
- 一个文件对应一个FileReadCache,一个消费者对应一个FileReadCache;
- 每个消费者拥有一个ConcurrentLinkedQueue,调用poll()方法读取消息;
- 若queue中没有消息,且对应的FileReadCache没有读完,则通过FileReadCache解码出一条消息,然后MessageLoader把消息分发到订阅了此topic/queue的Consumer的queue中;
- 若queue中没有消息,且对应的FileReadCache已经读完,则降低该消费者线程的优先级,这样其它Consumer就会多占用CPU来解码消息,并把该消费者需要的消息发过来,存到queue中;
- 所有的FileReadCache都把对应的文件读完了,MessageLoader发送endMsg到所有Consumer的queue中,这样所有Consumer就消费结束;
消息存储结构
- 每个Producer对应一个输出文件,文件中存储二进制数据;
- 文件中数据的结构:topic/queue的编号、数据长度的类型、数据长度、多条消息的二进制数据,然后是下一个topic/queue的数据;
- 消息的结构:消息头标识、消息id的长度、消息id的二进制数据、key的长度、key的二进制数据、val的长度、val的二进制数据、消息体开头标识、消息体的长度、消息体的二进制数据,然后是下一条消息的数据;
如何解码二进制数据
- 因为文件是追加写入的,因此最开始的消息写在文件的最前面;
- 读取数据时,把当前topic/queue对应的整块Cache数据加载到ByteBuffer中,一次解析出一条消息,再把该消息push给订阅了该topic/queue的Consumer;
- 当前topic/queue的消息被解析完后,再加载下一个topic/queue对应的Cache数据;
如何保证数据的有序性
图中producer1发送3条数据给topic1、2条数据给topic2,consumer1订阅了topic1和topic2;需保证consumer1中来自topic1的3条数据其顺序与其在producer1中相同,来自topic2的2条数据其顺序与其在producer1相同,来自不同topic的数据没有顺序要求;
原理:File中消息的顺序与Producer一致,FileReadCache从前往后依次解析File中的消息,然后把消息push给Consumer,这样Consumer就保证了消息局部有顺;