Elasticsearch源码分析-启动过程浅析

1、启动命令及启动类

首先,来看一下启动elasticsearch的Java命令,其中es.pidfile是pid文件路径,es.path.home是es的安装目录,logs、data、work和conf分别是存储日志、数据、工作和配置的目录。

${JAVA_HOME}/bin/java  \
            -Des.pidfile=/path/xxx.pid \
            -Des.default.path.home=/path/xxx \
            -Des.default.path.logs=/path/logs \
            -Des.default.path.data=/path/data \
            -Des.default.path.work=/path/work \
            -Des.default.path.conf=/path/config \
            -Des.path.home=/path/xxx \
            -cp :/path/xxx.jar \
            org.elasticsearch.bootstrap.Elasticsearch

elasticsearch启动类有两个,分别是Elasticsearch和ElasticsearchF,其中F代表foreground,区别是在前台进程运行还是后台进程运行,以及日志是存储在日志文件中还是显示在控制台中,System.setProperty("es.foreground", "yes")用来指定foreground。
两个启动类最终都是调用Bootstrap的静态main方法来启动elasticsearch。

public class Elasticsearch extends Bootstrap {
    public static void close(String[] args) {
        Bootstrap.close(args);
    }
    public static void main(String[] args) {
        Bootstrap.main(args);
    }
}
public class ElasticsearchF {
    public static void close(String[] args) {
        Bootstrap.close(args);
    }
    public static void main(String[] args) {
        System.setProperty("es.foreground", "yes");
        Bootstrap.main(args);
    }
}

2、环境初始化

在Bootstrap的main中,首先根据es.pidfile或者es-pidfile获取pid文件路径,并将运行当前elasticsearch的jvm进程号写入pid文件中,并调用fPidFile.deleteOnExit()在jvm进程结束时删除pid文件。

public class Bootstrap {
    public static void main(String[] args) {
        // pid文件路径, 启动参数 -Des.pidfile=/opt/elasticsearch-1.6.0/run/elasticsearch.pid
        final String pidFile = System.getProperty("es.pidfile", System.getProperty("es-pidfile"));
        if (pidFile != null) {
            try {
                File fPidFile = new File(pidFile);
                // 将jvm进程号写入fPidFile文件
                // ...

                // 当虚拟机terminate时,删除pid文件
                fPidFile.deleteOnExit();
            } catch (Exception e) {
                // ...
            }
        }
    }
}

接下来使用initialSettings()加载环境变量和配置文件,主要的逻辑在InternalSettingsPreparer.prepareSettings()中

public class Bootstrap {
    public static void main(String[] args) {
        // ...
        Tuple<Settings, Environment> tuple = null;
        try {
            tuple = initialSettings(foreground);
            setupLogging(tuple);
        }catch (Exception e) {
            // ...
        }
        // ...
    }
}

大致的流程如下:
①首先从System.getProperties()加载前缀为elasticsearch.和es.的变量

public class InternalSettingsPreparer {
    public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
        // ...
        // just create enough settings to build the environment
        ImmutableSettings.Builder settingsBuilder = settingsBuilder().put(pSettings);
        if (useSystemProperties) { // 优先加载default系统属性
            settingsBuilder.putProperties("elasticsearch.default.", System.getProperties())
                    .putProperties("es.default.", System.getProperties())
                    .putProperties("elasticsearch.", System.getProperties(), ignorePrefixes) // 加载相同前缀的系统属性,但忽略es.default.和elasticsearch.default.前缀
                    .putProperties("es.", System.getProperties(), ignorePrefixes);
        }
        settingsBuilder.replacePropertyPlaceholders();
        // 获取环境变量,包括path.home=home、path.data=data、path.logs=logs、path.conf=config和path.work=work
        Environment environment = new Environment(settingsBuilder.build()); //如果path.conf为空,则为path.home(default:user.dir)/config
        // ...
    }
}

②然后初始化Environment,主要是设置elasticsearch的path.home、path.conf、path.plugins、path.work、path.data、path.repo和path.logs变量,如果path.home没有设置,则置为
System.getProperty("user.dir");如果其他变量为空,则为path.home下面对应的目录(conf为{path.home}/config,data为{path.home}/data/集群名)

