hbase读写源码

write以put为例
客户端流程解析
(1)用户提交put请求后,HBase客户端会将put请求添加到本地buffer中,符合一定条件就会通过AsyncProcess异步批量提交。HBase默认设置autoflush=true,表示put请求直接会提交给服务器进行处理;用户可以设置autoflush=false,这样的话put请求会首先放到本地buffer,等到本地buffer大小超过一定阈值(默认为2M,可以通过配置文件配置)之后才会提交。很显然,后者采用group commit机制提交请求,可以极大地提升写入性能,但是因为没有保护机制,如果客户端崩溃的话会导致提交的请求丢失。

// 先将put加入到buffer中
  @Override
  public void put(final Put put) throws InterruptedIOException,
      RetriesExhaustedWithDetailsException {
    getBufferedMutator().mutate(put);
    if (autoFlush) {
      flushCommits();
    }
  }
  private void doMutate(Mutation m) throws InterruptedIOException,
      RetriesExhaustedWithDetailsException {
    if (closed) {
      throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
    }
    if (!(m instanceof Put) && !(m instanceof Delete)) {
      throw new IllegalArgumentException("Pass a Delete or a Put");
    }

    // This behavior is highly non-intuitive... it does not protect us against
    // 94-incompatible behavior, which is a timing issue because hasError, the below code
    // and setter of hasError are not synchronized. Perhaps it should be removed.
    if (ap.hasError()) {
      writeAsyncBuffer.add(m);
      backgroundFlushCommits(true);
    }

    if (m instanceof Put) {
      validatePut((Put) m);
    }

    currentWriteBufferSize += m.heapSize();
    writeAsyncBuffer.add(m);
    // size没有超过阈值异步flush,否则同步flush
    while (currentWriteBufferSize > writeBufferSize) {
      backgroundFlushCommits(false);
    }
  }

(2)在提交之前,HBase会在元数据表.meta.中根据rowkey找到它们归属的region server,这个定位的过程是通过HConnection的locateRegion方法获得的。如果是批量请求的话还会把这些rowkey按照HRegionLocation分组,每个分组可以对应一次RPC请求。

(3)HBase会为每个HRegionLocation构造一个远程RPC请求MultiServerCallable<Row>,然后通过rpcCallerFactory.<MultiResponse> newCaller()执行调用,忽略掉失败重新提交和错误处理,客户端的提交操作到此结束

服务器端流程解析
先简单介绍一下服务端HRegion中的各种lock:
(1)scannerReadPointsLock:increment操作的时候需要拿到所有scanner中最小的readPoint以保证原子性,这个lock用来控制控制 new RegionScannerImpl和 getSmallestReadPoint的并发
(2)lock:确保正常关闭
(3)updatesLock:update公共资源如mvcc/memstore/wal加锁
(4)RowLockContext:行锁put操作时需要对一行加锁


服务端流程

在HRegion中有许多replay的判断,replay是在rs failover和log split时重新将wal中数据持久化到rs时的动作,由于很多操作都是在写wal前已经完成了,所以这里不需要重复处理
主要操作在HRegion.doMiniBatchMutation这个函数中,以源码步骤为例:
(1)首先获取行锁

// 这里加的是读锁,因为mvcc保证了consistence所以这里没必要加写锁,数据一致性由mvcc保证即可,在increment/append/batch中由于需要保证原子性需要加写锁
rowLock = getRowLock(mutation.getRow(), true);

(2)更新cell时间戳,如果cell中设置了时间则不需要更新,否则更新为当前时间

  public static boolean updateLatestStamp(Cell cell, byte[] ts, int tsOffset) throws IOException {
    if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
      setTimestamp(cell, ts, tsOffset);
      return true;
    }
    return false;
  }

(3)将变更wal写入buffer中

      // 加了一把读锁
      walEdit = new WALEdit(cellCount, isInReplay);
      lock(this.updatesLock.readLock(), numReadyToWrite);
      .......
      // 写入到buffer中
      addFamilyMapToWALEdit(familyMaps[i], walEdit);

(4)将最后一个改到写入到wal buffer中,但是没有持久化

          // 生产的一个写入wal的key包含sequenceID和mvcc WriteEntry
          walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
              this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
              mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, this.getReplicationScope());
          // 重要txid是一个transactionID用于后续wal 持久化,hdfs写入模型单独开另一章节再介绍
          txid =
              this.wal.append(this.getRegionInfo(), walKey, walEdit, true);

(5)将数据写入到memstore中,由于wal没有sync写入过程没有complete,由mvcc保证数据不可见

        applyFamilyMapToMemstore(familyMaps[i], memstoreSize);

