RocketMQ快速上手

RocketMQ 是阿里开源的一款高性能、高吞吐量的消息中间件。

RocketMQ 架构介绍

RocketMQ架构分为4部分,Producer、Consumer、Nameserver、Broker,四部分均可集群部署。

Producer、Consumer为客户端。Producer是消息发送端,Consumer是消费消息端。开发过程中,我们会对这一部分关注更多。

Nameserver、Broker为服务端。Namesever可以看作架构的大脑,负责服务的发现和路由;Broker可以看作消息的中转站,Producer发送的消息会先到Broker,然后由Consumer消费。

RocketMQ 架构

RocketMQ 安装

下面介绍如何在本地安装RocketMQ,并使用源码中自带的demo发送消息、消费消息。

环境准备:

  1. 64bit OS, Linux/Unix/Mac 平台
  2. 64bit JDK 1.8+;
  3. Maven 3.2.x;

下载代码:RocketMQ项目由github托管,我们将从github上下载rocketMQ的源码
链接:RocketMQ github仓库

如果装有git,可以通过git checkout 代码到本地,如果git没有也可以直接下载安装包解压得到文件,文件目录如下


RocketMQ项目目录

安装步骤如下:

构建,进入文件目录,通过maven构建项目

mvn -Prelease-all -DskipTests clean install -U

稍等片刻,构建成功

[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Apache RocketMQ 4.4.0 4.4.0 ........................ SUCCESS [  3.836 s]
[INFO] rocketmq-logging 4.4.0 ............................. SUCCESS [  2.971 s]
[INFO] rocketmq-remoting 4.4.0 ............................ SUCCESS [  2.395 s]
[INFO] rocketmq-common 4.4.0 .............................. SUCCESS [  3.386 s]
[INFO] rocketmq-client 4.4.0 .............................. SUCCESS [  4.092 s]
[INFO] rocketmq-store 4.4.0 ............................... SUCCESS [  2.340 s]
[INFO] rocketmq-srvutil 4.4.0 ............................. SUCCESS [  0.589 s]
[INFO] rocketmq-filter 4.4.0 .............................. SUCCESS [  1.187 s]
[INFO] rocketmq-acl 4.4.0 ................................. SUCCESS [  0.862 s]
[INFO] rocketmq-broker 4.4.0 .............................. SUCCESS [  2.778 s]
[INFO] rocketmq-tools 4.4.0 ............................... SUCCESS [  1.686 s]
[INFO] rocketmq-namesrv 4.4.0 ............................. SUCCESS [  0.986 s]
[INFO] rocketmq-logappender 4.4.0 ......................... SUCCESS [  0.845 s]
[INFO] rocketmq-openmessaging 4.4.0 ....................... SUCCESS [  0.742 s]
[INFO] rocketmq-example 4.4.0 ............................. SUCCESS [  0.843 s]
[INFO] rocketmq-test 4.4.0 ................................ SUCCESS [  1.221 s]
[INFO] rocketmq-distribution 4.4.0 4.4.0 .................. SUCCESS [  6.102 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 37.160 s
[INFO] Finished at: 2019-02-17T22:26:28+08:00
[INFO] ------------------------------------------------------------------------

进入构建后的输出目录,按顺序依次启动Name ServerBroker

cd distribution/target/apache-rocketmq

启动Name Server,nohup表示不挂断执行,terminal退出时MQ的进程不会退出

nohup sh bin/mqnamesrv &

查看Name Server 启动日志

tail -f ~/logs/rocketmqlogs/namesrv.log

启动成功

2019-02-23 22:38:09 INFO main - tls.client.keyPath = null
2019-02-23 22:38:09 INFO main - tls.client.keyPassword = null
2019-02-23 22:38:09 INFO main - tls.client.certPath = null
2019-02-23 22:38:09 INFO main - tls.client.authServer = false
2019-02-23 22:38:09 INFO main - tls.client.trustCertPath = null
2019-02-23 22:38:09 INFO main - Using OpenSSL provider
2019-02-23 22:38:10 INFO main - SSLContext created for server
2019-02-23 22:38:10 INFO NettyEventExecutor - NettyEventExecutor service started
2019-02-23 22:38:10 INFO FileWatchService - FileWatchService service started
2019-02-23 22:38:10 INFO main - The Name Server boot success. serializeType=JSON

启动Broker,并指定注册到Name Server的地址 127.0.0.1:9786

nohup sh bin/mqbroker -n 127.0.0.1:9876 &

查看Broker 启动日志

tail -f ~/logs/rocketmqlogs/broker.log 

启动成功

2019-02-23 22:46:18 WARN main - Load default discard message hook service: DefaultTransactionalMessageCheckListener
2019-02-23 22:46:18 INFO main - The broker dose not enable acl
2019-02-23 22:46:18 INFO FileWatchService - FileWatchService service started
2019-02-23 22:46:18 INFO PullRequestHoldService - PullRequestHoldService service started
2019-02-23 22:46:18 INFO brokerOutApi_thread_1 - register broker to name server 127.0.0.1:9876 OK
2019-02-23 22:46:18 INFO main - Start transaction service!
2019-02-23 22:46:18 INFO main - The broker[zhangjianweideMacBook-Pro.local, 192.168.1.113:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
2019-02-23 22:46:28 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-02-23 22:46:28 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 415169 bytes
2019-02-23 22:46:28 INFO brokerOutApi_thread_2 - register broker to name server 127.0.0.1:9876 OK

至此,RocketMQ的服务端已全部启动运行了,下面我们将通过源码中的示例验证,让Producer发送消息,Consumer来消费消息。

运行示例代码

接下来我们就可以运行示例代码,简单验证一下服务端是否启动并开始工作。

设置Name Server 地址到环境变量,Producer和Consumer工作时都会用到此地址

export NAMESRV_ADDR=127.0.0.1:9876

启动客户端,验证发送与接收消息,producer和consumer的启动没有先后顺序。

启动Producer,Producer启动后将发送多条Topic为“TopicTest”的消息,然后自动终止进程

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360003E3, offsetMsgId=C0A8017100002A9F00000000000BCFA1, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=0], queueOffset=998]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360103E4, offsetMsgId=C0A8017100002A9F00000000000BD055, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=1], queueOffset=1001]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360203E5, offsetMsgId=C0A8017100002A9F00000000000BD109, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=2], queueOffset=999]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360303E6, offsetMsgId=C0A8017100002A9F00000000000BD1BD, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=3], queueOffset=999]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360403E7, offsetMsgId=C0A8017100002A9F00000000000BD271, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=0], queueOffset=999]
......

