skywalking grpc 服务端处理

从 grpc 到 dao实现处理

协议定义

proto定义文件位于 apm-protocol/apm-network/src/main/java/proto/language-agent/Tracing.proto

定义的服务

service TraceSegmentReportService {
   
    rpc collect (stream SegmentObject) returns (Commands) {
    }

    // 异步批量收集
    rpc collectInSync (SegmentCollection) returns (Commands) {
    }
}

总体流程如下图(来自 skywalking 实战)


image.png

协议 gpc 实现

服务实现注册在 TraceModuleProvider#start 中, 完成 grpc service 服务注册

@Override
public void start() {
    // 读取 grpc 注册器
    GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME)
                                                          .provider()
                                                          .getService(GRPCHandlerRegister.class);
    ...
    // 注册服务
    TraceSegmentReportServiceHandler traceSegmentReportServiceHandler = new TraceSegmentReportServiceHandler(getManager());
    grpcHandlerRegister.addHandler(traceSegmentReportServiceHandler);
    grpcHandlerRegister.addHandler(new TraceSegmentReportServiceHandlerCompat(traceSegmentReportServiceHandler));

    ...
}

可以看到实际调用服务为 TraceSegmentReportServiceHandler, 调用 ISegmentParserService, 目前实现类只有 SegmentParserServiceImpl 进行解析

public class TraceSegmentReportServiceHandler extends TraceSegmentReportServiceGrpc.TraceSegmentReportServiceImplBase implements GRPCHandler {
    ...
    private ISegmentParserService segmentParserService;
    ...
     @Override
    public StreamObserver<SegmentObject> collect(StreamObserver<Commands> responseObserver) {
        return new StreamObserver<SegmentObject>() {
            @Override
            public void onNext(SegmentObject segment) {
                ...
                try {
                    segmentParserService.send(segment);
                } catch (Exception e) {
                    errorCounter.inc();
                    log.error(e.getMessage(), e);
                } finally {
                    timer.finish();
                }
            }

           ...
        };
    }

SegmentParserServiceImpl 只是个包装类, 实际调用 TranceAnalyzer 进行具体解析

@RequiredArgsConstructor
public class SegmentParserServiceImpl implements ISegmentParserService {
    private final ModuleManager moduleManager;
    private final AnalyzerModuleConfig config;
    // 
    @Setter
    private SegmentParserListenerManager listenerManager;

    @Override
    public void send(SegmentObject segment) {
        final TraceAnalyzer traceAnalyzer = new TraceAnalyzer(moduleManager, listenerManager, config);
        traceAnalyzer.doAnalysis(segment);
    }
}


public class TraceAnalyzer {
    ...
    private List<AnalysisListener> analysisListeners = new ArrayList<>();
    
    public void doAnalysis(SegmentObject segmentObject) {
        if (segmentObject.getSpansList().size() == 0) {
            return;
        }

        createSpanListeners();

        notifySegmentListener(segmentObject);
        // 根据 Segment 的 span 类型, 调用不同的 Listener 进行处理
        segmentObject.getSpansList().forEach(spanObject -> {
            // 判断 span 是否为首次进入, 
            if (spanObject.getSpanId() == 0) {
                notifyFirstListener(spanObject, segmentObject);
            }

            if (SpanType.Exit.equals(spanObject.getSpanType())) {
                notifyExitListener(spanObject, segmentObject);
            } else if (SpanType.Entry.equals(spanObject.getSpanType())) {
                notifyEntryListener(spanObject, segmentObject);
            } else if (SpanType.Local.equals(spanObject.getSpanType())) {
                notifyLocalListener(spanObject, segmentObject);
            } else {
                log.error("span type value was unexpected, span type name: {}", spanObject.getSpanType()
                                                                                          .name());
            }
        });

        // 通知 Listener 执行构建, 保存db 等操作
        notifyListenerToBuild();
    }
    
    private void notifyListenerToBuild() {
        // 调用 sourceReceiver#receive 进行消息处理
        analysisListeners.forEach(AnalysisListener::build);
    }
}

AnalysisListener的实现为 MultiScopesAnalysisListener, NetworkAddressAliasMappingListener, SegmentAnalysisListener

