概述
生产者 producer 在发送消息的时候,每个消息发送到 broker 只存储在某一个 quene 上。那么 producer 是怎么选择 queue 呢?
下面主要通过以下5种方式进行分析。
1、自定义 MessageQueueSelector 实现
2、SelectMessageQueueByHash hash 选择 queue。
3、 SelectMessageQueueByRandom 随机选择 queue。
4、 SelectMessageQueueByMachineRoom 机房选择queue。
5、默认发送队列选择实现
1、自定义 MessageQueueSelector 实现
下面这个示例是 rocketmq 官网上的一个示例。
public class Producer {
public static void main(String[] args) throws UnsupportedEncodingException {
try {
MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
Message msg =
new Message("TopicTest", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
}
从示例中可以看到 producer.send(msg, new MessageQueueSelector(){}, orderId)
在发送的时候 自定义了一个 MessageQueueSelector。
MessageQueueSelector 的 selelct(List<MessageQueue> mqs, Message msg, Object arg) 方法中有三个参数。
- List<MessageQueue> mqs :topic 中的所有 queue 的集合。
- Message msg:发送的消息
- Object arg:上面示例中 send 方法的第三个参数。
通过实现 select 方法,通过 arg 参数进行取模 mqs.size() 进行选择队列。
RocketMQ 已实现的 MessageQueueSelector
rocketmq 源码中已经提供了几种 MessageQueueSelector 的实现。如下图:
- SelectMessageQueueByHash:通过 hash 进行选择 queue。
- SelectMessageQueueByRandom:随机选择 queue。
- SelectMessageQueueByMachineRoom:机房选择queue。
2、SelectMessageQueueByHash
public class SelectMessageQueueByHash implements MessageQueueSelector {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = arg.hashCode();
if (value < 0) {
value = Math.abs(value);
}
value = value % mqs.size();
return mqs.get(value);
}
}
通过 arg 的 hash,通过 mqs.size() 进行取模,来选择要存储的队列。
3、SelectMessageQueueByRandom
public class SelectMessageQueueByRandom implements MessageQueueSelector {
private Random random = new Random(System.currentTimeMillis());
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
int value = random.nextInt(mqs.size());
return mqs.get(value);
}
}
随机产生一个小于等于 mqs.size() 的随机正整数,来选择要存储的队列。
4、SelectMessageQueueByMachineRoom
public class SelectMessageQueueByMachineRoom implements MessageQueueSelector {
private Set<String> consumeridcs;
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
return null;
}
public Set<String> getConsumeridcs() {
return consumeridcs;
}
public void setConsumeridcs(Set<String> consumeridcs) {
this.consumeridcs = consumeridcs;
}
}
这个未实现,还是要通过自己的场景进行实现。
5、默认是轮询进行发送消息
如果直接调用 SendResult send(final Message msg)
方法,RocketMQ 是如何选择队列的呢?
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
1、int index = tpInfo.getSendWhichQueue().getAndIncrement();
获取 一个自增的index。
2、然后 int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
进行选择一个 queue。
通过上面的代码可以看出,默认是通过轮询的方式进行选择发送队列的。
ThreadLocalIndex 实现
public class ThreadLocalIndex {
private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
private final Random random = new Random();
public int getAndIncrement() {
Integer index = this.threadLocalIndex.get();
if (null == index) {
index = Math.abs(random.nextInt());
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
}
index = Math.abs(index + 1);
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
return index;
}
@Override
public String toString() {
return "ThreadLocalIndex{" +
"threadLocalIndex=" + threadLocalIndex.get() +
'}';
}
}
从 getAndIncrement() 方法中,可以看出。
为每个线程分配一个随机数,然后每次调用都自增 1。