目标
从建模到源码,梳理清楚罗根整个代码架构。
模块划分
罗根主要功能模块,其中 执行引擎 是分布式特性的主要模块。
Client(To be done):
Server入口:
添加的模块
try {
Injector injector = app.strictConfig().initialize();
logLocation(log, "Working directory", Paths.get("."));
logLocation(log, "Etc directory", Paths.get("etc"));
injector.getInstance(PluginManager.class).loadPlugins();
FileSystemClientManager fileSystemClientManager =
injector.getInstance(FileSystemClientManager.class);
fileSystemClientManager.loadFactoryConfigs();
injector.getInstance(HetuMetaStoreManager.class).loadHetuMetatstore(fileSystemClientManager);
injector.getInstance(HeuristicIndexerManager.class).buildIndexClient();
injector.getInstance(StaticCatalogStore.class).loadCatalogs();
injector.getInstance(DynamicCatalogScanner.class).start();
injector.getInstance(SessionPropertyDefaults.class).loadConfigurationManager();
injector.getInstance(ResourceGroupManager.class).loadConfigurationManager();
injector.getInstance(AccessControlManager.class).loadSystemAccessControl();
injector.getInstance(PasswordAuthenticatorManager.class).loadPasswordAuthenticator();
injector.getInstance(EventListenerManager.class).loadConfiguredEventListener();
// Seed Store
loadSeedStore(injector.getInstance(HetuConfig.class), injector.getInstance(SeedStoreManager.class));
// State Store
launchEmbeddedStateStore(injector.getInstance(HetuConfig.class), injector.getInstance(StateStoreLauncher.class));
injector.getInstance(StateStoreProvider.class).loadStateStore();
injector.getInstance(Announcer.class).start();
injector.getInstance(ServerInfoResource.class).startupComplete();
log.info("======== SERVER STARTED ========");
}
PluginManager:
-
Load all plugins from jar:
public void loadPlugins() throws Exception{
if (!pluginsLoading.compareAndSet(false, true)) {
return;
}
for (File file : listFiles(installedPluginsDir)) {
if (file.isDirectory()) {
loadPlugin(file.getCanonicalPath());
}
}
for (File file : listFiles(this.externalFunctionsPluginsDir)) {
if (file.isDirectory()) {
loadPlugin(file.getCanonicalPath(), true);
}
}
for (String plugin : plugins) {
loadPlugin(plugin);
}
metadataManager.verifyComparableOrderableContract();
pluginsLoaded.set(true);
}
-
Install all plugins:
private void loadPlugin(URLClassLoader pluginClassLoader, boolean onlyInstallFunctionsPlugin)
{
ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
List<Plugin> plugins = ImmutableList.copyOf(serviceLoader);
checkState(!plugins.isEmpty(), "No service providers of type %s", Plugin.class.getName());
for (Plugin plugin : plugins) {
String name = plugin.getClass().getName();
log.info("Installing %s", name);
if (onlyInstallFunctionsPlugin) {
** installFunctionsPlugin(plugin);**
return;
}
if (HIVE_FUNCTIONS_PLUGIN.equals(name)) {
plugin.setExternalFunctionsDir(this.config.getExternalFunctionsDir());
plugin.setMaxFunctionRunningTimeEnable(this.config.getMaxFunctionRunningTimeEnable());
plugin.setMaxFunctionRunningTimeInSec(this.config.getMaxFunctionRunningTimeInSec());
plugin.setFunctionRunningThreadPoolSize(this.config.getFunctionRunningThreadPoolSize());
}
**installPlugin(plugin);**
}
}
public void installPlugin(Plugin plugin)
{
for (BlockEncoding blockEncoding : plugin.getBlockEncodings()) {
log.info("Registering block encoding %s", blockEncoding.getName());
metadataManager.addBlockEncoding(blockEncoding);
}
for (Type type : plugin.getTypes()) {
log.info("Registering type %s", type.getTypeSignature());
metadataManager.addType(type);
}
for (ParametricType parametricType : plugin.getParametricTypes()) {
log.info("Registering parametric type %s", parametricType.getName());
metadataManager.addParametricType(parametricType);
}
for (ConnectorFactory connectorFactory : plugin.getConnectorFactories()) {
log.info("Registering connector %s", connectorFactory.getName());
connectorManager.addConnectorFactory(connectorFactory);
}
这里的Plugin是所有catalog类型和一些其他类型的插件的公共接口:
FileSystemClientManager:
读取预先定义在etc filesystem中的配置,用于操作不同的文件系统。
支持HDFS和local
HetuFileSystemClient负责提供统一的操作接口。
HetuMetaStoreManager:
一个 HetuMetastore 类负责实现对元数据的所有操作(create/drop/alter catalog/schema/table)
public void loadHetuMetatstore(FileSystemClientManager fileSystemClientManager)
throws Exception
{
LOG.info("-- Loading Hetu Metastore --");
if (HETUMETASTORE_CONFIG_FILE.exists()) {
// load configuration
Map<String, String> config = new HashMap<>(loadProperties(HETUMETASTORE_CONFIG_FILE));
// create hetu metastore
hetuMetastoreType = config.getOrDefault(HETU_METASTORE_TYPE_PROPERTY_NAME, HETU_METASTORE_TYPE_DEFAULT_VALUE);
config.remove(HETU_METASTORE_TYPE_PROPERTY_NAME);
HetuMetaStoreFactory hetuMetaStoreFactory = hetuMetastoreFactories.get(hetuMetastoreType);
checkState(hetuMetaStoreFactory != null, "hetuMetaStoreFactory %s is not registered", hetuMetaStoreFactory);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(HetuMetaStoreFactory.class.getClassLoader())) {
HetuFileSystemClient client = null;
if (HETU_METASTORE_TYPE_HETU_FILE_SYSTEM.equals(hetuMetastoreType)) {
String profileName = config.get(HETU_METASTORE_HETU_FILE_SYSTEM_PROFILE_NAME);
// get system client according to config**
client = fileSystemClientManager.getFileSystemClient(profileName, Paths.get("/"));
}
// produce a way to store metadata.
hetuMetastore = hetuMetaStoreFactory.create(hetuMetastoreType, ImmutableMap.copyOf(config), client);
}
LOG.info("-- Loaded Hetu Metastore %s --", hetuMetastoreType);
}
HeuristicIndexerManager:(先跳过)
StaticCatalogStore:
这个构造函数没找到调用的地方,里面应该有个注册创建的机制把全局的connectorManager catalogConfig传进来。
public void loadCatalogs()
throws Exception
{
if (!catalogsLoading.compareAndSet(false, true)) {
return;
}
for (File file : listFiles(catalogConfigurationDir)) {
if (file.isFile() && file.getName().endsWith(".properties")) {
** loadCatalog(file);**
}
}
// For dynamic catalog, we will put the catalog properties files in the dynamic directory.
// when setup, we need to load these files.
File dynamicCatalogsDir = getCatalogBasePath(catalogConfigurationDir.getPath()).toFile();
for (File file : listFiles(dynamicCatalogsDir)) {
if (file.isFile() && file.getName().endsWith(".properties")) {
** loadCatalog(file);**
}
}
connectorManager.updateConnectorIds();
}
private void loadCatalog(File file)
throws Exception
{
String catalogName = Files.getNameWithoutExtension(file.getName());
if (disabledCatalogs.contains(catalogName)) {
log.info("Skipping disabled catalog %s", catalogName);
return;
}
log.info("-- Loading catalog %s --", file);
Map<String, String> properties = new HashMap<>(loadProperties(file));
catalogStoreUtil.decryptEncryptedProperties(catalogName, properties);
String connectorName = properties.remove("connector.name");
checkState(connectorName != null, "Catalog configuration %s does not contain connector.name", file.getAbsoluteFile());
connectorManager.createConnection(catalogName, connectorName, ImmutableMap.copyOf(properties));
log.info("-- Added catalog %s using connector %s --", catalogName, connectorName);
}
这里开始创建connector, 之前在PluginManager类中已经将所有connectorFactory加载到了进程中,这里用配置文件里的connector.name来索引相应的connectorFactory。(创建Connector的代码还挺厚的,后面有时间再细看)
DynamicCatalogScanner:
单独启一个线程来观察是否有动态添加的catalog配置
SessionPropertyDefaults:
ResourceGroupManager:
AccessControlManager:
PasswordAuthenticatorManager:
EventListenerManager:
HetuConfig:
SeedStoreManager:
StateStoreLauncher:
StateStoreProvider:
State store is used to store states that are shared between coordinators and workers.
public void loadStateStore()
throws Exception
{
// if config file exists
if (STATE_STORE_CONFIGURATION.exists()) {
Map<String, String> properties = new HashMap<>(loadProperties(STATE_STORE_CONFIGURATION));
// get store type,only support hazelcast
String stateStoreType = properties.remove(STATE_STORE_TYPE_PROPERTY_NAME);
setStateStore(stateStoreType, properties);
createStateCollections();
}
else {
log.info("No configuration file found, skip loading state store client");
}
}