hbase读写源码

write以put为例
客户端流程解析
(1)用户提交put请求后,HBase客户端会将put请求添加到本地buffer中,符合一定条件就会通过AsyncProcess异步批量提交。HBase默认设置autoflush=true,表示put请求直接会提交给服务器进行处理;用户可以设置autoflush=false,这样的话put请求会首先放到本地buffer,等到本地buffer大小超过一定阈值(默认为2M,可以通过配置文件配置)之后才会提交。很显然,后者采用group commit机制提交请求,可以极大地提升写入性能,但是因为没有保护机制,如果客户端崩溃的话会导致提交的请求丢失。

// 先将put加入到buffer中
  @Override
  public void put(final Put put) throws InterruptedIOException,
      RetriesExhaustedWithDetailsException {
    getBufferedMutator().mutate(put);
    if (autoFlush) {
      flushCommits();
    }
  }
  private void doMutate(Mutation m) throws InterruptedIOException,
      RetriesExhaustedWithDetailsException {
    if (closed) {
      throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
    }
    if (!(m instanceof Put) && !(m instanceof Delete)) {
      throw new IllegalArgumentException("Pass a Delete or a Put");
    }

    // This behavior is highly non-intuitive... it does not protect us against
    // 94-incompatible behavior, which is a timing issue because hasError, the below code
    // and setter of hasError are not synchronized. Perhaps it should be removed.
    if (ap.hasError()) {
      writeAsyncBuffer.add(m);
      backgroundFlushCommits(true);
    }

    if (m instanceof Put) {
      validatePut((Put) m);
    }

    currentWriteBufferSize += m.heapSize();
    writeAsyncBuffer.add(m);
    // size没有超过阈值异步flush,否则同步flush
    while (currentWriteBufferSize > writeBufferSize) {
      backgroundFlushCommits(false);
    }
  }

(2)在提交之前,HBase会在元数据表.meta.中根据rowkey找到它们归属的region server,这个定位的过程是通过HConnection的locateRegion方法获得的。如果是批量请求的话还会把这些rowkey按照HRegionLocation分组,每个分组可以对应一次RPC请求。

(3)HBase会为每个HRegionLocation构造一个远程RPC请求MultiServerCallable<Row>,然后通过rpcCallerFactory.<MultiResponse> newCaller()执行调用,忽略掉失败重新提交和错误处理,客户端的提交操作到此结束

服务器端流程解析
先简单介绍一下服务端HRegion中的各种lock:
(1)scannerReadPointsLock:increment操作的时候需要拿到所有scanner中最小的readPoint以保证原子性,这个lock用来控制控制 new RegionScannerImpl和 getSmallestReadPoint的并发
(2)lock:确保正常关闭
(3)updatesLock:update公共资源如mvcc/memstore/wal加锁
(4)RowLockContext:行锁put操作时需要对一行加锁


服务端流程

在HRegion中有许多replay的判断,replay是在rs failover和log split时重新将wal中数据持久化到rs时的动作,由于很多操作都是在写wal前已经完成了,所以这里不需要重复处理
主要操作在HRegion.doMiniBatchMutation这个函数中,以源码步骤为例:
(1)首先获取行锁

// 这里加的是读锁,因为mvcc保证了consistence所以这里没必要加写锁,数据一致性由mvcc保证即可,在increment/append/batch中由于需要保证原子性需要加写锁
rowLock = getRowLock(mutation.getRow(), true);

(2)更新cell时间戳,如果cell中设置了时间则不需要更新,否则更新为当前时间

  public static boolean updateLatestStamp(Cell cell, byte[] ts, int tsOffset) throws IOException {
    if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
      setTimestamp(cell, ts, tsOffset);
      return true;
    }
    return false;
  }

(3)将变更wal写入buffer中

      // 加了一把读锁
      walEdit = new WALEdit(cellCount, isInReplay);
      lock(this.updatesLock.readLock(), numReadyToWrite);
      .......
      // 写入到buffer中
      addFamilyMapToWALEdit(familyMaps[i], walEdit);

(4)将最后一个改到写入到wal buffer中,但是没有持久化

          // 生产的一个写入wal的key包含sequenceID和mvcc WriteEntry
          walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
              this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
              mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, this.getReplicationScope());
          // 重要txid是一个transactionID用于后续wal 持久化,hdfs写入模型单独开另一章节再介绍
          txid =
              this.wal.append(this.getRegionInfo(), walKey, walEdit, true);

(5)将数据写入到memstore中,由于wal没有sync写入过程没有complete,由mvcc保证数据不可见

        applyFamilyMapToMemstore(familyMaps[i], memstoreSize);

