聊聊skywalking的RemoteClientManager

本文主要研究一下skywalking的RemoteClientManager

RemoteClientManager

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java

public class RemoteClientManager implements Service {

    private static final Logger logger = LoggerFactory.getLogger(RemoteClientManager.class);

    private final ModuleDefineHolder moduleDefineHolder;
    private ClusterNodesQuery clusterNodesQuery;
    private volatile List<RemoteClient> usingClients;
    private GaugeMetrics gauge;
    private int remoteTimeout;

    /**
     * Initial the manager for all remote communication clients.
     *
     * @param moduleDefineHolder for looking up other modules
     * @param remoteTimeout      for cluster internal communication, in second unit.
     */
    public RemoteClientManager(ModuleDefineHolder moduleDefineHolder, int remoteTimeout) {
        this.moduleDefineHolder = moduleDefineHolder;
        this.usingClients = ImmutableList.of();
        this.remoteTimeout = remoteTimeout;
    }

    public void start() {
        Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(this::refresh, 1, 5, TimeUnit.SECONDS);
    }

    /**
     * Query OAP server list from the cluster module and create a new connection for the new node. Make the OAP server
     * orderly because of each of the server will send stream data to each other by hash code.
     */
    void refresh() {
        if (gauge == null) {
            gauge = moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
                .createGauge("cluster_size", "Cluster size of current oap node",
                    MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
        }
        try {
            if (Objects.isNull(clusterNodesQuery)) {
                synchronized (RemoteClientManager.class) {
                    if (Objects.isNull(clusterNodesQuery)) {
                        this.clusterNodesQuery = moduleDefineHolder.find(ClusterModule.NAME).provider().getService(ClusterNodesQuery.class);
                    }
                }
            }

            if (logger.isDebugEnabled()) {
                logger.debug("Refresh remote nodes collection.");
            }

            List<RemoteInstance> instanceList = clusterNodesQuery.queryRemoteNodes();
            instanceList = distinct(instanceList);
            Collections.sort(instanceList);

            gauge.setValue(instanceList.size());

            if (logger.isDebugEnabled()) {
                instanceList.forEach(instance -> logger.debug("Cluster instance: {}", instance.toString()));
            }

            if (!compare(instanceList)) {
                if (logger.isDebugEnabled()) {
                    logger.debug("ReBuilding remote clients.");
                }
                reBuildRemoteClients(instanceList);
            }

            printRemoteClientList();
        } catch (Throwable t) {
            logger.error(t.getMessage(), t);
        }
    }

    //......
}
  • RemoteClientManager提供了getRemoteClient方法获取usingClients,它还提供了start方法,该方法注册一个定时任务每隔5秒执行一次refresh;refresh方法通过clusterNodesQuery.queryRemoteNodes()获取instanceList列表,然后根据Address去重一下再排序,然后跟本地的RemoteClient列表进行对比,如果有发现变更则触发reBuildRemoteClients操作,最后在执行printRemoteClientList

reBuildRemoteClients

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java

public class RemoteClientManager implements Service {

    //......

    private void reBuildRemoteClients(List<RemoteInstance> remoteInstances) {
        final Map<Address, RemoteClientAction> remoteClientCollection = this.usingClients.stream()
            .collect(Collectors.toMap(RemoteClient::getAddress, client -> new RemoteClientAction(client, Action.Close)));

        final Map<Address, RemoteClientAction> latestRemoteClients = remoteInstances.stream()
            .collect(Collectors.toMap(RemoteInstance::getAddress, remote -> new RemoteClientAction(null, Action.Create)));

        final Set<Address> unChangeAddresses = Sets.intersection(remoteClientCollection.keySet(), latestRemoteClients.keySet());

        unChangeAddresses.stream()
            .filter(remoteClientCollection::containsKey)
            .forEach(unChangeAddress -> remoteClientCollection.get(unChangeAddress).setAction(Action.Unchanged));

        // make the latestRemoteClients including the new clients only
        unChangeAddresses.forEach(latestRemoteClients::remove);
        remoteClientCollection.putAll(latestRemoteClients);

        final List<RemoteClient> newRemoteClients = new LinkedList<>();
        remoteClientCollection.forEach((address, clientAction) -> {
            switch (clientAction.getAction()) {
                case Unchanged:
                    newRemoteClients.add(clientAction.getRemoteClient());
                    break;
                case Create:
                    if (address.isSelf()) {
                        RemoteClient client = new SelfRemoteClient(moduleDefineHolder, address);
                        newRemoteClients.add(client);
                    } else {
                        RemoteClient client = new GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout);
                        client.connect();
                        newRemoteClients.add(client);
                    }
                    break;
            }
        });

        //for stable ordering for rolling selector
        Collections.sort(newRemoteClients);
        this.usingClients = ImmutableList.copyOf(newRemoteClients);

