rocketmq版本4.3.2
1.自动创建流程
我们初步可以看一个流程图
我们可以在broker.conf中通过配置 autoCreateTopicEnable=true开启自动创建Topic功能(默认是开启的),那么它是怎么用的呢?
我们从源码上逐步分析,首先从官网的例子入手
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("producer1");
producer.setNamesrvAddr("10.91.124.150:9876");
producer.start();
Message msg = new Message("TopicTest" ,
("Today Hello1 RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
producer.shutdown();
}
}
1)从客户端发送消息上查看
DefaultMQProducer#send(msg);//先从发送消息开始看起
//然后会调用
DefaultMQProducerImpl#sendDefaultImpl()
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
//这里就是获取topic发布信息的关键
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
....
}
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
//先从客户端本地获取发布信息
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
//默认我们是没有当前 topic的信息的
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
//客户端本地做一个topic当前的初始化
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
//从远程nameserver上获取当前topic信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
//由于是新的topic信息,所以获取回来的是没有topic的相关信息
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
//MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
//这里会再次从nameserver上获取模板topic(TBW102)的信息进行当前topic的赋值
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
//看看从远程nameserver获取topic信息
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer) {
try {
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
TopicRouteData topicRouteData;
//如果isDefault=true,defaultMQProducer传了值,那么就会获取默认topic(TBW102)的信息然后赋值给我们当前topic的缓存信息
if (isDefault && defaultMQProducer!= null) {
//defaultMQProducer.getCreateTopicKey() = "TBW102"
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null) {
for (QueueData data : topicRouteData.getQueueDatas()) {
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
//这里默认的队列大小创建是4
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
}
}
} else {
//这里就是直接通过当前topic从远程nameserver获topic的相关信息
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
boolean changed = topicRouteDataIsChange(old, topicRouteData);
...
//后面是topic信息如果修改了,对于本地client的涉及缓存处理,事件通知等。咱忽略
return false;
}
总结:上面其实就是在获取不到 topic信息时,通过去远程拉取Topic "TBW102"的信息,个人认为这就是一个模板Topic,没有的时候就拉取下来克隆一个个自己,不过这只是在自己客户端上有这个缓存西信息,实际上nameserver和broker都没有这个topic信息。
2)发送消息到broker上
客户端发送消息,这里就是组装需要的信息
底层用netty通信的,这里细节不展开,后面我们看producer发送消息在看源码信息
NettyRemotingServer#NettyServerHandler#channelRead0()
NettyRemotingAbstract#processRequestCommand()
这里就是在做创建topic信息了 在处理消息前创建了topic信息
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
final int opaque = cmd.getOpaque();
if (pair != null) {
Runnable run = new Runnable() {
@Override
public void run() {
try {
...
// 处理request ;SendMessageProcessor
final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
}
...
}
//....
try {
//运行
final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
pair.getObject2().submit(requestTask);
//.....
}
//SendMessageProcessor#processRequest()然后调用如下方法
private RemotingCommand sendMessage(final ChannelHandlerContext ctx,
final RemotingCommand request,
final SendMessageContext sendMessageContext,
final SendMessageRequestHeader requestHeader) throws RemotingCommandException {
//...
log.debug("receive SendMessage request command, {}", request);
response.setCode(-1);
//这里有一个check信息
super.msgCheck(ctx, requestHeader, response);
// ...省略后面的
}
//完整的看一下消息的check信息
protected RemotingCommand msgCheck(final ChannelHandlerContext ctx,
final SendMessageRequestHeader requestHeader, final RemotingCommand response) {
//检验当前broker是否可写入
if (!PermName.isWriteable(this.brokerController.getBrokerConfig().getBrokerPermission())
&& this.brokerController.getTopicConfigManager().isOrderTopic(requestHeader.getTopic())) {
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark("the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending message is forbidden");
return response;
}
//"TBW102"不允许写入消息
if (!this.brokerController.getTopicConfigManager().isTopicCanSendMessage(requestHeader.getTopic())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
}
//服务端获取topic信息
TopicConfig topicConfig =
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
//如果没有找到
if (null == topicConfig) {
....
log.warn("the topic {} not exist, producer: {}", requestHeader.getTopic(), ctx.channel().remoteAddress());
//这里执行本地创建信息
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
requestHeader.getTopic(),
requestHeader.getDefaultTopic(),
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
//...省略
}
TopicConfigManager#createTopicInSendMessageMethod()
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) {
TopicConfig topicConfig = null;
boolean createNew = false;
try {
if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
topicConfig = this.topicConfigTable.get(topic);
if (topicConfig != null)
return topicConfig;
//这里有获取默认的TBW102来赋值topic相关信息
TopicConfig defaultTopicConfig = this.topicConfigTable.get(defaultTopic);
if (defaultTopicConfig != null) { //如果不为空做下面的赋值操作
if (defaultTopic.equals(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
if (!this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
defaultTopicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE);
}
}
if (PermName.isInherited(defaultTopicConfig.getPerm())) {
topicConfig = new TopicConfig(topic);
int queueNums =
clientDefaultTopicQueueNums > defaultTopicConfig.getWriteQueueNums() ? defaultTopicConfig
.getWriteQueueNums() : clientDefaultTopicQueueNums;
if (queueNums < 0) {
queueNums = 0;
}
topicConfig.setReadQueueNums(queueNums);
topicConfig.setWriteQueueNums(queueNums);
int perm = defaultTopicConfig.getPerm();
perm &= ~PermName.PERM_INHERIT;
topicConfig.setPerm(perm);
topicConfig.setTopicSysFlag(topicSysFlag);
topicConfig.setTopicFilterType(defaultTopicConfig.getTopicFilterType());
} else {
} else {
}
if (topicConfig != null) {
log.info("Create new topic by default topic:[{}] config:[{}] producer:[{}]",
defaultTopic, topicConfig, remoteAddress);
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
createNew = true;
//持久化操作
this.persist();
}
} finally {
this.lockTopicConfigTable.unlock();
}
}
} catch (InterruptedException e) {
log.error("createTopicInSendMessageMethod exception", e);
}
if (createNew) {
//通知其他broker信息
this.brokerController.registerBrokerAll(false, true,true);
}
return topicConfig;
}
这里其实我们都有一个疑问,这个TBW102是什么时候创建的,为什么我们一开始可以从nameserver上获取到这个topic信息。答案就是下面的代码,在broker启动的时候默认创建了几个系统级的topic信息。
public TopicConfigManager(BrokerController brokerController) {
this.brokerController = brokerController;
{
// MixAll.SELF_TEST_TOPIC
String topic = MixAll.SELF_TEST_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
//如果开启了自动创建topic功能,那么我们系统就会创建TBW102 topic,不然这个模板topic没有就无法自动创建topic信息
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) {
String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
//默认8个读队列,8个写对垒
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
//支持读,写,继承
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
{
// MixAll.BENCHMARK_TOPIC
String topic = MixAll.BENCHMARK_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1024);
topicConfig.setWriteQueueNums(1024);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
String topic = this.brokerController.getBrokerConfig().getBrokerClusterName();
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
int perm = PermName.PERM_INHERIT;
if (this.brokerController.getBrokerConfig().isClusterTopicEnable()) {
perm |= PermName.PERM_READ | PermName.PERM_WRITE;
}
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
String topic = this.brokerController.getBrokerConfig().getBrokerName();
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
int perm = PermName.PERM_INHERIT;
if (this.brokerController.getBrokerConfig().isBrokerTopicEnable()) {
perm |= PermName.PERM_READ | PermName.PERM_WRITE;
}
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
{
// MixAll.OFFSET_MOVED_EVENT
String topic = MixAll.OFFSET_MOVED_EVENT;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(1);
topicConfig.setWriteQueueNums(1);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
}
}
总结:
客户端通过两次向nameServer获取topic信息,当前topic信息没有拉到,就拉TWB102信息,然后在像broker发送消息;
broker在checkMsg的时候会根据默认的TWB102创建当前topic主题;
TWB102的读写队列分别是8个,支持读写继承perm=7;
根据TWB102创建的Topic的读写队列是4个,支持读写,perm=6
2.后台创建流程
如果我们使用了rocketmq-external来查看rocketmq的相关信息,可以很容易发现是依赖rocketmq-tools来实现外部创建topic的。
//POST /topic/createOrUpdate.do
TopicController#topicCreateOrUpdateRequest()
=> TopicServiceImpl#createOrUpdate()
=> MQAdminExeImpl#createAndUpdateTopicConfig()
=> DefaultMQAdminExt#createAndUpdateTopicConfig()()
=> DefaultMQAdminExtImpl#createAndUpdateTopicConfig()
=> MQClientAPIImpl#createTopic()