本文展示 RocketMQ 同步发送消息的 Java 代码示例。
所有示例使用 Maven 工程构建,需要添加 rocketmq-client
依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
Java 代码
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import java.io.UnsupportedEncodingException;
public class SyncProducer {
public static void main(String[] args)
throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException,
MQBrokerException {
// 初始化消息生产者,需要指定消息生产者组名称
DefaultMQProducer producer = new DefaultMQProducer("synchronous-group");
// 显示设置 NameServer 的地址
producer.setNamesrvAddr("localhost:9876");
// 启动消息生产者实例
producer.start();
// 循环发送消息10次
for (int i = 0; i < 10; i++) {
// 创建消息实例,指定 Topic 、 Tag 和消息体
Message msg = new Message("TopicDemo", "TagSynchronous", ("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
// 调用消息生产者实例发送消息给消息代理
SendResult sendResult = producer.send(msg);
// 打印消息发送结果
System.out.printf("%s%n", sendResult);
}
// 关闭不再使用消息生产者
producer.shutdown();
}
}
运行代码前先在 RocketMQ Console 上查看一下 Topic 列表,没有名称为 TopicDemo 的 Topic。
运行代码,日志打印如下:
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD74B0000, offsetMsgId=0A00000700002A9F000000000002C5BA, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7680001, offsetMsgId=0A00000700002A9F000000000002C675, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD76B0002, offsetMsgId=0A00000700002A9F000000000002C730, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7730003, offsetMsgId=0A00000700002A9F000000000002C7EB, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7760004, offsetMsgId=0A00000700002A9F000000000002C8A6, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7790005, offsetMsgId=0A00000700002A9F000000000002C961, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD77D0006, offsetMsgId=0A00000700002A9F000000000002CA1C, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7800007, offsetMsgId=0A00000700002A9F000000000002CAD7, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7830008, offsetMsgId=0A00000700002A9F000000000002CB92, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=1], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A000007589418B4AAC2196BD7870009, offsetMsgId=0A00000700002A9F000000000002CC4D, messageQueue=MessageQueue [topic=TopicDemo, brokerName=LAPTOP-C375ASPB, queueId=2], queueOffset=2]
刷新 RocketMQ Console 的 Topic 列表,可以看到刚刚发送的消息
点击 STATUS 可以查看当前消息状态
测试环境使用单主(Master)部署,只启动了一个 NameServer 和一个 Broker,从 TopicDemo STATUS 中可以看到,一个 Topic 会被分为一个或多个 Message Queue,此处便分为了 4 个 Message Queue
上一篇:RocketMQ Console 的安装及运行
下一篇: