RocketMQ 命令行工具源码结构解析

概述

RocketMQ 提供有控制台及一系列控制台命令,用于管理员对主题,集群,broker 等信息的管理;

进入 RocketMQ 的bin 目录,可以看到 mqadmin 脚本文件。


执行 mqadmin 脚本显示如下:


显示了 mqadmin 命令支持的所有操作。

如果想具体查新某一个操作的详细命令,可以使用

mqadmin help 命令名称
比如:mqadmin help updateTopic

查看 mqadmin脚本

可以发现 mqadmin 的命令调用的是 tools 命令,设置的启动类为 org.apache.rocketmq.tools.command.MQAdminStartup 。

tools 模块结构

MQAdminStartup 启动类

public static void main(String[] args) {
    main0(args, null);
}

public static void main0(String[] args, RPCHook rpcHook) {
    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));

    //PackageConflictDetect.detectFastjson();

    initCommand();

    try {
        initLogback();
        switch (args.length) {
            case 0:
                printHelp();
                break;
            case 2:
                if (args[0].equals("help")) {
                    SubCommand cmd = findSubCommand(args[1]);
                    if (cmd != null) {
                        Options options = ServerUtil.buildCommandlineOptions(new Options());
                        options = cmd.buildCommandlineOptions(options);
                        if (options != null) {
                            ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options);
                        }
                    } else {
                        System.out.printf("The sub command %s not exist.%n", args[1]);
                    }
                    break;
                }
            case 1:
            default:
                SubCommand cmd = findSubCommand(args[0]);
                if (cmd != null) {
                    String[] subargs = parseSubArgs(args);

                    Options options = ServerUtil.buildCommandlineOptions(new Options());
                    final CommandLine commandLine =
                        ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options),
                            new PosixParser());
                    if (null == commandLine) {
                        return;
                    }

                    if (commandLine.hasOption('n')) {
                        String namesrvAddr = commandLine.getOptionValue('n');
                        System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);
                    }

                    cmd.execute(commandLine, options, rpcHook);
                } else {
                    System.out.printf("The sub command %s not exist.%n", args[0]);
               }
               break;
        }
    } catch (Exception e) {
         e.printStackTrace();
    }
}

1、首先调用initCommand() 方法加载所有的命令。
2、初始化日志
3、判断启动该类main 方法传入的参数。

  • 3.1 如果没有参数,则打印帮助信息。
  • 3.2 如果参数为2个,并且第一个是 help,第二个参数是initCommand() 加载的命令名称,则调用 ServerUtil.printCommandLineHelp() 方法打印指定命令的帮助信息。
  • 3.3 如果参赛为一个、或2个,并且第一个参数不为 help,或多个。并且第一个参赛为 initCommand() 加载的命令,则调用 该initCommand() 加载类中的 execute() 方法。
 cmd.execute(commandLine, options, rpcHook);

initCommand() 方法

public static void initCommand() {
    initCommand(new UpdateTopicSubCommand());
    initCommand(new DeleteTopicSubCommand());
    initCommand(new UpdateSubGroupSubCommand());
    initCommand(new DeleteSubscriptionGroupCommand());
    initCommand(new UpdateBrokerConfigSubCommand());
    initCommand(new UpdateTopicPermSubCommand());

    initCommand(new TopicRouteSubCommand());
    initCommand(new TopicStatusSubCommand());
    initCommand(new TopicClusterSubCommand());

    initCommand(new BrokerStatusSubCommand());
    initCommand(new QueryMsgByIdSubCommand());
    initCommand(new QueryMsgByKeySubCommand());
    initCommand(new QueryMsgByUniqueKeySubCommand());
    initCommand(new QueryMsgByOffsetSubCommand());
        
    initCommand(new PrintMessageSubCommand());
    initCommand(new PrintMessageByQueueCommand());
    initCommand(new SendMsgStatusCommand());
    initCommand(new BrokerConsumeStatsSubCommad());

    initCommand(new ProducerConnectionSubCommand());
    initCommand(new ConsumerConnectionSubCommand());
    initCommand(new ConsumerProgressSubCommand());
    initCommand(new ConsumerStatusSubCommand());     
    initCommand(new CloneGroupOffsetCommand());

    initCommand(new ClusterListSubCommand());
    initCommand(new TopicListSubCommand());

    initCommand(new UpdateKvConfigCommand());
    initCommand(new DeleteKvConfigCommand());

    initCommand(new WipeWritePermSubCommand());
    initCommand(new ResetOffsetByTimeCommand());

    initCommand(new UpdateOrderConfCommand());
    initCommand(new CleanExpiredCQSubCommand());
    initCommand(new CleanUnusedTopicCommand());

    initCommand(new StartMonitoringSubCommand());
    initCommand(new StatsAllSubCommand());

    initCommand(new AllocateMQSubCommand());

    initCommand(new CheckMsgSendRTCommand());
    initCommand(new CLusterSendMsgRTCommand());

    initCommand(new GetNamesrvConfigCommand());
    initCommand(new UpdateNamesrvConfigCommand());
    initCommand(new GetBrokerConfigCommand());

    initCommand(new QueryConsumeQueueCommand());
    initCommand(new SendMessageCommand());
    initCommand(new ConsumeMessageCommand());
}

