Druid Coordinator源码分析

前言

Druid Coordinator 服务在整个Druid架构中的主要作用是协调Segments在历史节点之间的分布。其主要功能包括是segments在historical之间分布均衡,rules管理,segments清理合并等等。

源码分析

Druid 版本: druid-0.12.3

DruidCoordinator

DruidCoordinator是Coordinator服务的入口,Coordinator启动的流程都是在这个类中开始。首先看一下它的内部类:

DruidCoordinator Inner Class

DruidCoordinator 一共有三个内部类,即使不看类的定义,我们从类名也可以知道这三个内部类都实现了Runnable接口。在Coordiantor服务中应该是作为三个独立的线程异步执行。
我们先大致了解一下每个Runnable的完成的工作。

CoordinatorRunnable
查看这个类的源码,除了一些判断结构之外,其最主要的实现如下:

        // Do coordinator stuff.
        DruidCoordinatorRuntimeParams params =
                DruidCoordinatorRuntimeParams.newBuilder()
                        .withStartTime(startTime)
                        .withDatasources(metadataSegmentManager.getInventory())
                        .withDynamicConfigs(getDynamicConfigs())
                        .withEmitter(emitter)
                        .withBalancerStrategy(balancerStrategy)
                        .build();
        for (DruidCoordinatorHelper helper : helpers) {
          // Don't read state and run state in the same helper otherwise racy conditions may exist
          if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) {
            params = helper.run(params);
          }
        }

在这里,CoordinatorRunnable首先组装出了一个原始的DruidCoordinatorRuntimeParams 对象(后用params代替),遍历执行了helper中的run方法。这里是使用了装饰器模式,run方法返回的依然是一个params对象,只是对象又进行了一波加工。
查看helpers的定义,知道了这是一个List<DruidCoordinatorHelper>。该对象的初始化是通过构造器来完成的。想要明白helper做了什么,就需要找到这个类在什么地方被使用,对象是如何创建的。
查看类图,如下:

CoordinatorRunnable Class

CoordinatorRunnable作为一个抽象类,在工程中并没有方法直接创建CoordinatorRunnable对象,也就是说他的run方法是通过子类调用的。有意思的是,CoordinatorRunnable被声明成一个抽象类,但是在它的类定义中并没有声明抽象方法。我的印象中,抽象类多用于模板方法模式。但是乍一看这个类并没有使用这一种模式,唯一一个没有确定的方法就是helper的run方法。也就是说,什么样的helper决定了run方法做了什么样的事。helper是通过构造器创建的,而CoordinatorRunnable并没有直接通过new关键字创建对象,那么其构造器应该只有子类调用。下面到子类中找helper的定义。

CoordinatorHistoricalManagerRunnable
这个类继承了CoordinatorRunnable抽象类,且只定义了一个有参构造器,就是这个构造器决定了父类的run方法还要做些什么事。

super(
          ImmutableList.of(
              new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this),
              new DruidCoordinatorHelper()
              {
                @Override
                public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
                {
                      ……//省略方法实现
                }
              },
              new DruidCoordinatorRuleRunner(DruidCoordinator.this),
              new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),
              new DruidCoordinatorCleanupOvershadowed(DruidCoordinator.this),
              new DruidCoordinatorBalancer(DruidCoordinator.this),
              new DruidCoordinatorLogger(DruidCoordinator.this)
          ),
          startingLeaderCounter
      );

在这个构造器中传入了一个装载DruidCoordinatorHelper的列表。这个列表中helper对象会在DruidCoordinator的run方法实现中被调用,运行每个helper的run方法。其父类方法的实现中部分步骤延伸至子类,由子类提供实践方式,其实用的还是模板方法。
DruidCoordinatorHelper并没有继承任何的接口或者类。也就是说CoordinatorRunnable的是在单个线程中调用helper的run方法。
从这个helper中我们可以看出,这个CoordinatorHistoricalManagerRunnable主要是实现了segments负责均衡以及Rules管理,segments清理等功能。

CoordinatorIndexingServiceRunnable
作为CoordinatorRunnable的另外一个子类,它的类定义就简单很多了。

public CoordinatorIndexingServiceRunnable(List<DruidCoordinatorHelper> helpers, final int startingLeaderCounter)
    {
      super(helpers, startingLeaderCounter);
    }

这个类中helpers也是通过其构造器赋值的。但是原理是一样的,传什么样的helper就做什么样的事。

start()

