基本消息发送有三种姿势:同步、异步、单向。
- 同步:消息发送到 Broker 成功后,返回发送成功结果;这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。
- 异步:消息发送出去后立即返回结果,可以在发送成功的消息回调中,查看消息是否发送成功;异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待 Broker 的响应。
- 单向:消息发送出去,Broker 不返回结果。这种方式主要用在不特别关心发送结果的场景,例如日志发送。
一、同步发送
在 第一章 RocketMQ 搭建调试环境 中,演示了消息的同步发送。
SendResult sendResult = producer.send(msg)
二、异步发送
改造 org.apache.rocketmq.example.simple.AsyncProducer
public class AsyncProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
producer.setNamesrvAddr("127.0.0.1:9876");
// https://blog.csdn.net/heihaozi/article/details/119145266
DefaultChannelId.newInstance();
producer.start();
// 异步发送失败重试,可能导致消息重复发送,需要保证消息幂等性
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("TopicTest",
"TagA",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送模式
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
}
三、单向发送
改造 org.apache.rocketmq.example.simple.OnewayProducer
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// 单向发送
producer.sendOneway(msg);
四、消息消费
在 第一章 RocketMQ 搭建调试环境 中,演示了消息的接收。