为初学者而来~手工最简MQ(二)Broker

本文仅展示核心代码,全部代码,请移步:git-soomq

1,服务端

服务端的设计就非常简单了,最核心的就是消息的存取,以及响应生产者和消费者的网络请求
分为2部分:

1.1 消息文件

消息的存储我们参考kafka,并简化其逻辑,因为是最简单的mq,我们只考虑单机的情况的就行,每个topic存储2个文件

topicname.index
topicname.data

.index 文件存储格式为:
消息顺序号:消息截止位置
.data 文件按照顺序存储具体的消息

文件操作:

package com.esoo.mq.server.message;

import com.alibaba.fastjson.JSON;
import com.esoo.mq.common.ProcessorCommand;

import java.io.RandomAccessFile;

/**
 * 为每个topic创建一个对象进行管理
 */
public class MessageFile {
    private String topic;
    private Long offset;
    //索引文件
    private RandomAccessFile indexFile = null ;
    //数据文件
    private RandomAccessFile dataFile = null ;

    //追加消息(生产者进行调用)
    public ProcessorCommand appendMsg(ProcessorCommand in){

        try {
            //加锁,避免竞争,文件乱码
            synchronized (in.getResult().getTopic()) {

                //读取index文件最后一行
                String lastLine = readLastLine(indexFile, null);
                int lastOffset = 1;
                //消息体追加到data文件中,并返回文件末尾位置,作为本条消息的offset
                long lastindex =  writeEndLine(dataFile, in.getResult().getBody());
                if (lastLine != null && !lastLine.equals("")) {
                    String index[] = lastLine.split(":");
                    lastOffset = Integer.valueOf(index[0]);
                    lastOffset = lastOffset + 1;
                }
                //组装本条消息index 序列号:消息体末尾位置
                String insertMsgIndex = lastOffset + ":" + lastindex + "\t\n";
                writeEndLine(indexFile, insertMsgIndex.getBytes());
                in.setSuccess(true);
            }
        }catch (Exception e){
            e.printStackTrace();

            in.setSuccess(false);
            in.setExmsg(e.getMessage());
        }
        return in;

    }

    //读取消息,消费者进行调用
    public ProcessorCommand readMsg(ProcessorCommand in){


        try {
            synchronized (in.getResult().getTopic()) {
                // 消息定位位置
                int seekIn = 0;
                // 消息体大小
                int bodySize = 0;
                //先定位到开始
                indexFile.seek(0);
                String indesMap=null;
                //遍历index文件,找到上一个消息 offset 与本消息offset 进行相减就是消息体大小
                while ((indesMap = indexFile.readLine())!=null){
                    String index[] = indesMap.split(":");
                    int inNum = Integer.valueOf(String.valueOf(index[0]).trim());
                    int off = Integer.valueOf(String.valueOf(index[1]).trim());
                    if (inNum == in.getResult().getOffset()) {
                        seekIn = off;
                    }
                    if (inNum == (in.getResult().getOffset() + 1)) {
                        bodySize = off - seekIn;
                    }
                }
                if (bodySize == 0) {
                    in.setSuccess(false);
                    in.setExmsg("offset is end");
                    return in;
                }
                //定位到具体位置
                dataFile.seek(seekIn);

                //进行消息读取
                byte[] b = new byte[bodySize];
                dataFile.read(b);
                in.getResult().setBody(b);

                in.setSuccess(true);
                System.out.println(" READ MSG IS: "+JSON.toJSONString(in));
            }
        }catch (Exception e){
            e.printStackTrace();
            in.setSuccess(false);
            in.setExmsg(e.getMessage());
        }
        return in;

    }

    //写消息到最后一行
    public static long writeEndLine(RandomAccessFile file, byte[] msg)
            throws Exception {
        // 文件长度,字节数
        long fileLength = file.length();
        // 将写文件指针移到文件尾。
        file.seek(fileLength);
        file.write(msg);
        return file.getFilePointer();

    }

    //读取最后一行的消息
    public static String readLastLine(RandomAccessFile file, String charset) throws Exception {

        long len = file.length();
        if (len == 0L) {
            return "";
        } else {
            long pos = len - 1;
            while (pos > 0) {
                pos--;
                file.seek(pos);
                if (file.readByte() == '\n') {
                    break;
                }
            }
            if (pos == 0) {
                file.seek(0);
            }
            byte[] bytes = new byte[(int) (len - pos)];
            file.read(bytes);
            if (charset == null) {
                return new String(bytes);
            } else {
                return new String(bytes, charset);
            }
        }

    }

    public static String readByOffset(RandomAccessFile file, String charset) throws Exception {

        return null;
    }



    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public Long getOffset() {
        return offset;
    }

    public void setOffset(Long offset) {
        this.offset = offset;
    }

