ThingsBoard 二次开发之源码分析2-启动分析

以下的分析环境基于内存消息队列和无注册中心配置以及按照默认配置

Clustering mode

官网对集群模式有一部分介绍这可以帮助我们理解代码为什么会这么做:

ThingsBoard adopts consistent hashing to ensure scalability and availability. Message from Device A that is received on a particular node may be forwarded to the other node based on the hash of the device ID. Although this introduces certain networking overhead, it allows to process all messages from a particular device using corresponding device actor on a determined server, which introduces the following advantages:
improve cache hit rate. Device attributes and other device related data are fetched by device actor on a specific server.
avoid race conditions. All messages for a particular device are processed on a determined server.
allows targeting server-side api calls based on the device id.

TbServiceInfoProvider

DefaultTbServiceInfoProvider init()方法由@PostConstruct标记,spring启动的时候会自动调用该方法:

public void init() {
        if (StringUtils.isEmpty(serviceId)) {
            try {
                //获取本机的HostName作为serviceId
                serviceId = InetAddress.getLocalHost().getHostName();
            } catch (UnknownHostException e) {
                serviceId = org.apache.commons.lang3.RandomStringUtils.randomAlphabetic(10);
            }
        }
        log.info("Current Service ID: {}", serviceId);
              //serviceType是配置文件下service.type的值,默认为monolith
            //serviceTypes将会是一个List包含TB_CORE, TB_RULE_ENGINE, TB_TRANSPORT, JS_EXECUTOR ①
        if (serviceType.equalsIgnoreCase("monolith")) { 
            serviceTypes = Collections.unmodifiableList(Arrays.asList(ServiceType.values()));
        } else {
            serviceTypes = Collections.singletonList(ServiceType.of(serviceType));
        }
        ServiceInfo.Builder builder = ServiceInfo.newBuilder()
                .setServiceId(serviceId)
                .addAllServiceTypes(serviceTypes.stream().map(ServiceType::name).collect(Collectors.toList()));
        UUID tenantId;
            //tenantIdStr是配置文件中service.tenant_id的值,默认情况下为空,isolatedTenant也就为空了
        if (!StringUtils.isEmpty(tenantIdStr)) {
            tenantId = UUID.fromString(tenantIdStr);
            isolatedTenant = new TenantId(tenantId);
        } else {
            tenantId = TenantId.NULL_UUID;
        }
            //返回此 uuid 的 128 位值中的最高有效 64 位和最低64位
        builder.setTenantIdMSB(tenantId.getMostSignificantBits());
        builder.setTenantIdLSB(tenantId.getLeastSignificantBits());
                //ruleEngineSettings是一个TbQueueRuleEngineSettings的一个实例,读取queue.rule-engine下的值
            //ruleEngineSettings包含topic是tb_rule_engine,queue队列有三个分别是②:
            // 1. name: Main topic: tb_rule_engine.main partition: 10
            // 2. name: HighPriority topic: tb_rule_engine.hp partition: 10
              // 3. name: SequentialByOriginator topic: tb_rule_engine.sq partition: 10
        if (serviceTypes.contains(ServiceType.TB_RULE_ENGINE) && ruleEngineSettings != null) {
            for (TbRuleEngineQueueConfiguration queue : ruleEngineSettings.getQueues()) {
                TransportProtos.QueueInfo queueInfo = TransportProtos.QueueInfo.newBuilder()
                        .setName(queue.getName())
                        .setTopic(queue.getTopic())
                        .setPartitions(queue.getPartitions()).build();
                builder.addRuleEngineQueues(queueInfo);
            }
        }
        serviceInfo = builder.build();
    }

PartitionService

