RocketMQ 是阿里开源的一款高性能、高吞吐量的消息中间件。
RocketMQ 架构介绍
RocketMQ架构分为4部分,Producer、Consumer、Nameserver、Broker,四部分均可集群部署。
Producer、Consumer为客户端。Producer是消息发送端,Consumer是消费消息端。开发过程中,我们会对这一部分关注更多。
Nameserver、Broker为服务端。Namesever可以看作架构的大脑,负责服务的发现和路由;Broker可以看作消息的中转站,Producer发送的消息会先到Broker,然后由Consumer消费。
RocketMQ 安装
下面介绍如何在本地安装RocketMQ,并使用源码中自带的demo发送消息、消费消息。
环境准备:
- 64bit OS, Linux/Unix/Mac 平台
- 64bit JDK 1.8+;
- Maven 3.2.x;
下载代码:RocketMQ项目由github托管,我们将从github上下载rocketMQ的源码
链接:RocketMQ github仓库
如果装有git,可以通过git checkout 代码到本地,如果git没有也可以直接下载安装包解压得到文件,文件目录如下
安装步骤如下:
构建,进入文件目录,通过maven构建项目
mvn -Prelease-all -DskipTests clean install -U
稍等片刻,构建成功
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary:
[INFO]
[INFO] Apache RocketMQ 4.4.0 4.4.0 ........................ SUCCESS [ 3.836 s]
[INFO] rocketmq-logging 4.4.0 ............................. SUCCESS [ 2.971 s]
[INFO] rocketmq-remoting 4.4.0 ............................ SUCCESS [ 2.395 s]
[INFO] rocketmq-common 4.4.0 .............................. SUCCESS [ 3.386 s]
[INFO] rocketmq-client 4.4.0 .............................. SUCCESS [ 4.092 s]
[INFO] rocketmq-store 4.4.0 ............................... SUCCESS [ 2.340 s]
[INFO] rocketmq-srvutil 4.4.0 ............................. SUCCESS [ 0.589 s]
[INFO] rocketmq-filter 4.4.0 .............................. SUCCESS [ 1.187 s]
[INFO] rocketmq-acl 4.4.0 ................................. SUCCESS [ 0.862 s]
[INFO] rocketmq-broker 4.4.0 .............................. SUCCESS [ 2.778 s]
[INFO] rocketmq-tools 4.4.0 ............................... SUCCESS [ 1.686 s]
[INFO] rocketmq-namesrv 4.4.0 ............................. SUCCESS [ 0.986 s]
[INFO] rocketmq-logappender 4.4.0 ......................... SUCCESS [ 0.845 s]
[INFO] rocketmq-openmessaging 4.4.0 ....................... SUCCESS [ 0.742 s]
[INFO] rocketmq-example 4.4.0 ............................. SUCCESS [ 0.843 s]
[INFO] rocketmq-test 4.4.0 ................................ SUCCESS [ 1.221 s]
[INFO] rocketmq-distribution 4.4.0 4.4.0 .................. SUCCESS [ 6.102 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 37.160 s
[INFO] Finished at: 2019-02-17T22:26:28+08:00
[INFO] ------------------------------------------------------------------------
进入构建后的输出目录,按顺序依次启动Name Server
和Broker
cd distribution/target/apache-rocketmq
启动Name Server,nohup表示不挂断执行,terminal退出时MQ的进程不会退出
nohup sh bin/mqnamesrv &
查看Name Server 启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
启动成功
2019-02-23 22:38:09 INFO main - tls.client.keyPath = null
2019-02-23 22:38:09 INFO main - tls.client.keyPassword = null
2019-02-23 22:38:09 INFO main - tls.client.certPath = null
2019-02-23 22:38:09 INFO main - tls.client.authServer = false
2019-02-23 22:38:09 INFO main - tls.client.trustCertPath = null
2019-02-23 22:38:09 INFO main - Using OpenSSL provider
2019-02-23 22:38:10 INFO main - SSLContext created for server
2019-02-23 22:38:10 INFO NettyEventExecutor - NettyEventExecutor service started
2019-02-23 22:38:10 INFO FileWatchService - FileWatchService service started
2019-02-23 22:38:10 INFO main - The Name Server boot success. serializeType=JSON
启动Broker,并指定注册到Name Server的地址 127.0.0.1:9786
nohup sh bin/mqbroker -n 127.0.0.1:9876 &
查看Broker 启动日志
tail -f ~/logs/rocketmqlogs/broker.log
启动成功
2019-02-23 22:46:18 WARN main - Load default discard message hook service: DefaultTransactionalMessageCheckListener
2019-02-23 22:46:18 INFO main - The broker dose not enable acl
2019-02-23 22:46:18 INFO FileWatchService - FileWatchService service started
2019-02-23 22:46:18 INFO PullRequestHoldService - PullRequestHoldService service started
2019-02-23 22:46:18 INFO brokerOutApi_thread_1 - register broker to name server 127.0.0.1:9876 OK
2019-02-23 22:46:18 INFO main - Start transaction service!
2019-02-23 22:46:18 INFO main - The broker[zhangjianweideMacBook-Pro.local, 192.168.1.113:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
2019-02-23 22:46:28 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2019-02-23 22:46:28 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 415169 bytes
2019-02-23 22:46:28 INFO brokerOutApi_thread_2 - register broker to name server 127.0.0.1:9876 OK
至此,RocketMQ的服务端已全部启动运行了,下面我们将通过源码中的示例验证,让Producer发送消息,Consumer来消费消息。
运行示例代码
接下来我们就可以运行示例代码,简单验证一下服务端是否启动并开始工作。
设置Name Server 地址到环境变量,Producer和Consumer工作时都会用到此地址
export NAMESRV_ADDR=127.0.0.1:9876
启动客户端,验证发送与接收消息,producer和consumer的启动没有先后顺序。
启动Producer,Producer启动后将发送多条Topic为“TopicTest”的消息,然后自动终止进程
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360003E3, offsetMsgId=C0A8017100002A9F00000000000BCFA1, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=0], queueOffset=998]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360103E4, offsetMsgId=C0A8017100002A9F00000000000BD055, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=1], queueOffset=1001]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360203E5, offsetMsgId=C0A8017100002A9F00000000000BD109, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=2], queueOffset=999]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360303E6, offsetMsgId=C0A8017100002A9F00000000000BD1BD, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=3], queueOffset=999]
SendResult [sendStatus=SEND_OK, msgId=C0A8017107605E2DE80C765C360403E7, offsetMsgId=C0A8017100002A9F00000000000BD271, messageQueue=MessageQueue [topic=TopicTest, brokerName=zhangjianweideMacBook-Pro.local, queueId=0], queueOffset=999]
......
启动Consumer,Consumer启动后将订阅Topic名称为“TopicTest”消息,持续监听,收到消息后进行消费,将消息打印输出
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=983, sysFlag=0, bornTimestamp=1550936154570, bornHost=/192.168.1.113:56251, storeTimestamp=1550936154571, storeHost=/192.168.1.113:10911, msgId=C0A8017100002A9F00000000000BA571, commitLogOffset=763249, bodyCRC=1431313338, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1000, CONSUME_START_TIME=1550936205508, UNIQ_KEY=C0A8017107605E2DE80C765C35CA03A7, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 51, 53], transactionId='null'}]]
ConsumeMessageThread_20 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=995, sysFlag=0, bornTimestamp=1550936154615, bornHost=/192.168.1.113:56251, storeTimestamp=1550936154616, storeHost=/192.168.1.113:10911, msgId=C0A8017100002A9F00000000000BC5C9, commitLogOffset=771529, bodyCRC=835257960, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1000, CONSUME_START_TIME=1550936205510, UNIQ_KEY=C0A8017107605E2DE80C765C35F703D5, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 56, 49], transactionId='null'}]]
ConsumeMessageThread_18 Receive New Messages: [MessageExt [queueId=2, storeSize=180, queueOffset=994, sysFlag=0, bornTimestamp=1550936154613, bornHost=/192.168.1.113:56251, storeTimestamp=1550936154614, storeHost=/192.168.1.113:10911, msgId=C0A8017100002A9F00000000000BC2F9, commitLogOffset=770809, bodyCRC=1597161362, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1000, CONSUME_START_TIME=1550936205510, UNIQ_KEY=C0A8017107605E2DE80C765C35F503D1, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 57, 55, 55], transactionId='null'}]]
......
以上就是示例代码的演示
关闭服务端:进入安装目录中,依次关闭broker、namesrv,如果忘记关闭,下次启动时会提示端口冲突。
cd distribution/target/apache-rocketmq
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
小结
本文介绍了RocketMQ架构的主要轮廓,讲解如何安装环境、运行示例代码。