RocketMQ 快速开始quickstart && borker配置文件

producer

public class Producer1 {

    public static void main(String[] agrs) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {

        DefaultMQProducer producer = new
                DefaultMQProducer("TopicTest3-produceGroup1",ACLClient.getAclRPCHook());
        // Specify name server addresses.
        producer.setNamesrvAddr("10.1.54.46:9876");
        producer.setInstanceName("producer 1");
        //Launch the instance.
        producer.start();
        
        for (int i = 0; i < 10; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest3" /* Topic */,
                    "abc" /* Tag */,"OrderID188",
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            msg.putUserProperty("coal", String.valueOf(i));
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();

    }
}

consumer

public class AclConsumer {

    public static void main(String[] agrs) throws MQClientException {
        //指定Group和ACL
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup5", ACLClient.getAclRPCHook(), new AllocateMessageQueueAveragely());

        consumer.setNamesrvAddr("localhost:9876");
        consumer.setInstanceName("consumer 5");

        //集群订阅(MessageModel.CLUSTERING)
        //广播订阅(MessageModel.BROADCASTING)
        consumer.setMessageModel(MessageModel.BROADCASTING);

        //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
        //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
        //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //可以修改每次消费消息的数量,默认设置是每次消费一条
        consumer.setConsumeMessageBatchMaxSize(10);


        //设置consumer所订阅的Topic和Tag,*代表全部的Tag MessageSelector.byTag
        consumer.subscribe("TopicTest3", MessageSelector.bySql("coal between 2 and 7"));

        //注册消费的监听
        consumer.registerMessageListener(MessageListener.getInstance());

        consumer.start();
        System.out.println("consumer 2 is started");

    }
}

ACLClient

public class ACLClient {

    private static final String ACL_ACCESS_KEY = "RocketMQ2";
    private static final String ACL_SECRET_KEY = "12345678";

    static RPCHook getAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
    }

    private static final String TEST_ACL_ACCESS_KEY = "testKey";
    private static final String TEST_ACL_SECRET_KEY = "12345678";

    static RPCHook getTestAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(TEST_ACL_ACCESS_KEY,TEST_ACL_SECRET_KEY));
    }


    private static final String ERROR_ACL_ACCESS_KEY = "RocketMQ333";
    private static final String ERROR_ACL_SECRET_KEY = "12345673333";

    static RPCHook getErrorAclRPCHook() {
        return new AclClientRPCHook(new SessionCredentials(ERROR_ACL_ACCESS_KEY,ERROR_ACL_SECRET_KEY));
    }
}

Consumer 的 MessageListener

public class MessageListener implements MessageListenerConcurrently {

    private static MessageListener instance = null;

    public static MessageListener getInstance() {
        if (instance == null) {
            synchronized (MessageListener.class) {
                if (instance == null) {
                    instance = new MessageListener();
                }
            }
        }
        return instance;
    }

    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        for (MessageExt messageExt : msgs) {
            String messageBody = new String(messageExt.getBody());
            System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(
                    new Date()) + "消费响应: msgBody : " + messageBody);//输出消息内容
        }
        //ACK
        //CONSUME_SUCCESS 消费成功
        //RECONSUME_LATER 消费失败,需要稍后重新消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    private MessageListener() {
    }
}

Borker配置文件介绍

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

#  brokerClusterName = DefaultCluster
#  brokerName = broker-a
#  brokerId = 0
#  deleteWhen = 04
#  fileReservedTime = 48
#  brokerRole = ASYNC_MASTER
#  flushDiskType = ASYNC_FLUSH

# 所属集群名字 
brokerClusterName = default-rocketmq-cluster
# true后可以使用SQL92
enablePropertyFilter = true
#broker名字,注意此处不同的配置文件填写的不一样 
brokerName = broker-a
#0 表示 Master,>0 表示 Slave
brokerId = 0 
#nameServer地址,分号分割
brokerIP1 = 192.168.6.46
namesrvAddr = 192.168.6.46:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 
defaultTopicQueueNums = 4 
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭 
autoCreateTopicEnable = true 
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 
autoCreateSubscriptionGroup = true 
#Broker 对外服务的监听端口 
listenPort = 10911 
#删除文件时间点,默认凌晨 4点 
deleteWhen = 04 
#文件保留时间,默认 48 小时 
fileReservedTime = 120 
#commitLog每个文件的大小默认1G 
mapedFileSizeCommitLog = 1073741824 
#ConsumeQueue每个文件默认存30W条,根据业务情况调整 
mapedFileSizeConsumeQueue = 300000 
#destroyMapedFileIntervalForcibly=120000 
#redeleteHangedFileInterval=120000 
#检测物理文件磁盘空间 
diskMaxUsedSpaceRatio = 88 
#存储路径 
storePathRootDir = D:\RocketMQ\target 
#commitLog 存储路径 
storePathCommitLog = D:\RocketMQ\target\commitLog 
#消费队列存储路径存储路径 
storePathConsumeQueue = D:\RocketMQ\target\consumequeue 
#消息索引存储路径 
storePathIndex = D:\RocketMQ\target\index 
#checkpoint 文件存储路径 
storeCheckpoint = D:\RocketMQ\target\checkpoint 
#Broker 的角色 
#- ASYNC_MASTER 异步复制Master 
#- SYNC_MASTER 同步双写Master 
#- SLAVE brokerRole=ASYNC_MASTER 
#刷盘方式 
#- ASYNC_FLUSH 异步刷盘 
#- SYNC_FLUSH 同步刷盘 
flushDiskType = ASYNC_FLUSH 
#checkTransactionMessageEnable=false 
#abort 文件存储路径 
abortFile = D:\RocketMQ\target\abort 
#限制的消息大小 
maxMessageSize = 65536 
#flushCommitLogLeastPages=4 
#flushConsumeQueueLeastPages=2 
#flushCommitLogThoroughInterval=10000 
#flushConsumeQueueThoroughInterval=60000
#acl控制
aclEnable=true
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,012评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,628评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,653评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,485评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,574评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,590评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,596评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,340评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,794评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,102评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,276评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,940评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,583评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,201评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,441评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,173评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,136评论 2 352

推荐阅读更多精彩内容