运行NameServer和Broker后,我们可以尝试用代码示例写一下:消息发送的三种方式
- 同步发送,适用场景:对可靠性高的业务场景
- 异步发送,适用场景:对响应时间比较敏感的业务场景,就是不允许长时间等待结果的场景
- 发送一次,适用场景:不关心发送结果场景。比如日志发送
依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.8.0</version>
</dependency>
同步发送
public class SyncProducer {
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
DefaultMQProducer defaultMQProducer = new DefaultMQProducer();
//设置生产者组
defaultMQProducer.setProducerGroup("syncProducer");
//设置nameserver
defaultMQProducer.setNamesrvAddr("localhost:9876");
//启动生产者
defaultMQProducer.start();
for (int i = 0; i < 10; i++) {
//构建消息 topic tag 内容
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//同步发送,且返回结果
SendResult sendResult = defaultMQProducer.send(msg);
System.out.println("发送结果"+sendResult);
}
//关闭生产者
defaultMQProducer.shutdown();
}
}
//运行结果
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2040000, offsetMsgId=C0A81FF100002A9F0000000000045402, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=350]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2230001, offsetMsgId=C0A81FF100002A9F00000000000454CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=350]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2260002, offsetMsgId=C0A81FF100002A9F0000000000045594, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=350]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22A0003, offsetMsgId=C0A81FF100002A9F000000000004565D, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=350]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22C0004, offsetMsgId=C0A81FF100002A9F0000000000045726, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=351]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E22E0005, offsetMsgId=C0A81FF100002A9F00000000000457EF, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=351]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2300006, offsetMsgId=C0A81FF100002A9F00000000000458B8, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=3], queueOffset=351]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2320007, offsetMsgId=C0A81FF100002A9F0000000000045981, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=0], queueOffset=351]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2340008, offsetMsgId=C0A81FF100002A9F0000000000045A4A, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=1], queueOffset=352]
发送结果SendResult [sendStatus=SEND_OK, msgId=7F000001503218B4AAC29874E2360009, offsetMsgId=C0A81FF100002A9F0000000000045B13, messageQueue=MessageQueue [topic=TopicTest, brokerName=aarondeMBP, queueId=2], queueOffset=352]
14:29:56.930 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:29:56.932 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.241:10911] result: true
异步发送
public class AsyncProducer {
public static void main(String[] args) throws Exception {
//设置生产者名字
DefaultMQProducer producer = new DefaultMQProducer("asyncproducer");
// 设置nameserver的地址
producer.setNamesrvAddr("localhost:9876");
//生产者启动
producer.start();
//发送失败重试次数
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 10;
for (int i = 0; i < messageCount; i++) {
try {
final int index = i;
Message msg = new Message("async_topic",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
//异步发送,这里多了一个SendCallBack回调函数
producer.send(msg, new SendCallback() {
//发送成功
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功:"+sendResult);
}
//发送失败
@Override
public void onException(Throwable e) {
System.out.printf("发送失败:"+e);
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
TimeUnit.SECONDS.sleep(3);
producer.shutdown();
}
}
//运行结果
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868BF40001, offsetMsgId=C0A81FF100002A9F0000000000046438, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=3], queueOffset=2]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C1A0004, offsetMsgId=C0A81FF100002A9F0000000000046790, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=1]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868BFA0002, offsetMsgId=C0A81FF100002A9F000000000004650E, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=3], queueOffset=3]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C0B0003, offsetMsgId=C0A81FF100002A9F00000000000466BA, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=2], queueOffset=5]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868BF10000, offsetMsgId=C0A81FF100002A9F00000000000465E4, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=1], queueOffset=2]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C1C0005, offsetMsgId=C0A81FF100002A9F0000000000046866, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=2]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C1C0006, offsetMsgId=C0A81FF100002A9F000000000004693C, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=3]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C280007, offsetMsgId=C0A81FF100002A9F0000000000046A12, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=1], queueOffset=3]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C2D0008, offsetMsgId=C0A81FF100002A9F0000000000046AE8, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=1], queueOffset=4]
发送成功:SendResult [sendStatus=SEND_OK, msgId=7F000001514218B4AAC298868C330009, offsetMsgId=C0A81FF100002A9F0000000000046BBE, messageQueue=MessageQueue [topic=async_topic, brokerName=aarondeMBP, queueId=0], queueOffset=4]
14:49:17.262 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:49:17.265 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:49:17.265 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.31.241:10911] result: true
发送一次
public class OnewayProducer {
public static void main(String[] args) throws Exception{
//初始化生产者
DefaultMQProducer producer = new DefaultMQProducer("onewayproducer");
// 置顶nameserver名字
producer.setNamesrvAddr("localhost:9876");
//启动生产者
producer.start();
for (int i = 0; i < 10; i++) {
//构建消息体
Message msg = new Message("oneway_topic" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " +
i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
//发送一次性消息
producer.sendOneway(msg);
}
Thread.sleep(5000);
producer.shutdown();
}
}
同步发送,异步发送,一次性发送是RocketMQ主要的三种消息发送方式。
来一个拓展:类似的Kafka和RabbitMQ有哪些消息发送方式,欢迎留言回答。
工作中,具体用什么发送方式,得取决你的业务场景。需要可靠性的,就用同步发送。需要低延时的,就用异步发送,如果压根不care发送结果,直接用一次性发送。
后续文章
- RocketMQ-入门(已更新)
- RocketMQ-消息发送(已更新)
- RocketMQ-消费信息
- RocketMQ-消费者的广播模式和集群模式
- RocketMQ-顺序消息
- RocketMQ-延迟消息
- RocketMQ-批量消息
- RocketMQ-过滤消息
- RocketMQ-事务消息
- RocketMQ-消息存储
- RocketMQ-高可用
- RocketMQ-高性能
- RocketMQ-主从复制
- RocketMQ-刷盘机制
- RocketMQ-幂等性
- RocketMQ-消息重试
- RocketMQ-死信队列
...
欢迎各位入(guan)股(zhu),后续文章干货多多。