基本概念
ApplicationConfiguration 应用配置信息
- 定义模块的ModuleConfiguration
- ModuleConfiguration定义ModuleProvider的ProviderConfiguration
ModuleManager 模块管理器
- 负责管理服务内模块实例
- 持有全部loaded的ModuleDefine信息
ModuleDefine 模块信息
- 模块的名称定义
- 关联ModuleProvider提供者
- 定义需要的Service接口类信息Class[]
ModuleProvider 提供者
- 负责模块具体功能的运行
- 关联ModuleDefine
- 提供ModuleConfig信息
- 定义依赖的模块名称信息String[]
- 通过prepare注册需要的Service接口实例
- 拥有ModuleManager信息
Service
抽象设计的接口类
启动的过程
从OAPServerStartUp
代码上了解sw的代码风格和设计
public class OAPServerStartUp {
private static final Logger logger = LoggerFactory.getLogger(OAPServerStartUp.class);
public static void main(String[] args) {
String mode = System.getProperty("mode");
RunningMode.setMode(mode);
ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
ModuleManager manager = new ModuleManager();
try {
// 配置信息的解析,读取application.yml的配置信息
ApplicationConfiguration applicationConfiguration = configLoader.load();
// 关键步骤 容器初始化,各模块的启动
manager.init(applicationConfiguration);
// 添加启动时间的监控信息
manager.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class).createGauge("uptime",
"oap server start up time", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE)
// Set uptime to second
.setValue(System.currentTimeMillis() / 1000d);
if (RunningMode.isInitMode()) {
logger.info("OAP starts up in init mode successfully, exit now...");
System.exit(0);
}
} catch (Throwable t) {
logger.error(t.getMessage(), t);
System.exit(1);
}
}
}
main方法的代码比较简单就是加载配置文件,调用manager的init初始化
配置信息以coreModule为例说明
# ModuleConfiguration
core: # ModuleName
#PrividerConfiguration
default: #ProviderName
# Provider的Properties配置
role: ${SW_CORE_ROLE:Mixed}
restHost: ${SW_CORE_REST_HOST:0.0.0.0}
restPort: ${SW_CORE_REST_PORT:12800}
restContextPath: ${SW_CORE_REST_CONTEXT_PATH:/}
gRPCHost: ${SW_CORE_GRPC_HOST:0.0.0.0}
gRPCPort: ${SW_CORE_GRPC_PORT:11800}
downsampling:
- Hour
- Day
- Month
enableDataKeeperExecutor: ${SW_CORE_ENABLE_DATA_KEEPER_EXECUTOR:true}
dataKeeperExecutePeriod: ${SW_CORE_DATA_KEEPER_EXECUTE_PERIOD:5}
recordDataTTL: ${SW_CORE_RECORD_DATA_TTL:90}
minuteMetricsDataTTL: ${SW_CORE_MINUTE_METRIC_DATA_TTL:90}
hourMetricsDataTTL: ${SW_CORE_HOUR_METRIC_DATA_TTL:36}
dayMetricsDataTTL: ${SW_CORE_DAY_METRIC_DATA_TTL:45}
monthMetricsDataTTL: ${SW_CORE_MONTH_METRIC_DATA_TTL:18}
enableDatabaseSession: ${SW_CORE_ENABLE_DATABASE_SESSION:true}
继续跟踪manager.init
方法
public void init(
ApplicationConfiguration applicationConfiguration) throws ModuleNotFoundException, ProviderNotFoundException, ServiceNotProvidedException, CycleDependencyException, ModuleConfigException, ModuleStartException {
String[] moduleNames = applicationConfiguration.moduleList();
// 1. SPI机制查找所有模块实现类 ModuleDefine
ServiceLoader<ModuleDefine> moduleServiceLoader = ServiceLoader.load(ModuleDefine.class);
// SPI机制查实模块提供者实现类 ModuleProvider
ServiceLoader<ModuleProvider> moduleProviderLoader = ServiceLoader.load(ModuleProvider.class);
// 配置文件定义的模块
LinkedList<String> moduleList = new LinkedList<>(Arrays.asList(moduleNames));
// 检查配置文件中定义的模块,系统是是否存在
for (ModuleDefine module : moduleServiceLoader) {
for (String moduleName : moduleNames) {
if (moduleName.equals(module.name())) {
ModuleDefine newInstance;
try {
// 反射创建实例
newInstance = module.getClass().newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new ModuleNotFoundException(e);
}
// 2. 模块准备流程
newInstance.prepare(this, applicationConfiguration.getModuleConfiguration(moduleName), moduleProviderLoader);
loadedModules.put(moduleName, newInstance);
moduleList.remove(moduleName);
}
}
}
// Finish prepare stage
isInPrepareStage = false;
// 存在不知名module配置则抛异常
if (moduleList.size() > 0) {
throw new ModuleNotFoundException(moduleList.toString() + " missing.");
}
// 3. 模块加载完成 启动服务
BootstrapFlow bootstrapFlow = new BootstrapFlow(loadedModules);
bootstrapFlow.start(this);
bootstrapFlow.notifyAfterCompleted();
}
通过分析上面标注了信息的步骤来了解代码流程
1. SPI机制加载模块信息
通过JavaSPI机制加载META-INF/services
目录下定义的类和实例
以CoreModule为例可以看到META-INF/services
目录下定义的
org.apache.skywalking.oap.server.library.module.ModuleDefine
org.apache.skywalking.oap.server.library.module.ModuleProvider
文件
ModuleDefine
org.apache.skywalking.oap.server.core.storage.StorageModule
org.apache.skywalking.oap.server.core.cluster.ClusterModule
org.apache.skywalking.oap.server.core.CoreModule
org.apache.skywalking.oap.server.core.query.QueryModule
org.apache.skywalking.oap.server.core.alarm.AlarmModule
org.apache.skywalking.oap.server.core.exporter.ExporterModule
ModuleProvider
org.apache.skywalking.oap.server.core.CoreModuleProvider
加载完成后进行遍历,通过反射创建对象实例
2. ModuleDefine的prepare方法
void prepare(ModuleManager moduleManager, ApplicationConfiguration.ModuleConfiguration configuration,
ServiceLoader<ModuleProvider> moduleProviderLoader) throws ProviderNotFoundException, ServiceNotProvidedException, ModuleConfigException, ModuleStartException {
// 遍历provider
for (ModuleProvider provider : moduleProviderLoader) {
// 配置中包含provider才处理
if (!configuration.has(provider.name())) {
continue;
}
// 判断Provider定义的Module是否 当前的ModuleDefine相同。建立一对一的关系
// 这里避免多个provider关联同一个Module实例
if (provider.module().equals(getClass())) {
if (loadedProvider == null) {
loadedProvider = provider;
loadedProvider.setManager(moduleManager);
loadedProvider.setModuleDefine(this);
} else {
// ModuleName + " module has one " + ProviderName + "[" + ProviderClass + "] provider already, "
throw new DuplicateProviderException(this.name() + " module has one " + loadedProvider.name() + "[" + loadedProvider.getClass().getName() + "] provider already, "
+ provider.name() + "[" + provider.getClass().getName() + "] is defined as 2nd provider.");
}
}
}
if (loadedProvider == null) {
throw new ProviderNotFoundException(this.name() + " module no provider exists.");
}
logger.info("Prepare the {} provider in {} module.", loadedProvider.name(), this.name());
try {
// 加载ModuleConfig配置,由provider管理
copyProperties(loadedProvider.createConfigBeanIfAbsent(), configuration.getProviderConfiguration(loadedProvider.name()), this.name(), loadedProvider.name());
} catch (IllegalAccessException e) {
throw new ModuleConfigException(this.name() + " module config transport to config bean failure.", e);
}
// provider准备阶段
loadedProvider.prepare();
}
ModuleDefine
的准备阶段主要工作是关联一个Provider,有且必须只能关联到一个provider,否则报错。建立完关联关系后加载配置信息到Provider,再执行Provider的准备阶段。
继续跟踪Provider的prepare。ModuleProvider的prepare方法是抽象方法,由具体子类实现。以CoreModuleProvider为例
@Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
// 暂时不展开,省略代码
this.registerServiceImplementation(ConfigService.class, new ConfigService(moduleConfig));
this.registerServiceImplementation(DownsamplingConfigService.class, new DownsamplingConfigService(moduleConfig.getDownsampling()));
this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
this.registerServiceImplementation(JettyHandlerRegister.class, new JettyHandlerRegisterImpl(jettyServer));
this.registerServiceImplementation(IComponentLibraryCatalogService.class, new ComponentLibraryCatalogService());
this.registerServiceImplementation(SourceReceiver.class, receiver);
WorkerInstancesService instancesService = new WorkerInstancesService();
this.registerServiceImplementation(IWorkerInstanceGetter.class, instancesService);
this.registerServiceImplementation(IWorkerInstanceSetter.class, instancesService);
this.registerServiceImplementation(RemoteSenderService.class, new RemoteSenderService(getManager()));
this.registerServiceImplementation(IModelSetter.class, storageModels);
this.registerServiceImplementation(IModelGetter.class, storageModels);
this.registerServiceImplementation(IModelOverride.class, storageModels);
this.registerServiceImplementation(ServiceInventoryCache.class, new ServiceInventoryCache(getManager()));
this.registerServiceImplementation(IServiceInventoryRegister.class, new ServiceInventoryRegister(getManager()));
this.registerServiceImplementation(ServiceInstanceInventoryCache.class, new ServiceInstanceInventoryCache(getManager()));
this.registerServiceImplementation(IServiceInstanceInventoryRegister.class, new ServiceInstanceInventoryRegister(getManager()));
this.registerServiceImplementation(EndpointInventoryCache.class, new EndpointInventoryCache(getManager()));
this.registerServiceImplementation(IEndpointInventoryRegister.class, new EndpointInventoryRegister(getManager()));
this.registerServiceImplementation(NetworkAddressInventoryCache.class, new NetworkAddressInventoryCache(getManager()));
this.registerServiceImplementation(INetworkAddressInventoryRegister.class, new NetworkAddressInventoryRegister(getManager()));
this.registerServiceImplementation(TopologyQueryService.class, new TopologyQueryService(getManager()));
this.registerServiceImplementation(MetricQueryService.class, new MetricQueryService(getManager()));
this.registerServiceImplementation(TraceQueryService.class, new TraceQueryService(getManager()));
this.registerServiceImplementation(LogQueryService.class, new LogQueryService(getManager()));
this.registerServiceImplementation(MetadataQueryService.class, new MetadataQueryService(getManager()));
this.registerServiceImplementation(AggregationQueryService.class, new AggregationQueryService(getManager()));
this.registerServiceImplementation(AlarmQueryService.class, new AlarmQueryService(getManager()));
this.registerServiceImplementation(TopNRecordsQueryService.class, new TopNRecordsQueryService(getManager()));
this.registerServiceImplementation(CommandService.class, new CommandService(getManager()));
this.registerServiceImplementation(RemoteClientManager.class, remoteClientManager);
}
先不展开讲CoreModuleProvider的具体功能,对比其他Provider,prepare都执行了 this.registerServiceImplementation()方法用以注册Provider需要的Service实例。
当所有ModuleDefine执行完prepare,准备阶段结束。
3. BootstrapFlow的运行
BootstrapFlow的创建,关键看makeSequence
方法
BootstrapFlow(Map<String, ModuleDefine> loadedModules) throws CycleDependencyException {
this.loadedModules = loadedModules;
startupSequence = new LinkedList<>();
makeSequence();
}
makeSequence方法,实现的功能是通过自旋判断Provider需要的Modules是否已经准备完成,当出现循环依赖是抛出异常
private void makeSequence() throws CycleDependencyException {
List<ModuleProvider> allProviders = new ArrayList<>();
// 创建待启动提供者集合
loadedModules.forEach((moduleName, module) -> allProviders.add(module.provider()));
// 检测提供者模块依赖模块启动
do {
int numOfToBeSequenced = allProviders.size();
for (int i = 0; i < allProviders.size(); i++) {
ModuleProvider provider = allProviders.get(i);
// 模块提供者配置了依赖模块
String[] requiredModules = provider.requiredModules();
if (CollectionUtils.isNotEmpty(requiredModules)) {
boolean isAllRequiredModuleStarted = true;
for (String module : requiredModules) {
// find module in all ready existed startupSequence
boolean exist = false;
for (ModuleProvider moduleProvider : startupSequence) {
if (moduleProvider.getModuleName().equals(module)) {
exist = true;
break;
}
}
// 依赖模块未启动 跳出循环 下次再检测
if (!exist) {
isAllRequiredModuleStarted = false;
break;
}
}
if (isAllRequiredModuleStarted) {
startupSequence.add(provider);
// 启动完成移除
allProviders.remove(i);
i--;
}
} else {
// 无依赖 启动完成
startupSequence.add(provider);
allProviders.remove(i);
i--;
}
}
// 循环依赖检测 MA依赖MB,MB依赖MA,则allProviders永远无法remove,出现循环
if (numOfToBeSequenced == allProviders.size()) {
StringBuilder unSequencedProviders = new StringBuilder();
allProviders.forEach(provider -> unSequencedProviders.append(provider.getModuleName()).append("[provider=").append(provider.getClass().getName()).append("]\n"));
throw new CycleDependencyException("Exist cycle module dependencies in \n" + unSequencedProviders.substring(0, unSequencedProviders.length() - 1));
}
}
while (allProviders.size() != 0);
}
再看BootstFlow.start(),这里负责的是Provider的启动阶段,并做了module和provider中service关系的校验
void start(
ModuleManager moduleManager) throws ModuleNotFoundException, ServiceNotProvidedException, ModuleStartException {
for (ModuleProvider provider : startupSequence) {
String[] requiredModules = provider.requiredModules();
if (requiredModules != null) {
for (String module : requiredModules) {
if (!moduleManager.has(module)) {
throw new ModuleNotFoundException(module + " is required by " + provider.getModuleName()
+ "." + provider.name() + ", but not found.");
}
}
}
logger.info("start the provider {} in {} module.", provider.name(), provider.getModuleName());
// 判断关联的ModuleDefine定义的services class[] 实例是否已经注册进来
provider.requiredCheck(provider.getModule().services());
// 启动
provider.start();
}
}
provider的start是抽象方法,由具体实现类完成。
BootstrapFlow
执行完成start后,调用notifyAfterCompleted,遍历启动完成的provider执行notifyAfterCompleted方法,用于启动完成的回调。同样notifyAfterCompleted也是抽象方法,由具体实现类实现。
至此OAP Server启动流程完成。
可通过流程图辅助了解
文章基于6.4.0版本
有些术语凭自己理解定义可能不太严谨,欢迎指正。