该方法由Lifecycle进行管理,注解为@LifecycleStart。在Coordinator启动后首先会在同步代码快中判断是Coordinator服务是有已经启动过了(可能配置了HA),如实没有启动则会去竞争leader。服务是否已经启动 是通过同步状态started来表示的。如果当前服务未启动,则将started置为true,然后就调用becomeLeader()方法来完成后续的操作。

 public void start()
  {
    synchronized (lock) {
      if (started) {
        return;
      }
      started = true;

      coordLeaderSelector.registerListener(
          new DruidLeaderSelector.Listener()
          {
            @Override
            public void becomeLeader()
            {
              DruidCoordinator.this.becomeLeader();
            }

            @Override
            public void stopBeingLeader()
            {
              DruidCoordinator.this.stopBeingLeader();
            }
          }
      );
    }
  }

Coordiantor在宣布服务之前会先去对MetadataStorage中的一些元信息进行操作。

//扫描segment
 metadataSegmentManager.start();
//
 metadataRuleManager.start();

SQLMetadataSegmentManager之start方法
在这个方法中,主要的实现是在这个类的poll()方法中,而poll()方法是通过单个线程调度的,调度方式为scheduleWithFixedDelay。通过这种方式调度就必须要等当前的线程工作执行完毕之后,才会在指定的间隔之后进行下一次调度。也就是说单个线程的执行时间对整个的调度周期有较大的影响。
这种方式有两个明显的目的:
1.当前线程的工作不会阻塞主线程的执行,即二者的操作是异步的;
2.线程的工作可以周期性的执行。

因为必须要等当前的线程执行完之后才会进行下一次调度,所以这个地方它创建了只有一个线程的线程池。

      //单个线程的线程池
      exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d");
      final Duration delay = config.get().getPollDuration().toStandardDuration();
     //对单个线程进行调度
      exec.scheduleWithFixedDelay(
          new Runnable()
          {
            @Override
            public void run()
            {
              readLock.lock();
              try {
                if (localStartOrder == currentStartOrder) {
                  poll();
                }
              }
              catch (Exception e) {
                log.makeAlert(e, "uncaught exception in segment manager polling thread").emit();

              }
              finally {
                readLock.unlock();
              }
            }
          },
          0,
          delay.getMillis(),
          TimeUnit.MILLISECONDS
      );

SQLMetadataRuleManager之start方法
调度方式与前面的start()方法相似,只是poll()方法中扫描的数据不一样了。前者是更新segments,这里是更新rules。

完成上述两个操作之后,Coordinator开始宣布服务。
宣布服务成功之后,Coordiantor就开始真正执行对segment和历史节点的管理。首先,它创建了一个list,用来存放要干的事以及调度周期。

final List<Pair<? extends CoordinatorRunnable, Duration>> coordinatorRunnables = Lists.newArrayList();  

在list中Coordinator放入了CoordinatorRunnable的两个子类——CoordinatorHistoricalManagerRunnableCoordinatorIndexingServiceRunnable。我们先了解一下这个list如何被使用。

      for (final Pair<? extends CoordinatorRunnable, Duration> coordinatorRunnable : coordinatorRunnables) {
        ScheduledExecutors.scheduleWithFixedDelay(
            exec,
            config.getCoordinatorStartDelay(),
            coordinatorRunnable.rhs,
            new Callable<ScheduledExecutors.Signal>()
            {
              private final CoordinatorRunnable theRunnable = coordinatorRunnable.lhs;

              @Override
              public ScheduledExecutors.Signal call()
              {
                if (coordLeaderSelector.isLeader() && startingLeaderCounter == coordLeaderSelector.localTerm()) {
                  theRunnable.run();
                }
                if (coordLeaderSelector.isLeader()
                    && startingLeaderCounter == coordLeaderSelector.localTerm()) { // (We might no longer be leader)
                  return ScheduledExecutors.Signal.REPEAT;
                } else {
                  return ScheduledExecutors.Signal.STOP;
                }
              }
            }
        );
      }

这里也是使用了scheduleWithFixedDelay调度,也就是说每个CoordinatorRunnable都是要等上一次执行结束之后才会进行下一次的调度。而且这里需要注意到是调度方法中的exec变量是一个容量为1的线程池。也就是说这里的调度相当于从主线程又单独创建一个线程来执行这些CoordinatorRunnable。这里采用单个线程池的线程进行调度一是为了保证主线程不会阻塞,另外是为确保数据操作的安全性。

至此我们知道了,Coordinator在启动的时候会先尝试去竞争主节点资格,然后扫描元数据库,这里是异步的。接着会宣布服务。服务宣布后他会去将一些工作封装成CoordinatorRunnable,由另外一个线程去周期性的调度执行这些Runnable的run方法。而这里使用模板方法,在父类的run方法中会调用helper的run方法,而helper是通过子类来确定。
所以,为了探究Coordinator接下来会做什么,我们就需要确认在创建其子类对象是,helper是如何实现的。

