Java语言快速实现简单MQ消息队列服务

使用 JAVA 语言自己动手来写一个MQ (类似ActiveMQ,RabbitMQ)

主要角色

首先我们必须需要搞明白 MQ (消息队列) 中的三个基本角色

ProducerBrokerConsumer

整体架构如下所示

自定义协议

首先从上一篇中介绍了协议的相关信息,具体厂商的 MQ(消息队列) 需要遵循某种协议或者自定义协议 , 消息的 生产者和消费者需要遵循其协议(约定)才能后成功地生产消息和生产消息 ,所以在这里我们自定义一个协议如下.

消息处理中心 : 如果接收到的信息包含"SEND"字符串,即视为生产者发送的消息,消息处理中心需要将此信息存储等待消费者消费

消息处理中心 : 如果接受到的信息为CONSUME,既视为消费者发送消费请求,需要将存储的消息队列头部的信息转发给消费者,然后将此消息从队列中移除

消息处理中心 : 如果消息处理中心存储的消息满3条仍然没有消费者进行消费,则不再接受生产者的生产请求

消息生产者:需要遵循协议将生产的消息头部增加"SEND:" 表示生产消息

消息消费者:需要遵循协议向消息处理中心发送"CONSUME"字符串表示消费消息

流程顺序

项目构建流程

下面将整个MQ的构建流程过一遍

新建一个 Broker 类,内部维护一个 ArrayBlockingQueue 队列,提供生产消息和消费消息的方法, 仅仅具备存储服务功能

新建一个 BrokerServer 类,将 Broker 发布为服务到本地9999端口,监听本地9999端口的 Socket 链接,在接受的信息中进行我们的协议校验, 这里 仅仅具备接受消息,校验协议,转发消息功能;

新建一个 MqClient 类,此类提供与本地端口9999的Socket链接 , 仅仅具备生产消息和消费消息的方法

测试:新建两个 MyClient 类对象,分别执行其生产方法和消费方法

具体使用流程

生产消息:客户端执行生产消息方法,传入需要生产的信息,该信息需要遵循我们自定义的协议,消息处理中心服务在接受到消息会根据自定义的协议校验该消息是否合法,如果合法如果合法就会将该消息存储到Broker内部维护的 ArrayBlockingQueue 队列中.如果 ArrayBlockingQueue 队列没有达到我们协议中的最大长度将将消息添加到队列中,否则输出生产消息失败.

消息消息:客户端执行消费消息方法, Broker服务 会校验请求的信息的信息是否等于 CONSUME ,如果验证成功则从Broker内部维护的 ArrayBlockingQueue 队列的 Poll 出一个消息返回给客户端

代码演示

消息处理中心 Broker

