序
本文主要研究一下rocketmq5的顺序消息的队列选择
SendMessageActivity
proxy/src/main/java/org/apache/rocketmq/proxy/grpc/v2/producer/SendMessageActivity.java
public class SendMessageActivity extends AbstractMessingActivity {
public SendMessageActivity(MessagingProcessor messagingProcessor,
GrpcClientSettingsManager grpcClientSettingsManager, GrpcChannelManager grpcChannelManager) {
super(messagingProcessor, grpcClientSettingsManager, grpcChannelManager);
}
public CompletableFuture<SendMessageResponse> sendMessage(ProxyContext ctx, SendMessageRequest request) {
CompletableFuture<SendMessageResponse> future = new CompletableFuture<>();
try {
if (request.getMessagesCount() <= 0) {
throw new GrpcProxyException(Code.MESSAGE_CORRUPTED, "no message to send");
}
List<apache.rocketmq.v2.Message> messageList = request.getMessagesList();
apache.rocketmq.v2.Message message = messageList.get(0);
Resource topic = message.getTopic();
validateTopic(topic);
future = this.messagingProcessor.sendMessage(
ctx,
new SendMessageQueueSelector(request),
topic.getName(),
buildSysFlag(message),
buildMessage(ctx, request.getMessagesList(), topic)
).thenApply(result -> convertToSendMessageResponse(ctx, request, result));
} catch (Throwable t) {
future.completeExceptionally(t);
}
return future;
}
//......
}
SendMessageActivity继承了AbstractMessingActivity,其sendMessage方法通过messagingProcessor.sendMessage进行消息发送,其传递的QueueSelector参数为new SendMessageQueueSelector(request)
SendMessageQueueSelector
protected static class SendMessageQueueSelector implements QueueSelector {
private final SendMessageRequest request;
public SendMessageQueueSelector(SendMessageRequest request) {
this.request = request;
}
@Override
public AddressableMessageQueue select(ProxyContext ctx, MessageQueueView messageQueueView) {
try {
apache.rocketmq.v2.Message message = request.getMessages(0);
String shardingKey = null;
if (request.getMessagesCount() == 1) {
shardingKey = message.getSystemProperties().getMessageGroup();
}
AddressableMessageQueue targetMessageQueue;
if (StringUtils.isNotEmpty(shardingKey)) {
// With shardingKey
List<AddressableMessageQueue> writeQueues = messageQueueView.getWriteSelector().getQueues();
int bucket = Hashing.consistentHash(shardingKey.hashCode(), writeQueues.size());
targetMessageQueue = writeQueues.get(bucket);
} else {
targetMessageQueue = messageQueueView.getWriteSelector().selectOneByPipeline(false);
}
return targetMessageQueue;
} catch (Exception e) {
return null;
}
}
}
SendMessageQueueSelector实现了QueueSelector接口,其select方法先获取系统属性中的messageGroup作为shardingKey,若该值不为空则通过Hashing.consistentHash(shardingKey.hashCode(), writeQueues.size())进行hash,然后取writeQueues.get(bucket)作为targetMessageQueue;若shardingKey为空则通过messageQueueView.getWriteSelector().selectOneByPipeline(false)来选择targetMessageQueue
Hashing.consistentHash
com/google/common/hash/Hashing.java
/**
* Assigns to {@code input} a "bucket" in the range {@code [0, buckets)}, in a uniform manner that
* minimizes the need for remapping as {@code buckets} grows. That is, {@code consistentHash(h,
* n)} equals:
*
* <ul>
* <li>{@code n - 1}, with approximate probability {@code 1/n}
* <li>{@code consistentHash(h, n - 1)}, otherwise (probability {@code 1 - 1/n})
* </ul>
*
* <p>This method is suitable for the common use case of dividing work among buckets that meet the
* following conditions:
*
* <ul>
* <li>You want to assign the same fraction of inputs to each bucket.
* <li>When you reduce the number of buckets, you can accept that the most recently added
* buckets will be removed first. More concretely, if you are dividing traffic among tasks,
* you can decrease the number of tasks from 15 and 10, killing off the final 5 tasks, and
* {@code consistentHash} will handle it. If, however, you are dividing traffic among
* servers {@code alpha}, {@code bravo}, and {@code charlie} and you occasionally need to
* take each of the servers offline, {@code consistentHash} will be a poor fit: It provides
* no way for you to specify which of the three buckets is disappearing. Thus, if your
* buckets change from {@code [alpha, bravo, charlie]} to {@code [bravo, charlie]}, it will
* assign all the old {@code alpha} traffic to {@code bravo} and all the old {@code bravo}
* traffic to {@code charlie}, rather than letting {@code bravo} keep its traffic.
* </ul>
*
* <p>See the <a href="http://en.wikipedia.org/wiki/Consistent_hashing">Wikipedia article on
* consistent hashing</a> for more information.
*/
public static int consistentHash(long input, int buckets) {
checkArgument(buckets > 0, "buckets must be positive: %s", buckets);
LinearCongruentialGenerator generator = new LinearCongruentialGenerator(input);
int candidate = 0;
int next;
// Jump from bucket to bucket until we go out of range
while (true) {
next = (int) ((candidate + 1) / generator.nextDouble());
if (next >= 0 && next < buckets) {
candidate = next;
} else {
return candidate;
}
}
}
guava的consistentHash使用LinearCongruentialGenerator来生成double
小结
rocketmq5的消息消息的队列选择是在proxy模块中,它根据messageGroup使用Hashing.consistentHash(shardingKey.hashCode(), writeQueues.size())来选择writeQueues的下标。