  • MultiScopesAnalysisListener: 用于 Service, Endpoint, ServiceRelation, EndpointRelation, DbSlowStatement, ServiceInstace 的收集 , 支持 Entry, Exit, Local (对应sapn 的类型)切入点
  • SegmentAnalysisListener, 包含 sgement的处理, 通过 #containsPoint 方法可以看到支持 Entry,First,Segment 的切入点
    • parseSegment: 主要完成 TraceId 的设置
    • parseFirst: 执行了Segment初始化, 并分配了 serviceId, endpointId , 用于首次
    • parseEntry: 分配 serviceId, endpointId
    • build: 用于 segment 的构建和发布, 调用 SourceReceiver 接口进行处理
  • NetworkAddressAliasMappingListener: 负责 NetworkAddressAliasSetup 的初始化, 在 Entry 时进行处理

上述Listener完成基础对象构建之后, 就会交给 SourceReceiver 进行处理数据的dao分发和保存

SourceReceiver

SourceReceiver 的核心为 DispatcherManager, DispatcherManager 的流程分为

  • 初始化 scan, DispatcherManager#scan, 处理流程为
    1. 扫描包为 org.apache.skywalking 中 SourceDispatcher 实现类
    2. 提取接口中定义的泛型参数, 例如 SourceDispatcher<ServiceMeta> 中的 ServiceMeta, 这个类实现了 ISource, 可以提取到类型的 scope, scope 定义在 DefaultScopeDefine 中
    3. 最后生成 Map<int(DefaultScopeDefine), SourceDispatcher> 的map
  • 分发处理: 根据传入的 scope 值, 分发到匹配的 SourceDispatcher 中进行处理, 例如 ServiceMeta 对应 ServiceMetaDispatcher, Endpoint 对应 EndpointMetaDispatcher 等等

DefaultScopeDefine 定义了所有的 metrics IDs , 例如 SERVICE = 1; SERVICE_INSTANCE = 2; 等等

public class SourceReceiverImpl implements SourceReceiver {
    @Getter
    private final DispatcherManager dispatcherManager;

    public SourceReceiverImpl() {
        this.dispatcherManager = new DispatcherManager();
    }

    @Override
    public void receive(ISource source) {
        // 调用实际的 SourceDispatcher 进行处理
        dispatcherManager.forward(source);
    }

    @Override
    public DispatcherDetectorListener getDispatcherDetectorListener() {
        return getDispatcherManager();
    }

    public void scan() throws IOException, InstantiationException, IllegalAccessException {
        // , 添加到 DispatcherManager中, 
        //DefaultScopeDefine 定义了所有 source 实现的常量ID, ID 用于查询, 以及钩子等多种场景
        dispatcherManager.scan();
    }
}

最终存储 XXXXStreamProcessor

下面以 Segment 为例说明存储流程, 由上述代码可以知道, 最终Segment 会调用到 SourceDispatcher<Segment>, 具体实现如下

public class SegmentDispatcher implements SourceDispatcher<Segment> {

    @Override
    public void dispatch(Segment source) {
        SegmentRecord segment = new SegmentRecord();
        segment.setSegmentId(source.getSegmentId());
        segment.setTraceId(source.getTraceId());
        // setXXX ...
        
        // 将 segment 添加到 worker 进行执行
        RecordStreamProcessor.getInstance().in(segment);
    }
}

RecordStreamProcessor 的处理流程


public class RecordStreamProcessor implements StreamProcessor<Record> {
    // 单例实现
    private final static RecordStreamProcessor PROCESSOR = new RecordStreamProcessor();
    public static RecordStreamProcessor getInstance() {
        return PROCESSOR;
    }

    // 缓存所有 RecordPersistentWorker 用于数据处理
    private Map<Class<? extends Record>, RecordPersistentWorker> workers = new HashMap<>();


    // 执行数据 worker 入队
    @Override
    public void in(Record record) {
        RecordPersistentWorker worker = workers.get(record.getClass());
        if (worker != null) {
            worker.in(record);
        }
    }