        remoteClientCollection.values()
            .stream()
            .filter(remoteClientAction -> remoteClientAction.getAction().equals(Action.Close))
            .forEach(remoteClientAction -> remoteClientAction.getRemoteClient().close());
    }

    //......
}
  • reBuildRemoteClients方法先构建remoteClientCollection及latestRemoteClients,然后取交集得到unChangeAddresses,然后从latestRemoteClients移除unChangeAddresses,最后再把latestRemoteClients添加到remoteClientCollection;之后遍历remoteClientCollection,对于action为Create的区分为SelfRemoteClient及GRPCRemoteClient,对于GRPCRemoteClient的还执行一下connect操作;最后对newRemoteClients进行排序,然后重新赋值给usingClients;最后对于action为close的remoteClient执行close操作

RemoteSenderService

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/RemoteSenderService.java

public class RemoteSenderService implements Service {
    private static final Logger logger = LoggerFactory.getLogger(RemoteSenderService.class);

    private final ModuleManager moduleManager;
    private final HashCodeSelector hashCodeSelector;
    private final ForeverFirstSelector foreverFirstSelector;
    private final RollingSelector rollingSelector;

    public RemoteSenderService(ModuleManager moduleManager) {
        this.moduleManager = moduleManager;
        this.hashCodeSelector = new HashCodeSelector();
        this.foreverFirstSelector = new ForeverFirstSelector();
        this.rollingSelector = new RollingSelector();
    }

    public void send(String nextWorkName, StreamData streamData, Selector selector) {
        RemoteClientManager clientManager = moduleManager.find(CoreModule.NAME).provider().getService(RemoteClientManager.class);
        RemoteClient remoteClient = null;

        List<RemoteClient> clientList = clientManager.getRemoteClient();
        if (clientList.size() == 0) {
            logger.warn("There is no available remote server for now, ignore the streaming data until the cluster metadata initialized.");
            return;
        }
        switch (selector) {
            case HashCode:
                remoteClient = hashCodeSelector.select(clientList, streamData);
                break;
            case Rolling:
                remoteClient = rollingSelector.select(clientList, streamData);
                break;
            case ForeverFirst:
                remoteClient = foreverFirstSelector.select(clientList, streamData);
                break;
        }
        remoteClient.push(nextWorkName, streamData);
    }
}
  • RemoteSenderService提供了send方法,该方法从clientManager.getRemoteClient()获取clientList,然后根据selector类型从中选取一个remoteClient执行remoteClient.push(nextWorkName, streamData)

RemoteClientSelector

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RemoteClientSelector.java

public interface RemoteClientSelector {
    RemoteClient select(List<RemoteClient> clients, StreamData streamData);
}
  • RemoteClientSelector定义了select方法

HashCodeSelector

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/HashCodeSelector.java

public class HashCodeSelector implements RemoteClientSelector {

    @Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) {
        int size = clients.size();
        int selectIndex = Math.abs(streamData.remoteHashCode()) % size;
        return clients.get(selectIndex);
    }
}
  • HashCodeSelector实现了RemoteClientSelector接口,它通过Math.abs(streamData.remoteHashCode()) % size来选择selectIndex

RollingSelector

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/RollingSelector.java

public class RollingSelector implements RemoteClientSelector {

    private int index = 0;

    @Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) {
        int size = clients.size();
        index++;
        int selectIndex = Math.abs(index) % size;

        if (index == Integer.MAX_VALUE) {
            index = 0;
        }
        return clients.get(selectIndex);
    }
}
  • RollingSelector实现了RemoteClientSelector接口,它通过每次递增index然后根据Math.abs(index) % size选择selectIndex

ForeverFirstSelector

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/selector/ForeverFirstSelector.java

public class ForeverFirstSelector implements RemoteClientSelector {

    private static final Logger logger = LoggerFactory.getLogger(ForeverFirstSelector.class);

    @Override public RemoteClient select(List<RemoteClient> clients, StreamData streamData) {
        if (logger.isDebugEnabled()) {
            logger.debug("clients size: {}", clients.size());
        }
        return clients.get(0);
    }
}
  • ForeverFirstSelector实现了RemoteClientSelector接口,它始终返回第一个client

小结

RemoteClientManager提供了getRemoteClient方法获取usingClients,它还提供了start方法,该方法注册一个定时任务每隔5秒执行一次refresh;refresh方法通过clusterNodesQuery.queryRemoteNodes()获取instanceList列表,然后根据Address去重一下再排序,然后跟本地的RemoteClient列表进行对比,如果有发现变更则触发reBuildRemoteClients操作,最后在执行printRemoteClientList

doc

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,875评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,569评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,475评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,459评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,537评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,563评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,580评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,326评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,773评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,086评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,252评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,921评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,566评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,190评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,435评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,129评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,125评论 2 352

推荐阅读更多精彩内容