/**

* 消息处理中心

*/publicclassBroker {// 队列存储消息的最大数量privatefinalstaticintMAX_SIZE =3;// 保存消息数据的容器privatestaticArrayBlockingQueue messageQueue =newArrayBlockingQueue(MAX_SIZE);// 生产消息publicstaticvoidproduce(String msg) {if(messageQueue.offer(msg)) {            System.out.println("成功向消息处理中心投递消息:"+ msg +",当前暂存的消息数量是:"+ messageQueue.size());        }else{            System.out.println("消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!");        }        System.out.println("=======================");    }// 消费消息publicstaticString consume() {        String msg = messageQueue.poll();if(msg !=null) {// 消费条件满足情况,从消息容器中取出一条消息System.out.println("已经消费消息:"+ msg +",当前暂存的消息数量是:"+ messageQueue.size());        }else{            System.out.println("消息处理中心内没有消息可供消费!");        }        System.out.println("=======================");returnmsg;    }}

消息处理中心服务 BrokerServer

/**

* 用于启动消息处理中心

*/publicclassBrokerServerimplementsRunnable{publicstaticintSERVICE_PORT =9999;privatefinal Socket socket;publicBrokerServer(Socket socket){this.socket = socket;    }    @Overridepublicvoidrun(){try(                BufferedReaderin=newBufferedReader(newInputStreamReader(                        socket.getInputStream()));                PrintWriterout=newPrintWriter(socket.getOutputStream())        )        {while(true) {                String str =in.readLine();if(str ==null) {continue;                }                System.out.println("接收到原始数据:"+ str);if(str.equals("CONSUME")) {//CONSUME 表示要消费一条消息//从消息队列中消费一条消息String message = Broker.consume();out.println(message);out.flush();                }elseif(str.contains("SEND:")){//接受到的请求包含SEND:字符串 表示生产消息放到消息队列中Broker.produce(str);                }else{                    System.out.println("原始数据:"+str+"没有遵循协议,不提供相关服务");                }            }        }catch(Exception e) {            e.printStackTrace();        }    }publicstaticvoidmain(String[] args) throws Exception{        ServerSocket server =newServerSocket(SERVICE_PORT);while(true) {            BrokerServer brokerServer =newBrokerServer(server.accept());newThread(brokerServer).start();        }    }}

客户端 MqClient

/**

* 访问消息队列的客户端

*/publicclassMqClient{//生产消息publicstaticvoidproduce(String message) throws Exception{//本地的的BrokerServer.SERVICE_PORT 创建SOCKETSocket socket =newSocket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);try(                PrintWriterout=newPrintWriter(socket.getOutputStream())        ) {out.println(message);out.flush();        }    }//消费消息publicstaticStringconsume() throws Exception{        Socket socket =newSocket(InetAddress.getLocalHost(), BrokerServer.SERVICE_PORT);try(                BufferedReaderin=newBufferedReader(newInputStreamReader(                        socket.getInputStream()));                PrintWriterout=newPrintWriter(socket.getOutputStream())        ) {//先向消息队列发送命令out.println("CONSUME");out.flush();//再从消息队列获取一条消息String message =in.readLine();returnmessage;        }    }}

测试MQ

publicclassProduceClient{publicstaticvoidmain(String[] args)throwsException{        MqClient client =newMqClient();        client.produce("SEND:Hello World");    }}publicclassConsumeClient{publicstaticvoidmain(String[] args)throwsException{        MqClient client =newMqClient();        String message = client.consume();        System.out.println("获取的消息为:"+ message);    }}

我们多执行几次客户端的生产方法和消费方法就可以看到一个完整的MQ的通讯过程,下面是我执行了几次的一些日志

接收到原始数据:SEND:Hello World成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:1=======================接收到原始数据:SEND:Hello World成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:2=======================接收到原始数据:SEND:Hello World成功向消息处理中心投递消息:SEND:Hello World,当前暂存的消息数量是:3=======================接收到原始数据:SEND:Hello World消息处理中心内暂存的消息达到最大负荷,不能继续放入消息!=======================接收到原始数据:Hello World原始数据:Hello World没有遵循协议,不提供相关服务接收到原始数据:CONSUME已经消费消息:SEND:Hello World,当前暂存的消息数量是:2=======================接收到原始数据:CONSUME已经消费消息:SEND:Hello World,当前暂存的消息数量是:1=======================接收到原始数据:CONSUME已经消费消息:SEND:Hello World,当前暂存的消息数量是:0=======================接收到原始数据:CONSUME消息处理中心内没有消息可供消费!=======================

小结

本章示例代码主要源自分布式消息中间件实践一书 , 这里我们自己使用Java语言写了一个MQ消息队列 , 通过这个消息队列我们对MQ中的几个角色 "生产者,消费者,消费处理中心,协议" 有了更深的理解 ; 那么下一章节我们就来一块学习具体厂商的MQ RabbitMQ

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,634评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,951评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,427评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,770评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,835评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,799评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,768评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,544评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,979评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,271评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,427评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,121评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,756评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,375评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,579评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,410评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,315评论 2 352

推荐阅读更多精彩内容