前置文章:
RocketMQ-基础使用(一),该文主要涉及MQ基础、RocketMQ安装&集群搭建、RocketMQ监控平台。
官方基础使用样例,很多基础内容其实官方文档都有很详细的说明。日常使用如果时间充足,还是推荐查看官方文档。学习官方文档是一个良好的习惯。
零、本文纲要
一、RocketMQ-基础使用
- 前置文章基础指令
二、RocketMQ-发送消息
- 发送同步消息
- 发送异步消息
- 发送单向消息
三、RocketMQ-接收消息
- 消息接收
- 消息接收-负载均衡【默认】
- 消息接收-广播模式
四、RocketMQ-消息类型
- 顺序消息
- 延迟消息
- 批量消息
- 过滤消息
- 事务消息
一、RocketMQ-基础使用
0. 前置文章基础指令
Ⅰ 启动RocketMQ的基础指令
# Start Name Server
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
# Start Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
指定自定义配置文件启动nohup sh bin/mqbroker -n localhost:9876 -c conf/broker.conf &
Ⅱ 关闭RocketMQ的基础指令
# Shutdown Servers
sh bin/mqshutdown broker
sh bin/mqshutdown namesrv
二、RocketMQ-发送消息
发送同步消息 / 发送异步消息 / 发送单向消息
1. 发送同步消息
这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
- ① 基础依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.0</version>
</dependency>
- ② 同步消息代码
/**
* 发送同步消息
*/
public class SyncProducer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.指定Nameserver地址,多个NameServer则用“;”隔开
producer.setNamesrvAddr("192.168.253.128:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("base", "tag1", ("Hello RocketMQ [" + i + "]").getBytes());
//5.发送消息
SendResult result = producer.send(msg);
//发送状态
SendStatus status = result.getSendStatus();
String msgId = result.getMsgId();
int queueId = result.getMessageQueue().getQueueId();
System.out.printf("发送状态:%s,消息ID:%s,队列:%d%n", status, msgId, queueId);
//线程睡1秒
Thread.sleep(1000);
}
//6.关闭生产者producer
producer.shutdown();
}
}
截取控制台输出
发送状态:SEND_OK,消息ID:C0A8026AC05818B4AAC28D428B270000,队列:3
2. 发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。
/**
* 发送异步消息
*/
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group2");
//2.指定Nameserver地址,多个NameServer则用“;”隔开
producer.setNamesrvAddr("192.168.253.128:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("base", "tag2", ("AsyncMsg [" + i + "]").getBytes());
//5.发送异步消息
producer.send(msg, new SendCallback() {
/**
* 发送成功的回调函数
* @param sendResult 发送结果
*/
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送结果:" + sendResult);
}
/**
* 发送失败的回调函数
* @param throwable 发送异常
*/
@Override
public void onException(Throwable throwable) {
System.out.println("发送异常:" + throwable);
}
});
//线程睡1秒
Thread.sleep(1000);
}
//6.关闭生产者producer
producer.shutdown();
}
}
与同步消息的不同之处在于通过回调函数来获取发送结果。
3. 发送单向消息
这种方式主要用在不特别关心发送结果的场景,比如:日志发送。
/**
* 发送单向消息
*/
public class OneWayProducer {
public static void main(String[] args) throws Exception, MQBrokerException {
//1.创建消息生产者producer,并制定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("group3");
//2.指定Nameserver地址,多个NameServer则用“;”隔开
producer.setNamesrvAddr("192.168.253.128:9876");
//3.启动producer
producer.start();
for (int i = 0; i < 10; i++) {
//4.创建消息对象,指定主题Topic、Tag和消息体
/**
* 参数一:消息主题Topic
* 参数二:消息Tag
* 参数三:消息内容
*/
Message msg = new Message("base", "tag3", ("OneWayMsg [" + i + "]").getBytes());
//5.发送单向消息
producer.send(msg);
//线程睡1秒
Thread.sleep(1000);
}
//6.关闭生产者producer
producer.shutdown();
}
}
三、RocketMQ-接收消息
1. 消息接收
/**
* 消息的接受者
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group3");
//2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.253.128:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("base", "tag1");
//设定消费模式:负载均衡|广播模式
//4.设置回调函数,处理消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
//接收消息内容
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(list);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
}
}
2. 消息接收-负载均衡【默认】
消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。【默认的消息消费方式】
consumer.setMessageModel(MessageModel.CLUSTERING);
3. 消息接收-广播模式
消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。
consumer.setMessageModel(MessageModel.BROADCASTING);
四、RocketMQ-消息类型
1. 顺序消息
- ① 基础分析
假定一个订单的顺序流程是:创建、付款、推送、完成。有张三、李四两人进行订单业务。
a、全局有序:
张三所有消息消费完,再消费李四消息,且内部有序;
一个Borker,一个MessageQueue;
b、局部有序:
只要保证各自消息内部的有序消费,交替消费两者的消息是可以的;
一个Borker,多个MessageQueue,一个MessageQueue对应一个订单。
所以,一般仅需保证局部有序即可。
实现方式:同一个用户的一个业务消息放到同一个队列,比如:订单号相同的消息进同一个队列。
- ② 代码实现
Ⅰ 消息生产者producer的核心代码
/**
* 参数一: 消息对象
* 参数二: 消息队列选择器 MessageQueueSelector
* 参数三: 选择队列业务标识,此处为订单ID
*/
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
/**
*
* @param list 消息队列
* @param message 消息对象
* @param o 业务标识的参数
* @return 消息队列
*/
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object o) {
long orderId = (long) o;
long index = orderId % list.size(); //订单ID一致,则取模结果一致,最终选择的队列一致
return list.get((int) index);
}
}, order.getOrderId());
Ⅱ 消息消费者consumer的核心代码
此处是通过有序消息监听MessageListenerOrderly来实现的
//4.注册消息监听器
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
for (MessageExt messageExt : list) {
System.out.println("线程名称:" + Thread.currentThread().getName() + " → " +
"消费消息:" + new String(messageExt.getBody()));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
2. 延迟消息
使用场景:比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
延迟消息使用限制:
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
Ⅰ 消息生产者producer的核心代码
msg.setDelayTimeLevel(2);
Ⅱ 消息消费者consumer的核心代码
无需调整。
注意:受限于网络情况,实际的延迟往往大于设置的延迟。
3. 批量消息
批量发送消息能显著提高传递小消息的性能。
限制:
a、相同的topic;
b、相同的waitStoreMsgOK;
c、不能是延时消息;
d、总大小不应超过4MB。
Ⅰ 消息生产者producer的核心代码
List<Message> messageList = new ArrayList<>();
Ⅱ 消息消费者consumer的核心代码
无需调整。
4. 过滤消息
一般过滤消息可通过 TAG / SQL92标准 来进行过滤
Ⅰ 消息生产者producer的核心代码
//方式一:通过Tag过滤的使用方法,消息发送方不做调整
//...
//方式二:通过sql过滤的使用方法,使用putUserProperty设置一些消息属性
msg.putUserProperty("a", String.valueOf(i));
Ⅱ 消息消费者consumer的核心代码
//方式一:通过Tag过滤的使用方法,consumer使用" || "分隔订阅不同的Tag即可
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
//方式二:通过sql过滤的使用方法,通过MessageSelector消息选择器的bySql方法过滤消息
consumer.subscribe("topic_sql_filter", MessageSelector.bySql("num > 5"));
方式二如果报错:The broker does not support consumer to filter message by SQL92
,
则需要在我们对应的Broker配置文件内做调整,添加enablePropertyFilter=true
,重启服务即可生效。
关于SQL92基础语法,RocketMQ只定义了一些基本语法来支持这个特性:
数值比较,比如:>,>=,<,<=,BETWEEN,=;
字符比较,比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号 AND,OR,NOT;
常量支持类型为:
数值,比如:123,3.1415;
字符,比如:'abc',必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE
注意:只有使用push模式的消费者才能用使用SQL92标准的sql语句。
5. 事务消息
在【分布式事务-可靠消息最终一致性】的解决方案内使用的就是事务消息。
事务消息流程:正常事务消息的发送及提交,以及事务消息的补偿【事务状态回查】;
事务状态:
LocalTransactionState.COMMIT_MESSAGE 提交状态 允许消费消息;
LocalTransactionState.ROLLBACK_MESSAGE 回滚状态 删除消息,不允许被消费;
LocalTransactionState.UNKNOW 中间状态 需要回查事务。
Ⅰ 消息生产者producer的核心代码
/**
* 发送同步消息
*/
public class Producer {
public static void main(String[] args) throws Exception {
//1.创建消息生产者producer,并制定生产者组名
TransactionMQProducer producer = new TransactionMQProducer("group1");
//2.指定Nameserver地址,多个NameServer则用“;”隔开
producer.setNamesrvAddr("192.168.253.128:9876");
//3.设置消息事务的监听器
producer.setTransactionListener(new TransactionListener() {
/**
* 在该方法中执行本地的事务
* @param message 消息
* @param o
* @return 事务状态
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
String messageTags = message.getTags();
if (StringUtils.equals("TagA", messageTags)) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (StringUtils.equals("TagB", messageTags)) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else {
return LocalTransactionState.UNKNOW;
}
}
/**
* 该方法进行MQ事务状态的回查
* @param messageExt 消息
* @return 事务状态
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("消息的Tag:" + messageExt.getTags());
return LocalTransactionState.COMMIT_MESSAGE;
}
});
String[] tags = new String[]{"TagA", "TagB", "TagC"};
//4.启动producer
producer.start();
for (int i = 0; i < 3; i++) {
//5.创建消息对象,指定Topic、Tag、消息体
Message msg = new Message("topic_transaction", tags[i],
(tags[i] + " Hello transactionMsg " + i).getBytes(StandardCharsets.UTF_8));
//6.发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.println("发送结果:" + sendResult);
Thread.sleep(1000);
}
//7.关闭生产者producer,此处需要回查,所以不关闭
//producer.shutdown();
}
}
Ⅱ 消息消费者consumer的核心代码
/**
* 消息的接受者
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建消费者Consumer,制定消费者组名
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.指定Nameserver地址
consumer.setNamesrvAddr("192.168.253.128:9876");
//3.订阅主题Topic和Tag
consumer.subscribe("topic_transaction", "*");
//4.注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list,
ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt messageExt : list) {
System.out.println("消费消息:" + new String(messageExt.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动消费者consumer
consumer.start();
System.out.println("消费者启动了...");
}
}
注意:事务消息不支持延时消息和批量消息。
五、结尾
以上即为RocketMQ-基础使用(二)的全部内容,感谢阅读。