@[toc]
1 示例模式
RocketMQ 事务消息示例包含一个生产者、消费者、NameServer 以及 Broker 服务,它们之间的关系如下:
RocketMQ架构上主要分为四部分[^1]:
Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡模块选择相应的Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。
Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。
NameServer:NameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。
BrokerServer:Broker主要负责消息的存储、投递和查询以及服务高可用保证。
2 安装与配置 RocketMQ
(1)下载解压
到 RocketMQ 官网下载二进制版本包,解压到磁盘下。
(2)JVM 版本
注意:openjdk11 不能运行(会抛出Error: Could not create the Java Virtual Machine.),jdk8 可以。
如果cmd 命令一闪而过,可以在命令代码的末尾加入 pause,以观察出错日志。
(3)配置环境变量
配置 ROCKETMQ_HOME 与 NAMESEV_ADDR 环境变量。其中 ROCKETMQ_HOME 是 RocketMQ 的解压后的路径,NAMESEV_ADDR 是 NameServer 的访问地址。
(4)在应用工程的 pom 中配置
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
3 运行服务
使用 powershell 分别启动 name server与 broker 服务。
3.1 启动 NameServer
启动 name server:
cd C:\programs\rocketmq-4.9.2\bin
.\mqnamesrv.cmd
输出:
Java HotSpot(TM) 64-Bit Server VM warning: Using the DefNew young collector with the CMS collector is deprecated and will likely be removed in a future release
Java HotSpot(TM) 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.
The Name Server boot success. serializeType=JSON
3.2 启动 broker
启动 broker:
cd C:\programs\rocketmq-4.9.2\bin
.\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
-n 指定 NameServer 地址,autoCreateTopicEnable 为 true 表示自动创建主题。
输出:
The broker[DENIRO-LEE, 192.168.17.1:10911] boot success. serializeType=JSON and name server is localhost:9876
4 生产者
4.1 事务监听器
事务监听器需要实现两个方法,其中的 executeLocalTransaction 方法用于执行本地事务,checkLocalTransaction 方法用于 broker 回查本地事务的状态。具体逻辑如下:
- 定义了一个原子整型计数器 transactionIndex,用于循环事务状态。
- 定义了一个线程安全 ConcurrentHashMap,名为 localTrans,用于存放事务 ID与状态码。
- 在 executeLocalTransaction 方法中,递增 transactionIndex,然后除以 3,求得余数。因为事务只有三种状态:UNKNOW、ROLLBACK_MESSAGE 和 COMMIT_MESSAGE,所以这里除数为 3。然后返回 UNKNOW 状态,让 broker 回查这批消息的事务状态。
- 在 checkLocalTransaction 方法中,会传入由 broker 给出的事务 ID。然后依据这个事务 ID 从 localTrans 中取出这个事务 ID 的状态码。接着依据不同的状态码,打印日志并返回对应的事务状态。
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String transactionId = msg.getTransactionId();
Integer status = localTrans.get(transactionId);
if (null != status) {
switch (status) {
case 0:
System.out.printf("%s%s%n", "事务ID -> "+transactionId," 返回 UNKNOW");
return LocalTransactionState.UNKNOW;
case 1:
System.out.printf("%s%s%n", "事务ID -> "+transactionId, "返回 COMMIT_MESSAGE");
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
System.out.printf("%s%s%n", "事务ID -> "+transactionId, "返回 ROLLBACK_MESSAGE");
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
System.out.printf("%s%s%n", "事务ID -> "+transactionId, "返回 UNKNOW");
return LocalTransactionState.COMMIT_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
4.2 事务消息生产者
该生产者会产生 10 条事务消息。具体逻辑如下:
- 创建事务监听器 transactionListener。
- 创建事务消息生产者 producer。
- 生产者设置 NameServer的地址。
- 创建线程池执行器 ThreadPoolExecutor。
- 生产者设置线程池执行器。
- 生产者设置事务监听器。
- 启动生产者。
- 创建多个标签。
- 循环生成 10 条事务消息。每个消息创建后,使用生产者以事务的方式发送该消息,打印并休眠 10 ms。
- 线程休眠一段时间。用于响应 broker 的回查请求。
- 关闭生产者。
package com.deniro.rocketmq.transaction;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.example.transaction.TransactionListenerImpl;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.*;
/**
* 生产事务消息的生产者
*
* @author Deniro Lee
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer(
"deniro_transaction_message_group");
// 设置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("transactionMsg", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", "第" + (i+1) + "条 ->" + sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
5 消费者
消费者只能获取事务状态为 COMMIT_MESSAGE 的消息。具体逻辑如下:
- 创建消费者。
- 消费者设置NameServer的地址。
- 消费者订阅Topic。
- 消费者注册回调实现类来处理从broker拉取回来的消息。回调实现类的方法返回消息已经被成功消费的状态。
- 启动消费者。
package com.deniro.rocketmq.base;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 消费者
*
* @author Deniro Lee
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 实例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("deniro_transaction_message_group");
// 设置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
consumer.subscribe("transactionMsg", "*");
// 注册回调实现类来处理从broker拉取回来的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动消费者实例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
6 测试
生产者端发送 10 条事务消息:
第1条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF410000, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=1], queueOffset=432]
第2条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF550001, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=2], queueOffset=433]
第3条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF640002, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=3], queueOffset=434]
第4条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF740003, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=0], queueOffset=435]
第5条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF840004, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=1], queueOffset=436]
第6条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CF930005, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=2], queueOffset=437]
第7条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFA20006, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=3], queueOffset=438]
第8条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFB20007, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=0], queueOffset=439]
第9条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFC20008, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=1], queueOffset=440]
第10条 ->SendResult [sendStatus=SEND_OK, msgId=7F000001771418B4AAC24166CFD10009, offsetMsgId=null, messageQueue=MessageQueue [topic=transactionMsg, brokerName=DENIRO-LEE, queueId=2], queueOffset=441]
事务监听器对 3 取余数,依据余数的值来返回事务状态。
余数值 | 说明 |
---|---|
0 | 返回 UNKNOW。表示未知,这样 broker 会不断回查生产者的事务状态(限制为 15 次,可在 broker 端配置,超过限制则丢弃该消息并记录日志)。 |
1 | 返回 ROLLBACK_MESSAGE。表示事务回滚,这样消费者端就不会收到这条消息。 |
2 | 返回 COMMIT_MESSAGE。表示事务提交,这样消费者端就会收到这条消息,执行本地事务操作。 |
生产者端事务监听器输出:
事务ID -> 7F000001771418B4AAC24166CF410000 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF640002返回 ROLLBACK_MESSAGE
事务ID -> 7F000001771418B4AAC24166CF550001返回 COMMIT_MESSAGE
事务ID -> 7F000001771418B4AAC24166CF740003 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF840004返回 COMMIT_MESSAGE
事务ID -> 7F000001771418B4AAC24166CF930005返回 ROLLBACK_MESSAGE
事务ID -> 7F000001771418B4AAC24166CFA20006 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFB20007返回 COMMIT_MESSAGE
事务ID -> 7F000001771418B4AAC24166CFD10009 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFC20008返回 ROLLBACK_MESSAGE
事务ID -> 7F000001771418B4AAC24166CF410000 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF740003 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFA20006 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFD10009 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF410000 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CF740003 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFA20006 返回 UNKNOW
事务ID -> 7F000001771418B4AAC24166CFD10009 返回 UNKNOW
消费者端只会收到状态为 COMMIT_MESSAGE 的消息:
通过 RocketMQ 可以实现可靠消息最终一致性,适用于执行周期长且实时性要求不高的场景。优点是避免了分布式事务中的同步阻塞的影响,并实现了两个服务的解耦。不足是如果消费者端事务异常回滚,生产者端不会回滚。而且消息可能会被重复消费,因此需要在消费者端进行冥等处理。