HBase 1.2.0源码分析:Compact

Compact是指HBase表中HRegion上某个Column Family下,部分或全部HFiles的合并。由于数据持续写入的过程中,MemStore达到一定阈值,被flush到磁盘上,形成许多的小文件,这些文件如果不做处理,将会严重影响HBase数据读取的效率。所以,在HBase系统内部,需要定期在满足一定条件的情况下,或者由人为手动触发,将这许多文件合并成一个大文件,称为Compact。

  1. Compact通过RPC调用触发,RSRpcServices(RegionServer RPC Service),org.apache.hadoop.hbase.regionserver.RSRpcServices
@QosPriority(priority=HConstants.ADMIN_QOS)     // 表示服务的相对优先级,Provides a basic notion of quality of service (QOS).
public CompactRegionResponse compactRegion(final RpcController controller,
    final CompactRegionRequest request) throws ServiceException {
    
  try {
    checkOpen();        // 检查RegionServer的状态:isOnline,isAborted、isStopped、fsOk
    Region region = getRegion(request.getRegion());     // 获取要操作的Region
    ...
    boolean major = false;      // 是否执行major compact
    byte [] family = null;      // Request是否有column family信息
    Store store = null;         // Column family对应的Store
    if (request.hasFamily()) {  // 获取存储列族的Store
      family = request.getFamily().toByteArray();
      store = region.getStore(family);
      if (store == null) {
        throw new ServiceException(...);
      }
    }

    if (request.hasMajor()) { 
      major = request.getMajor();
    }

    if (major) {        // 如果有列族信息对列族的Store执行,否则对整个Region执行Major Compaction
    // 这里没有真正执行Compaction,只是设置 this.forceMajor = true;
      if (family != null) {
        store.triggerMajorCompaction(); 
      } else {
        region.triggerMajorCompaction();
      }
    }

    // 差别就是是否有Store
    if(family != null) {
      regionServer.compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER, null, RpcServer.getRequestUser());
    } else {
      regionServer.compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER, null, RpcServer.getRequestUser());
    }
    
    return CompactRegionResponse.newBuilder().build(); 
  } catch (IOException ie) {
    throw new ServiceException(ie);
  }
}

获取Region

protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
  throws NotServingRegionException {
  Region region = this.onlineRegions.get(encodedRegionName);    // 从online的region列表中获取
  if (region == null) {
    throw new Excetion(....);
    // 不是online的region执行compact操作失败
    // 根据region的状态提示异常,例如:正在move的region
  }
  return region;
}

Map包含所有的因为move操作关闭的region维护在Map中

protected Map<String, MovedRegionInfo> movedRegions;
  1. 进入Compact逻辑
    org.apache.hadoop.hbase.regionserver.CompactSplitThread

如果没有传入Column family,遍历所有的store,执行requestCompactionInternal

private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
    int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
        throws IOException {
    
    for (Store s : r.getStores()) {
      CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
        ...
    }

}
  • HStore请求requestCompaction
    org.apache.hadoop.hbase.regionserver.HStore
@Override
public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
    User user) throws IOException {
  if (!this.areWritesEnabled()) {   // 如果禁用写入,不进行压缩。
    return null;
  }
  
  removeUnneededFiles();    // 在进行压缩之前,试着去掉不需要的文件来简化事情。
  // 根据hbase.store.delete.expired.storefile判断是否删除过期的文件

  final CompactionContext compaction = storeEngine.createCompaction();
  CompactionRequest request = null;
  this.lock.readLock().lock();  // 只读锁
  try {
    synchronized (filesCompacting) {
      final Store thisStore = this;
      if (this.getCoprocessorHost() != null) {
        // Coprocessor是0.92之后引入的协处理器,实现一些特性:建立二次索引、复杂过滤器以及访问控制等,先不看这部分逻辑
      }

      // 通用情况
      if (!compaction.hasSelection()) {         // this.request != null;
        boolean isUserCompaction = priority == Store.PRIORITY_USER;     // true
        boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
            offPeakCompactionTracker.compareAndSet(false, true);        // 判断是否是使用高峰
        try {
          compaction.select(this.filesCompacting, isUserCompaction,
            mayUseOffPeak, forceMajor && filesCompacting.isEmpty());    // 调用以选择用于压缩的文件
        } catch (IOException e) {
            ...
          throw e;
        }
        ...
      }

      if (baseRequest != null) {
        // 如果baseRequest不是null,比较baseReques和compaction的Request,判断哪些文件需要压缩
        ...
      }

      //得到结果文件列表
      request = compaction.getRequest();
      final Collection<StoreFile> selectedFiles = request.getFiles();
      if (selectedFiles.isEmpty()) {
        return null;
      }

      addToCompactingFiles(selectedFiles);      // 添加到filesCompacting

      // 根据request判断是否是major compacti
      this.forceMajor = this.forceMajor && !request.isMajor();

      // 设置公共请求属性,设置优先级
      request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());   
      request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
    }
  } finally {
    this.lock.readLock().unlock();
  }
  this.region.reportCompactionRequestStart(request.isMajor());      // 计数
  return compaction;
}

storeEngine根据hbase.hstore.engine.class配置获取,默认是DefaultStoreEngine
所以compaction默认是org.apache.hadoop.hbase.regionserver.DefaultCompactionContext实现