(6)释放行锁和更新锁

     if (locked) {
        this.updatesLock.readLock().unlock();
        locked = false;
      }
      releaseRowLocks(acquiredRowLocks);

(7)sync wal
(8)更新mvcc版本号

        mvcc.completeMemstoreInsert(writeEntry);

(9)后续一些coprocessor工作
put完之后,会根据条件讲一个flush request放入到taskqueue中,有一个单独线程执行request,最终调用HRegion.flush这个函数

flush:
如果内存超过一定阈值或者需要compact时,我们将内存中数据flush到磁盘上面,在flush的时候也用到了mvcc,这里是因为memstore中的数据有可能没有写到wal上面,我们插入一个事务保证flush时所有前面的事务已经完成了,保证我们flush到磁盘上面的数据都已经完全写入到wal上面

// flush 分两个阶段准备阶段和commit阶段
  protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
      final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
          throws IOException {
    PrepareFlushResult result
      = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
    if (result.result == null) {
      return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
    } else {
      return result.result; // early exit due to failure from prepare stage
    }
  }

//首先我们看看memestore的结构,memestore包含两个segement用于写入的mutableSegement和用于flush的immutableSegement,每次flush是先将mutable变为immutableSegement将这个segement用于flush,新生成一个mutableSegement用于新来的写入
public abstract class AbstractMemStore implements MemStore {
  // active segment absorbs write operations
  protected volatile MutableSegment active;
  // Snapshot of memstore.  Made for flusher.
  protected volatile ImmutableSegment snapshot;
......................
}
// 每一个cf对应一个Hstore,每一个Hstore对应一个StoreFlushContext,prepare时调用StoreFlushContext.prepare()
    @Override
    public void prepare() {
      // passing the current sequence number of the wal - to allow bookkeeping in the memstore
      this.snapshot = memstore.snapshot();
      this.cacheFlushCount = snapshot.getCellsCount();
      this.cacheFlushSize = snapshot.getDataSize();
      committedFiles = new ArrayList<Path>(1);
    }
//memstore做snapshot时具体源码如下直接将现有数据变为immutableSegement然后返回:
  @Override
  public MemStoreSnapshot snapshot() {
    // If snapshot currently has entries, then flusher failed or didn't call
    // cleanup.  Log a warning.
    if (!this.snapshot.isEmpty()) {
      LOG.warn("Snapshot called again without clearing previous. " +
          "Doing nothing. Another ongoing flush or did we fail last attempt?");
    } else {
      this.snapshotId = EnvironmentEdgeManager.currentTime();
      if (!this.active.isEmpty()) {
        ImmutableSegment immutableSegment = SegmentFactory.instance().
            createImmutableSegment(this.active);
        this.snapshot = immutableSegment;
        resetActive();
      }
    }
    return new MemStoreSnapshot(this.snapshotId, this.snapshot);
  }
// prepare阶段对updatesLock.writeLock()进行操作所以此时是禁止写入的

(1)prepare阶段:遍历当前Region中的所有Memstore,将Memstore中当前数据集kvset做一个快照snapshot,然后再新建一个新的kvset。后期的所有写入操作都会写入新的kvset中,而整个flush阶段读操作会首先分别遍历kvset和snapshot,如果查找不到再会到HFile中查找。prepare阶段需要加一把updateLock对写请求阻塞,结束之后会释放该锁。因为此阶段没有任何费时操作,因此持锁时间很短。

       // 循环调用HStore flush
      for (StoreFlushContext flush : storeFlushCtxs.values()) {
        flush.flushCache(status);
      }
