Hbase 故障恢复

Distributed log processing is enabled by default since HBase 0.92. The setting is controlled by the hbase.master.distributed.log.splitting property, which can be set to true or false, but defaults to true.

https://hbase.apache.org/1.4/book.html#wal

MasterFileSystem.splitLog

 public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException {
    long splitTime = 0, splitLogSize = 0;
    List<Path> logDirs = getLogDirs(serverNames);

    splitLogManager.handleDeadWorkers(serverNames);
    splitTime = EnvironmentEdgeManager.currentTime();
    splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
    splitTime = EnvironmentEdgeManager.currentTime() - splitTime;

    if (this.metricsMasterFilesystem != null) {
      if (filter == META_FILTER) {
        this.metricsMasterFilesystem.addMetaWALSplit(splitTime, splitLogSize);
      } else {
        this.metricsMasterFilesystem.addSplit(splitTime, splitLogSize);
      }
    }
  }

hregion 的LOGDIR

  /** Used to construct the name of the log directory for a region server */
  public static final String HREGION_LOGDIR_NAME = "WALs";

  /** Used to construct the name of the splitlog directory for a region server */
  public static final String SPLIT_LOGDIR_NAME = "splitWAL";

  /** Like the previous, but for old logs that are about to be deleted */
  public static final String HREGION_OLDLOGDIR_NAME = "oldWALs";

  public static final String SPLITTING_EXT = "-splitting";

hdfs上的region Hlog

drwxr-xr-x   - mfw_hadoop supergroup          0 2018-04-08 20:12 /hbase/WALs/node007159,60020,1523189512357
drwxr-xr-x   - mfw_hadoop supergroup          0 2019-07-24 08:11 /hbase/WALs/node007159,60020,1562342975966-splitting
drwxr-xr-x   - mfw_hadoop supergroup          0 2019-11-20 20:35 /hbase/WALs/node007159,60020,1568352393035-splitting
drwxr-xr-x   - mfw_hadoop supergroup          0 2020-05-12 06:44 /hbase/WALs/node007159,60020,1578310694153-splitting
drwxr-xr-x   - mfw_hadoop supergroup          0 2021-01-08 14:52 /hbase/WALs/node007159,60020,1609309991739
  public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
      PathFilter filter) throws IOException {
    MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
      logDirs + " for serverName=" + serverNames);
    FileStatus[] logfiles = getFileList(logDirs, filter);    
    long totalSize = 0;
   //用于监听所有任务的状态
    TaskBatch batch = new TaskBatch();
    Boolean isMetaRecovery = (filter == null) ? null : false;
    for (FileStatus lf : logfiles) {
      totalSize += lf.getLen();
      String pathToLog = FSUtils.removeWALRootPath(lf.getPath(), conf);
      //发布task,一个日志log文件 是一个task
      if (!enqueueSplitTask(pathToLog, batch)) {
        throw new IOException("duplicate log split scheduled for " + lf.getPath());
      }
    }
    waitForSplittingCompletion(batch, status);
    // remove recovering regions
    if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
      // we split meta regions and user regions separately therefore logfiles are either all for
      // meta or user regions but won't for both( we could have mixed situations in tests)
      isMetaRecovery = true;
    }
    // the function is only used in WALEdit direct replay mode(此处不会调用,cdh1.2当前版本是写死的false)
    removeRecoveringRegions(serverNames, isMetaRecovery);
    // 任务都完成
    if (batch.done != batch.installed) {
      batch.isDead = true;
      throw new IOException(msg);
    }
    //清理region日志
    for (Path logDir : logDirs) {
      status.setStatus("Cleaning up log directory...");
      final FileSystem fs = logDir.getFileSystem(conf);
      try {
        if (fs.exists(logDir) && !fs.delete(logDir, false)) {
          LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
        }
      }
      
    }
 
    return totalSize;
  }
  1. 将待切分日志文件夹重命名,为什么需要将文件夹重命名呢?这是因为在某些场景下RegionServer并没有真正宕机,但是HMaster会认为其已经宕机并进行故障恢复,比如最常见的RegionServer发生长时间Full GC,这种场景下用户并不知道RegionServer宕机,所有的写入更新操作还会继续发送到该RegionServer,而且由于该RegionServer自身还继续工作所以会接收用户的请求,此时如果不重命名日志文件夹,就会发生HMaster已经在使用HLog进行故障恢复了,但是RegionServer还在不断写入HLog
