创建maven工程,引入RocketMQ依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.2</version>
</dependency>
创建Producer发送同步消息,异步消息,单向消息
public class Producer {
/**
* RocketMQ 将消息发送到指定队列
*/
public static void asyncMsgCustomQueue() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
producer.setNamesrvAddr("192.168.218.131:9876;192.168.218.132:9876");
producer.start();
producer.setSendMsgTimeout(10000);
//1、创建消息
Message message = new Message("test_quick_topic", //主题
"TagA", //标签
"keyA", //用户自定义的key,唯一的标识
("hello RocketMq").getBytes());//消息体
//2、将消息发送到指定队列
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
Integer queueNumber = (Integer) arg;
int size = list.size();
int index = queueNumber % size;
return list.get(index);
}
},1);
producer.shutdown();
}
/**
* RocketMQ 发送单向消息
*/
public static void syncOneWay() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
//同步发送消息,如果5秒内没有发送成功,则重试3次
producer.setRetryTimesWhenSendFailed(3);
producer.setNamesrvAddr("192.168.218.131:9876;192.168.218.132:9876");
producer.start();
//设置消息发送超时时间,默认为3000ms
producer.setSendMsgTimeout(5000);
//1、创建消息
Message message = new Message("test_quick_topic", //主题
"TagA", //标签
"keyA", //用户自定义的key,唯一的标识
("hello RocketMq").getBytes());//消息体
//2、发送单向消息
producer.sendOneway(message);
System.out.println("单向消息已发出");
producer.shutdown();
}
/**
* RocketMQ 异步消息发送
*/
public static void asyncMsg() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
producer.setNamesrvAddr("192.168.218.131:9876;192.168.218.132:9876");
producer.setRetryTimesWhenSendAsyncFailed(0);
producer.start();
//设置消息发送超时时间,默认为3000ms
producer.setSendMsgTimeout(5000);
//1、创建消息
Message message = new Message("test_quick_topic", //主题
"TagA", //标签
"keyA", //用户自定义的key,唯一的标识
("hello RocketMq").getBytes());//消息体
//2、发送异步消息
producer.send(message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("msgId:" + sendResult.getMsgId()+",status:"+sendResult.getSendStatus());
//通过status可以知道此异步消息是否发送成功
//如果要保障100%可靠性投递的消息的话,在获取到status的时候对消息数据做变更操作
}
@Override
public void onException(Throwable throwable) {
//消息发送失败的话,进行补偿或者重发
System.out.println("消息发送失败");
}
});
producer.shutdown();
}
/**
* RocketMQ 同步消息发送
*/
public static void syncMsg() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
//同步发送消息,如果5秒内没有发送成功,则重试3次
producer.setRetryTimesWhenSendFailed(3);
producer.setNamesrvAddr("192.168.218.131:9876;192.168.218.132:9876");
producer.start();
//设置消息发送超时时间,默认为3000ms
producer.setSendMsgTimeout(5000);
//1、创建消息
Message message = new Message("test_quick_topic", //主题
"TagA", //标签
"keyA", //用户自定义的key,唯一的标识
("hello RocketMq").getBytes());//消息体
//2、发送同步消息
SendResult sendResult = producer.send(message,5000L);
System.out.println("消息发出:" + sendResult);
producer.shutdown();
}
}
创建Consumer消费消息
Consumer集群消费模式
/**
* @Description: Clustering集群模式
*/
public class Consumer1 {
public Consumer1() {
try {
String group_name = "test_model_producer_name";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
consumer.setNamesrvAddr("192.168.218.131:9876;192.168.218.132:9876");
//consumer.subscribe("test_model_topic","*");
consumer.subscribe("test_model_topic","TagA || TagB");
consumer.setMessageModel(MessageModel.CLUSTERING);//集群模式
consumer.registerMessageListener(new ConsumerListener());
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Consumer1 consumer1 = new Consumer1();
System.out.println("consumer1 start...");
}
}
public class ConsumerListener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = null;
try {
for (MessageExt message : list) {
messageExt = message;
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
String keys = messageExt.getKeys();
String body = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("topic:" + topic + ",tags:" + tags + ",keys:" + keys + ",body:" + body);
}
} catch (Exception e) {
//获取当前消息被重发了多少次
int reconsumeTimes = messageExt.getReconsumeTimes();
System.out.println("reconsumeTimes:" + reconsumeTimes);
if (reconsumeTimes == 3) {
//如果一条消息被重发了3次,且还是没有消费成功,那么就记录日志
//记录日志目的就是为了做补偿处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//消息消费失败会自动重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
Consumer广播消费模式
/**
* @Description: Broadcasting广播模式
*
* Broadcasting广播模式下
* 同一group_name下的Consumer通过consumer.subscribe("test_model_topic","TagA");监听同一topic,用tags来区分消费消息是不行的
* 这种效果必须要在Listener里面手动实现
*
* 不同group_name下的Consumer可以通过监听同一topic,用tags来区分消费消息
*
*/
public class Consumer1 {
public Consumer1() {
try {
String group_name = "test_model_producer_name";
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group_name);
consumer.setNamesrvAddr("192.168.218.131:9876;192.168.218.132:9876");
consumer.subscribe("test_model_topic","*");
consumer.setMessageModel(MessageModel.BROADCASTING);//广播模式
consumer.registerMessageListener(new Consumer1Listener());
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Consumer1 consumer1 = new Consumer1();
System.out.println("consumer1 start...");
}
}
public class Consumer1Listener implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = null;
try {
for (MessageExt message : list) {
messageExt = message;
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
//广播模式下,当Tags的值为"TagA"时,consumer1才进行消费
if("TagA".equals(tags)){
String keys = messageExt.getKeys();
String body = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("topic:" + topic + ",tags:" + tags + ",keys:" + keys + ",body:" + body);
}
}
} catch (Exception e) {
//获取当前消息被重发了多少次
int reconsumeTimes = messageExt.getReconsumeTimes();
System.out.println("reconsumeTimes:" + reconsumeTimes);
if (reconsumeTimes == 3) {
//如果一条消息被重发了3次,且还是没有消费成功,那么就记录日志
//记录日志目的就是为了做补偿处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//消息消费失败会自动重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
顺序消息
Rocketmq的顺序消息需要满足2点:
- 1、Producer端保证发送消息有序,且发送到同一个队列。
- 2、consumer端保证消费同一个队列。
生产端:
RocketMQ可以严格的保证消息有序。但这个顺序,不是全局顺序,只是局部(queue)顺序。同一个queue里面,RocketMQ的确是能保证FIFO的。
确保消息放到同一个queue中,需要使用 MessageQueueSelector。
Producer向指定messageQueue发送消息
/**
* @author: huangyibo
* @Date: 2019/11/30 1:57
* @Description: 顺序消息 保障同一个topic下的同一组消息发往同一个MessageQueue即可
*/
public class OrderProducer {
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("test_order_producer_group_name");
producer.setNamesrvAddr(Const.NAMESER_ADDR);
producer.setSendMsgTimeout(10000);
producer.start();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
//这5条消息是一个大的业务操作
for (int i = 0; i < 5; i++) {
//时间戳
String body = dateStr + "Hello RocketMQ " + i;
//参数: topic tag message
Message message = new Message("test_order_topic","TagA","keyA"+i,body.getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送数据:如果使用顺序消费,则需自己实现MessageQueueSelector 保证消息进入同一个队列
SendResult sendResult = producer.send(message, new MessageQueueSelector() {
/**
*
* @param list 队列集合
* @param message 消息对象
* @param arg 业务标识的参数
* @return
*/
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
Integer id = (Integer) arg;
System.out.println("id : " + id);
int queueIndex = id % list.size();
return list.get(queueIndex);
}
}, 1);//1是队列的下标
System.out.println(sendResult + ",body:"+body);
}
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}catch (MQClientException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
消费端:
需要使用 MessageListenerOrderly 来消费数据。
MessageListenerOrderly与MessageListenerConcurrently区别:
- MessageListenerOrderly:有序消费,同一队列的消息同一时刻只能一个线程消费,可保证消息在同一队列严格有序消费。
- MessageListenerConcurrently:并发消费。
Consumer消费者,编写OrderListener实现MessageListenerOrderly接口
public class OrderConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_order_producer_group_name");
consumer.setNamesrvAddr(Const.NAMESER_ADDR);
/**
* 设置consumer第一次启动是从队列头部开始消费还是从队列尾部开始消费
* 如果非第一次启动,那么按照上一次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//订阅的主题,以及过滤的标签内容
consumer.subscribe("test_order_topic","TagA");
//注册监听
consumer.registerMessageListener(new OrderListener());
consumer.start();
System.out.println("consumer started......");
}
}
public class OrderListener implements MessageListenerOrderly {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
MessageExt messageExt = null;
try {
for (MessageExt message : list) {
messageExt = message;
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
//广播模式下,当Tags的值为"TagB"时,consumer2才进行消费
String keys = messageExt.getKeys();
String body = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("topic:" + topic + ",tags:" + tags + ",keys:" + keys + ",body:" + body);
}
} catch (Exception e) {
//获取当前消息被重发了多少次
int reconsumeTimes = messageExt.getReconsumeTimes();
System.out.println("reconsumeTimes:" + reconsumeTimes);
if (reconsumeTimes == 3) {
//如果一条消息被重发了3次,且还是没有消费成功,那么就记录日志
//记录日志目的就是为了做补偿处理
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
//消息消费失败会自动重试
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
}
事物消息
创建事物消息Producer和事务监听器
/**
* @Description: 事务监听器
*/
public class TransactionListenerImpl implements TransactionListener {
/**
* 执行本地事务
* @param message
* @param arg
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
System.out.println("------执行本地事物----");
String callArg = (String) arg;
System.out.println("callArg:" + callArg);
System.out.println("message:" + message);
//begin tx
//数据库的落库操作
//tx.commit
return LocalTransactionState.COMMIT_MESSAGE;
}
/**
* 未决事务,Rocket服务器端回查客户端
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
System.out.println("------回调消息检查,查询数据库是否存在该条消息----"+messageExt);
return LocalTransactionState.COMMIT_MESSAGE;
}
}
public class TransactionProducer {
public static void main(String[] args) throws Exception {
TransactionMQProducer producer = new TransactionMQProducer("tx_producer_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("tx_producer_group_name" + "-check-thread");
return thread;
}
});
producer.setNamesrvAddr(Const.NAMESER_ADDR);
producer.setExecutorService(executorService);
//这个对象主要做两件事情,1、异步执行本地事物,2、回查
TransactionListener transactionListener = new TransactionListenerImpl();
producer.setTransactionListener(transactionListener);
producer.setSendMsgTimeout(10000);
producer.start();
Message message = new Message("tx_topic","TagA","Key",
("hello rocketmq tx").getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendMessageInTransaction(message,"我是回调的参数");
Thread.sleep(Integer.MAX_VALUE);
producer.shutdown();
}
}
创建消费者
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tx_producer_group_name");
consumer.setConsumeThreadMin(10);
consumer.setConsumeThreadMax(20);
consumer.setNamesrvAddr(Const.NAMESER_ADDR);
/**
* 设置consumer第一次启动是从队列头部开始消费还是从队列尾部开始消费
* 如果非第一次启动,那么按照上一次消费的位置继续消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//订阅的主题,以及过滤的标签内容
consumer.subscribe("tx_topic","*");
//注册监听
consumer.registerMessageListener(new Listener());
consumer.start();
System.out.println("tx consumer started...");
}
}
class Listener implements MessageListenerConcurrently{
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
MessageExt messageExt = null;
try {
for (MessageExt message : list) {
messageExt = message;
String topic = messageExt.getTopic();
String tags = messageExt.getTags();
//广播模式下,当Tags的值为"TagB"时,consumer2才进行消费
String keys = messageExt.getKeys();
String body = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.out.println("topic:" + topic + ",tags:" + tags + ",keys:" + keys + ",body:" + body);
}
} catch (Exception e) {
//获取当前消息被重发了多少次
int reconsumeTimes = messageExt.getReconsumeTimes();
System.out.println("reconsumeTimes:" + reconsumeTimes);
if (reconsumeTimes == 3) {
//如果一条消息被重发了3次,且还是没有消费成功,那么就记录日志
//记录日志目的就是为了做补偿处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
//消息消费失败会自动重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
参考:
https://www.cnblogs.com/fqybzhangji/p/11044119.html
https://www.cnblogs.com/hzmark/p/orderly_message.html
https://www.cnblogs.com/hzmark/p/rocket_txn.html
https://baijiahao.baidu.com/s?id=1638994745278332160&wfr=spider&for=pc