启动Consumer,Consumer启动后将订阅Topic名称为“TopicTest”消息,持续监听,收到消息后进行消费,将消息打印输出

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=983, sysFlag=0, bornTimestamp=1550936154570, bornHost=/192.168.1.113:56251, storeTimestamp=1550936154571, storeHost=/192.168.1.113:10911, msgId=C0A8017100002A9F00000000000BA571, commitLogOffset=763249, bodyCRC=1431313338, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1000, CONSUME_START_TIME=1550936205508, UNIQ_KEY=C0A8017107605E2DE80C765C35CA03A7, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 51, 53], transactionId='null'}]]
ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=995, sysFlag=0, bornTimestamp=1550936154615, bornHost=/192.168.1.113:56251, storeTimestamp=1550936154616, storeHost=/192.168.1.113:10911, msgId=C0A8017100002A9F00000000000BC5C9, commitLogOffset=771529, bodyCRC=835257960, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1000, CONSUME_START_TIME=1550936205510, UNIQ_KEY=C0A8017107605E2DE80C765C35F703D5, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 49], transactionId='null'}]]
ConsumeMessageThread_18 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=994, sysFlag=0, bornTimestamp=1550936154613, bornHost=/192.168.1.113:56251, storeTimestamp=1550936154614, storeHost=/192.168.1.113:10911, msgId=C0A8017100002A9F00000000000BC2F9, commitLogOffset=770809, bodyCRC=1597161362, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1000, CONSUME_START_TIME=1550936205510, UNIQ_KEY=C0A8017107605E2DE80C765C35F503D1, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 55], transactionId='null'}]]
......

以上就是示例代码的演示

关闭服务端:进入安装目录中,依次关闭broker、namesrv,如果忘记关闭,下次启动时会提示端口冲突。

cd distribution/target/apache-rocketmq
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

小结

本文介绍了RocketMQ架构的主要轮廓,讲解如何安装环境、运行示例代码。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容