    // 创建 RecordPersistentWorker 并添加到 RecordStreamProcessor 中进行处理
    @Override
    @SuppressWarnings("unchecked")
    public void create(ModuleDefineHolder moduleDefineHolder, Stream stream, Class<? extends Record> recordClass) throws StorageException {
        // 找到存储模块, 以及对应 StorageDAO
        final StorageBuilderFactory storageBuilderFactory = moduleDefineHolder.find(StorageModule.NAME)
                                                                              .provider()
                                                                              .getService(StorageBuilderFactory.class);
        // 根据 @Stream 注解, 以及当前类 生成 builder, 对应 es 则生成 RecordEsDAO
        final Class<? extends StorageBuilder> builder = storageBuilderFactory.builderOf(recordClass, stream.builder());

        StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
        IRecordDAO recordDAO;
        try {
            recordDAO = storageDAO.newRecordDao(builder.getDeclaredConstructor().newInstance());
        } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e);
        }

        // 创建对应存储模型 Model
        ModelCreator modelSetter = moduleDefineHolder.find(CoreModule.NAME).provider().getService(ModelCreator.class);
        // Record stream doesn't read data from database during the persistent process. Keep the timeRelativeID == false always.
        Model model = modelSetter.add(
            recordClass, stream.scopeId(), new Storage(stream.name(), false, DownSampling.Second), true);
        // 创建持久化 worker, 并保存
        RecordPersistentWorker persistentWorker = new RecordPersistentWorker(moduleDefineHolder, model, recordDAO);

        workers.put(recordClass, persistentWorker);
    }
}

RecordPersistentWorker#in 的操作只是将要保存的对象放入一个 list 中, 再由 PersistenceTimer#extractDataAndSave 定时执行, 执行频道根据配置core.persistentPeriod(默认值25), 25秒执行一次

RecordStreamProcessor 的初始化流程

初始化最开始始于 CoreModuleProvider#prepare, 并在 #start 方法中执行, 这时 AnnotationScan 会扫描所有带 @Stream 的注解用于初始化

// CoreModuleProvider#prepare
annotationScan.registerListener(new StreamAnnotationListener(getManager()));

// CoreModuleProvider#start
annotationScan.scan();

AnnotationScan{
    private final List<AnnotationListenerCache> listeners;
    
    public void scan() throws IOException, StorageException {
        // 扫描符合 listener 的注解,并添加
        ClassPath classpath = ClassPath.from(this.getClass().getClassLoader());
        ImmutableSet<ClassPath.ClassInfo> classes = classpath.getTopLevelClassesRecursive("org.apache.skywalking");
        for (ClassPath.ClassInfo classInfo : classes) {
            Class<?> aClass = classInfo.load();

            for (AnnotationListenerCache listener : listeners) {
                if (aClass.isAnnotationPresent(listener.annotation())) {
                    listener.addMatch(aClass);
                }
            }
        }

        // 完成扫描后触发执行
        for (AnnotationListenerCache listener : listeners) {
            listener.complete();
        }
    }
    
    // 扫描完成后会保存的以下部分
    private class AnnotationListenerCache {
            private AnnotationListener listener;
            private List<Class<?>> matchedClass;
            
            // 调用每个类进行执行处理
            private void complete() throws StorageException {
                matchedClass.sort(Comparator.comparing(Class::getName));
                for (Class<?> aClass : matchedClass) {
                    listener.notify(aClass);
                }
            }
    }
}

StreamAnnotationListener 完成对 RecordStreamProcessor, MetricsStreamProcessor 等处理器的初始化

public class StreamAnnotationListener implements AnnotationListener {

    private final ModuleDefineHolder moduleDefineHolder;

    public StreamAnnotationListener(ModuleDefineHolder moduleDefineHolder) {
        this.moduleDefineHolder = moduleDefineHolder;
    }

    @Override
    public Class<? extends Annotation> annotation() {
        return Stream.class;
    }

