聊聊sentinel的SimpleHttpCommandCenter

本文主要研究一下sentinel的SimpleHttpCommandCenter

SimpleHttpCommandCenter

sentinel-transport-simple-http-0.1.1-sources.jar!/com/alibaba/csp/sentinel/transport/command/SimpleHttpCommandCenter.java

/***
 * The simple command center provides service to exchange information.
 *
 * @author youji.zj
 */
public class SimpleHttpCommandCenter implements CommandCenter {

    private static final int PORT_UNINITIALIZED = -1;

    private static final int DEFAULT_SERVER_SO_TIMEOUT = 3000;
    private static final int DEFAULT_PORT = 8719;

    private static final Map<String, CommandHandler> handlerMap = new HashMap<String, CommandHandler>();

    private ExecutorService executor = Executors.newSingleThreadExecutor(
        new NamedThreadFactory("sentinel-command-center-executor"));
    private ExecutorService bizExecutor;

    private ServerSocket socketReference;

    @Override
    public void beforeStart() throws Exception {
        // Register handlers
        Map<String, CommandHandler> handlers = CommandHandlerProvider.getInstance().namedHandlers();
        registerCommands(handlers);
    }

    @Override
    public void start() throws Exception {
        int nThreads = Runtime.getRuntime().availableProcessors();
        this.bizExecutor = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(10),
            new NamedThreadFactory("sentinel-command-center-service-executor"),
            new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    CommandCenterLog.info("EventTask rejected");
                    throw new RejectedExecutionException();
                }
            });

        Runnable serverInitTask = new Runnable() {
            int port;

            {
                try {
                    port = Integer.parseInt(TransportConfig.getPort());
                } catch (Exception e) {
                    port = DEFAULT_PORT;
                }
            }

            @Override
            public void run() {
                int repeat = 0;

                int tmpPort = port;
                boolean success = false;
                while (true) {
                    ServerSocket serverSocket = null;
                    try {
                        serverSocket = new ServerSocket(tmpPort);
                    } catch (IOException ex) {
                        CommandCenterLog.info(
                            String.format("IO error occurs, port: %d, repeat times: %d", tmpPort, repeat), ex);

                        tmpPort = adjustPort(repeat);
                        try {
                            TimeUnit.SECONDS.sleep(5);
                        } catch (InterruptedException e1) {
                            break;
                        }
                        repeat++;
                    }

                    if (serverSocket != null) {
                        CommandCenterLog.info("[CommandCenter] Begin listening at port " + serverSocket.getLocalPort());
                        socketReference = serverSocket;
                        executor.submit(new ServerThread(serverSocket));
                        success = true;
                        break;
                    }
                }

                if (!success) {
                    tmpPort = PORT_UNINITIALIZED;
                }
                TransportConfig.setRuntimePort(tmpPort);
                executor.shutdown();
            }

            /**
             * Adjust the port to settle down.
             */
            private int adjustPort(int repeat) {
                int mod = repeat / 5;
                return port + mod;
            }
        };

        new Thread(serverInitTask).start();
    }

    @Override
    public void stop() throws Exception {
        if (socketReference != null) {
            try {
                socketReference.close();
            } catch (IOException e) {
                CommandCenterLog.warn("Error when releasing the server socket", e);
            }
        }
        bizExecutor.shutdownNow();
        executor.shutdownNow();
        TransportConfig.setRuntimePort(PORT_UNINITIALIZED);
        handlerMap.clear();
    }

    /**
     * Get the name set of all registered commands.
     */
    public static Set<String> getCommands() {
        return handlerMap.keySet();
    }

    public static CommandHandler getHandler(String commandName) {
        return handlerMap.get(commandName);
    }

    public static void registerCommands(Map<String, CommandHandler> handlerMap) {
        if (handlerMap != null) {
            for (Entry<String, CommandHandler> e : handlerMap.entrySet()) {
                registerCommand(e.getKey(), e.getValue());
            }
        }
    }

    public static void registerCommand(String commandName, CommandHandler handler) {
        if (StringUtil.isEmpty(commandName)) {
            return;
        }

        if (handlerMap.containsKey(commandName)) {
            CommandCenterLog.info("Register failed (duplicate command): " + commandName);
            return;
        }

        handlerMap.put(commandName, handler);
    }

    /**
     * Avoid server thread hang, 3 seconds timeout by default.
     */
    private void setSocketSoTimeout(Socket socket) throws SocketException {
        if (socket != null) {
            socket.setSoTimeout(DEFAULT_SERVER_SO_TIMEOUT);
        }
    }
}
  • 这里直接使用的是java的ServerSocket(bio)构建的tcp服务
  • 启动之前先调用CommandHandlerProvider.getInstance().namedHandlers()获取支持的命令及handler,然后调用registerCommands进行注册
  • 之后创建bizExecutor以及异步serverInitTask线程,里头再异步启动ServerThread

