系列
开篇
- 这个系列主要用以分析mqadmin常见的比较核心的几个命令,主要包括订阅分组和topic的创建和删除、Topic的权限变更。
- 这篇文章主要是用来分析Topic的创建和删除。
创建Topic
Topic创建的核心步骤如下
- 1、mqadmin向broker发起创建Topic的命令。
- 2、broker生成Topic对应的topicConfig配置保存在broker的TopicConfigManager中。
- 3、broker向所有的namesrv上报topicConfig信息。
- 4、namesrv的RouteInfoManager的topicQueueTable保存topic的QueueData信息。
- 5、broker会通过定时任务定期向namesrv发送心跳信息更新topic配置。
updateTopic
usage: mqadmin updateTopic -b <arg> | -c <arg> [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>] -t
<arg> [-u <arg>] [-w <arg>]
-b,--brokerAddr <arg> create topic to which broker
-c,--clusterName <arg> create topic to which cluster
-h,--help Print help
-n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-o,--order <arg> set topic's order(true|false)
-p,--perm <arg> set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]
-r,--readQueueNums <arg> set read queue nums
-s,--hasUnitSub <arg> has unit sub (true|false)
-t,--topic <arg> topic name
-u,--unit <arg> is unit topic (true|false)
-w,--writeQueueNums <arg> set write queue nums
- 通过 --brokerAddr在指定的broker创建topic。
- 通过 --clusterName在整个集群创建topic。
- 通过 --namesrvAddr指定namesrv地址。
- 通过 --topic来指定topic名称。
- 通过 --perm来指定Topic的权限管理。
UpdateTopicSubCommand
public class UpdateTopicSubCommand implements SubCommand {
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
// 默认的topic的读写队列为8个
TopicConfig topicConfig = new TopicConfig();
topicConfig.setReadQueueNums(8);
topicConfig.setWriteQueueNums(8);
topicConfig.setTopicName(commandLine.getOptionValue('t').trim());
// readQueueNums
if (commandLine.hasOption('r')) {
topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
}
// writeQueueNums
if (commandLine.hasOption('w')) {
topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
}
// perm
if (commandLine.hasOption('p')) {
topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));
}
boolean isUnit = false;
if (commandLine.hasOption('u')) {
isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim());
}
boolean isCenterSync = false;
if (commandLine.hasOption('s')) {
isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim());
}
int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync);
topicConfig.setTopicSysFlag(topicCenterSync);
boolean isOrder = false;
if (commandLine.hasOption('o')) {
isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
}
topicConfig.setOrder(isOrder);
if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();
defaultMQAdminExt.start();
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
if (isOrder) {
String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);
String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);
System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]",
isOrder, orderConf.toString()));
}
System.out.printf("create topic to %s success.%n", addr);
System.out.printf("%s", topicConfig);
return;
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
defaultMQAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
System.out.printf("create topic to %s success.%n", addr);
}
if (isOrder) {
Set<String> brokerNameSet =
CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
StringBuilder orderConf = new StringBuilder();
String splitor = "";
for (String s : brokerNameSet) {
orderConf.append(splitor).append(s).append(":")
.append(topicConfig.getWriteQueueNums());
splitor = ";";
}
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
orderConf.toString(), true);
System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf);
}
System.out.printf("%s", topicConfig);
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}
- Topic的默认的读写队列为8.
- 针对指定broker的场景,只在指定的broker机器创建Topic。
- 针对指定cluster的场景,获取集群下的所有broker并向全部的broker机器创建Topic。
mqadmin MQClientAPIImpl
public class MQClientAPIImpl {
public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig,
final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topicConfig.getTopicName());
requestHeader.setDefaultTopic(defaultTopic);
requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
requestHeader.setPerm(topicConfig.getPerm());
requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
requestHeader.setOrder(topicConfig.isOrder());
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
}
- 创建Topic的RequestCode为UPDATE_AND_CREATE_TOPIC。
broker AdminBrokerProcessor
public class AdminBrokerProcessor extends AsyncNettyRequestProcessor implements NettyRequestProcessor {
private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final CreateTopicRequestHeader requestHeader =
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
if (requestHeader.getTopic().equals(this.brokerController.getBrokerConfig().getBrokerClusterName())) {
String errorMsg = "the topic[" + requestHeader.getTopic() + "] is conflict with system reserved words.";
log.warn(errorMsg);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(errorMsg);
return response;
}
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
return response;
}
try {
response.setCode(ResponseCode.SUCCESS);
response.setOpaque(request.getOpaque());
response.markResponseType();
response.setRemark(null);
ctx.writeAndFlush(response);
} catch (Exception e) {
log.error("Failed to produce a proper response", e);
}
// TopicConfig的创建配置
TopicConfig topicConfig = new TopicConfig(requestHeader.getTopic());
topicConfig.setReadQueueNums(requestHeader.getReadQueueNums());
topicConfig.setWriteQueueNums(requestHeader.getWriteQueueNums());
topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
topicConfig.setPerm(requestHeader.getPerm());
topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
// broker保存Topic的配置信息
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
// broker负责向namesrv上报topic信息。
this.brokerController.registerIncrementBrokerData(topicConfig, this.brokerController.getTopicConfigManager().getDataVersion());
return null;
}
}
- 创建TopicConfig对象并通过TopicConfigManager来进行保存。
- 通过brokerController#registerIncrementBrokerData来上报topic信息到namesrv。
broker TopicConfigManager
public class TopicConfigManager extends ConfigManager {
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private transient final Lock lockTopicConfigTable = new ReentrantLock();
private final ConcurrentMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>(1024);
private final DataVersion dataVersion = new DataVersion();
private final Set<String> systemTopicList = new HashSet<String>();
private transient BrokerController brokerController;
public void updateTopicConfig(final TopicConfig topicConfig) {
TopicConfig old = this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
if (old != null) {
log.info("update topic config, old:[{}] new:[{}]", old, topicConfig);
} else {
log.info("create new topic [{}]", topicConfig);
}
this.dataVersion.nextVersion();
this.persist();
}
}
- 通过topicConfigTable来保存Topic和对应的TopicConfig。
broker BrokerController
public class BrokerController {
public synchronized void registerIncrementBrokerData(TopicConfig topicConfig, DataVersion dataVersion) {
TopicConfig registerTopicConfig = topicConfig;
// 如果broker本身的存在不可读和不可写的权限,那么就以broker的读写权限为准
if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
|| !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
registerTopicConfig =
new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
this.brokerConfig.getBrokerPermission());
}
ConcurrentMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
topicConfigTable.put(topicConfig.getTopicName(), registerTopicConfig);
TopicConfigSerializeWrapper topicConfigSerializeWrapper = new TopicConfigSerializeWrapper();
topicConfigSerializeWrapper.setDataVersion(dataVersion);
topicConfigSerializeWrapper.setTopicConfigTable(topicConfigTable);
// 将topic信息注册到namesrv的逻辑
doRegisterBrokerAll(true, false, topicConfigSerializeWrapper);
}
private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
TopicConfigSerializeWrapper topicConfigWrapper) {
// 向所有的namesrv发送topic的注册信息
List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
this.brokerConfig.getBrokerClusterName(),
this.getBrokerAddr(),
this.brokerConfig.getBrokerName(),
this.brokerConfig.getBrokerId(),
this.getHAServerAddr(),
topicConfigWrapper,
this.filterServerManager.buildNewFilterServerList(),
oneway,
this.brokerConfig.getRegisterBrokerTimeoutMills(),
this.brokerConfig.isCompressedRegister());
if (registerBrokerResultList.size() > 0) {
RegisterBrokerResult registerBrokerResult = registerBrokerResultList.get(0);
if (registerBrokerResult != null) {
if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
}
this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());
if (checkOrderConfig) {
this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
}
}
}
}
}
- registerIncrementBrokerData首先会检查broker本身的读写权限重新生成新的topicConfigSerializeWrapper。
- 通过brokerOuterAPI#registerBrokerAll向namesrv注册最新的topic信息。
broker BrokerOuterAPI
public class BrokerOuterAPI {
private RegisterBrokerResult registerBroker(
final String namesrvAddr,
final boolean oneway,
final int timeoutMills,
final RegisterBrokerRequestHeader requestHeader,
final byte[] body
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
InterruptedException {
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
request.setBody(body);
// oneway=false,这个分支不会执行
if (oneway) {
try {
this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
} catch (RemotingTooMuchRequestException e) {
// Ignore
}
return null;
}
// 向namesrv上报
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
// 处理上报结果并同步主从
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
RegisterBrokerResponseHeader responseHeader =
(RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
RegisterBrokerResult result = new RegisterBrokerResult();
result.setMasterAddr(responseHeader.getMasterAddr());
result.setHaServerAddr(responseHeader.getHaServerAddr());
if (response.getBody() != null) {
result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
}
return result;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
}
- 向namesrv注册topic的RequestCode为REGISTER_BROKER。
- broker向namesrv发起topic注册。
namesrv DefaultRequestProcessor
public class DefaultRequestProcessor implements NettyRequestProcessor {
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(RegisterBrokerResponseHeader.class);
final RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.readCustomHeader();
final RegisterBrokerRequestHeader requestHeader =
(RegisterBrokerRequestHeader) request.decodeCommandCustomHeader(RegisterBrokerRequestHeader.class);
if (!checksum(ctx, request, requestHeader)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("crc32 not match");
return response;
}
RegisterBrokerBody registerBrokerBody = new RegisterBrokerBody();
if (request.getBody() != null) {
try {
registerBrokerBody = RegisterBrokerBody.decode(request.getBody(), requestHeader.isCompressed());
} catch (Exception e) {
throw new RemotingCommandException("Failed to decode RegisterBrokerBody", e);
}
} else {
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setCounter(new AtomicLong(0));
registerBrokerBody.getTopicConfigSerializeWrapper().getDataVersion().setTimestamp(0);
}
// namesrv同时注册broker信息和TopicConfig信息
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel());
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
}
- getRouteInfoManager().registerBroker()负责向namesrv注册broker信息。
namesrv RouteInfoManager
public class RouteInfoManager {
private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
public RouteInfoManager() {
this.topicQueueTable = new HashMap<String, List<QueueData>>(1024);
this.brokerAddrTable = new HashMap<String, BrokerData>(128);
this.clusterAddrTable = new HashMap<String, Set<String>>(32);
this.brokerLiveTable = new HashMap<String, BrokerLiveInfo>(256);
this.filterServerTable = new HashMap<String, List<String>>(256);
}
public RegisterBrokerResult registerBroker(
final String clusterName,
final String brokerAddr,
final String brokerName,
final long brokerId,
final String haServerAddr,
final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,
final Channel channel) {
RegisterBrokerResult result = new RegisterBrokerResult();
try {
try {
this.lock.writeLock().lockInterruptibly();
Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
if (null == brokerNames) {
brokerNames = new HashSet<String>();
this.clusterAddrTable.put(clusterName, brokerNames);
}
brokerNames.add(brokerName);
boolean registerFirst = false;
BrokerData brokerData = this.brokerAddrTable.get(brokerName);
if (null == brokerData) {
registerFirst = true;
brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
this.brokerAddrTable.put(brokerName, brokerData);
}
Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();
//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>
//The same IP:PORT must only have one record in brokerAddrTable
Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();
while (it.hasNext()) {
Entry<Long, String> item = it.next();
if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {
it.remove();
}
}
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
registerFirst = registerFirst || (null == oldAddr);
// 保存topicConfig的信息
if (null != topicConfigWrapper
&& MixAll.MASTER_ID == brokerId) {
if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
|| registerFirst) {
ConcurrentMap<String, TopicConfig> tcTable =
topicConfigWrapper.getTopicConfigTable();
if (tcTable != null) {
for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {
this.createAndUpdateQueueData(brokerName, entry.getValue());
}
}
}
}
BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
new BrokerLiveInfo(
System.currentTimeMillis(),
topicConfigWrapper.getDataVersion(),
channel,
haServerAddr));
if (null == prevBrokerLiveInfo) {
log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);
}
if (filterServerList != null) {
if (filterServerList.isEmpty()) {
this.filterServerTable.remove(brokerAddr);
} else {
this.filterServerTable.put(brokerAddr, filterServerList);
}
}
if (MixAll.MASTER_ID != brokerId) {
String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
if (masterAddr != null) {
BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
if (brokerLiveInfo != null) {
result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
result.setMasterAddr(masterAddr);
}
}
}
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("registerBroker Exception", e);
}
return result;
}
}
- RouteInfoManager是namesrv的核心数据管理中心,broker上报Topic的过程中会同时更新broker的保活信息等。
- 核心的createAndUpdateQueueData负责注册topic配置信息。
namesrv createAndUpdateQueueData
public class RouteInfoManager {
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) {
QueueData queueData = new QueueData();
queueData.setBrokerName(brokerName);
queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
queueData.setReadQueueNums(topicConfig.getReadQueueNums());
queueData.setPerm(topicConfig.getPerm());
queueData.setTopicSynFlag(topicConfig.getTopicSysFlag());
List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
if (null == queueDataList) {
queueDataList = new LinkedList<QueueData>();
queueDataList.add(queueData);
this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
log.info("new topic registered, {} {}", topicConfig.getTopicName(), queueData);
} else {
boolean addNewOne = true;
Iterator<QueueData> it = queueDataList.iterator();
while (it.hasNext()) {
QueueData qd = it.next();
if (qd.getBrokerName().equals(brokerName)) {
if (qd.equals(queueData)) {
addNewOne = false;
} else {
log.info("topic changed, {} OLD: {} NEW: {}", topicConfig.getTopicName(), qd,
queueData);
it.remove();
}
}
}
if (addNewOne) {
queueDataList.add(queueData);
}
}
}
}
- 负责创建QueueData并保存到topicQueueTable当中。
- createAndUpdateQueueData会处理所有broker的上报,所以在处理QueueData过程中需要判断brokerName以处理对应的broker的QueueData。
定时上报 BrokerController
public class BrokerController {
public void start() throws Exception {
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
}
- broker会定时向namesrv上报broker的信息并且包括topic的注册信息。
删除Topic
删除Topic的核心逻辑如下
- 1、mqadmin负责通知所有的broker删除topic对应的配置和消息文件。
- 2、mqadmin负责通知所有的namesrv删除topic对应的配置。
deleteTopic
usage: mqadmin deleteTopic -c <arg> [-h] [-n <arg>] -t <arg>
-c,--clusterName <arg> delete topic from which cluster
-h,--help Print help
-n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-t,--topic <arg> topic name
- --topic指定待删除的topic信息。
- --clusterName指定待删除Topic所在的集群信息。
mqadmin DeleteTopicSubCommand
public class DeleteTopicSubCommand implements SubCommand {
public static void deleteTopic(final DefaultMQAdminExt adminExt,
final String clusterName,
final String topic
) throws InterruptedException, MQBrokerException, RemotingException, MQClientException {
Set<String> brokerAddressSet = CommandUtil.fetchMasterAndSlaveAddrByClusterName(adminExt, clusterName);
adminExt.deleteTopicInBroker(brokerAddressSet, topic);
System.out.printf("delete topic [%s] from cluster [%s] success.%n", topic, clusterName);
Set<String> nameServerSet = null;
if (adminExt.getNamesrvAddr() != null) {
String[] ns = adminExt.getNamesrvAddr().trim().split(";");
nameServerSet = new HashSet(Arrays.asList(ns));
}
adminExt.deleteTopicInNameServer(nameServerSet, topic);
System.out.printf("delete topic [%s] from NameServer success.%n", topic);
}
@Override
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt adminExt = new DefaultMQAdminExt(rpcHook);
adminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
String topic = commandLine.getOptionValue('t').trim();
if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
adminExt.start();
deleteTopic(adminExt, clusterName, topic);
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
adminExt.shutdown();
}
}
}
- deleteTopicInBroker负责当前集群的所有broker上的Topic信息。
- deleteTopicInNameServer负责删除namesrv中的topic信息
mqadmin DefaultMQAdminExtImpl
public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
public void deleteTopicInBroker(Set<String> addrs,
String topic) throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
for (String addr : addrs) {
this.mqClientInstance.getMQClientAPIImpl().deleteTopicInBroker(addr, topic, timeoutMillis);
}
}
}
- 遍历所有broker依次执行deleteTopicInBroker操作。
mqadmin MQClientAPIImpl
public class MQClientAPIImpl {
public void deleteTopicInBroker(final String addr, final String topic, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_BROKER, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
}
- deleteTopicInBroker的RequestCode为DELETE_TOPIC_IN_BROKER。
broker AdminBrokerProcessor
public class AdminBrokerProcessor implements NettyRequestProcessor {
private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
DeleteTopicRequestHeader requestHeader =
(DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
// 删除配置信息
this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
// 删除消息存储文件
this.brokerController.getMessageStore()
.cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
}
public class TopicConfigManager extends ConfigManager {
public void deleteTopicConfig(final String topic) {
// 从topicConfigTable中删除topic的配置信息
TopicConfig old = this.topicConfigTable.remove(topic);
if (old != null) {
log.info("delete topic config OK, topic: {}", old);
this.dataVersion.nextVersion();
this.persist();
} else {
log.warn("delete topic config failed, topic: {} not exists", topic);
}
}
- deleteTopic操作包括删除broker上topic的配置信息和topic对应的消息存储文件。
mqadmin DefaultMQAdminExtImpl
public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner {
@Override
public void deleteTopicInNameServer(Set<String> addrs,
String topic) throws RemotingException, MQBrokerException, InterruptedException,
MQClientException {
if (addrs == null) {
String ns = this.mqClientInstance.getMQClientAPIImpl().fetchNameServerAddr();
addrs = new HashSet(Arrays.asList(ns.split(";")));
}
for (String addr : addrs) {
this.mqClientInstance.getMQClientAPIImpl().deleteTopicInNameServer(addr, topic, timeoutMillis);
}
}
}
public class MQClientAPIImpl {
public void deleteTopicInNameServer(final String addr, final String topic, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_TOPIC_IN_NAMESRV, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, timeoutMillis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQClientException(response.getCode(), response.getRemark());
}
}
- 遍历所有的namesrv并依次执行topic的删除。
- requestCode为DELETE_TOPIC_IN_NAMESRV。
namesrv DefaultRequestProcessor
public class DefaultRequestProcessor implements NettyRequestProcessor {
private RemotingCommand deleteTopicInNamesrv(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
final DeleteTopicInNamesrvRequestHeader requestHeader =
(DeleteTopicInNamesrvRequestHeader) request.decodeCommandCustomHeader(DeleteTopicInNamesrvRequestHeader.class);
this.namesrvController.getRouteInfoManager().deleteTopic(requestHeader.getTopic());
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}
}
public class RouteInfoManager {
public void deleteTopic(final String topic) {
try {
try {
this.lock.writeLock().lockInterruptibly();
this.topicQueueTable.remove(topic);
} finally {
this.lock.writeLock().unlock();
}
} catch (Exception e) {
log.error("deleteTopic Exception", e);
}
}
}
- namesrv的topic删除负责从RouteInfoManager移除topic对应的QueueData。