原创:阿里巴巴 辽天RocketMQ官微12月10日
本文主要介绍rocketmq-spring-boot支持的高级用例,包括发送顺序消息,异步发送,消费过滤以及事务消息发送。该项目git地址:https://github.com/apache/rocketmq-spring
文章主要内容包括以下几个方面:
1 前言
2 API和注解列表
3 消息发送端
4 消息消费端
5 发送事务消息
5.1 定义回查实现类
5.2 RocketMQTemplate发送事务消息
1 前言
首先在这里向大家报告一个好消息,rocketmq-spring-boot项目经过6个多月的孵化(孵化项目repo: https://github.com/apache/rocketmq-externals),在今年12月初正式毕业。今后的维护和增强将在新的release仓库中进行,具体的地址是:https://github.com/apache/rocketmq-spring
我们把原来单一的project模块按照Spring Boot的规范划分成了四个子模块:
rocketmq-spring-boot-parent (父pom文件,定义相关的依赖管理和Plugin,供其它几个模块引用)
rocketmq-spring-boot (定义auto-configuration实现,具体RocketMQ相关的自动配置和Bean创建代码都集中在这里)
rocketmq-spring-starter (将rocketmq-spring-boot和其它的依赖打包生成全量的依赖,用户引用它即可完成所有rocketmq-spring的客户端操作)
rocketmq-spring-samples (使用示例,展示如何使用spring-boot方式发送和消费消息)
相对于孵化器版本,本次代码进行了较大的调整。目前已经支持spring-boot 2.0,推荐使用孵化器版本的用户尽快切换到新release的版本。请参考samples来体验spring-boot方式发送和消费消息的编码和使用方式:
https://github.com/apache/rocketmq-spring/tree/master/rocketmq-spring-boot-samples。
下面具体的介绍一下rocketmq-spring-boot的一些使用细节。
2 API和注解列表
编写代码时需要按消息发送者(Producer)和消息消费者(Consumer)分别进行代码编写,会使用到如下的API或注解:
API
类型
发送端/消费端
说明
RocketMQTemplate
API
发送端
发送端负责发送消息的API,它与@Resource注解一同使用,进行对象的声明和实例化。
RocketMQTransactionListener
注解
发送端
修饰事务回查Listener
TransactionListener
API
消费端
事务回查Listener接口,实现本地事务执行方法和本地事务回查方法
RocketMQMessageListener
注解
消费端
修饰消息消费Listener
RocketMQListener<T>
API
消费端
消息消费Listener接口,实现消费消息的处理逻辑的方法
RocketMQPushConsumerLife
cycleListener
API
消费端
该接口用来实现Push方式消费时,消费者的信息配置,如消费时间和消费位点方式
注:关于上述API或注解的使用方式,我们提供了如何使用Spring-Boot发送和消费RocketMQ消息的例子,可以直接参考sample的源码(https://github.com/apache/rocketmq-externals/tree/master/samples/rocketmq-spring-boot-starter-sample)。下面的文档是对示例的简单说明。
3 消息发送端
在使用RocketMQTemplate编写客户端时,需要执行如下的步骤:
1. 定义Spring-Boot的 application.properties (注:如果全部使用默认的配置,可以不定义这个文件)
## application.properties
spring.rocketmq.nameServer=127.0.0.1:9876
#你可以根据自己的name-server信息进行修改
spring.rocketmq.producer.group=my-group
## 其他的配置信息spring.rocketmq.producer.retryTimesWhenSendAsyncFailed=0
spring.rocketmq.producer.sendMessageTimeout=300000
spring.rocketmq.producer.compressMessageBodyOverHowmuch=4096
spring.rocketmq.producer.maxMessageSize=4194304
spring.rocketmq.producer.retryAnotherBrokerWhenNotStoreOk=false
spring.rocketmq.producer.retryTimesWhenSendFailed=2
2. 声明RocketMQTemplate,并根据发送方式的不同选择合适的方法进行消息发送。
3. 如果是异步发送还需要在异步调用方法中,设置回调的Callback对象。
import org.apache.rocketmq.spring.starter.core.RocketMQTemplate;
...
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner{
@Resource
private RocketMQTemplate rocketMQTemplate;
public static void main(String[] args){
SpringApplication.run(ProducerApplication.class, args);
}
public void run(String... args) throws Exception {
// 以同步的方式发送消息,构造器构造对象消息给指定的topic
sendResult = rocketMQTemplate.syncSend(springTopic, MessageBuilder.withPayload("Hello, World! I'm from spring message").build());
System.out.printf("string-topic syncSend2 sendResult=%s %n", sendResult); // 异步方式发送用户定义对象类型的消息,并实现回调接口SendCallback
rocketMQTemplate.asyncSend(orderPaidTopic, new OrderPaidEvent("T_001", new BigDecimal("88.00")), new SendCallback() {
// 实现消息发送成功的后续处理
public void onSuccess(SendResult var1) {
System.out.printf("async onSucess SendResult=%s %n", var1);
}
// 实现消息发送失败的后续处理
public void onException(Throwable var1) {
System.out.printf("async onException Throwable=%s %n", var1);
}
});
// 指定topic的同时,设置tag值,以便消费端可以根据tag值进行选择性消费
rocketMQTemplate.convertAndSend(msgExtTopic + ":tag0", "I'm from tag0");
// tag0 will not be consumer-selected
rocketMQTemplate.convertAndSend(msgExtTopic + ":tag1", "I'm from tag1");
}
@Data
@AllArgsConstructor
public class OrderPaidEvent implements Serializable{
private String orderId;
private BigDecimal paidMoney;
}
}
4 消息消费端
在消息消费端,只需要根据发送消息的类型实现RocketMQListener<T>并将它声明成Spring @Service和@RocketMQMessageListener,同时在相应的onMessage()方法里对拉取到的消息做处理。在@RocketMQMessageListener注解中可以定义如下具体的消费属性:
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documentedpublic @interface RocketMQMessageListener {
String consumerGroup();// 指定consumerGroup
String topic();// 指定消费的topic
SelectorType selectorType() default SelectorType.TAG; // 指定消费过滤方式: TAG, SQL92
String selectorExpress() default "*"; // 根据过滤方式,定义选择表达式
ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY; // 消费方式:并发,顺序
MessageModel messageModel() default MessageModel.CLUSTERING; // 消费模式: 集群, 广播
int consumeThreadMax() default 64; //消费的并发线程数
}
以下消费端代码,根据指定tag过滤消费信息并声明消费起点的Push方式消费
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.starter.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.starter.core.RocketMQListener;
import org.apache.rocketmq.spring.starter.core.RocketMQPushConsumerLifecycleListener;
import org.springframework.stereotype.Service;
// 消费监听标签里定义了消费相关的属性,包括:主题,选择表达式,消费组
@Service
@RocketMQMessageListener(topic = "message-ext-topic", selectorExpress = "tag1", consumerGroup = "${spring.application.name}-message-ext-consumer")
public class MessageExtConsumer implements RocketMQListener<MessageExt>, RocketMQPushConsumerLifecycleListener {
@Override
// 实现消息的消费处理
public void onMessage(MessageExt message) {
System.out.printf("------- MessageExtConsumer received message, msgId:%s, body:%s %n ", message.getMsgId(), new String(message.getBody()));
}
// 设置从当前时间点开始消费消息
public void prepareStart(DefaultMQPushConsumer consumer) {
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis()));
}
}
5 发送消息事务
对于事务消息与普通消息发送只在消息发布端区别,在消费端编写代码是没有区别的,所以这里只介绍消息发送端。发送事务消息需要在发送端做如下的编程处理:
5.1 定义回查实现类
实现RocketMQLocalTransactionListener接口,并将它使用注解@RocketMQTransactionListener来声明:
@RocketMQTransactionListener(txProducerGroup = TX_PGROUP_NAME)
public class TransactionListenerImpl implements RocketMQLocalTransactionListener{
// 实现执行本地事务的逻辑,并返回本地事务执行状态
public LocalTransactionState executeLocalTransaction(Message msg, Object arg){
// 实现执行本地事务的逻辑
...
// 可以根据具体的本地事务的执行情况返回 RocketMQLocalTransactionState.COMMIT, ROLLBACK 或UNKNOWN 三种状态
return ...
}
// 实现本地事务回查的逻辑,并返回本地事务执行状态
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 实现本地事务回查的逻辑
...
// 可以根据具体的本地事务的执行情况返回 RocketMQLocalTransactionState.COMMIT, ROLLBACK 或UNKNOWN 三种状态
return ...
}
}
@RocketMQTransactionListener注解用来定义TransactionListener的可配置属性:
@Target({ElementType.TYPE, ElementType.ANNOTATION_TYPE})
@Documented@Component
public @interface RocketMQTransactionListener {
/**
* 定义事务消息发送者组名
* Declare the txProducerGroup that is used to relate callback event to the listener, rocketMQTemplate must send a
* transactional message with the declared txProducerGroup.
* <p>
* <p>It is suggested to use the default txProducerGroup if your system only needs to define a TransactionListener class.
*/
String txProducerGroup() defaultRocketMQConfigUtils.ROCKETMQ_TRANSACTION_DEFAULT_GLOBAL_NAME;
// 以下定义事务回查ExecutorService的属性信息
/**
* Set ExecutorService params -- corePoolSize
*/
int corePoolSize() default 1;
/**
* Set ExecutorService params -- maximumPoolSize
*/
int maximumPoolSize() default 1;
/**
* Set ExecutorService params -- keepAliveTime
*/
long keepAliveTime() default 1000 * 60; //60ms
/**
* Set ExecutorService params -- blockingQueueSize
*/
int blockingQueueSize() default 2000;}
5.2 在RocketMQTemplate中使用特定的方法来发送事务消息
// 构造Spring Message请求消息
Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build();
// 指定在@RocketMQTransactionListener中声明的txProducerGroup, 使用这个事务发布者组来发送事务消息和回查状态
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME, msg, null);
如果社区的朋友有兴趣,欢迎大家提建议或者PR来改进和加强spring-boot代码。在源代码库的README(https://github.com/apache/rocketmq-spring/blob/master/README_zh_CN.md)部分,我们整理了一部分的FAQ的问题,如果有更多的疑问请给我们留言或Email,我们会整理到网站,来让大家更方便的使用。
大家有疑问也可以留言,会为大家一一作答。