RocketMQ入门手册

前言

继我上一篇博客后
分布式消息队列RocketMQ学习教程①
上一篇博客最主要介绍了几种常用的MQ,所以本博客再简单介绍一下RocketMQ的原理和简单的例子,基于Java实现,希望可以帮助学习者

RoketMQ搭建Linux版

“工于利其事,必先利其器”,所以我们首先需要搭建好RocketMQ,
考虑到学习者不一定有Linux系统的服务器,所以本博客介绍一下Linux和Window系统的两种安装方法,以补充上一篇博客

因为阿里已经将RocketMQ捐给Apache了,所以现在我们需要去Apache官网下载
RocketMQ官网

注意RocketMQ是基于Java开发的,所以安装前必须安装JDK,Linux JDK安装的可以看分布式消息队列RocketMQ学习教程①
下载文件解压后,可以看到conf文件夹里有2m-noslave、2m-2s-async、2m-2s-sync文件夹

2m-noslave 两主,无从的配置

2m-2s-async 两主,两从,同步复制数据的配置

2m-2s-sync 两主,两从,异步复制数据的配置

我们找到2m-noslave的broker-a.properties文件,修改完善配置
broker-a.properties

#所属集群名字  
brokerClusterName=DefaultCluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分号分割
namesrvAddr=127.0.0.1:9876

#关键
brokerIP1=127.0.0.1

#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=48
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88

#这里是我的 日志配置
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort


#限制的消息大小  
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=ASYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH

#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

先介绍一下linux系统的
一般将压缩文件解压到/usr/local

cd /usr/local

tar -xzf apache-rocketmq.tar.gz

mv apache-rocketmq rocketmq

mkdir /usr/rocketmq/logs

环境变量配置

vim /etc/profile

修改如下配置

export JAVA_HOME=/usr/java/jdk1.8.0_102
export ROCKETMQ_HOME=/usr/local/rocketmq
export PATH=$PATH:$JAVA_HOME/bin:/usr/local/src/redis-3.2.8/bin:$ROCKETMQ_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib

启动mqnamesrv

cd /usr/local/rocketmq/bin
nohup sh /usr/local/rocketmq/bin/mqnamesrv >/usr/local/rocketmq/logs/mqnamesrv.log 2>&1 &

启动Broker

nohup sh /usr/local/rocketmq/bin/mqbroker  -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties > /usr/local/rocketmq/logs/mqbroker.log  2>&1 &

要设置自动创建Topic,需要加上
autoCreateTopicEnable=true

关闭Broker服务
sh mqshutdown broker

启动成功可以用jps查看


这里写图片描述

RocketMQ搭建Window版

1、下载RocketMQ后,解压到D:\alibaba-rocketmq

2、在D:\alibaba-rocketmq,Ctrl+Shift,右键,打开dom界面,输入如下命令行
start /b bin/mqnamesrv.exe >D:\alibaba-rocketmq\logs\mqnamesrv.log
查看nameserver是否启动
jps -v

3、启动Broker

start /b bin/mqbroker.exe -n "127.0.0.1:9876" autoCreateTopicEnable=true >D:\alibaba-rocketmq\logs\mqbroker.log
Caused by: com.alibaba.rocketmq.client.exception.MQClientException: No route info of this topic, huang_1
See http://docs.aliyun.com/cn#/pub/ons/faq/exceptions&topic_not_exist for further details.
    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.sendDefaultImpl(DefaultMQProducerImpl.java:525) ~[rocketmq-client-3.5.3.jar:na]
    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:1011) ~[rocketmq-client-3.5.3.jar:na]
    at com.alibaba.rocketmq.client.impl.producer.DefaultMQProducerImpl.send(DefaultMQProducerImpl.java:970) ~[rocketmq-client-3.5.3.jar:na]
    at com.alibaba.rocketmq.client.producer.DefaultMQProducer.send(DefaultMQProducer.java:90) ~[rocketmq-client-3.5.3.jar:na]
    at com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl.send(ProducerImpl.java:107) ~[ons-client-1.2.3.jar:na]

