RocketMq-Windows单机安装

注:默认已安装jdk,maven的前提下

1.下载

http://rocketmq.apache.org/release_notes/release-notes-4.2.0/

image.png

2.启动

解压后进入bin目录:D:\rocketmq-4.2\bin
2.1 执行命令:start mqnamesrv.cmd 启动nameserver,成功后不要关闭窗口
启动成功可以看到success:


nameserver启动成功窗口

2.2 执行命令:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true 启动broker 执行成功后不要关闭窗口
启动成功是一个空的命令行窗口:


broker启动成功窗口

2.3 执行完已经启动成功,会有三个窗口,完整的执行命令的窗口图:
执行命令的窗口图

3.安装rocketmq-console插件(可选,不是必须)

下载地址:https://github.com/apache/rocketmq-externals
3.1 如果已安装git,使用git命令clone https://github.com/apache/rocketmq-externals即可下载
3.2 如果未安装git,点击下载:

下载rocktemq-console

3.3 下载完成后进入解压目录的rocketmq-console项目找到application.properties配置文件,此处我的目录是:D:\rocketmq-externals-master\rocketmq-console\src\main\resources
application.properties配置文件目录

3.4 打开目录配置rocketmq-console的启动端口(它本质是一个spring-boot项目,这里的端口相当于启动项目,也就是tomcat端口,不要与其它项目端口冲突),以及刚才启动的rocketmq的ip及端口号:
配置rocketmq-console

3.5 配置完成后,打开cmd进入rocketmq-console项目根目录,我的目录是D:\rocketmq-externals-master\rocketmq-console
执行maven命令跳过测试打包:mvn clean package -Dmaven.test.skip=true 第一次需要下载依赖会有些慢
build项目成功的命令行窗口

3.6 打包成功后,target目录下会生成rocketmq-console-ng-1.0.0.jar,我的目录为D:\rocketmq-externals-master\rocketmq-console\target
maven打包的jar

3.7 使用java命令java -jar rocketmq-console-ng-1.0.0.jar运行这个jar包
启动成功,配置的端口号为8001

3.8 访问rocketmq监控页面
http://localhost:8001 以实际地址为准
监控页面

4 测试rocketmq

4.1 启动消费者监听,然后模拟生产者发送一条数据


测试生产消费

4.2 测试结果,第一条打印信息是生产者打印的,可以看到SEND_OK代表生产成功,第二条打印信息是消费者打印的,已经成功消费到数据,如果数据中含有中文,消费时注意要用UTF-8,否则乱码


测试结果

4.3 安装的监控页面也可以看到刚才生产的消息,点击MESSAGE DETAIL按钮可以查看消息内容:
生产成功的消息

消息详细

6 测试代码

6.1 依赖:

<!-- RocketMQ -->
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-all</artifactId>
    <version>3.5.8</version>
    <type>pom</type>
</dependency>
<dependency>
    <groupId>com.alibaba.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>3.5.8</version>
</dependency>

6.2 完整测试代码:

package com.haocang.clean.util;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.alibaba.rocketmq.shade.com.alibaba.fastjson.JSON;
import com.haocang.itc.pojo.MqBean;

/**
 * RocketMq alibaba版本操作工具类
 * 生产/消费
 * @author shenke
 */
public class RocketMqUtil {
    
    private RocketMqUtil(){
        
    }
    
    // 默认生产者组名称
    private static final String DEFAULT_ROCKETMQ_PRODUCER_GROUPNAME = "test_producer";
    // 默认连接地址
    private static final String DEFAULT_ROCKETMQ_ADDRESS = "localhost:9876";
    // 默认生产者应用名称
    private static final String DEFAULT_ROCKETMQ_INSTANCENAME = "test";
    // 默认生产者最大消息长度
    private static final Integer DEFAULT_ROCKETMQ_MAXMESSAGESIZE = Integer.MAX_VALUE;
    // 默认编码
    public static final String DEFAULT_ROCKETMQ_ENCODING = "UTF-8";
    // 默认消费者组名称
    private static final String DEFAULT_ROCKETMQ_CONSUMER_GROUPNAME = "test_consumer";
    // 默认监听主题
    private static final String DEFAULT_ROCKETMQ_TOPIC = "test";
    // 默认监听过滤条件
    private static final String DEFAULT_ROCKETMQ_SUB_EXPRESSION = "test1";
    
    /**
     * 初始化生产者服务
     * 默认配置
     * @return
     */
    private static synchronized DefaultMQProducer initProducer() {
        return initProducer(
                DEFAULT_ROCKETMQ_PRODUCER_GROUPNAME,
                DEFAULT_ROCKETMQ_ADDRESS,
                DEFAULT_ROCKETMQ_INSTANCENAME,
                DEFAULT_ROCKETMQ_MAXMESSAGESIZE
        );
    }
 