    @SuppressWarnings("unchecked")
    @Override
    public void notify(Class aClass) throws StorageException {
        if (aClass.isAnnotationPresent(Stream.class)) {
            Stream stream = (Stream) aClass.getAnnotation(Stream.class);

            if (DisableRegister.INSTANCE.include(stream.name())) {
                return;
            }

            if (stream.processor().equals(RecordStreamProcessor.class)) {
                RecordStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else if (stream.processor().equals(MetricsStreamProcessor.class)) {
                MetricsStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else if (stream.processor().equals(TopNStreamProcessor.class)) {
                TopNStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else if (stream.processor().equals(NoneStreamProcessor.class)) {
                NoneStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else if (stream.processor().equals(ManagementStreamProcessor.class)) {
                ManagementStreamProcessor.getInstance().create(moduleDefineHolder, stream, aClass);
            } else {
                throw new UnexpectedException("Unknown stream processor.");
            }
        } else {
            throw new UnexpectedException(
                    "Stream annotation listener could only parse the class present stream annotation.");
        }
    }
}

ListenerManager 的初始化

TraceModule 的依赖为 TelemetryModule, CoreModule, AnalyzerModule, ConfigurationModule, SharingServerModule, 其中重点是 AnalyzerModule, 完成了 ISegmentParserService 初始化, 其中最重要的就是完成了各个 ListenerManager 的初始化

AnalyzerModuleProvider 的重要源码为

public class AnalyzerModuleProvider extends ModuleProvider {
    
    @Getter
    private SegmentParserServiceImpl segmentParserService;
    ...

    private List<MeterConfig> meterConfigs;

    public AnalyzerModuleProvider() {
        this.moduleConfig = new AnalyzerModuleConfig();
    }
    ...

    @Override
    public void prepare() throws ServiceNotProvidedException, ModuleStartException {

        segmentParserService = new SegmentParserServiceImpl(getManager(), moduleConfig);
        this.registerServiceImplementation(ISegmentParserService.class, segmentParserService);
    }

    @Override
    public void start() throws ModuleStartException {
        // load official analysis
        getManager().find(CoreModule.NAME)
                    .provider()
                    .getService(OALEngineLoaderService.class)
                    .load(CoreOALDefine.INSTANCE);

        ...
        // 初始化解析的监听器
        segmentParserService.setListenerManager(listenerManager());
    }
    ...

    // 此处是重点 .Factory 工厂用于创建实际的 AnalysisListener 用于消息处理
    private SegmentParserListenerManager listenerManager() {
        SegmentParserListenerManager listenerManager = new SegmentParserListenerManager();
        if (moduleConfig.isTraceAnalysis()) {
            // 多模块之间的解析
            listenerManager.add(new MultiScopesAnalysisListener.Factory(getManager()));
            listenerManager.add(new NetworkAddressAliasMappingListener.Factory(getManager()));
        }
        listenerManager.add(new SegmentAnalysisListener.Factory(getManager(), moduleConfig));

        return listenerManager;
    }
}

SegmentParserListenerManager 中完成了服务的注册或获取

public class SegmentParserListenerManager implements ISegmentParserListenerManager {
    @Getter
    @Setter
    private final List<AnalysisListenerFactory> spanListenerFactories;

    public SegmentParserListenerManager() {
        this.spanListenerFactories = new LinkedList<>();
    }

}

@Stream

这个模块最重要的东西就是 @Stream 注解, 标注了 @Stream 注解的类为需要收集的类, 注解包含以下几个部分

  1. name: 存储表或索引名称
  2. scopeId: 所属的范围id
  3. builder: 对应类为 StorageBuilder,dbmap 到bean对象的相互转换器, 包含以下两个方法
    • storage2Entity: dbMap -> T
    • entity2Storage: T -> dbMap
  4. processor: 对应的处理器

例如: LogRecord

@Stream(name = LogRecord.INDEX_NAME, scopeId = DefaultScopeDefine.LOG, builder = LogRecord.Builder.class, processor = RecordStreamProcessor.class)
public class LogRecord extends AbstractLogRecord {

    public static final String INDEX_NAME = "log";

    public static final String UNIQUE_ID = "unique_id";

    @Setter
    @Getter
    @Column(columnName = UNIQUE_ID)
    private String uniqueId;

    @Override
    public String id() {
        return uniqueId;
    }

    public static class Builder extends AbstractLogRecord.Builder<LogRecord> {

        @Override
        public LogRecord storage2Entity(final Map<String, Object> dbMap) {
            LogRecord record = new LogRecord();
            map2Data(record, dbMap);
            record.setUniqueId((String) dbMap.get(UNIQUE_ID));
            return record;
        }

        @Override
        public Map<String, Object> entity2Storage(final LogRecord record) {
            Map<String, Object> dbMap = new HashMap<>();
            data2Map(dbMap, record);
            dbMap.put(UNIQUE_ID, record.getUniqueId());
            return dbMap;
        }
    }

}

Stream 扫描的同时会进行 dao 初始化, 例如 RecordStreamProcessor#create 中代码片段 IRecordDAO 通过 @Stream 中定义的 StorageBuilder.newInstance 来初始化 dao 对象

StorageDAO storageDAO = moduleDefineHolder.find(StorageModule.NAME).provider().getService(StorageDAO.class);
IRecordDAO recordDAO;
try {
    recordDAO = storageDAO.newRecordDao(builder.getDeclaredConstructor().newInstance());
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
    throw new UnexpectedException("Create " + stream.builder().getSimpleName() + " record DAO failure.", e);
}

小结

==初始化阶段==, 分以下模块块进行缓存

  1. StreamProcessor, 负责@Stream注解的扫描, 扫描完成后, 注册到当前 StreamProcessor 中的 Map<Class<? extends XXX>, XXXPersistentWorker> workers 中, XXX 可以是 record, metrics
    • 其中 XXXPersistentWorker 是持有 DataCarrier 的 worker, 用于数据入队和消费, 消费时调用持有的 xxxDao 进行持久化
  2. SourceReceiverImpl.dispatcherManager 的初始化, 这块负责扫描所有实现 SourceDispatcher 的类, 并注册到 Map<Integer, List<SourceDispatcher>> dispatcherMap,
    • 此处 Integer 为 DefaultScopeDefine 中定义的常量, 表示各种Scope 的值

此处 @Stream 注解定义的类为收集并存储的 metrics, record 数据, SourceDispatcher 实现类的功能是将定义的 metrics, record 转为需要存储的对象

==执行阶段==, 就是对象的转换, 以及从 cache 中寻找对应处理类的过程

  1. grpc 收到 segmentObject 后, 调用 ISegmentParserService#send , 做了第一层的接口分离, 实际调用 SegmentParserServiceImpl
  2. SegmentParserServiceImpl#send 初始化 TraceAnalyzer 进行具体数据分析, TraceAnalyzer 中注册了List<AnalysisListener > analysisListeners
  3. TraceAnalyzer 定义了多个 AnalysisListener, AnalysisListener 是第二层的接口分离, 用于不同分析监听器的实现, 提供了不同阶段的 Segment 的监听, 接口有, 对应
    • SegmentListener: segment 进入监听(每次都会进入)
    • FirstAnalysisListener: 首次进入监听(首次)
    • EntryAnalysisListener: Span 为 Entry 类型
    • ExitAnalysisListener: Span 为 Exit 类型
    • LocalAnalysisListener: Span 为 Local 类型
  4. TraceAnalyzer 最后阶段调用 AnalysisListener#build 调用 SourceReiver 进行数据的分发构建
  5. SourceReceiver 第三层抽象, 中寻找 ISource 对应的 SourceDispatcher, 此处只有需要保存的 ISource 才有SourceDispatcher实现, 像 EndpointTraffic 对应有 EndpointTrafficDispatcher
  6. SourceDispatcher 最后调用具体的 StreamProcessor 进行消息入队, 定期进行消息的批量持久化, 例如 Metrics 消息调用 MetricsStreamProcessor#in, Record 消息调用 RecordStreamProcessor#In
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容