PartitionService的默认实现是HashPartitionService:

 @PostConstruct
    public void init() {
        //根据queue.partitions.hash_function_name的配置选择以后做partition的hash方法,默认值是murmur3_128
        this.hashFunction = forName(hashFunctionName);
        //ConcurrentMap<ServiceQueue, Integer> partitionSizes
        //ServiceQueue 类成员ServiceType和字符串类型的queue name,构造函数如果不提供queue name或者queue name是null的话,ServiceQueue对象的的queue name是"Main"
      //corePartitions 是queue.core.partitions默认值10
        partitionSizes.put(new ServiceQueue(ServiceType.TB_CORE), corePartitions);
      //coreTopic对应的配置文件键是queue.core.topic,默认值tb_core
        partitionTopics.put(new ServiceQueue(ServiceType.TB_CORE), coreTopic);
      //根据DefaultTbServiceInfoProvider②的分析可以得出partitionTopics,partitionSizes的具体内容
        tbQueueRuleEngineSettings.getQueues().forEach(queueConfiguration -> {
            partitionTopics.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()), queueConfiguration.getTopic());
            partitionSizes.put(new ServiceQueue(ServiceType.TB_RULE_ENGINE, queueConfiguration.getName()),          queueConfiguration.getPartitions());
        });
    }

DiscoveryService

因为没有使用Zookeeper做注册中心,DiscoveryService的实现由DummyDiscoveryService实现,在收到Spring发送的ApplicationReadyEvent事件后,调用partitionService.recalculatePartitions方法:

public void recalculatePartitions(ServiceInfo currentService, List<ServiceInfo> otherServices) {
    //日志记录
    logServiceInfo(currentService);
    //dummy Discovery将不包含otherService,Zookeeper注册中心的实现将会有otherService
    otherServices.forEach(this::logServiceInfo);
    Map<ServiceQueueKey, List<ServiceInfo>> queueServicesMap = new HashMap<>();
    //展开ServiceInfo的serviceTypes和RuleEngineQueue,并添加到queueServicesMap
    addNode(queueServicesMap, currentService);
    for (ServiceInfo other : otherServices) {
        addNode(queueServicesMap, other);
    }
    queueServicesMap.values().forEach(list -> list.sort((a, b) -> a.getServiceId().compareTo(b.getServiceId())));

    ConcurrentMap<ServiceQueueKey, List<Integer>> oldPartitions = myPartitions;
    TenantId myIsolatedOrSystemTenantId = getSystemOrIsolatedTenantId(currentService);
    myPartitions = new ConcurrentHashMap<>();
   //创建了ServiceQueueKey和partitionIndex的list组合
    partitionSizes.forEach((serviceQueue, size) -> {
        ServiceQueueKey myServiceQueueKey = new ServiceQueueKey(serviceQueue, myIsolatedOrSystemTenantId);
        for (int i = 0; i < size; i++) {
            ServiceInfo serviceInfo = resolveByPartitionIdx(queueServicesMap.get(myServiceQueueKey), i);
            if (currentService.equals(serviceInfo)) {
                ServiceQueueKey serviceQueueKey = new ServiceQueueKey(serviceQueue, getSystemOrIsolatedTenantId(serviceInfo));
                myPartitions.computeIfAbsent(serviceQueueKey, key -> new ArrayList<>()).add(i);
            }
        }
    });

    oldPartitions.forEach((serviceQueueKey, partitions) -> {
        if (!myPartitions.containsKey(serviceQueueKey)) {
            log.info("[{}] NO MORE PARTITIONS FOR CURRENT KEY", serviceQueueKey);
            applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceQueueKey, Collections.emptySet()));
        }
    });
   //发送PartitionChangeEvent,创建的TopicPartitionInfo的topic是partitionTopics的topic,fullTopicName是topic+Index, DummyDiscovery每次tpiList包含了所有的partitionIndex, 此例0-9.例:tpiList其中的fullTopicName是tb_core.9,tb_rule_engine.main.3
    myPartitions.forEach((serviceQueueKey, partitions) -> {
        if (!partitions.equals(oldPartitions.get(serviceQueueKey))) {
            log.info("[{}] NEW PARTITIONS: {}", serviceQueueKey, partitions);
            Set<TopicPartitionInfo> tpiList = partitions.stream()
                    .map(partition -> buildTopicPartitionInfo(serviceQueueKey, partition))
                    .collect(Collectors.toSet());
            applicationEventPublisher.publishEvent(new PartitionChangeEvent(this, serviceQueueKey, tpiList));
        }
    });
    tpiCache.clear();

    if (currentOtherServices == null) {
        currentOtherServices = new ArrayList<>(otherServices);
    } else {
        Set<ServiceQueueKey> changes = new HashSet<>();
        Map<ServiceQueueKey, List<ServiceInfo>> currentMap = getServiceKeyListMap(currentOtherServices);
        Map<ServiceQueueKey, List<ServiceInfo>> newMap = getServiceKeyListMap(otherServices);
        currentOtherServices = otherServices;
        currentMap.forEach((key, list) -> {
            if (!list.equals(newMap.get(key))) {
                changes.add(key);
            }
        });
        currentMap.keySet().forEach(newMap::remove);
        changes.addAll(newMap.keySet());
        if (!changes.isEmpty()) {
            applicationEventPublisher.publishEvent(new ClusterTopologyChangeEvent(this, changes));
        }
    }
}