丛类名中可以看出跟上面控制台 执行 mqadmin 指令输出命令的名字和这里的类名可以一一对应上。

initCommand 方法

protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();

public static void initCommand(SubCommand command) {
    subCommandList.add(command);
}

把 init 加载到一个List集合中。

SubCommand 接口定义

所有的操作命令都实现了 SubCommand 接口

public interface SubCommand {
    String commandName();
    String commandDesc();
    Options buildCommandlineOptions(final Options options);
    void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException;
}

1、commandName() 命令名称
2、commandDesc()命令描述
3、buildCommandlineOptions() 构建命令解析器
4、execute() 执行命令

创建 Topic 源码分析

下面我们以创建 Topic 命令来分析实现原理。
updateTopic 命令是创建Topic的命令。


通过该命令可以查看 updateTopic 支持那么多参数。
下面我们来分析下 UpdateTopicPermSubCommand 类的实现

UpdateTopicPermSubCommand 解析

commandName()
@Override
public String commandName() {
    return "updateTopic";
}

命令名称

commandDesc()
@Override
public String commandDesc() {
    return "Update or create topic";
}

命令描述

buildCommandlineOptions()
@Override
public Options buildCommandlineOptions(Options options) {
    Option opt = new Option("b", "brokerAddr", true, "create topic to which broker");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("c", "clusterName", true, "create topic to which cluster");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("t", "topic", true, "topic name");
    opt.setRequired(true);
    options.addOption(opt);

    opt = new Option("r", "readQueueNums", true, "set read queue nums");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("w", "writeQueueNums", true, "set write queue nums");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("o", "order", true, "set topic's order(true|false)");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("u", "unit", true, "is unit topic (true|false)");
    opt.setRequired(false);
    options.addOption(opt);

    opt = new Option("s", "hasUnitSub", true, "has unit sub (true|false)");
    opt.setRequired(false);
    options.addOption(opt);

    return options;
}

从该方法中可以看到定义的命令及其说明。

execute() 方法
@Override
public void execute(final CommandLine commandLine, final Options options,
    RPCHook rpcHook) throws SubCommandException {
    DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
    defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));

    try {
        TopicConfig topicConfig = new TopicConfig();
        topicConfig.setReadQueueNums(8);
        topicConfig.setWriteQueueNums(8);
        topicConfig.setTopicName(commandLine.getOptionValue('t').trim());

        // readQueueNums
        if (commandLine.hasOption('r')) {
            topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));
        }

        // writeQueueNums
        if (commandLine.hasOption('w')) {
            topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));
        }

        // perm
        if (commandLine.hasOption('p')) {
            topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim()));
        }

        boolean isUnit = false;
        if (commandLine.hasOption('u')) {
            isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim());
        }

        boolean isCenterSync = false;
        if (commandLine.hasOption('s')) {
            isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim());
        }

        int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync);
        topicConfig.setTopicSysFlag(topicCenterSync);

        boolean isOrder = false;
        if (commandLine.hasOption('o')) {
            isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim());
        }
        topicConfig.setOrder(isOrder);

        if (commandLine.hasOption('b')) {
            String addr = commandLine.getOptionValue('b').trim();

            defaultMQAdminExt.start();
            defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);

            if (isOrder) {
                String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr);
                String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums();
                defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false);
                System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]",
                    isOrder, orderConf.toString()));
            }
            System.out.printf("create topic to %s success.%n", addr);
            System.out.printf("%s", topicConfig);
            return;

        } else if (commandLine.hasOption('c')) {
            String clusterName = commandLine.getOptionValue('c').trim();

            defaultMQAdminExt.start();

            Set<String> masterSet =
                CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
            for (String addr : masterSet) {
                defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
                System.out.printf("create topic to %s success.%n", addr);
            }

            if (isOrder) {
                Set<String> brokerNameSet =
                    CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName);
                StringBuilder orderConf = new StringBuilder();
                String splitor = "";
                for (String s : brokerNameSet) {
                    orderConf.append(splitor).append(s).append(":")
                        .append(topicConfig.getWriteQueueNums());
                    splitor = ";";
                }
                defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(),
                    orderConf.toString(), true);
                System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf);
            }

            System.out.printf("%s", topicConfig);
            return;
        }

        ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
    } catch (Exception e) {
        throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
    } finally {
        defaultMQAdminExt.shutdown();
    }
}

从上面代码中可以看出,很大一部分代码都是解析 commandLine 参数。
解析出来的参数来填充 TopicConfig 对象。
然后调用 DefaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig) 方法来创建 Topic。

从上面的代码中可以看出 -b 和 -c 参数只能有一个生效。
-b 参数是在指定的 broker 上创建 topic
-c 是在指定的集群上每一个 broker 创建 topic。

优先判断的是 -b 参数,如果指定 -b 参数就会在指定的 broker 上创建,而不会在 -c 指定的集群上创建。

其它的 SubCommand 命令的实现方式都一样,就不一一解析了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容