Dispatcher类

Balancer.runOneIteration()--》Dispatcher.dispatchAndCheckContinue()
Dispatcher.dispatchAndCheckContinue()--》dispatchBlockMoves()

1. dispatchBlockMoves()

 对每个source进行block移动的处理,相应的线程会选择要移动的block,向proxy source发送请求来进行block移动的初始化操作。这个过程是流式控制的。如果有太多un-confirmed block要移动,block选择的操作会被锁住。

final long bytesLastMoved = getBytesMoved();
    //java.util.concurrent.Future可以获取任务的执行结果
    final Future<?>[] futures = new Future<?>[sources.size()];

    //sources是HashSet<Source>集合类对象
    final Iterator<Source> i = sources.iterator();
    for (int j = 0; j < futures.length; j++) {
      final Source s = i.next();
      //dispatchExecutor是一个ExecutorService对象,ExecutorService.submit()会返回一个对象
      futures[j] = dispatchExecutor.submit(new Runnable() {
        @Override
        public void run() {
          s.dispatchBlocks();
        }
      });
    }
2. dispatchBlocks()

 这个方法会迭代地进行以下步骤:首先选取要移动的block,然后向proxy source发送请求

      //Time.monotonicNow()会调用System.nanoTime(),以毫微秒为单位
      final long startTime = Time.monotonicNow();
      this.blocksToReceive = 2 * getScheduledSize();
      boolean isTimeUp = false;
      int noPendingMoveIteration = 0;
      while (!isTimeUp && getScheduledSize() > 0
          && (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
        final PendingMove p = chooseNextMove();
        if (p != null) {
          // Reset no pending move counter
          noPendingMoveIteration=0;
          executePendingMove(p);
          continue;
        }
3. chooseNextMove()
      for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
        final Task task = i.next();
        final DDatanode target = task.target.getDDatanode();
        final PendingMove pendingBlock = new PendingMove(this, task.target);
        if (target.addPendingBlock(pendingBlock)) {
          // target is not busy, so do a tentative block allocation
          if (pendingBlock.chooseBlockAndProxy()) {
            long blockSize = pendingBlock.block.getNumBytes();
            incScheduledSize(-blockSize);
            task.size -= blockSize;
            if (task.size == 0) {
              i.remove();
            }
            return pendingBlock;
          } else {
            // cancel the tentative move
            target.removePendingBlock(pendingBlock);
          }
        }
      }
      return null;
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,273评论 19 139
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 11,310评论 6 13
  • 关于Mongodb的全面总结 MongoDB的内部构造《MongoDB The Definitive Guide》...
    中v中阅读 32,099评论 2 89
  • 对不起,对你也是对我。 你今天告诉我不出来了,想去跆拳道。最后又告诉我练完陪我玩。结果就是我傻了吧唧的跑到地方等了...
    想做梦的大黑狗阅读 278评论 0 0
  • 本应该是一片漆黑的夜空宛若白昼,照亮了白晋的脸,莫名的心悸让他加快步伐往家跑去。 轰隆一声炸响,让他忍不住缩了一下...
    酒生公子阅读 238评论 0 1