注:默认已安装jdk,maven的前提下
1.下载
http://rocketmq.apache.org/release_notes/release-notes-4.2.0/
2.启动
解压后进入bin目录:D:\rocketmq-4.2\bin
2.1 执行命令:start mqnamesrv.cmd 启动nameserver,成功后不要关闭窗口
启动成功可以看到success:
2.2 执行命令:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true 启动broker 执行成功后不要关闭窗口
启动成功是一个空的命令行窗口:
2.3 执行完已经启动成功,会有三个窗口,完整的执行命令的窗口图:
3.安装rocketmq-console插件(可选,不是必须)
下载地址:https://github.com/apache/rocketmq-externals
3.1 如果已安装git,使用git命令clone https://github.com/apache/rocketmq-externals即可下载
3.2 如果未安装git,点击下载:
3.3 下载完成后进入解压目录的rocketmq-console项目找到application.properties配置文件,此处我的目录是:D:\rocketmq-externals-master\rocketmq-console\src\main\resources
3.4 打开目录配置rocketmq-console的启动端口(它本质是一个spring-boot项目,这里的端口相当于启动项目,也就是tomcat端口,不要与其它项目端口冲突),以及刚才启动的rocketmq的ip及端口号:
3.5 配置完成后,打开cmd进入rocketmq-console项目根目录,我的目录是D:\rocketmq-externals-master\rocketmq-console
执行maven命令跳过测试打包:mvn clean package -Dmaven.test.skip=true 第一次需要下载依赖会有些慢
3.6 打包成功后,target目录下会生成rocketmq-console-ng-1.0.0.jar,我的目录为D:\rocketmq-externals-master\rocketmq-console\target
3.7 使用java命令java -jar rocketmq-console-ng-1.0.0.jar运行这个jar包
3.8 访问rocketmq监控页面
http://localhost:8001 以实际地址为准
4 测试rocketmq
4.1 启动消费者监听,然后模拟生产者发送一条数据
4.2 测试结果,第一条打印信息是生产者打印的,可以看到SEND_OK代表生产成功,第二条打印信息是消费者打印的,已经成功消费到数据,如果数据中含有中文,消费时注意要用UTF-8,否则乱码
4.3 安装的监控页面也可以看到刚才生产的消息,点击MESSAGE DETAIL按钮可以查看消息内容:
6 测试代码
6.1 依赖:
<!-- RocketMQ -->
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>3.5.8</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.5.8</version>
</dependency>
6.2 完整测试代码:
package com.haocang.clean.util;
import java.util.List;
import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQBrokerException;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.alibaba.rocketmq.remoting.exception.RemotingException;
import com.alibaba.rocketmq.shade.com.alibaba.fastjson.JSON;
import com.haocang.itc.pojo.MqBean;
/**
* RocketMq alibaba版本操作工具类
* 生产/消费
* @author shenke
*/
public class RocketMqUtil {
private RocketMqUtil(){
}
// 默认生产者组名称
private static final String DEFAULT_ROCKETMQ_PRODUCER_GROUPNAME = "test_producer";
// 默认连接地址
private static final String DEFAULT_ROCKETMQ_ADDRESS = "localhost:9876";
// 默认生产者应用名称
private static final String DEFAULT_ROCKETMQ_INSTANCENAME = "test";
// 默认生产者最大消息长度
private static final Integer DEFAULT_ROCKETMQ_MAXMESSAGESIZE = Integer.MAX_VALUE;
// 默认编码
public static final String DEFAULT_ROCKETMQ_ENCODING = "UTF-8";
// 默认消费者组名称
private static final String DEFAULT_ROCKETMQ_CONSUMER_GROUPNAME = "test_consumer";
// 默认监听主题
private static final String DEFAULT_ROCKETMQ_TOPIC = "test";
// 默认监听过滤条件
private static final String DEFAULT_ROCKETMQ_SUB_EXPRESSION = "test1";
/**
* 初始化生产者服务
* 默认配置
* @return
*/
private static synchronized DefaultMQProducer initProducer() {
return initProducer(
DEFAULT_ROCKETMQ_PRODUCER_GROUPNAME,
DEFAULT_ROCKETMQ_ADDRESS,
DEFAULT_ROCKETMQ_INSTANCENAME,
DEFAULT_ROCKETMQ_MAXMESSAGESIZE
);
}
/**
* 初始化生产者服务
* 可选配置
* @param rocketmqAddress
* @return
*/
private static synchronized DefaultMQProducer initProducer(
String groupName, String rocketmqAddress,String instanceName, int maxMessageSize) {
DefaultMQProducer producer = new DefaultMQProducer(groupName);
producer.setNamesrvAddr(rocketmqAddress);
producer.setInstanceName(instanceName);
producer.setMaxMessageSize(maxMessageSize);
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
return producer;
}
/**
* 关闭生产者服务
*/
public static synchronized void closeProducer(DefaultMQProducer producer){
if (producer != null) {
producer.shutdown();
}
}
/**
* 初始化消费者服务
* 使用默认配置
* @return
*/
private static DefaultMQPushConsumer initConsumer(){
return initConsumer(DEFAULT_ROCKETMQ_ADDRESS, DEFAULT_ROCKETMQ_CONSUMER_GROUPNAME, DEFAULT_ROCKETMQ_TOPIC, DEFAULT_ROCKETMQ_SUB_EXPRESSION);
}
/**
* 初始化消费者服务
* 使用自定义配置
* @param topic
* @param subExpression
* @return
*/
private static DefaultMQPushConsumer initConsumer(String namesrvAddr, String consumerGroup, String topic, String subExpression) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(namesrvAddr);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
try {
consumer.subscribe(topic, subExpression);
} catch (MQClientException e1) {
e1.printStackTrace();
}
return consumer;
}
/**
* 关闭消费者服务
*/
public static void closeConsumer(DefaultMQPushConsumer consumer){
if (consumer != null) {
consumer.shutdown();
}
}
/**
* 生产消息
* 使用默认配置
* @param message
* @param close 生产完毕是否关闭,若不关闭则会阻塞
* @return
*/
public static SendResult send(Message message, boolean close){
SendResult sendResult = null;
DefaultMQProducer producer = initProducer();
try {
sendResult = producer.send(message);
if(close){
closeProducer(producer);
}
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
return sendResult;
}
/**
* 生产消息
* 使用自定义配置
* @param message
* @param groupName
* @param rocketmqAddress
* @param instanceName
* @param maxMessageSize
* @param close 生产完毕是否关闭,若不关闭则会阻塞
* @return
*/
public static SendResult send(Message message, String groupName,String rocketmqAddress,String instanceName, int maxMessageSize, boolean close){
SendResult sendResult = null;
try {
DefaultMQProducer producer = initProducer(groupName, rocketmqAddress, instanceName, maxMessageSize);
sendResult = producer.send(message);
if(close){
closeProducer(producer);
}
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
return sendResult;
}
/**
* 消费消息
* 使用默认配置
* @param messageListenerConcurrently
* @throws MQClientException
*/
public static void consumer(MessageListenerConcurrently messageListenerConcurrently) throws MQClientException{
DefaultMQPushConsumer consumer = initConsumer();
consumer.registerMessageListener(messageListenerConcurrently);
consumer.start();
}
/**
* 消费消息
* 使用自定义配置
* @param topic
* @param subExpression
* @param messageListenerConcurrently
* @throws MQClientException
*/
public static void consumer(String namesrvAddr, String consumerGroup, String topic, String subExpression, MessageListenerConcurrently messageListenerConcurrently) throws MQClientException{
DefaultMQPushConsumer consumer = initConsumer(namesrvAddr, consumerGroup, topic, subExpression);
consumer.registerMessageListener(messageListenerConcurrently);
consumer.start();
}
public static void main(String[] args) {
try {
consumer(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
for (MessageExt messageExt : msgs) {
System.out.println(new String(messageExt.getBody(), "UTF-8"));
}
} catch (Exception e) {
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
} catch (Exception e) {
}
String data = "[{\"id\":\"001\",\"name\":\"张三\",\"age\":\"21\"},{\"id\":\"002\",\"name\":\"李四\",\"age\":\"22\"}]";
SendResult send = send(new Message("test", "test1", data.getBytes()), true);
System.out.println(send.toString());
}
}
7 总结
本文只是demo,详细阐述如果在windows上搭建rocketmq服务及测试,实际生产环境中需要更精准的配置,以及具体的业务问题,比如订单系统中如何保证消息顺序消费,当存在多个监听时如何避免消息重复消费,如何让每个监听都能消费到,如何回溯消息,如何定时消费消息,延时消费消息等一系列的问题,可以参考官方文档:http://rocketmq.apache.org/docs/quick-start/