MasterFileSystem.getLogDirs  
   for (ServerName serverName : serverNames) {
        Path logDir = new Path(this.walRootDir,
            DefaultWALProvider.getWALDirectoryName(serverName.toString()));
        Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
        if (walFs.exists(logDir)) {
          if (!this.walFs.rename(logDir, splitDir)) {
            throw new IOException("Failed fs.rename for log split: " + logDir);
          }
          logDir = splitDir;
        } 
        logDirs.add(splitDir);

如果任务没有重复发布过,将 task entry 发布到 SplitLogManagerCoordinatio(ZKSplitLogManagerCoordination)

  boolean enqueueSplitTask(String taskname, TaskBatch batch) {
    lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
    String task =
        ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
            .getSplitLogManagerCoordination().prepareTask(taskname);
    Task oldtask = createTaskIfAbsent(task, batch);
    if (oldtask == null) {
      // publish the task in the coordination engine
      ((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
          .getSplitLogManagerCoordination().submitTask(task);
      return true;
    }
    return false;
  }
image.png
  1. Master会将待切分日志路径发布到Zookeeper节点上(/hbase/splitWAL),每个日志作为一个任务,每个任务都会有对应状态,起始状态为TASK_UNASSIGNED
    SplitLogManager.submitTask
ZKSplitLogManagerCoordination.createNode
 public void submitTask(String path) {
   createNode(path, zkretries);
 }
   //切分日志路径发布到Zookeeper节点上
private void createNode(String path, Long retry_count) {
   SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), getRecoveryMode());
   ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
     retry_count);
   SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
   return;
 }
  1. 所有RegionServer启动之后都注册在这个节点上等待新任务,一旦Master发布任务之后,RegionServer就会抢占该任务
  # HRegionServer.run
  while (keepLooping()) {
        //告诉Hmaster  我可以干活了
        RegionServerStartupResponse w = reportForDuty();
        if (w == null) {
          LOG.warn("reportForDuty failed; sleeping and then retrying.");
          this.sleeper.sleep();
        } else {
         /
          handleReportForDutyResponse(w);
          break;
        }
      }

/如果像master 报告成功,做初始化. 方法内调用startServiceThreads();
其中包括 SplitLogWorker

  # HRegionServer.startServiceThreads 
   this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
    splitLogWorker.start();

在/hbase/splitlog zknode上等待任务可用。一次只处理一个任务。这
策略对在集群中可能发生同时日志拆分数量设置了上限在

ZkSplitLogWorkerCoordination.taskLoop

尝试获取任务zk节点上的“锁”,以拥有并执行任务。

ZkSplitLogWorkerCoordination.grabTask
  1. 抢占任务实际上首先去查看任务状态,如果是TASK_UNASSIGNED状态,说明当前没有人占有,此时就去修改该节点状态为TASK_OWNED。如果修改失败,说明其他RegionServer也在抢占,修改成功表明任务抢占成功。
    attemptToOwnTask.attemptToOwnTask
  2. RegionServer抢占任务成功之后会分发给相应线程处理,如果处理成功,会将该任务对应zk节点状态修改为TASK_DONE,一旦失败会修改为TASK_ERR
public void process() throws IOException {
  try {
    //splitTaskExecutor的内部类调用exec执行
    Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter);
    switch (status) {
    case DONE:
      coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode),
        SplitLogCounters.tot_wkr_task_done, splitTaskDetails);
      break;
    case PREEMPTED:
      SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
      break;
    case ERR:
      if (server != null && !server.isStopped()) {
        coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode),
          SplitLogCounters.tot_wkr_task_err, splitTaskDetails);
        break;
      }
    case RESIGNED:
      if (server != null && server.isStopped()) {
      coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode),
        SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails);
      break;
    }
  } 
}
  1. Master会一直监听在该ZK节点上,一旦发生状态修改就会得到通知。任务状态变更为TASK_ERR的话,Master会重新发布该任务,而变更为TASK_DONE的话,Master会将对应的节点删除
    下图是RegionServer抢占任务以及抢占任务之后的工作流程:


    image.png

1.1 假设Master当前发布了4个任务,即当前需要回放4个日志文件,分别为hlog1、hlog2、hlog3和hlog4

1.2 RegionServer1抢占到了hlog1和hlog2日志,RegionServer2抢占到了hlog3日志,RegionServer3抢占到了hlog4日志

1.3 以RegionServer1为例,其抢占到hlog1和hlog2日志之后会分别分发给两个HLogSplitter线程进行处理,HLogSplitter负责对日志文件执行具体的切分,切分思路还是首先读出日志中每一个<HLogKey, WALEdit>数据对,根据HLogKey所属Region写入不同的Region Buffer