public class Environment {
    public Environment(Settings settings) {
        this.settings = settings;
        if (settings.get("path.home") != null) {
            homeFile = new File(cleanPath(settings.get("path.home")));
        } else {
            homeFile = new File(System.getProperty("user.dir"));
        }
        if (settings.get("path.conf") != null) {
            configFile = new File(cleanPath(settings.get("path.conf")));
        } else {
            configFile = new File(homeFile, "config");
        }
        // ......
    }
}

③按顺序加载es.default.config、es.config和elasticsearch.config变量对应的配置文件,若为空,则忽略。当加载的配置文件不是es.config和elasticsearch.config变量对应的配置文件时,则继续加载{path.home}/config/elasticsearch.yml, .yaml, .json, .properties配置文件

public class InternalSettingsPreparer {
    public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
        // ...
        if (loadConfigSettings) {
            boolean loadFromEnv = true;
            if (useSystemProperties) {// 默认为true
                // if its default, then load it, but also load form env
                if (Strings.hasText(System.getProperty("es.default.config"))) { // 从系统属性中加载默认config配置
                    loadFromEnv = true;
                    settingsBuilder.loadFromUrl(environment.resolveConfig(System.getProperty("es.default.config")));
                }
                // ...
            }
            // 从es.default.config加载配置后需要从.yml, .yaml, .json, .properties中继续加载配置
            if (loadFromEnv) { 
                for (String allowedSuffix : ALLOWED_SUFFIXES) {
                    try {
                        // config目录下的elasticsearch.yml文件
                        settingsBuilder.loadFromUrl(environment.resolveConfig("elasticsearch" + allowedSuffix)); 
                    } catch (FailedToResolveConfigException e) {
                        // ignore
                    }
                }
            }
        }
        // ...
    }
}

④使用相同前缀的系统属性覆盖已设置的前缀为es.和elasticsearch.变量,并忽略前缀为es.default., elasticsearch.default.的变量

public class InternalSettingsPreparer {
    public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
        // ...
        settingsBuilder.put(pSettings);
        // 除了es.default., elasticsearch.default. ,使用相同前缀的系统属性覆盖settingsBuilder
        if (useSystemProperties) {
            settingsBuilder.putProperties("elasticsearch.", System.getProperties(), ignorePrefixes)
                    .putProperties("es.", System.getProperties(), ignorePrefixes);
        }
        settingsBuilder.replacePropertyPlaceholders();
    }
}

⑤如果配置文件中没有设置name,则从系统属性中读取,如果不为空则为节点名,若依然为空,则从config/names.txt随机选择一个字符串作为节点名

public class InternalSettingsPreparer {
    public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
        // ...
        if (settingsBuilder.get("name") == null) { 
            String name = System.getProperty("name");
            if (name != null) {
                settingsBuilder.put("name", name);
            }
        }
        Settings settings = replacePromptPlaceholders(settingsBuilder.build(), terminal);
        if (settings.get("name") == null) {
            final String name = settings.get("node.name");
            if (name == null || name.isEmpty()) {
                settings = settingsBuilder().put(settings)
                    .put("name", Names.randomNodeName(environment.resolveConfig("names.txt"))) 
                                .build();
            } else {
                settings = settingsBuilder().put(settings)
                    .put("name", name)
                    .build();
            }
        }
        // ...
    }
}

⑥如果没有设置集群名变量cluster.name,则设置为默认值"elasticsearch"

public class InternalSettingsPreparer {
    public static Tuple<Settings, Environment> prepareSettings(Settings pSettings, boolean loadConfigSettings, Terminal terminal) {
        // ...
        if (settingsBuilder.get(ClusterName.SETTING) == null) {
            settingsBuilder.put(ClusterName.SETTING, ClusterName.DEFAULT.value());
        }
        // ...
    }
}

3、集群设置及启动

设置完环境变量,开始使用bootstrap.setup(true, tuple)进行集群的初始化,并使用bootstrap.start()启动集群

public class Bootstrap {
    public static void main(String[] args) {
        // ...
        try {
            if (!foreground) {
                Loggers.disableConsoleLogging();
                System.out.close();
            }
            // fail if using broken version
            JVMCheck.check();
            bootstrap.setup(true, tuple);
            stage = "Startup";
            bootstrap.start();
        } catch (Throwable e) {

        }
        // ...
    }
}

