模块
模块定义 ModuleDefine
ModuleDefine 模块定义需要包含以下信息
- 模块名称: 全局唯一
- 对外开放的API 服务列表, 需要实现
org.apache.skywalking.oap.server.library.module.Service
, 此 Service 仅为标识接口
例如 集群协调器模块定义如下
public class ClusterModule extends ModuleDefine {
public static final String NAME = "cluster";
public ClusterModule() {
// 定义模块名称
super(NAME);
}
@Override
public Class[] services() {
return new Class[] {
// 用于节点注册,
ClusterRegister.class,
// 用于节点查询
ClusterNodesQuery.class
};
}
}
public interface ClusterRegister extends Service {
void registerRemote(RemoteInstance remoteInstance) throws ServiceRegisterException;
}
public interface ClusterNodesQuery extends Service {
List<RemoteInstance> queryRemoteNodes();
}
模块实现 ModuleProvider
ModuleProvider 依附在 ModuleDefine 下, 一对一关系, 需要有以下实现
- 模块实现名称在模块下唯一
- 所属模块
- 相关配置
- 依赖模块
- 对外服务接口列表的实现类注册, 这块一般在 prepare 方法完成
- 模块实现还提供 start, notifyAfterCompleted 用于扩展
prepare 阶段无法调用其他模块
ClusterModule 的 zookeeper 实现如下
// 模块实现名称
@Override
public String name() {
return "zookeeper";
}
// 所属模块
@Override
public Class module() {
return ClusterModule.class;
}
// 模块配置
@Override
public ModuleConfig createConfigBeanIfAbsent() {
return config;
}
// 准备阶段
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().retryPolicy(retryPolicy)
.connectString(config.getHostPort());
...
try {
client.start();
client.blockUntilConnected();
serviceDiscovery.start();
coordinator = new ZookeeperCoordinator(getManager(), config, serviceDiscovery);
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
throw new ModuleStartException(e.getMessage(), e);
}
// 注册 ModuleDefine 中开放api服务列表
this.registerServiceImplementation(ClusterRegister.class, coordinator);
this.registerServiceImplementation(ClusterNodesQuery.class, coordinator);
}
// 启动阶段
@Override
public void start() {
}
// 完成
@Override
public void notifyAfterCompleted() {
}
// 依赖的模块, 这里是CORE
@Override
public String[] requiredModules() {
return new String[]{CoreModule.NAME};
}
模块配置
模块化内核从 config/application.yml 配置中驱动, 启动的模块和实现由此文件驱动和管理
OAP server 的主入口为: OAPServerBootstrap
public static void start() {
// 加载 application.yml 进行解析
ApplicationConfigLoader configLoader = new ApplicationConfigLoader();
ModuleManager manager = new ModuleManager();
try {
// 读取配置
ApplicationConfiguration applicationConfiguration = configLoader.load();
// 初始化各个 ModuleProvider
manager.init(applicationConfiguration);
...
}
ApplicationConfigLoader 的加载流程如下, 最终完成 ApplicationConfiguration 对象初始化
- 初始化 ApplicationConfiguration 对象
- 将 application.yml 转换为 ModuleName, Map<String, Object> 的结构
- 读取每个 ModuleName 下的 Map, 先读取到 selector 属性, 如果不存在就跳过此端配置
- 对 selector 属性值(例如: ${SW_CONFIGURATION:none} )处理, 按照 系统属性> 环境变量 > 默认值 的顺序提取变量值
- 移除 Map 中 key 值不为 selector 的属性, 如果为空则跳过这部分
- 此时结构为 ModuleName, Map<String, Object>(size = 1) 的Map, 遍历向 ApplicationConfiguration 中添加模块配置,添加过程中并对 ${xxx:xxx} 进行解析提取完成配置加载, 此处没有读取系统属性
- 读取系统属性对值进行覆盖处理, 使用的是 ApplicationConfigLoader#overrideConfigBySystemEnv
selector 值设置 - , 可以屏蔽模块, 例如:
selector: ${SW_HEALTH_CHECKER:-}
public class ApplicationConfiguration {
// 模块名, 模块配置的map
private HashMap<String, ModuleConfiguration> modules = new HashMap<>();
public static class ModuleConfiguration {
private HashMap<String, ProviderConfiguration> providers = new HashMap<>();
}
public static class ProviderConfiguration {
private Properties properties;
}
}
模块初始化
ApplicationConfiguration 对象初始化完成后, 进行模块初始化, 通过 ModuleManager#init 执行, 执行流程如下
- 加载所有的ModuleDefine
ServiceLoader.load(ModuleDefine.class)
进行遍历 - 判断 ModuleDefine.name 是否在 ApplicationConfiguration 配置中存在, 不存在跳过
- 存在时, 分别调用每个 ModuleDefine#prepare 进行初始化, 此处主要工作为配置初始化, 完成后放入 loadedModules 中
- 读取所有 ModuleProvider 实例, 找到和当前 ModuleDefine 匹配的 Provider
- 在 ModuleProvider 注入 ModuleDefine实例和 moduleManager 实例, 并将 provider 设置到当前 ModuleDefine 中
- 执行 ModuleProvider 配置初始化, 通过
ModuleDefine#copyProperties
方法 从 ApplicationConfiguration 中复制属性到当前 provider中
- 使用loadedModules 对 BootstrapFlow 进行初始化, BootstrapFlow 会对模块的依赖关系进行排序, 并且调用每个模块的 ModuleProvider#start
- 每个 provider start 之前会检查ModuleDefine 定义的服务类是否已经注册
- start完成后, 调用执行每个 ModuleProvider#notifyAfterCompleted 方法, 至此, 所有模块都已经加载完成
- 对 telemetry 模块进行特殊处理, 添加 uptime 的 Gauge, 用于标识服务开始运行
- 如果是 Init 模式, 现在就直接退出, 否则就继续运行
public class ModuleManager implements ModuleDefineHolder {
private boolean isInPrepareStage = true;
private final Map<String, ModuleDefine> loadedModules = new HashMap<>();
}
main 方法跑完了为什么还能继续运行?? 通过设置 GRPCServer 中 ThreadPoolExecutor 的 CustomThreadFactory, 设置线程为非守护线程, 这样 grpc 没关闭情况下不会退出
在 ModuleProvider 中注册的服务可以通过以下方式对外提供
manager.find(moduleName).provider().getService(XXXX.class)
core模块加载
通过 SPI 机制, ServiceLoader 对 ModuleDefine 和 ModuleProvider 进行加载, 例如: server-core 对应配置文件为
- /META-INF.services/org.apache.skywalking.oap.server.library.module.ModuleDefine
- /META-INF.services/org.apache.skywalking.oap.server.library.module.ModuleProvider
server-core 包含模块有
org.apache.skywalking.oap.server.core.storage.StorageModule
org.apache.skywalking.oap.server.core.cluster.ClusterModule
org.apache.skywalking.oap.server.core.CoreModule
org.apache.skywalking.oap.server.core.query.QueryModule
org.apache.skywalking.oap.server.core.alarm.AlarmModule
org.apache.skywalking.oap.server.core.exporter.ExporterModule
对应provider 配置
org.apache.skywalking.oap.server.core.CoreModuleProvider
core 模块依赖模块在 CoreModuleProvider, 包含 TelemetryModule(目前只提供 prometheus 遥测收集), ConfigurationModule(用于cds)
模块一般拆分为 api 和 各种实现包, 例如 ConfigurationModule 中api包 中包含 NoneConfigurationProvider 用于注册空实现, 而实现包包含 apollo, zk, consul, etcd 等具体实现
由上文可知, 模块加载在 ModuleManager init后, 配置已经完全注入 ModuleProvider, ModuleProvider 再通过 prepare -> start -> notifyAfterCompleted 执行, 下面以 CoreModuleProvider 说明
prepare
prepare 阶段主要完成 ModuleDefine#services
定义类的注册工作, core 模块中最重要的定义为 CoreModule
, 包含以下几个模块
- oap 服务基础定义: ConfigService, 用于定义 oap 的端口, host, 以及 search 可以用的Tag
- 下采样定义: DownSamplingConfigService, 定义是否进行 '小时' 和 '天' 维度聚合, 生成新指标
- 指标系统服务定义: MeterSystem, 服务用于定义指标名称, 以及对处理类关系,
meterSystem.create("test_meter", "avg", ScopeType.SERVICE)
- 服务接口定义:
CoreModule#addServerInterface
用于对接 agent 端和 oap 之间通信, 包含 GRPCHandlerRegister (grpc服务: 11800), JettyHandlerRegister (rest 服务: 12800) - 内部服务定义:
CoreModule#addServerInterface
包含用于存储服务 ModelCreator,IModelManager,ModelManipulator, 以及用于 OAP 间通信的 RemoteClientManager, RemoteSenderService - 查询服务定义:
CoreModule#addQueryService
用于ui端 graphql 查询服务, 包含 trace, log, alarm 等查询 - OAL 服务定义: OALEngineLoaderService 用于定义skywalking 后端运行模式, 分析指标的类型和参数, 生成指标数据, 用于告警和存储
- 性能剖析服务定义:
CoreModule#addProfileService
, 定义了 ProfileTask 的缓存, 查询服务, 修改服务, 以及具体执行命令的 CommandService
start
start 工作阶段包含以下几块
- grpcServer 添加远程服务 RemoteServiceHandler, 以及心跳检查处理 HealthCheckServiceHandler
- 判断是否未 MIX 或 AGGREGATOR 节点, 是的话注册 grpc 服务信息到 cluster 模块
- 配置发现服务部分, 增加对应的 监听服务配置改变, 包含 apdex(服务性能指数)配置, endpoint名称分组规则配置, 日志配置
notifyAfterCompleted
包含
- grpcServer 服务启动
- jetty 服务启动
- persistence 指标收集启动
- dataTTLKeeper 启动: 用于删除存储过期的数据, record 3天, metrics 类 7天
- cache 更新启动, 包含 网络地址加载(一般从es读取), 以及 ProfileTask(性能剖析任务) 的更新