(6)释放行锁和更新锁

     if (locked) {
        this.updatesLock.readLock().unlock();
        locked = false;
      }
      releaseRowLocks(acquiredRowLocks);

(7)sync wal
(8)更新mvcc版本号

        mvcc.completeMemstoreInsert(writeEntry);

(9)后续一些coprocessor工作
put完之后,会根据条件讲一个flush request放入到taskqueue中,有一个单独线程执行request,最终调用HRegion.flush这个函数

flush:
如果内存超过一定阈值或者需要compact时,我们将内存中数据flush到磁盘上面,在flush的时候也用到了mvcc,这里是因为memstore中的数据有可能没有写到wal上面,我们插入一个事务保证flush时所有前面的事务已经完成了,保证我们flush到磁盘上面的数据都已经完全写入到wal上面

// flush 分两个阶段准备阶段和commit阶段
  protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
      final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
          throws IOException {
    PrepareFlushResult result
      = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
    if (result.result == null) {
      return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
    } else {
      return result.result; // early exit due to failure from prepare stage
    }
  }

//首先我们看看memestore的结构,memestore包含两个segement用于写入的mutableSegement和用于flush的immutableSegement,每次flush是先将mutable变为immutableSegement将这个segement用于flush,新生成一个mutableSegement用于新来的写入
public abstract class AbstractMemStore implements MemStore {
  // active segment absorbs write operations
  protected volatile MutableSegment active;
  // Snapshot of memstore.  Made for flusher.
  protected volatile ImmutableSegment snapshot;
......................
}
// 每一个cf对应一个Hstore,每一个Hstore对应一个StoreFlushContext,prepare时调用StoreFlushContext.prepare()
    @Override
    public void prepare() {
      // passing the current sequence number of the wal - to allow bookkeeping in the memstore
      this.snapshot = memstore.snapshot();
      this.cacheFlushCount = snapshot.getCellsCount();
      this.cacheFlushSize = snapshot.getDataSize();
      committedFiles = new ArrayList<Path>(1);
    }
//memstore做snapshot时具体源码如下直接将现有数据变为immutableSegement然后返回:
  @Override
  public MemStoreSnapshot snapshot() {
    // If snapshot currently has entries, then flusher failed or didn't call
    // cleanup.  Log a warning.
    if (!this.snapshot.isEmpty()) {
      LOG.warn("Snapshot called again without clearing previous. " +
          "Doing nothing. Another ongoing flush or did we fail last attempt?");
    } else {
      this.snapshotId = EnvironmentEdgeManager.currentTime();
      if (!this.active.isEmpty()) {
        ImmutableSegment immutableSegment = SegmentFactory.instance().
            createImmutableSegment(this.active);
        this.snapshot = immutableSegment;
        resetActive();
      }
    }
    return new MemStoreSnapshot(this.snapshotId, this.snapshot);
  }
// prepare阶段对updatesLock.writeLock()进行操作所以此时是禁止写入的

(1)prepare阶段:遍历当前Region中的所有Memstore,将Memstore中当前数据集kvset做一个快照snapshot,然后再新建一个新的kvset。后期的所有写入操作都会写入新的kvset中,而整个flush阶段读操作会首先分别遍历kvset和snapshot,如果查找不到再会到HFile中查找。prepare阶段需要加一把updateLock对写请求阻塞,结束之后会释放该锁。因为此阶段没有任何费时操作,因此持锁时间很短。

       // 循环调用HStore flush
      for (StoreFlushContext flush : storeFlushCtxs.values()) {
        flush.flushCache(status);
      }