WALSplitter.splitLogFile

1.4 每个Region Buffer都会有一个对应的写线程,将buffer中的日志数据写入hdfs中,写入路径为/hbase/table/region2/seqenceid.temp,其中seqenceid是一个日志中某个region对应的最大sequenceid

LogRecoveredEditsOutputSink的工作是直接按照region,把相对应的log写到hdfs的 hbase.rootdir/data/namespace(比如test)/table_name/region_encoded_name/recovered.edits下。后续region被其他region server open时,会来这看是不是有需要回放的hlog.

hdfs dfs -ls /hbase/data/default/MQL_SAVE_RESULT/e5206427b8b58601ade809081f2f625b
Found 4 items
-rw-r--r--   3 mfw_hadoop supergroup        122 2020-02-13 06:38 /hbase/data/default/MQL_SAVE_RESULT/e5206427b8b58601ade809081f2f625b/.regioninfo
drwxr-xr-x   - mfw_hadoop supergroup          0 2021-01-08 18:00 /hbase/data/default/MQL_SAVE_RESULT/e5206427b8b58601ade809081f2f625b/.tmp
drwxr-xr-x   - mfw_hadoop supergroup          0 2021-01-08 18:00 /hbase/data/default/MQL_SAVE_RESULT/e5206427b8b58601ade809081f2f625b/info
drwxr-xr-x   - mfw_hadoop supergroup          0 2020-12-30 14:56 /hbase/data/default/MQL_SAVE_RESULT/e5206427b8b58601ade809081f2f625b/recovered.edits

hdfs dfs -ls /hbase/data/default/MQL_SAVE_RESULT/e5206427b8b58601ade809081f2f625b/recovered.edits
Found 1 items
-rw-r--r--   3 mfw_hadoop supergroup          0 2020-12-30 14:56 /hbase/data/default/MQL_SAVE_RESULT/e5206427b8b58601ade809081f2f625b/recovered.edits/492005.seqid

1.5 后续region被其他region server open时,针对某一region回放日志只需要将该region对应的所有文件按照sequenceid由小到大依次进行回放即可

这种Distributed Log Splitting方式可以很大程度上加快整个故障恢复的进程,正常故障恢复时间可以降低到分钟级别。然而,这种方式会产生很多日志小文件,产生的文件数将会是M * N,其中M是待切分的总hlog数量,N是一个宕机RegionServer上的Region个数。假如一个RegionServer上有200个Region,并且有90个hlog日志,一旦该RegionServer宕机,那这种方式的恢复过程将会创建 90 * 200 = 18000个小文件。这还只是一个RegionServer宕机的情况,如果是整个集群宕机小文件将会更多!!!

#WALSplitter.writeRegionSequenceIdFile
// write a new seqId file
 Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX);

znode : splitWAL
znode :recovering-regions

从日志恢复

在region打开的时候,我们从HRegionServer的openRegion方法一路跟踪,中间历经OpenMetaHandler,再到HRegion.openHRegion方法,终于在initializeRegionStores方法里面找到了那么一句话。