this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
CompactionContext compaction = storeEngine.createCompaction();

compactionPolicy根据hbase.hstore.defaultengine.compactionpolicy.class获取,默认是ExploringCompactionPolicy
selectCompaction由父类org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy实现,不深入介绍

request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
        filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
  • requestCompactionInternal
// request = null, selectNow = true
private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
    final String why, int priority, CompactionRequest request, boolean selectNow, User user)
        throws IOException {
        
  if (this.server.isStopped()
      || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
    // 如果Region所属的table设置了COMPACTION_ENABLED=false,不会执行任何Compaction
    return null;
  }

  CompactionContext compaction = null;
  if (selectNow) {
    compaction = selectCompaction(r, s, priority, request, user);   // 对Store获取CompactionContext,包含需要压缩的文件
    // CompactionContext是合并的上下文类。该类含有运行一个合并所必需的全部“物理”细节
    if (compaction == null) return null; 
  }

  //这里假设大多数压缩是小的。因此,将系统压缩放入小池中,在必要时移动到大型池中。
  // throttleCompaction判断compactionSize > comConf.getThrottlePoint(); 
  // hbase.regionserver.thread.compaction.throttle参数设置
  ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
    ? longCompactions : shortCompactions;
  pool.execute(new CompactionRunner(s, r, compaction, pool, user));    // 多线程执行Compact,执行逻辑在CompactionRunner
  return selectNow ? compaction.getRequest() : null;
}
  1. 线程执行Compact
private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
  @Override
  public void run() {
    if (server.isStopped()
        || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
        return;         //判断RegionServer状态和Tabel是否开启Compaction
      }
      doCompaction(user);
    } 

    private void doCompaction(User user) {
      // 通用逻辑,系统compaction,不包含file selection
      if (this.compaction == null) {
        // 这里判断Store的优先级是否改变,以避免阻塞潜在的更高优先级。
      }

      this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
      // 这里重复执行,之前selectNow=true时已经执行过一次,可能为了防止数据改变
    }
        
    // 接下来可以进行压缩
    this.compaction.getRequest().beforeExecute();     //现在没有做任何操作
    try {
      boolean completed = region.compact(compaction, store,   compactionThroughputController, user);
      if (completed) {
        if (store.getCompactPriority() <= 0) {
          // 退化情况:重新执行requestCompactionInternal
          requestSystemCompaction(region, store, "Recursive enqueue");
        } else {
          // 查看压缩后是否导致超出最大区域大小,需要进行Split。参考Split操作
          requestSplit(region);
        }
      }
    } catch (Exception ex) {
    ...
    } finally {
      LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
    }
    this.compaction.getRequest().afterExecute();   //现在没有做任何操作
  }
}
  • 真正的compact逻辑
    org.apache.hadoop.hbase.regionserver.HRegion
public boolean compact(CompactionContext compaction, Store store,
    CompactionThroughputController throughputController, User user) throws IOException {
  if (this.closing.get() || this.closed.get()) {    //判断Region是否close
    store.cancelRequestedCompaction(compaction);
    return false;
  }

  MonitoredTask status = null;
  boolean requestNeedsCancellation = true;
  lock.readLock().lock();       
  try {
    byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
    if (stores.get(cf) != store) {  // 如果因为各种情况导致:根据cf获取的store和之前获取的store已经不一样了,退出compact
      return false;
    }

    if (this.closed.get()) {        //再判断Region是否close
      return false;
    }

      try {
        store.compact(compaction, throughputController, user);      // Store执行compact
      } catch (InterruptedIOException iioe) {
        ...
      }
   
    return true;
  } finally {
    try {
      if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
      if (status != null) status.cleanup();
    } finally {
      lock.readLock().unlock();
    }
  }
}
  • 文件Compact
    org.apache.hadoop.hbase.regionserver.HStore
@Override
public List<StoreFile> compact(CompactionContext compaction,
  CompactionThroughputController throughputController, User user) throws IOException {
  List<StoreFile> sfs = null;
  CompactionRequest cr = compaction.getRequest();
  try {
    // 如果有一个有效的压缩请求,在这里做所有明智性检查(sanity check),因为我们需要在下面的最后一个块中清除它之后的清理。
    Collection<StoreFile> filesToCompact = cr.getFiles();
    synchronized (filesCompacting) {
      // sanity check:正在压缩这个Store的文件
      Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
    }

    // 开始压缩
    List<Path> newFiles = compaction.compact(throughputController, user);

    long outputBytes = 0L;
    if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
        // 压缩被停止的处理
        ...
    }

    // 完成压缩所需的步骤。
    sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
    writeCompactionWalRecord(filesToCompact, sfs);
    replaceStoreFiles(filesToCompact, sfs);
    ...
    // 这时候Store将使用所有新的文件。
    completeCompaction(filesToCompact, true);       // 存档旧文件和更新存储大小。

    if (region.getRegionServerServices() != null
        && region.getRegionServerServices().getMetrics() != null) {
      region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
        now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
        outputBytes);
    }

    return sfs;
  } finally {
    finishCompactionRequest(cr);
  }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,185评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,445评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,684评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,564评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,681评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,874评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,025评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,761评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,217评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,545评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,694评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,351评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,988评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,778评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,007评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,427评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,580评论 2 349

推荐阅读更多精彩内容