Classes

查看CoordinatorHistoricalManagerRunnable的构造器实现:

   public CoordinatorHistoricalManagerRunnable(final int startingLeaderCounter)
    {
      super(
          ImmutableList.of(
              new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this),
            //使用匿名内部类创建了一个Helper对象
              new DruidCoordinatorHelper()
              {
                @Override
                public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
                {
                  ……//省略方法的实现
                }
              },
              new DruidCoordinatorRuleRunner(DruidCoordinator.this),
              new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),
              new DruidCoordinatorCleanupOvershadowed(DruidCoordinator.this),
              new DruidCoordinatorBalancer(DruidCoordinator.this),
              new DruidCoordinatorLogger(DruidCoordinator.this)
          ),
          startingLeaderCounter
      );
    }

查看源码易知,这里实际上是创建了7个DruidCoordinatorHelper,我们可以理解成,Coordinator接下来会串行地完成这七件事。这里为了代码的易读性和整洁度,可以把这个匿名内部类拎出来单独实现一下。
CoordinatorHistoricalManagerRunnable中完成的工作基本上是与segments管理及历史节点负载均衡相关的。具体的实现可以查看每个Runnable的run方法。

关于CoordinatorIndexingServiceRunnable,我们前面说过,它的helpers是通过调用方传入参数决定的。

 if (indexingServiceClient != null) {
        coordinatorRunnables.add(
            Pair.of(
                new CoordinatorIndexingServiceRunnable(
                    makeIndexingServiceHelpers(),
                    startingLeaderCounter
                ),
                config.getCoordinatorIndexingPeriod()
            )
        );


//makeIndexingServiceHelpersde的实现如下: 

 private List<DruidCoordinatorHelper> makeIndexingServiceHelpers()
  {
    List<DruidCoordinatorHelper> helpers = Lists.newArrayList();
    helpers.add(new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this));
    helpers.addAll(indexingServiceHelpers);

    log.info("Done making indexing service helpers [%s]", helpers);
    return ImmutableList.copyOf(helpers);
  }

实际上是传入了两个helper,一个是DruidCoordinatorSegmentInfoLoader,用来加载segments信息。另一个是indexingServiceHelpers,负责完成segment的一些合并工作。
最后调用LookupCoordinatorManager的start()方法,这里暂时先不讨论。

现在的问题是,列表中的CoordinatorRunnable是通过单个线程来进行调度的,而且helpers也是通过串行的方式实现的。那么,如果单次调度时间过长的话,则会导致很长时间后才会进行下一次的调度。
这里我们考虑一下,Druid为什么会选择使用只有一个线程资源的线程池来调度所有的CoordinatorRunnable
在单个线程中他们实际上是串行的。串行的好处是,在进程中变量对于内存的可见性得到了保证,坏处是执行效率差。
如果需要对这里进行改造,串行改并行,我们首先要确定的就是共享内存中的变量有哪些,对共享变量的操作有哪些。
首先查看父类的run方法,再查看run方法调用的各个helper。细看代码我们不难发现,这里其实就是一个装饰器模式。每个help的run方法都会对params对象进行读写,且大部分都是读操作,只有在返回的时候进行了写操作。

备注:把匿名内部类抽出来,单独实现,然后再作为参数传递,会让代码看起来整洁很多,如下:
更改前:

   public CoordinatorHistoricalManagerRunnable(final int startingLeaderCounter)
    {
      super(
          ImmutableList.of(
              new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this),
            //使用匿名内部类创建了一个Helper对象
              new DruidCoordinatorHelper()
              {
                @Override
                public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params)
                {
                  ……//省略方法的实现
                }
              },
              new DruidCoordinatorRuleRunner(DruidCoordinator.this),
              new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),
              new DruidCoordinatorCleanupOvershadowed(DruidCoordinator.this),
              new DruidCoordinatorBalancer(DruidCoordinator.this),
              new DruidCoordinatorLogger(DruidCoordinator.this)
          ),
          startingLeaderCounter
      );
    }

更改后:

public CoordinatorHistoricalManagerRunnable(final int startingLeaderCounter)
    {
      super(
          ImmutableList.of(
              new DruidCoordinatorSegmentInfoLoader(DruidCoordinator.this),
              new DruidCoordinatorPeonRunner(),
              new DruidCoordinatorRuleRunner(DruidCoordinator.this),
              new DruidCoordinatorCleanupUnneeded(DruidCoordinator.this),
              new DruidCoordinatorCleanupOvershadowed(DruidCoordinator.this),
              new DruidCoordinatorBalancer(DruidCoordinator.this),
              new DruidCoordinatorLogger(DruidCoordinator.this)
          ),
          startingLeaderCounter
      );
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容