ServerThread

    class ServerThread extends Thread {

        private ServerSocket serverSocket;

        ServerThread(ServerSocket s) {
            this.serverSocket = s;
            setName("sentinel-courier-server-accept-thread");
        }

        @Override
        public void run() {
            while (true) {
                Socket socket = null;
                try {
                    socket = this.serverSocket.accept();
                    setSocketSoTimeout(socket);
                    HttpEventTask eventTask = new HttpEventTask(socket);
                    bizExecutor.submit(eventTask);
                } catch (Exception e) {
                    CommandCenterLog.info("Server error", e);
                    if (socket != null) {
                        try {
                            socket.close();
                        } catch (Exception e1) {
                            CommandCenterLog.info("Error when closing an opened socket", e1);
                        }
                    }
                    try {
                        // In case of infinite log.
                        Thread.sleep(10);
                    } catch (InterruptedException e1) {
                        // Indicates the task should stop.
                        break;
                    }
                }
            }
        }
    }
  • ServerThread采取的是worker线程池模式,接收一个请求之后,包装为HttpEventTask,然后提交给bizExecutor

HttpEventTask

sentinel-transport-simple-http-0.1.1-sources.jar!/com/alibaba/csp/sentinel/transport/command/http/HttpEventTask.java

public class HttpEventTask implements Runnable {

    private final Socket socket;

    private boolean writtenHead = false;

    public HttpEventTask(Socket socket) {
        this.socket = socket;
    }

    public void close() throws Exception {
        socket.close();
    }

    @Override
    public void run() {
        if (socket == null) {
            return;
        }

        BufferedReader in = null;
        PrintWriter printWriter = null;
        try {
            long start = System.currentTimeMillis();
            in = new BufferedReader(new InputStreamReader(socket.getInputStream(), SentinelConfig.charset()));
            OutputStream outputStream = socket.getOutputStream();

            printWriter = new PrintWriter(
                new OutputStreamWriter(outputStream, Charset.forName(SentinelConfig.charset())));

            String line = in.readLine();
            CommandCenterLog.info("[CommandCenter] socket income:" + line
                + "," + socket.getInetAddress());
            CommandRequest request = parseRequest(line);

            // Validate the target command.
            String commandName = HttpCommandUtils.getTarget(request);
            if (StringUtil.isBlank(commandName)) {
                badRequest(printWriter, "Invalid command");
                return;
            }

            // Find the matching command handler.
            CommandHandler<?> commandHandler = SimpleHttpCommandCenter.getHandler(commandName);
            if (commandHandler != null) {
                CommandResponse<?> response = commandHandler.handle(request);
                handleResponse(response, printWriter, outputStream);
            } else {
                // No matching command handler.
                badRequest(printWriter, "Unknown command `" + commandName + '`');
            }
            printWriter.flush();

            long cost = System.currentTimeMillis() - start;
            CommandCenterLog.info("[CommandCenter] deal a socket task:" + line
                + "," + socket.getInetAddress() + ", time cost=" + cost + " ms");
        } catch (Throwable e) {
            CommandCenterLog.info("CommandCenter error", e);
            try {
                if (printWriter != null) {
                    String errorMessage = SERVER_ERROR_MESSAGE;
                    if (!writtenHead) {
                        internalError(printWriter, errorMessage);
                    } else {
                        printWriter.println(errorMessage);
                    }
                    printWriter.flush();
                }
            } catch (Exception e1) {
                CommandCenterLog.info("CommandCenter close serverSocket failed", e);
            }
        } finally {
            closeResource(in);
            closeResource(printWriter);
            closeResource(socket);
        }
    }

    private void closeResource(Closeable closeable) {
        if (closeable != null) {
            try {
                closeable.close();
            } catch (Exception e) {
                CommandCenterLog.info("CommandCenter close resource failed", e);
            }
        }
    }

    private void handleResponse(CommandResponse response, /*@NonNull*/ final PrintWriter printWriter,
        /*@NonNull*/ final OutputStream rawOutputStream) throws Exception {
        if (response.isSuccess()) {
            if (response.getResult() == null) {
                writeOkStatusLine(printWriter);
                return;
            }
            // Write 200 OK status line.
            writeOkStatusLine(printWriter);
            // Here we directly use `toString` to encode the result to plain text.
            byte[] buffer = response.getResult().toString().getBytes(SentinelConfig.charset());
            rawOutputStream.write(buffer);
            rawOutputStream.flush();
        } else {
            String msg = SERVER_ERROR_MESSAGE;
            if (response.getException() != null) {
                msg = response.getException().getMessage();
            }
            badRequest(printWriter, msg);
        }
    }

    /**
     * Write `400 Bad Request` HTTP response status line and message body, then flush.
     */
    private void badRequest(/*@NonNull*/ final PrintWriter out, String message) {
        out.print("HTTP/1.1 400 Bad Request\r\n"
            + "Connection: close\r\n\r\n");
        out.print(message);
        out.flush();
        writtenHead = true;
    }

    /**
     * Write `500 Internal Server Error` HTTP response status line and message body, then flush.
     */
    private void internalError(/*@NonNull*/ final PrintWriter out, String message) {
        out.print("HTTP/1.1 500 Internal Server Error\r\n"
            + "Connection: close\r\n\r\n");
        out.print(message);
        out.flush();
        writtenHead = true;
    }

