OAL概念
用户自定义的描述分析过程的可扩展, 轻量级的编译型语言, 在运行时编译成 class 文件, 用于 skywalking 流计算
支持两种
- 硬编码定义
- OAL 定义, 用于指标数据, 针对特定服务, 服务实例等进行统计数据聚合计算
skywalking 流计算, 目前存在以下4中数据类型
- Record: 明细数据, Trace, 日志
- Metrics: 指标数据, OAL 一般生成这个类型
- TopN: 周期采样数据, SQL 周期性采集
- ManagementData: 管理数据, 目前只有 UITemplate用于配置管理后台的页面模板
- NoneStream: 非流数据保存, 目前用于 ProfileTaskRecord
对应的有以下处理器, 处理器的职责为将数据添加到处理队列, 后续用 DAO 批量保存
- RecordStreamProcessor
- MetricsStreamProcessor
- TopNStreamProcessor
- ManagementStreamProcessor
- NoneStreamStreamProcessor: 主要支撑页面交互, 直接对db操作, 类似crud
OAL 主要用于指标部分的自动化生成, 对应 Metrics 和 MetricsStreamProcessor, 采用运行时编译成class, 因此 OAL 不会影响执行效率
OAL 基本语法
8.0.0 版本后在 /config/*.oal 中可以直接调整配置
基础语法类似于 lambda 表达式
// 声明 Metrics
METRICS_NAME = from(CAST SCOPE.(* | [FIELD][,FIELD ...]))
[.filter(CAST FIELD OP [INT | STRING])]
.FUNCTION([PARAM][, PARAM ...])
// 禁用 Metrics
disable(METRICS_NAME);
- METRICS_NAME: 指标名称
- from: 定义数据源
- SCOPE: 定义为: All(全局访问), Service(服务), ServiceInstance(服务实例), Endpoint, ServiceRelation, ServiceInstanceRelation, and EndpointRelation
- (* | [FIELD][,FIELD ...]): 需要提取的字段
- CAST: 类型转换, 类似 lambda 的map, 例如: from((str->long)Service.tag["transmission.latency"]) 将 transmission.latency 字段转换为 long 类型, filter, function 中都可以使用
- filter(可选): 通过定义的字段来过滤, 可以是多个, 也可以没有, 多个时为 AND 关系
- 多级过滤: service_2xx = from(Service.*).filter(responseCode >= 200).filter(responseCode < 400).cpm()
- FUNCTION: 聚集函数定义, 聚合生成新的指标, 例如: 百分比, longAvg, percent, rate, count, histogram(热力图), apdex 等等
官方文档
OAL 工作阶段
词法和语法分析
通过 Antlr 定义, 源码语法定义在 oap-server/oal-grammar 中, 分为 OALLexer.g4 和 OALParser.g4
- OALLexer: 定义词法树
- OALParser: 定义语法树
基本语法结构如下
// 定义最上级语法, Metrics 声明 或是 禁用 Metrics
root
: (aggregationStatement | disableStatement)*
;
// Metrics 声明则包含 "变量 = metricStatement 注释" 的机构
aggregationStatement
: variable (SPACE)? EQUAL (SPACE)? metricStatement DelimitedComment? LineComment? (SEMI|EOF)
;
// 声明 disable语法, disable(METRICS_NAME);
disableStatement
: DISABLE LR_BRACKET disableSource RR_BRACKET DelimitedComment? LineComment? (SEMI|EOF)
;
// 定义 from(...).filter(...)+.function(...) 的结构
metricStatement
: FROM LR_BRACKET source (sourceAttributeStmt+) RR_BRACKET (filterStatement+)? DOT aggregateFunction
;
动态代码生成
动态代码生成通过 Javassist 辅助生成运行时代码, 直接将生成好代码注入 JVM, 代码模板位于 oap-server/oal-rt 项目的 resouces 目录中, 模板通过freemarker处理成真正的代码
生成代码的目标为
- 生成带 @Stream 注解的 XXXMetrics(抽象类) 的实现类, 此实现类和使用的 function 对应, 有 AvgFunction, AvgHistogramFunction 等等
- 生成 XXXMetrics 的 StorageBuilder , 用于 具体 Metrics 实现到 Map<String, Object> 相互转换
- 生成 XXXDispather, 用于 metrics 的基本信息注册, 并调用
MetricsStreamProcessor#in
入队,稍后进行批量处理
触发的流程
- CoreModuleProvider#prepare 中注册 OALEngineLoaderService
- CoreModuleProvider#start 加载 oal/disable.oal
- MeshReceiverProvider#start 加载 oal/core.oal
- ...
加载代码为 OALEngineLoaderService#load
// 加载引擎
OALEngine engine = loadOALEngine(define);
StreamAnnotationListener streamAnnotationListener = new StreamAnnotationListener(moduleManager);
engine.setStreamListener(streamAnnotationListener);
// 设置用于Metrics分配服务 DispatcherManager
engine.setDispatcherListener(moduleManager.find(CoreModule.NAME)
.provider()
.getService(SourceReceiver.class)
.getDispatcherDetectorListener());
// 设置dao存储模块
engine.setStorageBuilderFactory(moduleManager.find(StorageModule.NAME)
.provider()
.getService(StorageBuilderFactory.class));
// 开始解析
engine.start(OALEngineLoaderService.class.getClassLoader());
engine.notifyAllListeners();
oalDefineSet.add(define);
OALEngine 目前实现类为 OALRuntime, OALRuntime 实现如下, 主要生成 Metrics 数据, 以及 MetricsBuilder( Metrics 对象到 Map<String,Object>
的映射), 以下 XXX 对应 metrics name
- 初始化进行 freemarker 的 Configuration 初始化, 加载 /code-templates 下模板
- 读取配置文件 *.oal
- 通过语法分析ScriptParser 将 oal 中的配置解析成 OALScripts
- 将 OALScripts 处理成 XXXMetrics 对应的 Class, 具体方法通过 OALRuntime#generateMetricsClass, 通过 javassit 生成 (OALRuntime#generateMetricsClass)
- 构建空的构造函数
- 添加 field
- 通过 freemarker 生成 method, 定义好的 method 有 id, hashCode, equals, serialize, deserialize, getMeta, toHour, toDay, 每个方法在 /code-templates 下都有对应模板
- 添加 Stream 注解,
@Stream(name = "${tableName}", scopeId = ${sourceScopeId}, builder = ${metricsName}Metrics.Builder.class, processor = MetricsStreamProcessor.class)
- 将 OALScripts 处理成对应的 XXXMetricsBuilder, 实现 entity2Storage 和 storage2Entity 方法, 用于 dao 层到 实体的转换
- 将 OALScripts 处理成对应的 ${scopeName}Dispatcher
生成器可以通过环境变量配置 SW_OAL_ENGINE_DEBUG 类设置是否生成对应的
.class
文件
实例
oal 配置如下, from 定义了 Scope, function 定义了对应的 Metrics 实现类
- Scope 相同的将合并到同一个 SourceDispather 中进行分发
- function 则通过不同的实现类进行分发
all_percentile = from(All.latency).percentile(10); // Multiple values including p50, p75, p90, p95, p99
all_heatmap = from(All.latency).histogram(100, 20);
对应生成代码, 上述两条oal 都对应到同一个 Scope 模块, 都对应到 ALL 上
/**
* SourceDispatcher 分发器
*/
public class AllDispatcher implements SourceDispatcher<All> {
private void doAllPercentile(All var1) {
AllPercentileMetrics var2 = new AllPercentileMetrics();
var2.setTimeBucket(var1.getTimeBucket());
var2.combine(var1.getLatency(), 10);
MetricsStreamProcessor.getInstance().in(var2);
}
private void doAllHeatmap(All var1) {
AllHeatmapMetrics var2 = new AllHeatmapMetrics();
var2.setTimeBucket(var1.getTimeBucket());
var2.combine(var1.getLatency(), 100, 20);
MetricsStreamProcessor.getInstance().in(var2);
}
public void dispatch(ISource var1) {
All var2 = (All)var1;
this.doAllPercentile(var2);
this.doAllHeatmap(var2);
}
public AllDispatcher() {
}
}
/**
* 带 function 的Metrics
*/
@Stream(
name = "all_heatmap",
scopeId = DefaultScopeDefine.ALL,
builder = AllHeatmapMetricsBuilder.class,
processor = MetricsStreamProcessor.class
)
public class AllHeatmapMetrics extends HistogramMetrics implements WithMetadata {
public AllHeatmapMetrics() {
}
protected String id0() {
StringBuilder var1 = new StringBuilder(String.valueOf(this.getTimeBucket()));
return var1.toString();
}
public int hashCode() {
byte var1 = 17;
int var2 = 31 * var1 + (int)this.getTimeBucket();
return var2;
}
public int remoteHashCode() {
byte var1 = 17;
return var1;
}
public boolean equals(Object var1) {
if (this == var1) {
return true;
} else if (var1 == null) {
return false;
} else if (this.getClass() != var1.getClass()) {
return false;
} else {
AllHeatmapMetrics var2 = (AllHeatmapMetrics)var1;
return this.getTimeBucket() == var2.getTimeBucket();
}
}
public Builder serialize() {
Builder var1 = RemoteData.newBuilder();
var1.addDataLongs(this.getTimeBucket());
var1.addDataObjectStrings(this.getDataset().toStorageData());
return var1;
}
public void deserialize(RemoteData var1) {
this.setTimeBucket(var1.getDataLongs(0));
this.setDataset(new DataTable(var1.getDataObjectStrings(0)));
}
public MetricsMetaInfo getMeta() {
return new MetricsMetaInfo("all_heatmap", 0);
}
public Metrics toHour() {
AllHeatmapMetrics var1 = new AllHeatmapMetrics();
DataTable var2 = new DataTable();
var2.copyFrom(this.getDataset());
var1.setDataset(var2);
var1.setTimeBucket(this.toTimeBucketInHour());
return var1;
}
public Metrics toDay() {
AllHeatmapMetrics var1 = new AllHeatmapMetrics();
DataTable var2 = new DataTable();
var2.copyFrom(this.getDataset());
var1.setDataset(var2);
var1.setTimeBucket(this.toTimeBucketInDay());
return var1;
}
}
/**
* StorageBuilder 实现
*/
public class AllHeatmapMetricsBuilder implements StorageHashMapBuilder {
public AllHeatmapMetricsBuilder() {
}
public Map entity2Storage(StorageData var1) {
AllHeatmapMetrics var2 = (AllHeatmapMetrics)var1;
HashMap var3 = new HashMap();
var3.put((Object)"dataset", var2.getDataset());
var3.put((Object)"time_bucket", new Long(var2.getTimeBucket()));
return var3;
}
public StorageData storage2Entity(Map var1) {
AllHeatmapMetrics var2 = new AllHeatmapMetrics();
var2.setDataset(new DataTable((String)var1.get("dataset")));
var2.setTimeBucket(((Number)var1.get("time_bucket")).longValue());
return var2;
}
}
@Stream(
name = "all_percentile",
scopeId = DefaultScopeDefine.ALL,
builder = AllPercentileMetricsBuilder.class,
processor = MetricsStreamProcessor.class
)
public class AllPercentileMetrics extends PercentileMetrics implements WithMetadata {
public AllPercentileMetrics() {
}
...
}
grpc 的模块依赖
Trace 模块依赖如下, AnalyzerModule 会依赖 AnalyzerModule, AnalyzerModule 加载时会对 core.oal 进行加载, 因此在 grpc 接口调用时 oal 已生成完成
@Override
public String[] requiredModules() {
return new String[] {
TelemetryModule.NAME,
CoreModule.NAME,
AnalyzerModule.NAME,
SharingServerModule.NAME,
ConfigurationModule.NAME
};
}