DefaultTbCoreConsumerService

image.png
  • DefaultTbCoreConsumerService的父类是AbstractConsumerService, 而父类接收到ApplicationReadyEvent时,会调用子类的launchMainConsumers()应用设计模式模板方法模式,启动了消费者线;其作用是:消费者线程按照固定的延时,无限循环去消息队列提取消息,由于此时并未subscribed,也即未订阅,将暂时不从消息队列里取消息;
  • 前面讨论到DiscoveryService发布了PartitionChangeEvent,DefaultTbCoreConsumerService实现了ApplicationListener<PartitionChangeEvent>接口,在接收到PartitionChangeEvent时,会做出相应的反应,调用onApplicationEvent,订阅了的fullTopicName是tb_core.0-9 每次从消息队列进行poll的时候,都会检查这些fullTopicName里面是否有消息准备就绪;

DefaultTbRuleEngineConsumerService

image.png
  • @PostConstruct注解初始化了ruleEngine的消费者,按照TbServiceInfoProviderbean的初始化②,可以知道,初始化了三个consumer, 放到了consumers的map变量里,key是queue的name;

  • DefaultTbRuleEngineConsumerService的父类也是AbstractConsumerService, 跟DefaultTbCoreConsumerService一样,父类接收到ApplicationReadyEvent时,调用该类的launchMainConsumers()方法,启动了三个消费者线程,由于还未订阅,所以暂时不从消息队列里面获取消息;

  • 同DefaultTbCoreConsumerService一样,收到PartitionChangeEvent时,启动订阅主题tb_rule_engine.main.0-9,tb_rule_engine.sq.0-9,tb_rule_engine.hp.0-9:

public void onApplicationEvent(PartitionChangeEvent partitionChangeEvent) {
    //此时serviceType是ServiceType.TB_RULE_ENGINE,过滤了其他的type
    if (partitionChangeEvent.getServiceType().equals(getServiceType())) {
        ServiceQueue serviceQueue = partitionChangeEvent.getServiceQueueKey().getServiceQueue();
        log.info("[{}] Subscribing to partitions: {}", serviceQueue.getQueue(), partitionChangeEvent.getPartitions());
        //根绝queueName做区分,让三个consumer分别订阅了三个主题的列表tb_rule_engine.main.0-9,tb_rule_engine.sq.0-9,tb_rule_engine.hp.0-9
        consumers.get(serviceQueue.getQueue()).subscribe(partitionChangeEvent.getPartitions());
    }
}

通过上面类的初始化一系列过程,我们有了大概的印象:

  • 创建了ServiceInfo对象,并根据配置文件我们得知了tb_core,和tb_rule_engine的一些配置信息;

  • DiscoveryService根据注册中心,和其他的服务提供者,会计算相应的partition index,并发布PartitionChangeEvent;

  • AbstractConsumerService的两个实现类在接收到PartitionChangeEvent之前,都启动了一个或多个线程,在接收到此消息的时候, 都会使自己的consumer订阅相应的主题等待消息的到来。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容