// 具体实现类有两个DefaultStoreFlusher和StripeStoreFlusher,以DefaultStoreFlusher为例
  @Override
  public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
      MonitoredTask status, ThroughputController throughputController) throws IOException {
    ArrayList<Path> result = new ArrayList<Path>();
    int cellsCount = snapshot.getCellsCount();
    if (cellsCount == 0) return result; // don't flush if there are no entries

    // Use a store scanner to find which rows to flush.
    // 这里使用的是当前所有scanner中最小的readPoint,这样保证flush到磁盘上的文件一定可读的,flush思路就是从snapshot中scan出kv写到临时 hfile中
    long smallestReadPoint = store.getSmallestReadPoint();
    InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
    if (scanner == null) {
      return result; // NULL scanner returned from coprocessor hooks means skip normal processing
    }

    StoreFile.Writer writer;
    try {
      // TODO:  We can fail in the below block before we complete adding this flush to
      //        list of store files.  Add cleanup of anything put on filesystem if we fail.
      synchronized (flushLock) {
        status.setStatus("Flushing " + store + ": creating writer");
        // Write the map out to the disk
        writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
            /* isCompaction = */ false,
            /* includeMVCCReadpoint = */ true,
            /* includesTags = */ snapshot.isTagsPresent(),
            /* shouldDropBehind = */ false,
            snapshot.getTimeRangeTracker());
        IOException e = null;
        try {
          performFlush(scanner, writer, smallestReadPoint, throughputController);
        } catch (IOException ioe) {
          e = ioe;
          // throw the exception out
          throw ioe;
        } finally {
          if (e != null) {
            writer.close();
          } else {
            finalizeWriter(writer, cacheFlushId, status);
          }
        }
      }
    } finally {
      scanner.close();
    }
    LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
        + StringUtils.humanReadableInt(snapshot.getDataSize()) +
        ", hasBloomFilter=" + writer.hasGeneralBloom() +
        ", into tmp file " + writer.getPath());
    result.add(writer.getPath());
    return result;
  }

(2)flush阶段:遍历所有Memstore,将prepare阶段生成的snapshot持久化为临时文件,临时文件会统一放到目录.tmp下。这个过程因为涉及到磁盘IO操作,因此相对比较耗时。

 /*
   * Change storeFiles adding into place the Reader produced by this new flush.
   * @param sfs Store files
   * @param snapshotId
   * @throws IOException
   * @return Whether compaction is required.
   */
  private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
      throws IOException {
    this.lock.writeLock().lock();
    try {
      this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
      if (snapshotId > 0) {
        this.memstore.clearSnapshot(snapshotId);
      }
    } finally {
      // We need the lock, as long as we are updating the storeFiles
      // or changing the memstore. Let us release it before calling
      // notifyChangeReadersObservers. See HBASE-4485 for a possible
      // deadlock scenario that could have happened if continue to hold
      // the lock.
      this.lock.writeLock().unlock();
    }

    // Tell listeners of the change in readers.
    notifyChangedReadersObservers();

    if (LOG.isTraceEnabled()) {
      long totalSize = 0;
      for (StoreFile sf : sfs) {
        totalSize += sf.getReader().length();
      }
      String traceMessage = "FLUSH time,count,size,store size,store files ["
          + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
          + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
      LOG.trace(traceMessage);
    }
// move 完文件后判断一下需不需compact
    return needsCompaction();
  }

(3)commit阶段:遍历所有的Memstore,将flush阶段生成的临时文件移到指定的ColumnFamily目录下,针对HFile生成对应的storefile和Reader,把storefile添加到HStore的storefiles列表中,最后再清空prepare阶段生成的snapshot。

read以scan为例

  protected RegionScanner instantiateRegionScanner(Scan scan,
      List<KeyValueScanner> additionalScanners) throws IOException {
    // scan逆序的
    if (scan.isReversed()) {
      if (scan.getFilter() != null) {
        scan.getFilter().setReversed(true);
      }
// 支持逆序scan,以下以正序scan为基础
      return new ReversedRegionScannerImpl(scan, additionalScanners, this);
    }
    return new RegionScannerImpl(scan, additionalScanners, this);
  }

// 构造RegionScanScannerImpl中,一个region有多个store,针对每一个store我们有一个对应的KeyValueScanner
      try {
        for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
          Store store = stores.get(entry.getKey());
          KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
          instantiatedScanners.add(scanner);
          // 1.没有filter 2.??? 3.filter对该cf永远返回true
          if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
              || this.filter.isFamilyEssential(entry.getKey())) {
            scanners.add(scanner);
          } else {
            joinedScanners.add(scanner);
          }
        }

......
   // 返回下一行
    @Override
    public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)
        throws IOException {
      if (storeHeap == null) {
        // scanner is closed
        throw new UnknownScannerException("Scanner was closed");
      }
      boolean moreValues = false;
      if (outResults.isEmpty()) {
        // Usually outResults is empty. This is true when next is called
        // to handle scan or get operation.
        moreValues = nextInternal(outResults, scannerContext);
      } else {
        List<Cell> tmpList = new ArrayList<Cell>();
        moreValues = nextInternal(tmpList, scannerContext);
        outResults.addAll(tmpList);
      }

      // If the size limit was reached it means a partial Result is being
      // returned. Returning a
      // partial Result means that we should not reset the filters; filters
      // should only be reset in
      // between rows
      if (!scannerContext.midRowResultFormed())
        resetFilters();

      if (isFilterDoneInternal()) {
        moreValues = false;
      }
      return moreValues;
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。