openLooKeng源码分析(keep updating)

目标

从建模到源码,梳理清楚罗根整个代码架构。

模块划分

罗根主要功能模块,其中 执行引擎 是分布式特性的主要模块。

image.png

Client(To be done):

Server入口:

image.png

添加的模块

try {            
            Injector injector = app.strictConfig().initialize();
            logLocation(log, "Working directory", Paths.get("."));
            logLocation(log, "Etc directory", Paths.get("etc"));
            injector.getInstance(PluginManager.class).loadPlugins();
            FileSystemClientManager fileSystemClientManager = 
            injector.getInstance(FileSystemClientManager.class);
            fileSystemClientManager.loadFactoryConfigs();
            injector.getInstance(HetuMetaStoreManager.class).loadHetuMetatstore(fileSystemClientManager);
            injector.getInstance(HeuristicIndexerManager.class).buildIndexClient();
            injector.getInstance(StaticCatalogStore.class).loadCatalogs();
            injector.getInstance(DynamicCatalogScanner.class).start();
            injector.getInstance(SessionPropertyDefaults.class).loadConfigurationManager();
            injector.getInstance(ResourceGroupManager.class).loadConfigurationManager();
            injector.getInstance(AccessControlManager.class).loadSystemAccessControl();
            injector.getInstance(PasswordAuthenticatorManager.class).loadPasswordAuthenticator();
            injector.getInstance(EventListenerManager.class).loadConfiguredEventListener();
            // Seed Store
            loadSeedStore(injector.getInstance(HetuConfig.class), injector.getInstance(SeedStoreManager.class));
            // State Store
            launchEmbeddedStateStore(injector.getInstance(HetuConfig.class), injector.getInstance(StateStoreLauncher.class));
            injector.getInstance(StateStoreProvider.class).loadStateStore();
            injector.getInstance(Announcer.class).start();
            injector.getInstance(ServerInfoResource.class).startupComplete();
            log.info("======== SERVER STARTED ========");
        }

PluginManager:

image.png
  • Load all plugins from jar:
public void loadPlugins() throws Exception{

if (!pluginsLoading.compareAndSet(false, true)) {

return;

}

 for (File file : listFiles(installedPluginsDir)) {

 if (file.isDirectory()) {

 loadPlugin(file.getCanonicalPath());

 }

 }

 for (File file : listFiles(this.externalFunctionsPluginsDir)) {

 if (file.isDirectory()) {

 loadPlugin(file.getCanonicalPath(), true);

 }

 }

 for (String plugin : plugins) {

     loadPlugin(plugin);

 }

 metadataManager.verifyComparableOrderableContract();

 pluginsLoaded.set(true);

}
  • Install all plugins:
private void loadPlugin(URLClassLoader pluginClassLoader, boolean onlyInstallFunctionsPlugin)

 {

 ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);

 List<Plugin> plugins = ImmutableList.copyOf(serviceLoader);

 checkState(!plugins.isEmpty(), "No service providers of type %s", Plugin.class.getName());

 for (Plugin plugin : plugins) {

 String name = plugin.getClass().getName();

 log.info("Installing %s", name);

 if (onlyInstallFunctionsPlugin) {

  ** installFunctionsPlugin(plugin);**

 return;

 }

 if (HIVE_FUNCTIONS_PLUGIN.equals(name)) {

 plugin.setExternalFunctionsDir(this.config.getExternalFunctionsDir());

 plugin.setMaxFunctionRunningTimeEnable(this.config.getMaxFunctionRunningTimeEnable());

 plugin.setMaxFunctionRunningTimeInSec(this.config.getMaxFunctionRunningTimeInSec());

 plugin.setFunctionRunningThreadPoolSize(this.config.getFunctionRunningThreadPoolSize());

 }

  **installPlugin(plugin);**

 }

 }