出现以上异常启动时添加autoCreateTopicEnable=true

4、查看topic命令:mqadmin topicList -n "127.0.0.1:9876"

cd 到bin目录,执行下面命令
mqadmin updateTopic -t test_1 -b "127.0.0.1:10911" -n "127.0.0.1:9876"
添加如下参数到eclipse启动工程的VM参数里
-Drocketmq.namesrv.addr=127.0.0.1:9876

RocketMq监控平台搭建

需要去github下载,下载链接
rocketmq-console

下载后在rocketmq-console文件夹里,ctrl+shift,右键,在此处打开命令窗口,打开cmd窗口,主要要先搭建好maven环境

mvn clean package -Dmaven.test.skip=true

打包完成之后,我们去target文件夹找到rocketmq-console-ng-1.0.0.jar
然后

mkdir rocketmq-console
cd /usr/local/rocketmq-console

使用xftp上传rocketmq-console-ng-1.0.0.jar到/usr/local/rocketmq-console

nohup java -jar rocketmq-console-ng-1.0.0.jar --server.port=12581 --rocketmq.config.namesrvAddr=127.0.0.1:9876 >/usr/local/rocketmq-console/run.log 2>&1 &

端口检查

netstat -anp|grep 12581

部署成功,打开http://服务器IP:12581

这里写图片描述

编程实现MQ实例

maven加入配置

<dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>3.0.10</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.rocketmq</groupId>
            <artifactId>rocketmq-all</artifactId>
            <version>3.0.10</version>
            <type>pom</type>
        </dependency>

消息队列消费者消费消息实例

package com.mq.test;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;

import java.util.List;

public class MQConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(
                "mq-group");

        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setInstanceName("consumer");
        consumer.subscribe("TopicA-test", "TagA");

        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(
                    List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("RocketMQ Consumer Started...");
    }
}

消息队列生产者产生消息实例

package com.mq.test;

import java.util.Date;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class MQProducer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQProducer producer = new DefaultMQProducer("mq-group");
//        producer.setNamesrvAddr("123.207.63.192:9876");
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.setInstanceName("producer");
        producer.start();
        try {
            for (int i = 0; i < 10; i++) {
                Thread.sleep(1000);  //MQ每隔一秒发送一条消息
                Message msg = new Message("TopicA-test",// topic
                        "TagA",// tag
                        ("RocketMQ message"+i)
                                .getBytes()// body
                );
                SendResult sendResult = producer.send(msg);//发送消息
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.shutdown();//关闭消息生产者
    }
}

下面是来自github wiki的学习例子

Filter网络架构,以CPU资源换取宝贵的网卡流量资源

screenshot

启动Broker时,增加以下配置,可以自动加载Filter Server进程

filterServerNums=1

Filter样本(Consumer仅负责将代码上传到Filter Server,由Filter Server编译后执行)


package com.alibaba.rocketmq.example.filter;

import com.alibaba.rocketmq.common.filter.MessageFilter;
import com.alibaba.rocketmq.common.message.MessageExt;


public class MessageFilterImpl implements MessageFilter {

    @Override
    public boolean match(MessageExt msg) {
        String property = msg.getUserProperty("SequenceId");
        if (property != null) {
            int id = Integer.parseInt(property);
            if ((id % 3) == 0 && (id > 10)) {
                return true;
            }
        }

        return false;
    }
}


Consumer例子


    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroupNamecc4");
        
        String filterCode = MixAll.file2String("/home/admin/MessageFilterImpl.java");
        consumer.subscribe("TopicFilter7", "com.alibaba.rocketmq.example.filter.MessageFilterImpl",
            filterCode);

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        <br>
        consumer.start();

        System.out.println("Consumer Started.");
    }


附录

RocketMQ原理与安装教程

RocketMQ实例

阿里RocketMQ Quick Start

RocketMQ集群安装

rocketMq监控平台rocketmq-console搭建

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容