从 grpc 到 dao实现处理
协议定义
proto定义文件位于 apm-protocol/apm-network/src/main/java/proto/language-agent/Tracing.proto
中
定义的服务
service TraceSegmentReportService {
rpc collect (stream SegmentObject) returns (Commands) {
}
// 异步批量收集
rpc collectInSync (SegmentCollection) returns (Commands) {
}
}
总体流程如下图(来自 skywalking 实战)
协议 gpc 实现
服务实现注册在 TraceModuleProvider#start 中, 完成 grpc service 服务注册
@Override
public void start() {
// 读取 grpc 注册器
GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME)
.provider()
.getService(GRPCHandlerRegister.class);
...
// 注册服务
TraceSegmentReportServiceHandler traceSegmentReportServiceHandler = new TraceSegmentReportServiceHandler(getManager());
grpcHandlerRegister.addHandler(traceSegmentReportServiceHandler);
grpcHandlerRegister.addHandler(new TraceSegmentReportServiceHandlerCompat(traceSegmentReportServiceHandler));
...
}
可以看到实际调用服务为 TraceSegmentReportServiceHandler, 调用 ISegmentParserService, 目前实现类只有 SegmentParserServiceImpl 进行解析
public class TraceSegmentReportServiceHandler extends TraceSegmentReportServiceGrpc.TraceSegmentReportServiceImplBase implements GRPCHandler {
...
private ISegmentParserService segmentParserService;
...
@Override
public StreamObserver<SegmentObject> collect(StreamObserver<Commands> responseObserver) {
return new StreamObserver<SegmentObject>() {
@Override
public void onNext(SegmentObject segment) {
...
try {
segmentParserService.send(segment);
} catch (Exception e) {
errorCounter.inc();
log.error(e.getMessage(), e);
} finally {
timer.finish();
}
}
...
};
}
SegmentParserServiceImpl 只是个包装类, 实际调用 TranceAnalyzer 进行具体解析
@RequiredArgsConstructor
public class SegmentParserServiceImpl implements ISegmentParserService {
private final ModuleManager moduleManager;
private final AnalyzerModuleConfig config;
//
@Setter
private SegmentParserListenerManager listenerManager;
@Override
public void send(SegmentObject segment) {
final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
traceAnalyzer.doAnalysis(segment);
}
}
public class TraceAnalyzer {
...
private List<AnalysisListener> analysisListeners = new ArrayList<>();
public void doAnalysis(SegmentObject segmentObject) {
if (segmentObject.getSpansList().size() == 0) {
return;
}
createSpanListeners();
notifySegmentListener(segmentObject);
// 根据 Segment 的 span 类型, 调用不同的 Listener 进行处理
segmentObject.getSpansList().forEach(spanObject -> {
// 判断 span 是否为首次进入,
if (spanObject.getSpanId() == 0) {
notifyFirstListener(spanObject, segmentObject);
}
if (SpanType.Exit.equals(spanObject.getSpanType())) {
notifyExitListener(spanObject, segmentObject);
} else if (SpanType.Entry.equals(spanObject.getSpanType())) {
notifyEntryListener(spanObject, segmentObject);
} else if (SpanType.Local.equals(spanObject.getSpanType())) {
notifyLocalListener(spanObject, segmentObject);
} else {
log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
.name());
}
});
// 通知 Listener 执行构建, 保存db 等操作
notifyListenerToBuild();
}
private void notifyListenerToBuild() {
// 调用 sourceReceiver#receive 进行消息处理
analysisListeners.forEach(AnalysisListener::build);
}
}
AnalysisListener的实现为 MultiScopesAnalysisListener, NetworkAddressAliasMappingListener, SegmentAnalysisListener
- MultiScopesAnalysisListener: 用于 Service, Endpoint, ServiceRelation, EndpointRelation, DbSlowStatement, ServiceInstace 的收集 , 支持 Entry, Exit, Local (对应sapn 的类型)切入点
- SegmentAnalysisListener, 包含 sgement的处理, 通过 #containsPoint 方法可以看到支持 Entry,First,Segment 的切入点
- parseSegment: 主要完成 TraceId 的设置
- parseFirst: 执行了Segment初始化, 并分配了 serviceId, endpointId , 用于首次
- parseEntry: 分配 serviceId, endpointId
- build: 用于 segment 的构建和发布, 调用 SourceReceiver 接口进行处理
- NetworkAddressAliasMappingListener: 负责 NetworkAddressAliasSetup 的初始化, 在 Entry 时进行处理
上述Listener完成基础对象构建之后, 就会交给 SourceReceiver 进行处理数据的dao分发和保存
SourceReceiver
SourceReceiver 的核心为 DispatcherManager, DispatcherManager 的流程分为
- 初始化 scan, DispatcherManager#scan, 处理流程为
- 扫描包为 org.apache.skywalking 中 SourceDispatcher 实现类
- 提取接口中定义的泛型参数, 例如 SourceDispatcher<ServiceMeta> 中的 ServiceMeta, 这个类实现了 ISource, 可以提取到类型的 scope, scope 定义在 DefaultScopeDefine 中
- 最后生成 Map<int(DefaultScopeDefine), SourceDispatcher> 的map
- 分发处理: 根据传入的 scope 值, 分发到匹配的 SourceDispatcher 中进行处理, 例如 ServiceMeta 对应 ServiceMetaDispatcher, Endpoint 对应 EndpointMetaDispatcher 等等
DefaultScopeDefine 定义了所有的 metrics IDs , 例如 SERVICE = 1; SERVICE_INSTANCE = 2; 等等
public class SourceReceiverImpl implements SourceReceiver {
@Getter
private final DispatcherManager dispatcherManager;
public SourceReceiverImpl() {
this.dispatcherManager = new DispatcherManager();
}
@Override
public void receive(ISource source) {
// 调用实际的 SourceDispatcher 进行处理
dispatcherManager.forward(source);
}
@Override
public DispatcherDetectorListener getDispatcherDetectorListener() {
return getDispatcherManager();
}
public void scan() throws IOException, InstantiationException, IllegalAccessException {
// , 添加到 DispatcherManager中,
//DefaultScopeDefine 定义了所有 source 实现的常量ID, ID 用于查询, 以及钩子等多种场景
dispatcherManager.scan();
}
}
最终存储 XXXXStreamProcessor
下面以 Segment 为例说明存储流程, 由上述代码可以知道, 最终Segment 会调用到 SourceDispatcher<Segment>
, 具体实现如下
public class SegmentDispatcher implements SourceDispatcher<Segment> {
@Override
public void dispatch(Segment source) {
SegmentRecord segment = new SegmentRecord();
segment.setSegmentId(source.getSegmentId());
segment.setTraceId(source.getTraceId());
// setXXX ...
// 将 segment 添加到 worker 进行执行
RecordStreamProcessor.getInstance().in(segment);
}
}
RecordStreamProcessor 的处理流程
public class RecordStreamProcessor implements StreamProcessor<Record> {
// 单例实现
private final static RecordStreamProcessor PROCESSOR = new RecordStreamProcessor();
public static RecordStreamProcessor getInstance() {
return PROCESSOR;
}
// 缓存所有 RecordPersistentWorker 用于数据处理
private Map<Class<? extends Record>, RecordPersistentWorker> workers = new HashMap<>();
// 执行数据 worker 入队
@Override
public void in(Record record) {
RecordPersistentWorker worker = workers.get(record.getClass());
if (worker != null) {
worker.in(record);
}
}
// 创建 RecordPersistentWorker 并添加到 RecordStreamProcessor 中进行处理
@Override
@SuppressWarnings("unchecked")
public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) throws StorageException {
// 找到存储模块, 以及对应 StorageDAO
final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
.provider()
.getService(StorageBuilderFactory.class);
// 根据 @Stream 注解, 以及当前类 生成 builder, 对应 es 则生成 RecordEsDAO
final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(recordClass, stream.builder());
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRecordDAO recordDAO;
try {
recordDAO = storageDAO.newRecordDao(builder.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e);
}
// 创建对应存储模型 Model
ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
// Record stream doesn't read data from database during the persistent process. Keep the timeRelativeID == false always.
Model model = modelSetter.add(
recordClass, stream.scopeId(), new Storage(stream.name(), false, DownSampling.Second), true);
// 创建持久化 worker, 并保存
RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO);
workers.put(recordClass, persistentWorker);
}
}
RecordPersistentWorker#in 的操作只是将要保存的对象放入一个 list 中, 再由 PersistenceTimer#extractDataAndSave 定时执行, 执行频道根据配置
core.persistentPeriod
(默认值25), 25秒执行一次
RecordStreamProcessor 的初始化流程
初始化最开始始于 CoreModuleProvider#prepare, 并在 #start 方法中执行, 这时 AnnotationScan 会扫描所有带 @Stream 的注解用于初始化
// CoreModuleProvider#prepare
annotationScan.registerListener(new StreamAnnotationListener(getManager()));
// CoreModuleProvider#start
annotationScan.scan();
AnnotationScan{
private final List<AnnotationListenerCache> listeners;
public void scan() throws IOException, StorageException {
// 扫描符合 listener 的注解,并添加
ClassPath classpath = ClassPath.from(this.getClass().getClassLoader());
ImmutableSet<ClassPath.ClassInfo> classes = classpath.getTopLevelClassesRecursive("org.apache.skywalking");
for (ClassPath.ClassInfo classInfo : classes) {
Class<?> aClass = classInfo.load();
for (AnnotationListenerCache listener : listeners) {
if (aClass.isAnnotationPresent(listener.annotation())) {
listener.addMatch(aClass);
}
}
}
// 完成扫描后触发执行
for (AnnotationListenerCache listener : listeners) {
listener.complete();
}
}
// 扫描完成后会保存的以下部分
private class AnnotationListenerCache {
private AnnotationListener listener;
private List<Class<?>> matchedClass;
// 调用每个类进行执行处理
private void complete() throws StorageException {
matchedClass.sort(Comparator.comparing(Class::getName));
for (Class<?> aClass : matchedClass) {
listener.notify(aClass);
}
}
}
}
StreamAnnotationListener 完成对 RecordStreamProcessor, MetricsStreamProcessor 等处理器的初始化
public class StreamAnnotationListener implements AnnotationListener {
private final ModuleDefineHolder moduleDefineHolder;
public StreamAnnotationListener(ModuleDefineHolder moduleDefineHolder) {
this.moduleDefineHolder = moduleDefineHolder;
}
@Override
public Class<? extends Annotation> annotation() {
return Stream.class;
}
@SuppressWarnings("unchecked")
@Override
public void notify(Class aClass) throws StorageException {
if (aClass.isAnnotationPresent(Stream.class)) {
Stream stream = (Stream) aClass.getAnnotation(Stream.class);
if (DisableRegister.INSTANCE.include(stream.name())) {
return;
}
if (stream.processor().equals(RecordStreamProcessor.class)) {
RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(MetricsStreamProcessor.class)) {
MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(TopNStreamProcessor.class)) {
TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(NoneStreamProcessor.class)) {
NoneStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else if (stream.processor().equals(ManagementStreamProcessor.class)) {
ManagementStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
} else {
throw new UnexpectedException("Unknown stream processor.");
}
} else {
throw new UnexpectedException(
"Stream annotation listener could only parse the class present stream annotation.");
}
}
}
ListenerManager 的初始化
TraceModule 的依赖为 TelemetryModule, CoreModule, AnalyzerModule, ConfigurationModule, SharingServerModule, 其中重点是 AnalyzerModule, 完成了 ISegmentParserService 初始化, 其中最重要的就是完成了各个 ListenerManager 的初始化
AnalyzerModuleProvider 的重要源码为
public class AnalyzerModuleProvider extends ModuleProvider {
@Getter
private SegmentParserServiceImpl segmentParserService;
...
private List<MeterConfig> meterConfigs;
public AnalyzerModuleProvider() {
this.moduleConfig = new AnalyzerModuleConfig();
}
...
@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
segmentParserService = new SegmentParserServiceImpl(getManager(), moduleConfig);
this.registerServiceImplementation(ISegmentParserService.class, segmentParserService);
}
@Override
public void start() throws ModuleStartException {
// load official analysis
getManager().find(CoreModule.NAME)
.provider()
.getService(OALEngineLoaderService.class)
.load(CoreOALDefine.INSTANCE);
...
// 初始化解析的监听器
segmentParserService.setListenerManager(listenerManager());
}
...
// 此处是重点 .Factory 工厂用于创建实际的 AnalysisListener 用于消息处理
private SegmentParserListenerManager listenerManager() {
SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
if (moduleConfig.isTraceAnalysis()) {
// 多模块之间的解析
listenerManager.add(new MultiScopesAnalysisListener.Factory(getManager()));
listenerManager.add(new NetworkAddressAliasMappingListener.Factory(getManager()));
}
listenerManager.add(new SegmentAnalysisListener.Factory(getManager(), moduleConfig));
return listenerManager;
}
}
SegmentParserListenerManager 中完成了服务的注册或获取
public class SegmentParserListenerManager implements ISegmentParserListenerManager {
@Getter
@Setter
private final List<AnalysisListenerFactory> spanListenerFactories;
public SegmentParserListenerManager() {
this.spanListenerFactories = new LinkedList<>();
}
}
@Stream
这个模块最重要的东西就是 @Stream 注解, 标注了 @Stream 注解的类为需要收集的类, 注解包含以下几个部分
- name: 存储表或索引名称
- scopeId: 所属的范围id
- builder: 对应类为 StorageBuilder,dbmap 到bean对象的相互转换器, 包含以下两个方法
- storage2Entity: dbMap -> T
- entity2Storage: T -> dbMap
- processor: 对应的处理器
例如: LogRecord
@Stream(name = LogRecord.INDEX_NAME, scopeId = DefaultScopeDefine.LOG, builder = LogRecord.Builder.class, processor = RecordStreamProcessor.class)
public class LogRecord extends AbstractLogRecord {
public static final String INDEX_NAME = "log";
public static final String UNIQUE_ID = "unique_id";
@Setter
@Getter
@Column(columnName = UNIQUE_ID)
private String uniqueId;
@Override
public String id() {
return uniqueId;
}
public static class Builder extends AbstractLogRecord.Builder<LogRecord> {
@Override
public LogRecord storage2Entity(final Map<String, Object> dbMap) {
LogRecord record = new LogRecord();
map2Data(record, dbMap);
record.setUniqueId((String) dbMap.get(UNIQUE_ID));
return record;
}
@Override
public Map<String, Object> entity2Storage(final LogRecord record) {
Map<String, Object> dbMap = new HashMap<>();
data2Map(dbMap, record);
dbMap.put(UNIQUE_ID, record.getUniqueId());
return dbMap;
}
}
}
Stream 扫描的同时会进行 dao 初始化, 例如 RecordStreamProcessor#create 中代码片段 IRecordDAO 通过 @Stream 中定义的 StorageBuilder.newInstance 来初始化 dao 对象
StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRecordDAO recordDAO;
try {
recordDAO = storageDAO.newRecordDao(builder.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e);
}
小结
==初始化阶段==, 分以下模块块进行缓存
- StreamProcessor, 负责
@Stream
注解的扫描, 扫描完成后, 注册到当前 StreamProcessor 中的Map<Class<? extends XXX>, XXXPersistentWorker> workers
中, XXX 可以是 record, metrics- 其中 XXXPersistentWorker 是持有 DataCarrier 的 worker, 用于数据入队和消费, 消费时调用持有的 xxxDao 进行持久化
- SourceReceiverImpl.dispatcherManager 的初始化, 这块负责扫描所有实现
SourceDispatcher
的类, 并注册到Map<Integer, List<SourceDispatcher>> dispatcherMap
,- 此处 Integer 为 DefaultScopeDefine 中定义的常量, 表示各种Scope 的值
此处 @Stream 注解定义的类为收集并存储的 metrics, record 数据, SourceDispatcher 实现类的功能是将定义的 metrics, record 转为需要存储的对象
==执行阶段==, 就是对象的转换, 以及从 cache 中寻找对应处理类的过程
- grpc 收到 segmentObject 后, 调用 ISegmentParserService#send , 做了第一层的接口分离, 实际调用 SegmentParserServiceImpl
- SegmentParserServiceImpl#send 初始化 TraceAnalyzer 进行具体数据分析, TraceAnalyzer 中注册了
List<AnalysisListener > analysisListeners
- TraceAnalyzer 定义了多个 AnalysisListener, AnalysisListener 是第二层的接口分离, 用于不同分析监听器的实现, 提供了不同阶段的 Segment 的监听, 接口有, 对应
- SegmentListener: segment 进入监听(每次都会进入)
- FirstAnalysisListener: 首次进入监听(首次)
- EntryAnalysisListener: Span 为 Entry 类型
- ExitAnalysisListener: Span 为 Exit 类型
- LocalAnalysisListener: Span 为 Local 类型
- TraceAnalyzer 最后阶段调用 AnalysisListener#build 调用 SourceReiver 进行数据的分发构建
- SourceReceiver 第三层抽象, 中寻找 ISource 对应的 SourceDispatcher, 此处只有需要保存的 ISource 才有SourceDispatcher实现, 像 EndpointTraffic 对应有 EndpointTrafficDispatcher
- SourceDispatcher 最后调用具体的 StreamProcessor 进行消息入队, 定期进行消息的批量持久化, 例如 Metrics 消息调用 MetricsStreamProcessor#in, Record 消息调用 RecordStreamProcessor#In