// 具体实现类有两个DefaultStoreFlusher和StripeStoreFlusher,以DefaultStoreFlusher为例
  @Override
  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
      MonitoredTask status, ThroughputController throughputController) throws IOException {
    ArrayList<Path> result = new ArrayList<Path>();
    int cellsCount = snapshot.getCellsCount();
    if (cellsCount == 0) return result; // don't flush if there are no entries

    // Use a store scanner to find which rows to flush.
    // 这里使用的是当前所有scanner中最小的readPoint,这样保证flush到磁盘上的文件一定可读的,flush思路就是从snapshot中scan出kv写到临时 hfile中
    long smallestReadPoint = store.getSmallestReadPoint();
    InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
    if (scanner == null) {
      return result; // NULL scanner returned from coprocessor hooks means skip normal processing
    }

    StoreFile.Writer writer;
    try {
      // TODO:  We can fail in the below block before we complete adding this flush to
      //        list of store files.  Add cleanup of anything put on filesystem if we fail.
      synchronized (flushLock) {
        status.setStatus("Flushing " + store + ": creating writer");
        // Write the map out to the disk
        writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
            /* isCompaction = */ false,
            /* includeMVCCReadpoint = */ true,
            /* includesTags = */ snapshot.isTagsPresent(),
            /* shouldDropBehind = */ false,
            snapshot.getTimeRangeTracker());
        IOException e = null;
        try {
          performFlush(scanner, writer, smallestReadPoint, throughputController);
        } catch (IOException ioe) {
          e = ioe;
          // throw the exception out
          throw ioe;
        } finally {
          if (e != null) {
            writer.close();
          } else {
            finalizeWriter(writer, cacheFlushId, status);
          }
        }
      }
    } finally {
      scanner.close();
    }
    LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
        + StringUtils.humanReadableInt(snapshot.getDataSize()) +
        ", hasBloomFilter=" + writer.hasGeneralBloom() +
        ", into tmp file " + writer.getPath());
    result.add(writer.getPath());
    return result;
  }

(2)flush阶段:遍历所有Memstore,将prepare阶段生成的snapshot持久化为临时文件,临时文件会统一放到目录.tmp下。这个过程因为涉及到磁盘IO操作,因此相对比较耗时。

 /*
   * Change storeFiles adding into place the Reader produced by this new flush.
   * @param sfs Store files
   * @param snapshotId
   * @throws IOException
   * @return Whether compaction is required.
   */
  private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
      throws IOException {
    this.lock.writeLock().lock();
    try {
      this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
      if (snapshotId > 0) {
        this.memstore.clearSnapshot(snapshotId);
      }
    } finally {
      // We need the lock, as long as we are updating the storeFiles
      // or changing the memstore. Let us release it before calling
      // notifyChangeReadersObservers. See HBASE-4485 for a possible
      // deadlock scenario that could have happened if continue to hold
      // the lock.
      this.lock.writeLock().unlock();
    }

    // Tell listeners of the change in readers.
    notifyChangedReadersObservers();

    if (LOG.isTraceEnabled()) {
      long totalSize = 0;
      for (StoreFile sf : sfs) {
        totalSize += sf.getReader().length();
      }
      String traceMessage = "FLUSH time,count,size,store size,store files ["
          + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
          + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
      LOG.trace(traceMessage);
    }
// move 完文件后判断一下需不需compact
    return needsCompaction();
  }

(3)commit阶段:遍历所有的Memstore,将flush阶段生成的临时文件移到指定的ColumnFamily目录下,针对HFile生成对应的storefile和Reader,把storefile添加到HStore的storefiles列表中,最后再清空prepare阶段生成的snapshot。

read以scan为例

  protected RegionScanner instantiateRegionScanner(Scan scan,
      List<KeyValueScanner> additionalScanners) throws IOException {
    // scan逆序的
    if (scan.isReversed()) {
      if (scan.getFilter() != null) {
        scan.getFilter().setReversed(true);
      }
// 支持逆序scan,以下以正序scan为基础
      return new ReversedRegionScannerImpl(scan, additionalScanners, this);
    }
    return new RegionScannerImpl(scan, additionalScanners, this);
  }

// 构造RegionScanScannerImpl中,一个region有多个store,针对每一个store我们有一个对应的KeyValueScanner
      try {
        for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
          Store store = stores.get(entry.getKey());
          KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
          instantiatedScanners.add(scanner);
          // 1.没有filter 2.??? 3.filter对该cf永远返回true
          if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
              || this.filter.isFamilyEssential(entry.getKey())) {
            scanners.add(scanner);
          } else {
            joinedScanners.add(scanner);
          }
        }

......
   // 返回下一行
    @Override
    public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)
        throws IOException {
      if (storeHeap == null) {
        // scanner is closed
        throw new UnknownScannerException("Scanner was closed");
      }
      boolean moreValues = false;
      if (outResults.isEmpty()) {
        // Usually outResults is empty. This is true when next is called
        // to handle scan or get operation.
        moreValues = nextInternal(outResults, scannerContext);
      } else {
        List<Cell> tmpList = new ArrayList<Cell>();
        moreValues = nextInternal(tmpList, scannerContext);
        outResults.addAll(tmpList);
      }

      // If the size limit was reached it means a partial Result is being
      // returned. Returning a
      // partial Result means that we should not reset the filters; filters
      // should only be reset in
      // between rows
      if (!scannerContext.midRowResultFormed())
        resetFilters();

      if (isFilterDoneInternal()) {
        moreValues = false;
      }
      return moreValues;
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352