public void installPlugin(Plugin plugin)

 {

 for (BlockEncoding blockEncoding : plugin.getBlockEncodings()) {

 log.info("Registering block encoding %s", blockEncoding.getName());

 metadataManager.addBlockEncoding(blockEncoding);

 }

 for (Type type : plugin.getTypes()) {

 log.info("Registering type %s", type.getTypeSignature());

 metadataManager.addType(type);

 }

 for (ParametricType parametricType : plugin.getParametricTypes()) {

 log.info("Registering parametric type %s", parametricType.getName());

 metadataManager.addParametricType(parametricType);

 }

   for (ConnectorFactory connectorFactory : plugin.getConnectorFactories()) {

 log.info("Registering connector %s", connectorFactory.getName());

 connectorManager.addConnectorFactory(connectorFactory);

 }

这里的Plugin是所有catalog类型和一些其他类型的插件的公共接口:

image.png

FileSystemClientManager:

读取预先定义在etc filesystem中的配置,用于操作不同的文件系统。

image.png

支持HDFS和local

HetuFileSystemClient负责提供统一的操作接口。

HetuMetaStoreManager:

image.png

一个 HetuMetastore 类负责实现对元数据的所有操作(create/drop/alter catalog/schema/table)

public void loadHetuMetatstore(FileSystemClientManager fileSystemClientManager)

 throws Exception

 {

 LOG.info("-- Loading Hetu Metastore --");

 if (HETUMETASTORE_CONFIG_FILE.exists()) {

 // load configuration

 Map<String, String> config = new HashMap<>(loadProperties(HETUMETASTORE_CONFIG_FILE));

 // create hetu metastore

 hetuMetastoreType = config.getOrDefault(HETU_METASTORE_TYPE_PROPERTY_NAME, HETU_METASTORE_TYPE_DEFAULT_VALUE);

 config.remove(HETU_METASTORE_TYPE_PROPERTY_NAME);

 HetuMetaStoreFactory hetuMetaStoreFactory = hetuMetastoreFactories.get(hetuMetastoreType);

 checkState(hetuMetaStoreFactory != null, "hetuMetaStoreFactory %s is not registered", hetuMetaStoreFactory);

 try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(HetuMetaStoreFactory.class.getClassLoader())) {

 HetuFileSystemClient client = null;

 if (HETU_METASTORE_TYPE_HETU_FILE_SYSTEM.equals(hetuMetastoreType)) {

 String profileName = config.get(HETU_METASTORE_HETU_FILE_SYSTEM_PROFILE_NAME);

 // get system client according to config**

  client = fileSystemClientManager.getFileSystemClient(profileName, Paths.get("/"));                

 }

  // produce a way to store metadata.

  hetuMetastore = hetuMetaStoreFactory.create(hetuMetastoreType, ImmutableMap.copyOf(config), client);

 }

 LOG.info("-- Loaded Hetu Metastore %s --", hetuMetastoreType);

 }

HeuristicIndexerManager:(先跳过)

StaticCatalogStore:

image.png

这个构造函数没找到调用的地方,里面应该有个注册创建的机制把全局的connectorManager catalogConfig传进来。

public void loadCatalogs()

 throws Exception

 {

 if (!catalogsLoading.compareAndSet(false, true)) {

 return;

 }

 for (File file : listFiles(catalogConfigurationDir)) {

 if (file.isFile() && file.getName().endsWith(".properties")) {

  ** loadCatalog(file);**

 }

 }

 // For dynamic catalog, we will put the catalog properties files in the dynamic directory.

 // when setup, we need to load these files.

 File dynamicCatalogsDir = getCatalogBasePath(catalogConfigurationDir.getPath()).toFile();

 for (File file : listFiles(dynamicCatalogsDir)) {

 if (file.isFile() && file.getName().endsWith(".properties")) {

  ** loadCatalog(file);**

 }

 }

 connectorManager.updateConnectorIds();

 }
    private void loadCatalog(File file)
            throws Exception
    {
        String catalogName = Files.getNameWithoutExtension(file.getName());
        if (disabledCatalogs.contains(catalogName)) {
            log.info("Skipping disabled catalog %s", catalogName);
            return;
        }

        log.info("-- Loading catalog %s --", file);

        Map<String, String> properties = new HashMap<>(loadProperties(file));
        catalogStoreUtil.decryptEncryptedProperties(catalogName, properties);

        String connectorName = properties.remove("connector.name");
        checkState(connectorName != null, "Catalog configuration %s does not contain connector.name", file.getAbsoluteFile());

        connectorManager.createConnection(catalogName, connectorName, ImmutableMap.copyOf(properties));
        log.info("-- Added catalog %s using connector %s --", catalogName, connectorName);
    }

这里开始创建connector, 之前在PluginManager类中已经将所有connectorFactory加载到了进程中,这里用配置文件里的connector.name来索引相应的connectorFactory。(创建Connector的代码还挺厚的,后面有时间再细看)

DynamicCatalogScanner:

单独启一个线程来观察是否有动态添加的catalog配置

image.png

SessionPropertyDefaults:

ResourceGroupManager:

AccessControlManager:

PasswordAuthenticatorManager:

EventListenerManager:

HetuConfig:

SeedStoreManager:

StateStoreLauncher:

StateStoreProvider:

State store is used to store states that are shared between coordinators and workers.


image.png
public void loadStateStore()
            throws Exception
    {
        // if config file exists
        if (STATE_STORE_CONFIGURATION.exists()) {
            Map<String, String> properties = new HashMap<>(loadProperties(STATE_STORE_CONFIGURATION));
           // get store type,only support hazelcast
            String stateStoreType = properties.remove(STATE_STORE_TYPE_PROPERTY_NAME);
            setStateStore(stateStoreType, properties);
            createStateCollections();
        }
        else {
            log.info("No configuration file found, skip loading state store client");
        }
    }

Announcer:

ServerInfoResource:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352