    public RandomAccessFile getIndexFile() {
        return indexFile;
    }

    public void setIndexFile(RandomAccessFile indexFile) {
        this.indexFile = indexFile;
    }

    public RandomAccessFile getDataFile() {
        return dataFile;
    }

    public void setDataFile(RandomAccessFile dataFile) {
        this.dataFile = dataFile;
    }
}

1.2 网络编程

利用netty 开放端口,响应生产者与消费者,每个消息包装成一个commod,commod类型

  • 消息类型(消费/生产)
  • 消息topic
  • 消息体(生产时用)
  • 消息顺序号(消费时用)
  • 处理结果(成功/失败)
  • 处理消息(失败时添加原因)

网络启动

package com.esoo.mq.server;

import com.esoo.mq.server.netty.handler.NettySooMqServerHandler;
import com.esoo.mq.server.netty.handler.NettySooMqServerOutHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;

public class SooMQServer {
    private static Integer serverPort=9870;
    ServerBootstrap b = new ServerBootstrap();

    public void start(){
        //创建reactor 线程组
        EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerLoopGroup = new NioEventLoopGroup();

        try {
            //1 设置reactor 线程组
            b.group(bossLoopGroup, workerLoopGroup);
            //2 设置nio类型的channel
            b.channel(NioServerSocketChannel.class);
            //3 设置监听端口
            b.localAddress(serverPort);
            //4 设置通道的参数
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
            b.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

            //5 装配子通道流水线
            b.childHandler(new ChannelInitializer<SocketChannel>() {
                //有连接到达时会创建一个channel
                protected void initChannel(SocketChannel ch) throws Exception {
                    // pipeline管理子通道channel中的Handler
                    // 向子channel流水线添加一个handler处理器
                    ch.pipeline().addLast(new ObjectEncoder());
                    ch.pipeline().addLast(new ObjectDecoder(Integer.MAX_VALUE,
                            ClassResolvers.cacheDisabled(null)));
                    ch.pipeline().addLast(new NettySooMqServerOutHandler());
                    ch.pipeline().addLast(new NettySooMqServerHandler());
                }
            });
            // 6 开始绑定server
            // 通过调用sync同步方法阻塞直到绑定成功
            ChannelFuture channelFuture = b.bind().sync();
            System.out.println(" 服务器启动成功,监听端口: " +
                    channelFuture.channel().localAddress());

            // 7 等待通道关闭的异步任务结束
            // 服务监听通道会一直等待通道关闭的异步任务结束
            ChannelFuture closeFuture = channelFuture.channel().closeFuture();
            closeFuture.sync();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 8 优雅关闭EventLoopGroup,
            // 释放掉所有资源包括创建的线程
            workerLoopGroup.shutdownGracefully();
            bossLoopGroup.shutdownGracefully();
        }
    }

}

网络逻辑分发

注意:回写给客户端的消息体类型必须与入参保持一致,否则netty无法解析


netty
package com.esoo.mq.server.netty.handler;


import com.alibaba.fastjson.JSON;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.processor.Processor;
import com.esoo.mq.server.processor.ProcessorFactory;
import io.netty.channel.*;

@ChannelHandler.Sharable
public class NettySooMqServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        try {
            ProcessorCommand command = (ProcessorCommand) msg;
            System.out.println("["+ctx.channel().remoteAddress()+"] msg:"+JSON.toJSONString(msg));
            Processor processor = ProcessorFactory.getProcessorInstantiate(command.getType());
            msg = processor.handle(command);
            ChannelFuture f = ctx.writeAndFlush(msg);
            f.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    System.out.println("msg ctx send");
                }
            });
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.getMessage());
        ctx.close();
    }



}

生产者

package com.esoo.mq.server.processor;

import com.esoo.mq.common.Message;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.message.MessageFile;
import com.esoo.mq.server.message.MessageFileFactory;

public class SendMessageProcessor implements Processor<Message,Message> {

    @Override
    public ProcessorCommand handle(ProcessorCommand task) {
        MessageFile file = MessageFileFactory.getTopicFile(task.getResult().getTopic());
        task = file.appendMsg(task);
        return task;
    }


}

消费者

package com.esoo.mq.server.processor;

import com.esoo.mq.common.Message;
import com.esoo.mq.common.ProcessorCommand;
import com.esoo.mq.server.message.MessageFile;
import com.esoo.mq.server.message.MessageFileFactory;

public class ReadMessageProcessor implements Processor<Message,Message> {

    @Override
    public ProcessorCommand handle(ProcessorCommand task) {
        Message msg = task.getResult();
        MessageFile file = MessageFileFactory.getTopicFile(msg.getTopic());
        task = file.readMsg(task);
        return task;
    }


}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343