一、Maven引入
<!--消息队列 RocketMQ-->
<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
<version>1.7.9.Final</version>
</dependency>
二、登录阿里云创建主题与组
1、创建主题
2、主题授权
2、创建组
3、获取用户Key
二、代码编写
1、配置代码
@Configuration
public class RocketMQConfig {
public Properties getProperties(){
Properties properties=new Properties();
/**
* 键的首字母必须大写
*/
//
properties.setProperty("AccessKey","xxxxxxxxx");
//
properties.setProperty("SecretKey","xxxxxxxxxxxxxxxxxx");
//
properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, "3000");
// 顺序消息消费失败进行重试前的等待时间,单位(毫秒)
properties.put(PropertyKeyConst.SuspendTimeMillis, "100");
// 消息消费失败时的最大重试次数
properties.put(PropertyKeyConst.MaxReconsumeTimes, "20");
//
properties.put(PropertyKeyConst.NAMESRV_ADDR, "http://onsaddr.mq-internet-access.mq-internet.aliyuncs.com:80");
return properties;
}
}
2、生产消息
import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.order.OrderProducer;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionExecuter;
import com.aliyun.openservices.ons.api.transaction.TransactionProducer;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import com.hrz.common.utils.HashUtil;
import com.hrz.push.config.RocketMQConfig;
import com.hrz.push.service.MessageService;
import com.hrz.push.service.impl.LocalTransactionCheckerImpl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.UUID;
@Component
@Slf4j
public class ProducerClient {
@Autowired
private RocketMQConfig rocketMQConfig;
@Autowired
private MessageService messageService;
/**
* 1、发送普通消息
*
* @param message
* @return
*/
public boolean sendNormalMessage(Message message,String groupId) {
Properties properties=rocketMQConfig.getProperties();
properties.setProperty(PropertyKeyConst.GROUP_ID,groupId);
Producer producer = ONSFactory.createProducer(properties);
// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
try {
SendResult sendResult = producer.send(message);
// 同步发送消息,只要不抛异常就是成功
if (sendResult != null) {
log.info(new Date() + " 发送普通消息成功,消息主题为:" + message.getTopic() + " msgId is: " + sendResult.getMessageId());
return true;
}
} catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
log.error(new Date() + " 发送普通消息失败,消息主题为:" + message.getTopic());
e.printStackTrace();
}
return false;
}
/**
* 2、发送顺序消息
*
* @param message
* @return
*/
public boolean sendOrderMessage(Message message,String groupId,Integer shardingNo) {
Properties properties=rocketMQConfig.getProperties();
properties.setProperty(PropertyKeyConst.GROUP_ID,groupId);
OrderProducer producer = ONSFactory.createOrderProducer(properties);
// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
try {
// 设置代表消息的业务关键属性,请尽可能全局唯一。
// 以方便您在无法正常收到消息情况下,可通过控制台查询消息并补发。
// 注意:不设置也不会影响消息正常收发
message.setKey(UUID.randomUUID().toString());
// 分区顺序消息中区分不同分区的关键字段,sharding key 于普通消息的 key 是完全不同的概念。
// 全局顺序消息,该字段可以设置为任意非空字符串。
String shardingKey = "Hrz_" + shardingNo;
SendResult sendResult = producer.send(message, shardingKey);
// 同步发送消息,只要不抛异常就是成功
if (sendResult != null) {
log.info(new Date() + " 发送顺序消息成功,消息主题为:" + message.getTopic() + " 消息 is: " + sendResult.getMessageId());
return true;
}
} catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
log.error(new Date() + " 发送顺序消息成功,消息主题为:" + message.getTopic());
e.printStackTrace();
}
return false;
}
/**
* 2、发送事务消息
*
* @param message
* @return
*/
public boolean sendTransactionMessage(Message message) {
TransactionProducer producer = ONSFactory.createTransactionProducer(rocketMQConfig.getProperties(),
new LocalTransactionCheckerImpl());
try {
SendResult sendResult = producer.send(message, new LocalTransactionExecuter() {
@Override
public TransactionStatus execute(Message msg, Object arg) {
// 消息 ID(有可能消息体一样,但消息 ID 不一样,当前消息 ID 在控制台无法查询)
String msgId = msg.getMsgID();
// 消息体内容进行 crc32,也可以使用其它的如 MD5
long crc32Id = HashUtil.crc32Code(msg.getBody());
// 消息 ID 和 crc32id 主要是用来防止消息重复
// 如果业务本身是幂等的,可以忽略,否则需要利用 msgId 或 crc32Id 来做幂等
// 如果要求消息绝对不重复,推荐做法是对消息体 body 使用 crc32或 md5来防止重复消息
//业务自己的参数对象,这里只是一个示例,需要您根据实际情况来处理
Object businessServiceArgs = new Object();
TransactionStatus transactionStatus = TransactionStatus.Unknow;
try {
//要求方法返回为boolean
boolean isCommit = messageService.test("测试");
if (isCommit) {
// 本地事务成功则提交消息
transactionStatus = TransactionStatus.CommitTransaction;
} else {
// 本地事务失败则回滚消息
transactionStatus = TransactionStatus.RollbackTransaction;
}
} catch (Exception e) {
log.error("消息 Id:{}", msgId, e);
}
System.out.println(msg.getMsgID());
log.warn("消息 Id:{}事务状态:{}", msgId, transactionStatus.name());
return transactionStatus;
}
}, null);
} catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
System.out.println(new Date() + " 发送消息失败. 主题为:" + message.getTopic());
e.printStackTrace();
}
// demo example 防止进程退出(实际使用不需要这样)
// TimeUnit.MILLISECONDS.sleep(Integer.MAX_VALUE);
return true;
}
/**
* 3、发送延时消息
*
* @param message
* @param delay
* @return
*/
public boolean sendDelayMessage(Message message, Integer delay) {
Producer producer = ONSFactory.createProducer(rocketMQConfig.getProperties());
// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
// 注意:不设置也不会影响消息正常收发
message.setKey("ORDERID_100");
try {
// 延时消息,单位毫秒(ms),在指定延迟时间(当前时间之后)进行投递,例如消息在 3 秒后投递
// long delayTime = System.currentTimeMillis() + 3000;
long delayTime = System.currentTimeMillis() + delay;
// 设置消息需要被投递的时间
message.setStartDeliverTime(delayTime);
SendResult sendResult = producer.send(message);
// 同步发送消息,只要不抛异常就是成功
if (sendResult != null) {
log.info(new Date() + " 发送普通消息成功,消息主题为:" + message.getTopic() + " msgId is: " + sendResult.getMessageId());
return true;
}
} catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
log.error(new Date() + " 发送普通消息失败,消息主题为:" + message.getTopic());
e.printStackTrace();
}
return false;
}
/**
* 4、收发定时消息
*
* @param message
* @param datetime
* @return
*/
public boolean sendFixedDelayMessage(Message message, String datetime) {
Producer producer = ONSFactory.createProducer(rocketMQConfig.getProperties());
// 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
// 注意:不设置也不会影响消息正常收发
message.setKey("ORDERID_100");
try {
// 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如 2016-03-07 16:21:00 投递。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
// long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-03-07 16:21:00").getTime();
long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(datetime).getTime();
// 设置消息需要被投递的时间
message.setStartDeliverTime(timeStamp);
SendResult sendResult = producer.send(message);
// 同步发送消息,只要不抛异常就是成功
if (sendResult != null) {
log.info(new Date() + " 发送普通消息成功,消息主题为:" + message.getTopic() + " msgId is: " + sendResult.getMessageId());
return true;
}
} catch (Exception e) {
// 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
log.error(new Date() + " 发送普通消息失败,消息主题为:" + message.getTopic());
e.printStackTrace();
}
return false;
}
}
3、消费消息
import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import com.aliyun.openservices.ons.api.order.OrderConsumer;
import com.hrz.common.utils.StringUtils;
import com.hrz.push.config.RocketMQConfig;
import com.hrz.push.entity.ConsumerMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Properties;
@Component
@Slf4j
public class ConsumerClient {
@Autowired
private RocketMQConfig rocketMQConfig;
/**
* 1、普通订阅
*
* @param consumerMessage
*/
public void normalSubscribe(ConsumerMessage consumerMessage) {
Properties properties = rocketMQConfig.getProperties();
//以 "GID_" 或者 "GID-" 开头
properties.put(PropertyKeyConst.GROUP_ID, consumerMessage.getGroupId());
// 集群订阅方式 (默认)
if (consumerMessage.isModel()) {
// 广播订阅方式
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
}
// 在订阅消息前,必须调用 start 方法来启动 Consumer,只需调用一次即可。
Consumer consumer = ONSFactory.createConsumer(properties);
// 1. * 表示订阅所有消息
// 2. TagA || TagB || TagC 表示订阅 TagA 或 TagB 或 TagC 的消息
if(StringUtils.isNullOrEmpty(consumerMessage.getTag())){
consumerMessage.setTag("*");
}
//消息监听
consumer.subscribe(consumerMessage.getTopic(), consumerMessage.getTag(), new MessageListener() {
//订阅多个 Tag
public Action consume(Message message, ConsumeContext context) {
log.info("订阅消息内容->: " + message.getTopic() + "->" + message.getKey() + "->" + new String(message.getBody()));
log.info("订阅消息ID->: "+message.getMsgID());
//提交订阅,消息依然存在
return Action.CommitMessage;
}
});
consumer.start();
log.info("消息订阅成功执行。");
}
/**
* 2、顺序订阅
* @param consumerMessage
*/
public void orderSubscribe(ConsumerMessage consumerMessage) {
Properties properties = rocketMQConfig.getProperties();
//以 "GID_" 或者 "GID-" 开头
properties.put(PropertyKeyConst.GROUP_ID, consumerMessage.getGroupId());
// 集群订阅方式 (默认)
if (consumerMessage.isModel()) {
// 广播订阅方式
properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
}
// 在订阅消息前,必须调用 start 方法来启动 Consumer,只需调用一次即可。
OrderConsumer consumer = ONSFactory.createOrderedConsumer(properties);
// 1. * 表示订阅所有消息
// 2. TagA || TagB || TagC 表示订阅 TagA 或 TagB 或 TagC 的消息
if(StringUtils.isNullOrEmpty(consumerMessage.getTag())){
consumerMessage.setTag("*");
}
//消息监听
consumer.subscribe(consumerMessage.getTopic(), consumerMessage.getTag(), new MessageOrderListener() {
@Override
public OrderAction consume(Message message, ConsumeOrderContext consumeOrderContext) {
log.info("订阅消息内容->: " + message.getTopic() + "->" + message.getKey() + "->" + new String(message.getBody()));
log.info("订阅消息ID->: "+message.getMsgID());
//提交订阅,消息依然存在
return OrderAction.Success;
}
});
consumer.start();
log.info("消息订阅成功执行。");
}
}
四、测试
1、测试代码
@RestController
@RequestMapping(value = "/push", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
@Slf4j
@Api(value = "1、消息队列服务", tags = {"1、消息队列服务"})
public class RocketMQController {
@Autowired
private ConsumerClient consumerClient;
@Autowired
private ProducerClient producerClient;
/**
* 1、订阅普通消息
*
* @return
*/
@RequestMapping(value = "/consumer", method = RequestMethod.GET)
public String consumer() {
ConsumerMessage consumerMessage = new ConsumerMessage();
consumerMessage.setTopic("TOPIC_HRZ_TEST");
consumerMessage.setGroupId("GID_HRZ_TEST");
consumerClient.normalSubscribe(consumerMessage);
return "消费成功";
}
/**
* 1、订阅普通消息
*
* @return
*/
@RequestMapping(value = "/orderConsumer", method = RequestMethod.GET)
public String orderConsumer() {
ConsumerMessage consumerMessage = new ConsumerMessage();
consumerMessage.setTopic("TOPIC_HRZ_ORDER_TEST");
consumerMessage.setGroupId("GID_HRZ_TEST");
consumerClient.orderSubscribe(consumerMessage);
return "消费成功";
}
/**
* 1、发送普通消息
*
* @return
*/
@RequestMapping(value = "/sendNormalMessage", method = RequestMethod.GET)
public String sendNormalMessage() {
try {
Message message = new Message();
//设置主题
message.setTopic("TOPIC_HRZ_TEST");
message.setBody("测试".getBytes());
// 设置代表消息的业务关键属性,全局唯一。
message.setKey("OrderId_" + Snowflake.getId());
//C0A80137276C18B4AAC20261E0600000
producerClient.sendNormalMessage(message, "GID_HRZ_TEST");
} catch (Exception e) {
e.printStackTrace();
}
return "发送普通消息成功";
}
/**
* 2、发送顺序消息
*
* @return
*/
@RequestMapping(value = "/sendOrderMessage", method = RequestMethod.GET)
public String sendOrderMessage() {
try {
Message message = new Message();
//设置主题
message.setTopic("TOPIC_HRZ_ORDER_TEST");
message.setBody("顺序消息".getBytes());
// 设置代表消息的业务关键属性,全局唯一。
message.setKey("OrderId_" + Snowflake.getId());
//C0A80137276C18B4AAC20261E0600000
producerClient.sendOrderMessage(message, "GID_HRZ_TEST", 1);
} catch (Exception e) {
e.printStackTrace();
}
return "发送顺序消息成功";
}
}
2、测试方法
3、平台查看
订阅成功后查看。
注意事项
如果使用 SpringBoot请不要使用单元测试来测试订阅,要使用控制器来测试。