前言
目的整体就是入门学习RocketMQ到底是国什么用的,基本如何使用之类。
对于RocketMQ而言
- 支持集群模型,负载均衡,水平扩展能力
- 亿级别的消息堆积能力
- 采用零拷贝的原理,顺序写盘,随机读
- 丰富的api使用等
基本的概念模型
与RabbitMQ一样有生产者与消费者 - Producer: 消息生产者,负责产生消息。
- Comsumer: 消息消费者, 负责消费消息
- push Consumer: consumer的一种,需要向Consumer对象注册监听
- pull Consuer: consumer的一种,需要主动请求broker拉取消息
- Producer Group:生产者集合,一般用于发送一类消息
- Consumer Group:消费者集合,一般用于接受一类消息进行消费
- Broker: MQ 消息服务,
安装windows RocketMQ
至于安装可以去搜一些安装教程,再进行修改一些配置的就可以了。
RocketMQ客户端
开始之前得去github下载个客户端
https://github.com/apache/rocketmq-dashboard
下载完后直接修改配置文件application.yml
rocketmq:
config:
# if this value is empty,use env value rocketmq.config.namesrvAddr NAMESRV_ADDR | now, default localhost:9876
# configure multiple namesrv addresses to manage multiple different clusters
namesrvAddrs: 替换成你本地的地址或者能连上你开启的RocketMQ服务地址和端口
- 127.0.0.1:9876
- 127.0.0.2:9876
创建项目
创建maven项目
- pom.xml
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<rocketmq.version>4.3.0</rocketmq.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>
创建producer类
创建defaultMQProducer对象
public class Producer {
public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("test_producer_name");
producer.setNamesrvAddr(Const.NAMESRV_ADDR_SINGLE);
producer.start();
for(int i = 0 ; i <5; i ++) {
// 1. 创建消息
Message message = new Message("test_topic", // 主题
"TagA", // 标签
"key" + i, // 用户自定义的key ,唯一的标识
("Hello RocketMQ" + i).getBytes()); // 消息内容实体(byte[])
// 2. 发送消息 topic 默认4个队列
SendResult sendResult = producer.send(message);
System.out.println("消息发出:" + sendResult);
producer.shutdown();
}
执行打印
消息发出:SendResult [sendStatus=SEND_OK, msgId=C0A8010500E018B4AAC27F460B760000, offsetMsgId=C0A8010500002A9F000000000005812E, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=1], queueOffset=1]
消息发出:SendResult [sendStatus=SEND_OK, msgId=C0A8010500E018B4AAC27F460B800001, offsetMsgId=C0A8010500002A9F00000000000581F0, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=2], queueOffset=1]
消息发出:SendResult [sendStatus=SEND_OK, msgId=C0A8010500E018B4AAC27F460B820002, offsetMsgId=C0A8010500002A9F00000000000582B2, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=3], queueOffset=2]
消息发出:SendResult [sendStatus=SEND_OK, msgId=C0A8010500E018B4AAC27F460B850003, offsetMsgId=C0A8010500002A9F0000000000058374, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=0], queueOffset=1]
消息发出:SendResult [sendStatus=SEND_OK, msgId=C0A8010500E018B4AAC27F460B8B0004, offsetMsgId=C0A8010500002A9F0000000000058436, messageQueue=MessageQueue [topic=test_topic, brokerName=SC-202111172212, queueId=1], queueOffset=2]
创建Consumer类
- 创建defaultMQConsumer对象
- 设置NamesrvAddr 及消费位置ConsumerFromWhere
- 进行订阅主题 subscribe
- 注册监听并消费registerMessageListener
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_name");
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_SINGLE);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("test_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt me = msgs.get(0);
try {
String topic = me.getTopic();
String tags = me.getTags();
String keys = me.getKeys();
String msgBody = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.err.println("consumer start...");
l
}
}
打印消息
topic: test_topic,tags: TagA, keys: key0,body: Hello RocketMQ0
topic: test_topic,tags: TagA, keys: key1,body: Hello RocketMQ1
topic: test_topic,tags: TagA, keys: key2,body: Hello RocketMQ2
topic: test_topic,tags: TagA, keys: key3,body: Hello RocketMQ3
topic: test_topic,tags: TagA, keys: key4,body: Hello RocketMQ4
消息重发
假设消费者有一段代码是抛出异常,并重发了多少次,达到多少次的时候就去记录,
做定时任务补偿处理
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer_name");
consumer.setNamesrvAddr(Const.NAMESRV_ADDR_SINGLE);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe("test_topic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
MessageExt me = msgs.get(0);
try {
String topic = me.getTopic();
String tags = me.getTags();
String keys = me.getKeys();
if(keys.equals("key1")) {
System.err.println("消息消费失败..");
int a = 1/0;
}
String msgBody = new String(me.getBody(), RemotingHelper.DEFAULT_CHARSET);
System.err.println("topic: " + topic + ",tags: " + tags + ", keys: " + keys + ",body: " + msgBody);
} catch (Exception e) {
e.printStackTrace();
int recousumeTimes = me.getReconsumeTimes();
System.err.println("recousumeTimes: " + recousumeTimes);
if(recousumeTimes == 3) {
// 记录日志....
// 做补偿处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.err.println("consumer start...");
}
}
打印日志
recousumeTimes: 3
消息消费失败..
java.lang.ArithmeticException: / by zero
at com.bfxy.rocketmq.quickstart.Consumer$1.consumeMessage(Consumer.java:42)
at org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService$ConsumeRequest.run(ConsumeMessageConcurrentlyService.java:417)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
recousumeTimes: 3
结尾
此次学习了解一个RocketMQ的一个生产者发送消息和消费者接收消息,并在消费者做一个假设处理异常,可以做日志记录或者消息补偿之类。