    /**
     * Write `200 OK` HTTP response status line and flush.
     */
    private void writeOkStatusLine(/*@NonNull*/ final PrintWriter out) {
        out.print("HTTP/1.1 200 OK\r\n"
            + "Connection: close\r\n\r\n");
        out.flush();
        writtenHead = true;
    }

    /**
     * Parse raw HTTP request line to a {@link CommandRequest}.
     *
     * @param line HTTP request line
     * @return parsed command request
     */
    private CommandRequest parseRequest(String line) {
        CommandRequest request = new CommandRequest();
        if (StringUtil.isBlank(line)) {
            return request;
        }
        int start = line.indexOf('/');
        int ask = line.indexOf('?') == -1 ? line.lastIndexOf(' ') : line.indexOf('?');
        int space = line.lastIndexOf(' ');
        String target = line.substring(start != -1 ? start + 1 : 0, ask != -1 ? ask : line.length());
        request.addMetadata(HttpCommandUtils.REQUEST_TARGET, target);
        if (ask == -1 || ask == space) {
            return request;
        }
        String parameterStr = line.substring(ask != -1 ? ask + 1 : 0, space != -1 ? space : line.length());
        for (String parameter : parameterStr.split("&")) {
            if (StringUtil.isBlank(parameter)) {
                continue;
            }

            String[] keyValue = parameter.split("=");
            if (keyValue.length != 2) {
                continue;
            }

            String value = StringUtil.trim(keyValue[1]);
            try {
                value = URLDecoder.decode(value, SentinelConfig.charset());
            } catch (UnsupportedEncodingException e) {
            }

            request.addParam(StringUtil.trim(keyValue[0]), value);
        }
        return request;
    }

    private static final String SERVER_ERROR_MESSAGE = "Command server error";

}
  • 这里直接readLine,然后解析为CommandRequest,然后调用SimpleHttpCommandCenter.getHandler获取处理程序,执行返回结果
  • 可以看到这里是手工解析http协议

CommandHandlerProvider

sentinel-transport-common-0.1.1-sources.jar!/com/alibaba/csp/sentinel/command/CommandHandlerProvider.java

/**
 * Provides and filters command handlers registered via SPI.
 *
 * @author Eric Zhao
 */
public class CommandHandlerProvider implements Iterable<CommandHandler> {

    private final ServiceLoader<CommandHandler> serviceLoader = ServiceLoader.load(CommandHandler.class);

    /**
     * Get all command handlers annotated with {@link CommandMapping} with command name.
     *
     * @return list of all named command handlers
     */
    public Map<String, CommandHandler> namedHandlers() {
        Map<String, CommandHandler> map = new HashMap<String, CommandHandler>();
        for (CommandHandler handler : serviceLoader) {
            String name = parseCommandName(handler);
            if (!StringUtil.isEmpty(name)) {
                map.put(name, handler);
            }
        }
        return map;
    }

    private String parseCommandName(CommandHandler handler) {
        CommandMapping commandMapping = handler.getClass().getAnnotation(CommandMapping.class);
        if (commandMapping != null) {
            return commandMapping.name();
        } else {
            return null;
        }
    }

    @Override
    public Iterator<CommandHandler> iterator() {
        return serviceLoader.iterator();
    }

    private static final CommandHandlerProvider INSTANCE = new CommandHandlerProvider();

    public static CommandHandlerProvider getInstance() {
        return INSTANCE;
    }
}
  • 这个类采用单例模式,以及SPI机制,来动态加载CommandHandler的实现,然后namedHandlers方法提供一个命令名及handler的map

com.alibaba.csp.sentinel.command.CommandHandler

sentinel-transport-common-0.1.1.jar!/META-INF/services/com.alibaba.csp.sentinel.command.CommandHandler

com.alibaba.csp.sentinel.command.handler.BasicInfoCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchActiveRuleCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterNodeByIdCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchClusterNodeHumanCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchJsonTreeCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchOriginCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchSimpleClusterNodeCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchSystemStatusCommandHandler
com.alibaba.csp.sentinel.command.handler.FetchTreeCommandHandler
com.alibaba.csp.sentinel.command.handler.ModifyRulesCommandHandler
com.alibaba.csp.sentinel.command.handler.OnOffGetCommandHandler
com.alibaba.csp.sentinel.command.handler.OnOffSetCommandHandler
com.alibaba.csp.sentinel.command.handler.SendMetricCommandHandler
com.alibaba.csp.sentinel.command.handler.VersionCommandHandler
  • 这里在META-INF的services目录,使用SPI机制,在com.alibaba.csp.sentinel.command.CommandHandler文件下记录了各种实现

小结

  • sentinel的transport部分,有一个common基础包,然后就是simple-http以及netty的实现。
  • simple-http采用的是bio外加工作线程池模式,来一个请求,往线程池丢一个HttpEventTask。
  • HttpEventTask解析请求的命令然后获取相应的handler执行返回
  • commandHandler采用的是SPI的模式进行动态加载

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容