发送普通消息
普通消息,也叫并发消息,是发送效率最高、使用场景最多的一类消息。发送普通消息的代码如下:
public static void main(String[] args) throws MQClientException, InterruptedException {
// 创建默认的生产者
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.start();
for (int i = 0; i < 128; i++)
try {
{
// 创建消息对象
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
} catch (Exception e) {
e.printStackTrace();
}
// 关闭
producer.shutdown();
}
image.gif
发送顺序消息
同步发送消息是,根据HashKey将消息发送到指定的分区中,每个分区中的消息都是按照发送顺序保存的,即分区有序。如果Topic的分区被设置为1,这个Topic的消息就是全局有序的。注意,顺序消息的发送必须是单线程,多线程将不再有序。顺序消息的消费和普通消息的消费方式不同。代码如下:
// 第一步 初始化生产者,配置生产者参数,启动生产者
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
// 初始化消息体
Message msg = new Message("TopicTest",
"TagA",
"Hello RocketMQ ".getBytes(RemotingHelper.DEFAULT_CHARSET));
Integer hashKey = 123;
// 核心操作 MessageQueueSelector ,根据 hashKey选择当前消息发送到那个分区中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, hashKey);
producer.shutdown();
image.gif
发送延迟消息
生产者发送消息后,消费者在指定时间才能消费消息,这类消息被称为延迟消息或定时消息。生产者发送延迟消息前需要设置延迟级别,目前开源版本支持18个延迟级别。
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
image.gif
Broker 在接收用户发送的消息后,首先将消息保存到名为 CHEDULE_TOPIC_XXXX的Topic中。此时,消费者无法消费该延迟消息。然后,由Broker端的定时投递任务定时投递给消费者。
保存延迟消息的实现逻辑 见类chduleMessageService。按照配置的延迟级别初始化多个任务,每秒执行一次。如果消息投递满足时间条件,那么将消息投递到原始Topic中。消费者此时可以消费该延迟消息。
发送事务消息
事务消息的发送、消费流程和延迟消息类似,都是先发送到对消费者不可见的Topic中。当事务消息被生产者提交后,会被二次投递到原始Topic中,此时消费者正常消费。事务消息的发送具体分为以下两步:
第一步:用户发送一个half消息到Broker,Broker设置 queueOffset = 0,即对消费者不可见。
第二步:用户本地事务处理成功,发送一个commit,消息到Broker,Broker修改queueOffset为正常值,达到重新投递的目的,此时消费者可以正常消费;如果本地事务处理失败,那么将发送一个RollBack消息给Broker,Broker删除half消息。
// 第一步 初始化事务消息生产者
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
// 第二步 配置生产者的各个参数和Broker回调,检查本地事务处理并启动生产者
producer.setCheckThreadPoolMaxSize(2);
producer.setCheckThreadPoolMinSize(2);
producer.setCheckRequestHoldMax(2000);
producer.setTransactionCheckListener(new TransactionCheckListener() {
private AtomicInteger transactionIndex = new AtomicInteger(0);
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
int value = transactionIndex.getAndIncrement();
if (value % 6 == 0) {
throw new RuntimeException("Cloud not find db");
} else if (value % 5 == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (value % 4 == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
});
producer.start();
// 第三步 设置本地事务处理器,发送消息
Message msg = new Message("TopicTest",
"tags",
"KEY",
("Hello EocketMQ " + System.currentTimeMillis()).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, new LocalTransactionExecuter() {
private AtomicInteger transactionIndex = new AtomicInteger(1);
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
if (value == 0) {
throw new RuntimeException("Cloud not find db");
} else if (value % 5 == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if (value % 4 == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}, null);
System.out.printf("%s%n", sendResult);
image.gif
发送单向消息
单向消息的生产者只管发送过程,不管发送结果。单向消息主要用于日志传输等消息允许丢失的场景。
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.2:9876;");
producer.setInstanceName("instanceName");
producer.start();
Message msg = new Message("TopicTest", "TagA", "OrderID1", "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(msg);
image.gif
批量发送消息
批量发送消息能提高发送效率,提升系统吞吐量。批量消息发送有一以下3点注意事项:
(1)消息最好小于1MB
(2)同一批批量消息的Topic、waitStoreMsgOK属性必须一致
(3)批量消息不支持延迟消息
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroupName");
producer.start();
//If you just send messages of no more than 1MiB at a time, it is easy to use batch
//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
producer.send(messages);
}
image.gif
相对消费者而言,生产者的使用更加简单,一般主要关注消息类型、消息发送方法和方法参数。
常用的消息类型如下:
| 消息类型 | 优点 | 缺点 | 备注 |
| 普通消息(并发消息) | 性能最好,单机TPS级别为100 000 | 消息的生产和消费都是无序的 | 大部分场景使用 |
| 分区有序消息 | 单分区中消息有序,单机发送TPS万级别 | 单点问题,如果Broker宕机,则会导致发送失败 | 大部分有序消息场景适用 |
| 全局有序消息 | 类似传统的Queue,全部消息有序,单机发送TPS千级别 | 单点问题,如果Broker宕机,则会导致发送失败 | 极少场景使用 |
| 延迟消息 | RocketMQ自身支持,不需要额外使用组件,支持延迟特性 | 不能根据任意时间延迟,使用范围受限。Broker随着延迟级别增大支持越多,CPU压力越大,延迟时间不准确 | 非精确,延迟级别不多的场景,非常方便使用 |
| 事务消息 | RocketMQ自身支持,不需要额外使用组件支持事务特性 | RocketMQ事务是生产者事务,只有生产者参与,如果消费者处理失败则事务失效 | 简单事务处理可以使用 |