RSRpcServices.openRegion
if (region.isMetaRegion()) {
            regionServer.service.submit(new OpenMetaHandler(
              regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
          } else {
            regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
              regionOpenInfo.getFavoredNodesList());
            regionServer.service.submit(new OpenRegionHandler(
              regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
          }

打开region

HRegion.openHRegion
protected HRegion openHRegion(final CancelableProgressable reporter)
  throws IOException {
    // Refuse to open the region if we are missing local compression support
    checkCompressionCodecs();
    // Refuse to open the region if encryption configuration is incorrect or
    // codec support is missing
    checkEncryption();
    // Refuse to open the region if a required class cannot be loaded
    checkClassLoading();
    this.openSeqNum = initialize(reporter);
    this.mvcc.advanceTo(openSeqNum);
    if (wal != null && getRegionServerServices() != null && !writestate.readOnly
        && !recovering) {
      // Only write the region open event marker to WAL if (1) we are not read-only
      // (2) dist log replay is off or we are not recovering. In case region is
      // recovering, the open event will be written at setRecovering(false)
      writeRegionOpenMarker(wal, openSeqNum);
    }
    return this;
  }

HRegion.initialize
HRegion.initializeRegionInternals
ServerRegionReplicaUtil.shouldReplayRecoveredEdits

    // 如果recovered.edits有日志的话,就恢复日志
      maxSeqId = Math.max(maxSeqId,
        replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));

读取recovered.edits下面的日志,符合条件的就加到MemStore里面去,完成之后,就把这些文件删掉。大家也看到了,这里通篇讲到一个logSeqNum,哪里都有它的身影,它实际上是FSHLog当中的一个递增的AtomicLong,每当往FSLog里面写入一条日志的时候,它都会加一,然后MemStore请求flush的时候,会调用FSLog的startCacheFlush方法,获取(logSeqNum+1)回来,然后写入到StoreFile的sequenceid字段,再次拿出来的时候,就遍历这个HStore下面的StoreFile的logSeqNum,取出来最大的跟它比较,小于它的都已经写过了,没必要再写了。

#replayRecoveredEditsIfAny
HLog.Reader reader = null;
    try {
      //创建reader读取hlog
      reader = HLogFactory.createReader(fs, edits, conf);
      long currentEditSeqId = -1;
      long firstSeqIdInLog = -1;
      long skippedEdits = 0;
      long editsCount = 0;
      long intervalEdits = 0;
      HLog.Entry entry;
      Store store = null;
      boolean reported_once = false;

      try {//逐个读取
        while ((entry = reader.next()) != null) {
          HLogKey key = entry.getKey();
          WALEdit val = entry.getEdit();
          //实例化firstSeqIdInLog
          if (firstSeqIdInLog == -1) {
            firstSeqIdInLog = key.getLogSeqNum();
          }
          boolean flush = false;
          for (KeyValue kv: val.getKeyValues()) {
            // 从WALEdits里面取出kvs
            if (kv.matchingFamily(WALEdit.METAFAMILY) ||
                !Bytes.equals(key.getEncodedRegionName(),
                  this.getRegionInfo().getEncodedNameAsBytes())) {//是meta表的kv就有compaction
              CompactionDescriptor compaction = WALEdit.getCompaction(kv);
              if (compaction != null) {
                //完成compaction未完成的事情,校验输入输出文件,完成文件替换等操作
                completeCompactionMarker(compaction);
              }

              skippedEdits++;
              continue;
            }
            // 获得kv对应的store
            if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
              store = this.stores.get(kv.getFamily());
            }
            if (store == null) {
              // 应该不会发生,缺少它对应的列族
              skippedEdits++;
              continue;
            }
            // seq id小,呵呵,说明已经被处理过了这个日志
            if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily().getName())) {
              skippedEdits++;
              continue;
            }
            currentEditSeqId = key.getLogSeqNum();
            // 这个就是我们要处理的日志,添加到MemStore里面就ok了
            flush = restoreEdit(store, kv);
            editsCount++;
          }
          //MemStore太大了,需要flush掉
          if (flush) internalFlushcache(null, currentEditSeqId, status);

         }
      } catch (IOException ioe) {
        // 就是把名字改了,然后在后面加上".时间戳",这个有毛意思?
        if (ioe.getCause() instanceof ParseException) {
          Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
          msg = "File corruption encountered!  " +
              "Continuing, but renaming " + edits + " as " + p;
        } else {// 不知道是啥错误,抛错误吧,处理不了
          throw ioe;
        }
      }
      status.markComplete(msg);
      return currentEditSeqId;
    } finally {
      status.cleanup();
      if (reader != null) {
         reader.close();
      }
    }

参考自:https://zhuanlan.zhihu.com/p/27885715
https://blog.csdn.net/qq_26803795/article/details/79152808?utm_source=blogxgwz6

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,383评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,522评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,852评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,621评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,741评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,929评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,076评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,803评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,265评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,582评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,716评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,395评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,039评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,027评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,488评论 2 361
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,612评论 2 350

推荐阅读更多精彩内容

  • 目录 HBase的故障恢复有哪三种不同模式? HBase日志切分方法? Distributed Log Repla...
    尼小摩阅读 893评论 0 1
  • HBase检测宕机是通过Zookeeper实现的, 正常情况下RegionServer会周期性向Zookeeper...
    GK_斯皮利特阅读 3,135评论 1 0
  • Hbase 每一次对数据的修改都会写入到memorystore 中,写入成功后,Hbase 便会将这条记录写入到...
    Ivan_030c阅读 5,267评论 1 0
  • 1、Master负载并不很高,基本采用热备的方式来实现Master高可用 2、RegionServer宕机的恢复主...
    loukey_j阅读 515评论 0 0
  • 参考:https://www.jianshu.com/p/569106a3008f 最近在逐步跟进Hbase的相关...
    博弈史密斯阅读 849评论 1 1