在初始化集群前,进行jvm检验,出现以下二者之一情况将会抛异常并终止启动过程:
①JVM供应商为IBM Corporation
②JVM供应商为Oracle Corporation,版本为21.0-b17、24.0-b56、24.45-b08和24.51-b03,且运行时没有加对应的-XX:-UseLoopPredicate或者-XX:-UseSuperWord参数

public class JVMCheck {
    static final Map<String,HotspotBug> JVM_BROKEN_HOTSPOT_VERSIONS;
    
    static {
        Map<String,HotspotBug> bugs = new HashMap<>();
        
        // 1.7.0: loop optimizer bug
        bugs.put("21.0-b17",  new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-7070134", "-XX:-UseLoopPredicate"));
        // register allocation issues (technically only x86/amd64). This impacted update 40, 45, and 51
        bugs.put("24.0-b56",  new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-8024830", "-XX:-UseSuperWord"));
        bugs.put("24.45-b08", new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-8024830", "-XX:-UseSuperWord"));
        bugs.put("24.51-b03", new HotspotBug("https://bugs.openjdk.java.net/browse/JDK-8024830", "-XX:-UseSuperWord"));
        
        JVM_BROKEN_HOTSPOT_VERSIONS = Collections.unmodifiableMap(bugs);
    }
    static void check() {
        if (Boolean.parseBoolean(System.getProperty(JVM_BYPASS))) {
            // ...
        } else if ("Oracle Corporation".equals(Constants.JVM_VENDOR)) {
            HotspotBug bug = JVM_BROKEN_HOTSPOT_VERSIONS.get(Constants.JVM_VERSION);
            if (bug != null) { // wordAround为-XX:-UseLoopPredicate或者-XX:-UseSuperWord
                if (bug.workAround != null && ManagementFactory.getRuntimeMXBean().getInputArguments().contains(bug.workAround)) {
                    Loggers.getLogger(JVMCheck.class).warn(bug.getWarningMessage());
                } else {
                    throw new RuntimeException(bug.getErrorMessage());
                }
            }
        } else if ("IBM Corporation".equals(Constants.JVM_VENDOR)) {
            // currently any JVM from IBM will easily result in index corruption.
            // ...
            throw new RuntimeException(sb.toString());
        }
    }
}

若JVM可用,则执行bootstrap.setup()进入集群初始化阶段,主要使用InternalNode类的构造方法创建节点对象node。
最后添加钩子方法,在虚拟机结束前执行node.close()关闭node

public class Bootstrap {
    private void setup(boolean addShutdownHook, Tuple<Settings, Environment> tuple) throws Exception {
        // ...
        Settings nodeSettings = ImmutableSettings.settingsBuilder()
                .put(settings)
                .put(InternalSettingsPreparer.IGNORE_SYSTEM_PROPERTIES_SETTING, true)
                .build();
        NodeBuilder nodeBuilder = NodeBuilder.nodeBuilder().settings(nodeSettings).loadConfigSettings(false);
        node = nodeBuilder.build();
        if (addShutdownHook) {
            Runtime.getRuntime().addShutdownHook(new Thread() { //虚拟机关闭前执行
                @Override
                public void run() {
                    node.close();
                }
            });
        }
    }
}

构造node对象时,主要流程是创建nodeEnvironment,并执行modules.add()方法添加elasticsearch各部分模块,在添加模块时,会执行模块对应的module.spawnModules(),最后创建注入对象,执行每个模块的configure()方法,将实现和接口进行绑定。

public final class InternalNode implements Node {
    public InternalNode(Settings preparedSettings, boolean loadConfigSettings) throws ElasticsearchException {
        // ...
        final NodeEnvironment nodeEnvironment;
        try {
            nodeEnvironment = new NodeEnvironment(this.settings, this.environment);
        } catch (IOException ex) {
            throw new ElasticsearchIllegalStateException("Failed to created node environment", ex);
        }
        boolean success = false;
        try {
            ModulesBuilder modules = new ModulesBuilder();
            modules.add(new NodeEnvironmentModule(nodeEnvironment));
            modules.add(new DiscoveryModule(settings));
            //...
            // 创建injector完成注入
            injector = modules.createInjector();
            //获取Client的绑定实现
            client = injector.getInstance(Client.class);
            threadPool.setNodeSettingsService(injector.getInstance(NodeSettingsService.class));
            success = true;
        }finally {
            if (!success) {
                nodeEnvironment.close();
                ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
            }
        }
        logger.info("initialized");
    }
}

对Client类进行绑定

public class NodeClientModule extends AbstractModule {

    @Override
    protected void configure() {
        // ...
        bind(Client.class).to(NodeClient.class).asEagerSingleton();
    }
}

以DiscoveryModule模块为例,用node.local和node.mode参数控制是本地发现还是Zen发现,然后初始化对应Discovery模块

public class DiscoveryModule extends AbstractModule implements SpawnModules {
    @Override
    public Iterable<? extends Module> spawnModules() {
        Class<? extends Module> defaultDiscoveryModule;
        if (DiscoveryNode.localNode(settings)) {
            defaultDiscoveryModule = LocalDiscoveryModule.class;
        } else {
            defaultDiscoveryModule = ZenDiscoveryModule.class;
        }
        return ImmutableList.of(Modules.createModule(settings.getAsClass(DISCOVERY_TYPE_KEY, defaultDiscoveryModule, "org.elasticsearch.discovery.",                 "DiscoveryModule"), settings));
    }
    @Override
    protected void configure() {
        bind(DiscoveryService.class).asEagerSingleton();
    }
}

节点发现的两种方式Local和Network,分别对应LocalDiscovery和ZenDiscovery

public class DiscoveryNode implements Streamable, Serializable {
    public static boolean localNode(Settings settings) {
        if (settings.get("node.local") != null) {
            return settings.getAsBoolean("node.local", false);
        }
        if (settings.get("node.mode") != null) {
            String nodeMode = settings.get("node.mode");
            if ("local".equals(nodeMode)) {
                return true;
            } else if ("network".equals(nodeMode)) {
                return false;
            } else {
                throw new ElasticsearchIllegalArgumentException("unsupported node.mode [" + nodeMode + "]. Should be one of [local, network].");
            }
        }
        return false;
    }
}

在完成节点初始化后,调用bootstrap.start()来启动节点,其实是调用的node.start(),与钩子函数的node.close()相对应
启动节点的过程,其实是各个模块的启动过程,调用各模块的start方法

public final class InternalNode implements Node {
    public Node start() {
        if (!lifecycle.moveToStarted()) {
            return this;
        }
        // 开启Tcp服务
        injector.getInstance(TransportService.class).start();
    
        // 节点发现及master选举
        DiscoveryService discoService = injector.getInstance(DiscoveryService.class).start(); // ZenDiscovery.doStart
        discoService.waitForInitialState();
        
        // 应该在DiscoveryService启动之后,开启网关服务
        injector.getInstance(GatewayService.class).start();

        // 开启Http服务
        if (settings.getAsBoolean("http.enabled", true)) {
            injector.getInstance(HttpServer.class).start();
        }
    }
}

4、设置keepAliveThread

node完成启动后,创建用户线程keepAliveThread,值为1,并添加一个钩子方法,在JVM关闭前执行countDown()。
然后继续创建用户线程keepAliveThread,在keepAliveLatch执行countDown()之前一直阻塞,以此来保证elasticsearch一直存活

public class Bootstrap {
    public static void main(String[] args) {
        keepAliveLatch = new CountDownLatch(1);
        // keep this thread alive (non daemon thread) until we shutdown
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                keepAliveLatch.countDown();
            }
        });

        keepAliveThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    keepAliveLatch.await();
                } catch (InterruptedException e) {
                // bail out
                }
            }
        }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
        keepAliveThread.setDaemon(false);
        keepAliveThread.start();
    }
}

elasticsearch在启动过程中会注入TransportModule和HttpServerModule模块,并且在启动时会启动TransportService和HttpServer,最终都是通过Netty监听Http和Tcp消息,完成客户端请求处理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,923评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,154评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,775评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,960评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,976评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,972评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,893评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,709评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,159评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,400评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,552评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,265评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,876评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,528评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,701评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,552评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,451评论 2 352