    /**
     * 初始化生产者服务
     * 可选配置
     * @param rocketmqAddress
     * @return
     */
    private static synchronized DefaultMQProducer initProducer(
        String groupName, String rocketmqAddress,String instanceName, int maxMessageSize) {
        DefaultMQProducer producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr(rocketmqAddress);
        producer.setInstanceName(instanceName);
        producer.setMaxMessageSize(maxMessageSize);
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        return producer;
    }
    
    /**
     * 关闭生产者服务
     */
    public static synchronized void closeProducer(DefaultMQProducer producer){
        if (producer != null) {
            producer.shutdown();
        }
    }
    
    /**
     * 初始化消费者服务
     * 使用默认配置
     * @return
     */
    private static DefaultMQPushConsumer initConsumer(){
        return initConsumer(DEFAULT_ROCKETMQ_ADDRESS, DEFAULT_ROCKETMQ_CONSUMER_GROUPNAME, DEFAULT_ROCKETMQ_TOPIC, DEFAULT_ROCKETMQ_SUB_EXPRESSION);
    }
    
    /**
     * 初始化消费者服务
     * 使用自定义配置
     * @param topic
     * @param subExpression
     * @return
     */
    private static DefaultMQPushConsumer initConsumer(String namesrvAddr, String consumerGroup, String topic, String subExpression) {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
        consumer.setNamesrvAddr(namesrvAddr);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        try {
            consumer.subscribe(topic, subExpression);
        } catch (MQClientException e1) {
            e1.printStackTrace();
        }
        return consumer;
    }
    
    /**
     * 关闭消费者服务
     */
    public static void closeConsumer(DefaultMQPushConsumer consumer){
        if (consumer != null) {
            consumer.shutdown();
        }
    }
    
    /**
     * 生产消息
     * 使用默认配置
     * @param message
     * @param close 生产完毕是否关闭,若不关闭则会阻塞
     * @return
     */
    public static SendResult send(Message message, boolean close){
        SendResult sendResult = null;
        DefaultMQProducer producer = initProducer();
        try {
            sendResult = producer.send(message);
            if(close){
                closeProducer(producer);
            }
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
        return sendResult;
    }
    
    /**
     * 生产消息
     * 使用自定义配置
     * @param message
     * @param groupName
     * @param rocketmqAddress
     * @param instanceName
     * @param maxMessageSize
     * @param close 生产完毕是否关闭,若不关闭则会阻塞
     * @return
     */
    public static SendResult send(Message message, String groupName,String rocketmqAddress,String instanceName, int maxMessageSize, boolean close){
        SendResult sendResult = null;
        try {
            DefaultMQProducer producer = initProducer(groupName, rocketmqAddress, instanceName, maxMessageSize);
            sendResult = producer.send(message);
            if(close){
                closeProducer(producer);
            }
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
        return sendResult;
    }
    
    /**
     * 消费消息
     * 使用默认配置
     * @param messageListenerConcurrently
     * @throws MQClientException 
     */
    public static void consumer(MessageListenerConcurrently messageListenerConcurrently) throws MQClientException{
        DefaultMQPushConsumer consumer = initConsumer();
        consumer.registerMessageListener(messageListenerConcurrently);
        consumer.start();
    }
    
    /**
     * 消费消息
     * 使用自定义配置
     * @param topic
     * @param subExpression
     * @param messageListenerConcurrently
     * @throws MQClientException 
     */
    public static void consumer(String namesrvAddr, String consumerGroup, String topic, String subExpression, MessageListenerConcurrently messageListenerConcurrently) throws MQClientException{
        DefaultMQPushConsumer consumer = initConsumer(namesrvAddr, consumerGroup, topic, subExpression);
        consumer.registerMessageListener(messageListenerConcurrently);
        consumer.start();
    }
    
    public static void main(String[] args) {
        try {
            consumer(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        for (MessageExt messageExt : msgs) {
                            System.out.println(new String(messageExt.getBody(), "UTF-8"));
                        }
                    } catch (Exception e) {
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
        } catch (Exception e) {
        }
        
        String data = "[{\"id\":\"001\",\"name\":\"张三\",\"age\":\"21\"},{\"id\":\"002\",\"name\":\"李四\",\"age\":\"22\"}]";
        SendResult send = send(new Message("test", "test1", data.getBytes()), true);
        System.out.println(send.toString());
    }

}

7 总结

本文只是demo,详细阐述如果在windows上搭建rocketmq服务及测试,实际生产环境中需要更精准的配置,以及具体的业务问题,比如订单系统中如何保证消息顺序消费,当存在多个监听时如何避免消息重复消费,如何让每个监听都能消费到,如何回溯消息,如何定时消费消息,延时消费消息等一系列的问题,可以参考官方文档:http://rocketmq.apache.org/docs/quick-start/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容