概述
RocketMQ 提供有控制台及一系列控制台命令,用于管理员对主题,集群,broker 等信息的管理;
进入 RocketMQ 的bin 目录,可以看到 mqadmin 脚本文件。
执行 mqadmin 脚本显示如下:
显示了 mqadmin 命令支持的所有操作。
如果想具体查新某一个操作的详细命令,可以使用
mqadmin help 命令名称
比如:mqadmin help updateTopic
查看 mqadmin脚本
可以发现 mqadmin 的命令调用的是 tools 命令,设置的启动类为 org.apache.rocketmq.tools.command.MQAdminStartup 。
tools 模块结构
MQAdminStartup 启动类
public static void main(String[] args) {
main0(args, null);
}
public static void main0(String[] args, RPCHook rpcHook) {
System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
//PackageConflictDetect.detectFastjson();
initCommand();
try {
initLogback();
switch (args.length) {
case 0:
printHelp();
break;
case 2:
if (args[0].equals("help")) {
SubCommand cmd = findSubCommand(args[1]);
if (cmd != null) {
Options options = ServerUtil.buildCommandlineOptions(new Options());
options = cmd.buildCommandlineOptions(options);
if (options != null) {
ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options);
}
} else {
System.out.printf("The sub command %s not exist.%n", args[1]);
}
break;
}
case 1:
default:
SubCommand cmd = findSubCommand(args[0]);
if (cmd != null) {
String[] subargs = parseSubArgs(args);
Options options = ServerUtil.buildCommandlineOptions(new Options());
final CommandLine commandLine =
ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
new PosixParser());
if (null == commandLine) {
return;
}
if (commandLine.hasOption('n')) {
String namesrvAddr = commandLine.getOptionValue('n');
System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
}
cmd.execute(commandLine, options, rpcHook);
} else {
System.out.printf("The sub command %s not exist.%n", args[0]);
}
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
1、首先调用initCommand() 方法加载所有的命令。
2、初始化日志
3、判断启动该类main 方法传入的参数。
- 3.1 如果没有参数,则打印帮助信息。
- 3.2 如果参数为2个,并且第一个是 help,第二个参数是initCommand() 加载的命令名称,则调用 ServerUtil.printCommandLineHelp() 方法打印指定命令的帮助信息。
- 3.3 如果参赛为一个、或2个,并且第一个参数不为 help,或多个。并且第一个参赛为 initCommand() 加载的命令,则调用 该initCommand() 加载类中的 execute() 方法。
cmd.execute(commandLine, options, rpcHook);
initCommand() 方法
public static void initCommand() {
initCommand(new UpdateTopicSubCommand());
initCommand(new DeleteTopicSubCommand());
initCommand(new UpdateSubGroupSubCommand());
initCommand(new DeleteSubscriptionGroupCommand());
initCommand(new UpdateBrokerConfigSubCommand());
initCommand(new UpdateTopicPermSubCommand());
initCommand(new TopicRouteSubCommand());
initCommand(new TopicStatusSubCommand());
initCommand(new TopicClusterSubCommand());
initCommand(new BrokerStatusSubCommand());
initCommand(new QueryMsgByIdSubCommand());
initCommand(new QueryMsgByKeySubCommand());
initCommand(new QueryMsgByUniqueKeySubCommand());
initCommand(new QueryMsgByOffsetSubCommand());
initCommand(new PrintMessageSubCommand());
initCommand(new PrintMessageByQueueCommand());
initCommand(new SendMsgStatusCommand());
initCommand(new BrokerConsumeStatsSubCommad());
initCommand(new ProducerConnectionSubCommand());
initCommand(new ConsumerConnectionSubCommand());
initCommand(new ConsumerProgressSubCommand());
initCommand(new ConsumerStatusSubCommand());
initCommand(new CloneGroupOffsetCommand());
initCommand(new ClusterListSubCommand());
initCommand(new TopicListSubCommand());
initCommand(new UpdateKvConfigCommand());
initCommand(new DeleteKvConfigCommand());
initCommand(new WipeWritePermSubCommand());
initCommand(new ResetOffsetByTimeCommand());
initCommand(new UpdateOrderConfCommand());
initCommand(new CleanExpiredCQSubCommand());
initCommand(new CleanUnusedTopicCommand());
initCommand(new StartMonitoringSubCommand());
initCommand(new StatsAllSubCommand());
initCommand(new AllocateMQSubCommand());
initCommand(new CheckMsgSendRTCommand());
initCommand(new CLusterSendMsgRTCommand());
initCommand(new GetNamesrvConfigCommand());
initCommand(new UpdateNamesrvConfigCommand());
initCommand(new GetBrokerConfigCommand());
initCommand(new QueryConsumeQueueCommand());
initCommand(new SendMessageCommand());
initCommand(new ConsumeMessageCommand());
}
丛类名中可以看出跟上面控制台 执行 mqadmin
指令输出命令的名字和这里的类名可以一一对应上。
initCommand 方法
protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();
public static void initCommand(SubCommand command) {
subCommandList.add(command);
}
把 init 加载到一个List集合中。
SubCommand 接口定义
所有的操作命令都实现了 SubCommand 接口
public interface SubCommand {
String commandName();
String commandDesc();
Options buildCommandlineOptions(final Options options);
void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException;
}
1、commandName() 命令名称
2、commandDesc()命令描述
3、buildCommandlineOptions() 构建命令解析器
4、execute() 执行命令
创建 Topic 源码分析
下面我们以创建 Topic 命令来分析实现原理。
updateTopic 命令是创建Topic的命令。
通过该命令可以查看 updateTopic 支持那么多参数。
下面我们来分析下 UpdateTopicPermSubCommand 类的实现
UpdateTopicPermSubCommand 解析
commandName()
@Override
public String commandName() {
return "updateTopic";
}
命令名称
commandDesc()
@Override
public String commandDesc() {
return "Update or create topic";
}
命令描述
buildCommandlineOptions()
@Override
public Options buildCommandlineOptions(Options options) {
Option opt = new Option("b", "brokerAddr", true, "create topic to which broker");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("c", "clusterName", true, "create topic to which cluster");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("t", "topic", true, "topic name");
opt.setRequired(true);
options.addOption(opt);
opt = new Option("r", "readQueueNums", true, "set read queue nums");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("w", "writeQueueNums", true, "set write queue nums");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("o", "order", true, "set topic's order(true|false)");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("u", "unit", true, "is unit topic (true|false)");
opt.setRequired(false);
options.addOption(opt);
opt = new Option("s", "hasUnitSub", true, "has unit sub (true|false)");
opt.setRequired(false);
options.addOption(opt);
return options;
}
从该方法中可以看到定义的命令及其说明。
execute() 方法
@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));
try {
TopicConfig topicConfig = new TopicConfig();
topicConfig.setReadQueueNums(8);
topicConfig.setWriteQueueNums(8);
topicConfig.setTopicName(commandLine.getOptionValue('t').trim());
// readQueueNums
if (commandLine.hasOption('r')) {
topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
}
// writeQueueNums
if (commandLine.hasOption('w')) {
topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
}
// perm
if (commandLine.hasOption('p')) {
topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));
}
boolean isUnit = false;
if (commandLine.hasOption('u')) {
isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim());
}
boolean isCenterSync = false;
if (commandLine.hasOption('s')) {
isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim());
}
int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync);
topicConfig.setTopicSysFlag(topicCenterSync);
boolean isOrder = false;
if (commandLine.hasOption('o')) {
isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
}
topicConfig.setOrder(isOrder);
if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();
defaultMQAdminExt.start();
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
if (isOrder) {
String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);
String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);
System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]",
isOrder, orderConf.toString()));
}
System.out.printf("create topic to %s success.%n", addr);
System.out.printf("%s", topicConfig);
return;
} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();
defaultMQAdminExt.start();
Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
System.out.printf("create topic to %s success.%n", addr);
}
if (isOrder) {
Set<String> brokerNameSet =
CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
StringBuilder orderConf = new StringBuilder();
String splitor = "";
for (String s : brokerNameSet) {
orderConf.append(splitor).append(s).append(":")
.append(topicConfig.getWriteQueueNums());
splitor = ";";
}
defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
orderConf.toString(), true);
System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf);
}
System.out.printf("%s", topicConfig);
return;
}
ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
从上面代码中可以看出,很大一部分代码都是解析 commandLine 参数。
解析出来的参数来填充 TopicConfig 对象。
然后调用 DefaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig) 方法来创建 Topic。
从上面的代码中可以看出 -b 和 -c 参数只能有一个生效。
-b 参数是在指定的 broker 上创建 topic
-c 是在指定的集群上每一个 broker 创建 topic。
优先判断的是 -b 参数,如果指定 -b 参数就会在指定的 broker 上创建,而不会在 -c 指定的集群上创建。
其它的 SubCommand 命令的实现方式都一样,就不一一解析了。