本文仅展示核心代码,全部代码,请移步:git-soomq
1,服务端
服务端的设计就非常简单了,最核心的就是消息的存取,以及响应生产者和消费者的网络请求
分为2部分:
1.1 消息文件
消息的存储我们参考kafka,并简化其逻辑,因为是最简单的mq,我们只考虑单机的情况的就行,每个topic存储2个文件
topicname.index
topicname.data
.index 文件存储格式为:
消息顺序号:消息截止位置
.data 文件按照顺序存储具体的消息
文件操作:
package com.esoo.mq.server.message;
import com.alibaba.fastjson.JSON;
import com.esoo.mq.common.ProcessorCommand;
import java.io.RandomAccessFile;
/**
* 为每个topic创建一个对象进行管理
*/
public class MessageFile {
private String topic;
private Long offset;
//索引文件
private RandomAccessFile indexFile = null ;
//数据文件
private RandomAccessFile dataFile = null ;
//追加消息(生产者进行调用)
public ProcessorCommand appendMsg(ProcessorCommand in){
try {
//加锁,避免竞争,文件乱码
synchronized (in.getResult().getTopic()) {
//读取index文件最后一行
String lastLine = readLastLine(indexFile, null);
int lastOffset = 1;
//消息体追加到data文件中,并返回文件末尾位置,作为本条消息的offset
long lastindex = writeEndLine(dataFile, in.getResult().getBody());
if (lastLine != null && !lastLine.equals("")) {
String index[] = lastLine.split(":");
lastOffset = Integer.valueOf(index[0]);
lastOffset = lastOffset + 1;
}
//组装本条消息index 序列号:消息体末尾位置
String insertMsgIndex = lastOffset + ":" + lastindex + "\t\n";
writeEndLine(indexFile, insertMsgIndex.getBytes());
in.setSuccess(true);
}
}catch (Exception e){
e.printStackTrace();
in.setSuccess(false);
in.setExmsg(e.getMessage());
}
return in;
}
//读取消息,消费者进行调用
public ProcessorCommand readMsg(ProcessorCommand in){
try {
synchronized (in.getResult().getTopic()) {
// 消息定位位置
int seekIn = 0;
// 消息体大小
int bodySize = 0;
//先定位到开始
indexFile.seek(0);
String indesMap=null;
//遍历index文件,找到上一个消息 offset 与本消息offset 进行相减就是消息体大小
while ((indesMap = indexFile.readLine())!=null){
String index[] = indesMap.split(":");
int inNum = Integer.valueOf(String.valueOf(index[0]).trim());
int off = Integer.valueOf(String.valueOf(index[1]).trim());
if (inNum == in.getResult().getOffset()) {
seekIn = off;
}
if (inNum == (in.getResult().getOffset() + 1)) {
bodySize = off - seekIn;
}
}
if (bodySize == 0) {
in.setSuccess(false);
in.setExmsg("offset is end");
return in;
}
//定位到具体位置
dataFile.seek(seekIn);
//进行消息读取
byte[] b = new byte[bodySize];
dataFile.read(b);
in.getResult().setBody(b);
in.setSuccess(true);
System.out.println(" READ MSG IS: "+JSON.toJSONString(in));
}
}catch (Exception e){
e.printStackTrace();
in.setSuccess(false);
in.setExmsg(e.getMessage());
}
return in;
}
//写消息到最后一行
public static long writeEndLine(RandomAccessFile file, byte[] msg)
throws Exception {
// 文件长度,字节数
long fileLength = file.length();
// 将写文件指针移到文件尾。
file.seek(fileLength);
file.write(msg);
return file.getFilePointer();
}
//读取最后一行的消息
public static String readLastLine(RandomAccessFile file, String charset) throws Exception {
long len = file.length();
if (len == 0L) {
return "";
} else {
long pos = len - 1;
while (pos > 0) {
pos--;
file.seek(pos);
if (file.readByte() == '\n') {
break;
}
}
if (pos == 0) {
file.seek(0);
}
byte[] bytes = new byte[(int) (len - pos)];
file.read(bytes);
if (charset == null) {
return new String(bytes);
} else {
return new String(bytes, charset);
}
}
}
public static String readByOffset(RandomAccessFile file, String charset) throws Exception {
return null;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public Long getOffset() {
return offset;
}
public void setOffset(Long offset) {
this.offset = offset;
}
public RandomAccessFile getIndexFile() {
return indexFile;
}
public void setIndexFile(RandomAccessFile indexFile) {
this.indexFile = indexFile;
}
public RandomAccessFile getDataFile() {
return dataFile;
}
public void setDataFile(RandomAccessFile dataFile) {
this.dataFile = dataFile;
}
}
1.2 网络编程
利用netty 开放端口,响应生产者与消费者,每个消息包装成一个commod,commod类型
- 消息类型(消费/生产)
- 消息topic
- 消息体(生产时用)
- 消息顺序号(消费时用)
- 处理结果(成功/失败)
- 处理消息(失败时添加原因)
网络启动
package com.esoo.mq.server;
import com.esoo.mq.server.netty.handler.NettySooMqServerHandler;
import com.esoo.mq.server.netty.handler.NettySooMqServerOutHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class SooMQServer {
private static Integer serverPort=9870;
ServerBootstrap b = new ServerBootstrap();
public void start(){
//创建reactor 线程组
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
try {
//1 设置reactor 线程组
b.group(bossLoopGroup, workerLoopGroup);
//2 设置nio类型的channel
b.channel(NioServerSocketChannel.class);
//3 设置监听端口
b.localAddress(serverPort);
//4 设置通道的参数
b.option(ChannelOption.SO_KEEPALIVE, true);
b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
//5 装配子通道流水线
b.childHandler(new ChannelInitializer<SocketChannel>() {
//有连接到达时会创建一个channel
protected void initChannel(SocketChannel ch) throws Exception {
// pipeline管理子通道channel中的Handler
// 向子channel流水线添加一个handler处理器
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE,
ClassResolvers.cacheDisabled(null)));
ch.pipeline().addLast(new NettySooMqServerOutHandler());
ch.pipeline().addLast(new NettySooMqServerHandler());
}
});
// 6 开始绑定server
// 通过调用sync同步方法阻塞直到绑定成功
ChannelFuture channelFuture = b.bind().sync();
System.out.println(" 服务器启动成功,监听端口: " +
channelFuture.channel().localAddress());
// 7 等待通道关闭的异步任务结束
// 服务监听通道会一直等待通道关闭的异步任务结束
ChannelFuture closeFuture = channelFuture.channel().closeFuture();
closeFuture.sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 8 优雅关闭EventLoopGroup,
// 释放掉所有资源包括创建的线程
workerLoopGroup.shutdownGracefully();
bossLoopGroup.shutdownGracefully();
}
}
}
网络逻辑分发
注意:回写给客户端的消息体类型必须与入参保持一致,否则netty无法解析
package com.esoo.mq.server.netty.handler;
import com.alibaba.fastjson.JSON;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.processor.Processor;
import com.esoo.mq.server.processor.ProcessorFactory;
import io.netty.channel.*;
@ChannelHandler.Sharable
public class NettySooMqServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ProcessorCommand command = (ProcessorCommand) msg;
System.out.println("["+ctx.channel().remoteAddress()+"] msg:"+JSON.toJSONString(msg));
Processor processor = ProcessorFactory.getProcessorInstantiate(command.getType());
msg = processor.handle(command);
ChannelFuture f = ctx.writeAndFlush(msg);
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
System.out.println("msg ctx send");
}
});
}catch (Exception e){
e.printStackTrace();
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println(cause.getMessage());
ctx.close();
}
}
生产者
package com.esoo.mq.server.processor;
import com.esoo.mq.common.Message;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.message.MessageFile;
import com.esoo.mq.server.message.MessageFileFactory;
public class SendMessageProcessor implements Processor<Message,Message> {
@Override
public ProcessorCommand handle(ProcessorCommand task) {
MessageFile file = MessageFileFactory.getTopicFile(task.getResult().getTopic());
task = file.appendMsg(task);
return task;
}
}
消费者
package com.esoo.mq.server.processor;
import com.esoo.mq.common.Message;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.message.MessageFile;
import com.esoo.mq.server.message.MessageFileFactory;
public class ReadMessageProcessor implements Processor<Message,Message> {
@Override
public ProcessorCommand handle(ProcessorCommand task) {
Message msg = task.getResult();
MessageFile file = MessageFileFactory.getTopicFile(msg.getTopic());
task = file.readMsg(task);
return task;
}
}