Distributed log processing is enabled by default since HBase 0.92. The setting is controlled by the hbase.master.distributed.log.splitting property, which can be set to true or false, but defaults to true.
MasterFileSystem.splitLog
public void splitLog(final Set<ServerName> serverNames, PathFilter filter) throws IOException {
long splitTime = 0, splitLogSize = 0;
List<Path> logDirs = getLogDirs(serverNames);
splitLogManager.handleDeadWorkers(serverNames);
splitTime = EnvironmentEdgeManager.currentTime();
splitLogSize = splitLogManager.splitLogDistributed(serverNames, logDirs, filter);
splitTime = EnvironmentEdgeManager.currentTime() - splitTime;
if (this.metricsMasterFilesystem != null) {
if (filter == META_FILTER) {
this.metricsMasterFilesystem.addMetaWALSplit(splitTime, splitLogSize);
} else {
this.metricsMasterFilesystem.addSplit(splitTime, splitLogSize);
}
}
}
hregion 的LOGDIR
/** Used to construct the name of the log directory for a region server */
public static final String HREGION_LOGDIR_NAME = "WALs";
/** Used to construct the name of the splitlog directory for a region server */
public static final String SPLIT_LOGDIR_NAME = "splitWAL";
/** Like the previous, but for old logs that are about to be deleted */
public static final String HREGION_OLDLOGDIR_NAME = "oldWALs";
public static final String SPLITTING_EXT = "-splitting";
hdfs上的region Hlog
drwxr-xr-x - mfw_hadoop supergroup 0 2018-04-08 20:12 /hbase/WALs/node007159,60020,1523189512357
drwxr-xr-x - mfw_hadoop supergroup 0 2019-07-24 08:11 /hbase/WALs/node007159,60020,1562342975966-splitting
drwxr-xr-x - mfw_hadoop supergroup 0 2019-11-20 20:35 /hbase/WALs/node007159,60020,1568352393035-splitting
drwxr-xr-x - mfw_hadoop supergroup 0 2020-05-12 06:44 /hbase/WALs/node007159,60020,1578310694153-splitting
drwxr-xr-x - mfw_hadoop supergroup 0 2021-01-08 14:52 /hbase/WALs/node007159,60020,1609309991739
public long splitLogDistributed(final Set<ServerName> serverNames, final List<Path> logDirs,
PathFilter filter) throws IOException {
MonitoredTask status = TaskMonitor.get().createStatus("Doing distributed log split in " +
logDirs + " for serverName=" + serverNames);
FileStatus[] logfiles = getFileList(logDirs, filter);
long totalSize = 0;
//用于监听所有任务的状态
TaskBatch batch = new TaskBatch();
Boolean isMetaRecovery = (filter == null) ? null : false;
for (FileStatus lf : logfiles) {
totalSize += lf.getLen();
String pathToLog = FSUtils.removeWALRootPath(lf.getPath(), conf);
//发布task,一个日志log文件 是一个task
if (!enqueueSplitTask(pathToLog, batch)) {
throw new IOException("duplicate log split scheduled for " + lf.getPath());
}
}
waitForSplittingCompletion(batch, status);
// remove recovering regions
if (filter == MasterFileSystem.META_FILTER /* reference comparison */) {
// we split meta regions and user regions separately therefore logfiles are either all for
// meta or user regions but won't for both( we could have mixed situations in tests)
isMetaRecovery = true;
}
// the function is only used in WALEdit direct replay mode(此处不会调用,cdh1.2当前版本是写死的false)
removeRecoveringRegions(serverNames, isMetaRecovery);
// 任务都完成
if (batch.done != batch.installed) {
batch.isDead = true;
throw new IOException(msg);
}
//清理region日志
for (Path logDir : logDirs) {
status.setStatus("Cleaning up log directory...");
final FileSystem fs = logDir.getFileSystem(conf);
try {
if (fs.exists(logDir) && !fs.delete(logDir, false)) {
LOG.warn("Unable to delete log src dir. Ignoring. " + logDir);
}
}
}
return totalSize;
}
- 将待切分日志文件夹重命名,为什么需要将文件夹重命名呢?这是因为在某些场景下RegionServer并没有真正宕机,但是HMaster会认为其已经宕机并进行故障恢复,比如最常见的RegionServer发生长时间Full GC,这种场景下用户并不知道RegionServer宕机,所有的写入更新操作还会继续发送到该RegionServer,而且由于该RegionServer自身还继续工作所以会接收用户的请求,此时如果不重命名日志文件夹,就会发生HMaster已经在使用HLog进行故障恢复了,但是RegionServer还在不断写入HLog
MasterFileSystem.getLogDirs
for (ServerName serverName : serverNames) {
Path logDir = new Path(this.walRootDir,
DefaultWALProvider.getWALDirectoryName(serverName.toString()));
Path splitDir = logDir.suffix(DefaultWALProvider.SPLITTING_EXT);
if (walFs.exists(logDir)) {
if (!this.walFs.rename(logDir, splitDir)) {
throw new IOException("Failed fs.rename for log split: " + logDir);
}
logDir = splitDir;
}
logDirs.add(splitDir);
如果任务没有重复发布过,将 task entry 发布到 SplitLogManagerCoordinatio(ZKSplitLogManagerCoordination)
boolean enqueueSplitTask(String taskname, TaskBatch batch) {
lastTaskCreateTime = EnvironmentEdgeManager.currentTime();
String task =
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
.getSplitLogManagerCoordination().prepareTask(taskname);
Task oldtask = createTaskIfAbsent(task, batch);
if (oldtask == null) {
// publish the task in the coordination engine
((BaseCoordinatedStateManager) server.getCoordinatedStateManager())
.getSplitLogManagerCoordination().submitTask(task);
return true;
}
return false;
}
- Master会将待切分日志路径发布到Zookeeper节点上(/hbase/splitWAL),每个日志作为一个任务,每个任务都会有对应状态,起始状态为TASK_UNASSIGNED
SplitLogManager.submitTask
ZKSplitLogManagerCoordination.createNode
public void submitTask(String path) {
createNode(path, zkretries);
}
//切分日志路径发布到Zookeeper节点上
private void createNode(String path, Long retry_count) {
SplitLogTask slt = new SplitLogTask.Unassigned(details.getServerName(), getRecoveryMode());
ZKUtil.asyncCreate(this.watcher, path, slt.toByteArray(), new CreateAsyncCallback(),
retry_count);
SplitLogCounters.tot_mgr_node_create_queued.incrementAndGet();
return;
}
- 所有RegionServer启动之后都注册在这个节点上等待新任务,一旦Master发布任务之后,RegionServer就会抢占该任务
# HRegionServer.run
while (keepLooping()) {
//告诉Hmaster 我可以干活了
RegionServerStartupResponse w = reportForDuty();
if (w == null) {
LOG.warn("reportForDuty failed; sleeping and then retrying.");
this.sleeper.sleep();
} else {
/
handleReportForDutyResponse(w);
break;
}
}
/如果像master 报告成功,做初始化. 方法内调用startServiceThreads();
其中包括 SplitLogWorker
# HRegionServer.startServiceThreads
this.splitLogWorker = new SplitLogWorker(this, sinkConf, this, this, walFactory);
splitLogWorker.start();
在/hbase/splitlog zknode上等待任务可用。一次只处理一个任务。这
策略对在集群中可能发生同时日志拆分数量设置了上限在
ZkSplitLogWorkerCoordination.taskLoop
尝试获取任务zk节点上的“锁”,以拥有并执行任务。
ZkSplitLogWorkerCoordination.grabTask
- 抢占任务实际上首先去查看任务状态,如果是TASK_UNASSIGNED状态,说明当前没有人占有,此时就去修改该节点状态为TASK_OWNED。如果修改失败,说明其他RegionServer也在抢占,修改成功表明任务抢占成功。
attemptToOwnTask.attemptToOwnTask - RegionServer抢占任务成功之后会分发给相应线程处理,如果处理成功,会将该任务对应zk节点状态修改为TASK_DONE,一旦失败会修改为TASK_ERR
public void process() throws IOException {
try {
//splitTaskExecutor的内部类调用exec执行
Status status = this.splitTaskExecutor.exec(splitTaskDetails.getWALFile(), mode, reporter);
switch (status) {
case DONE:
coordination.endTask(new SplitLogTask.Done(this.serverName,this.mode),
SplitLogCounters.tot_wkr_task_done, splitTaskDetails);
break;
case PREEMPTED:
SplitLogCounters.tot_wkr_preempt_task.incrementAndGet();
break;
case ERR:
if (server != null && !server.isStopped()) {
coordination.endTask(new SplitLogTask.Err(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_err, splitTaskDetails);
break;
}
case RESIGNED:
if (server != null && server.isStopped()) {
coordination.endTask(new SplitLogTask.Resigned(this.serverName, this.mode),
SplitLogCounters.tot_wkr_task_resigned, splitTaskDetails);
break;
}
}
}
-
Master会一直监听在该ZK节点上,一旦发生状态修改就会得到通知。任务状态变更为TASK_ERR的话,Master会重新发布该任务,而变更为TASK_DONE的话,Master会将对应的节点删除
下图是RegionServer抢占任务以及抢占任务之后的工作流程:
1.1 假设Master当前发布了4个任务,即当前需要回放4个日志文件,分别为hlog1、hlog2、hlog3和hlog4
1.2 RegionServer1抢占到了hlog1和hlog2日志,RegionServer2抢占到了hlog3日志,RegionServer3抢占到了hlog4日志
1.3 以RegionServer1为例,其抢占到hlog1和hlog2日志之后会分别分发给两个HLogSplitter线程进行处理,HLogSplitter负责对日志文件执行具体的切分,切分思路还是首先读出日志中每一个<HLogKey, WALEdit>数据对,根据HLogKey所属Region写入不同的Region Buffer
WALSplitter.splitLogFile
1.4 每个Region Buffer都会有一个对应的写线程,将buffer中的日志数据写入hdfs中,写入路径为/hbase/table/region2/seqenceid.temp,其中seqenceid是一个日志中某个region对应的最大sequenceid
LogRecoveredEditsOutputSink的工作是直接按照region,把相对应的log写到hdfs的 hbase.rootdir/data/namespace(比如test)/table_name/region_encoded_name/recovered.edits下。后续region被其他region server open时,会来这看是不是有需要回放的hlog.
hdfs dfs -ls /hbase/data/default/MQL_SAVE_RESULT/e5206427b8b58601ade809081f2f625b
Found 4 items
-rw-r--r-- 3 mfw_hadoop supergroup 122 2020-02-13 06:38 /hbase/data/default/MQL_SAVE_RESULT/e5206427b8b58601ade809081f2f625b/.regioninfo
drwxr-xr-x - mfw_hadoop supergroup 0 2021-01-08 18:00 /hbase/data/default/MQL_SAVE_RESULT/e5206427b8b58601ade809081f2f625b/.tmp
drwxr-xr-x - mfw_hadoop supergroup 0 2021-01-08 18:00 /hbase/data/default/MQL_SAVE_RESULT/e5206427b8b58601ade809081f2f625b/info
drwxr-xr-x - mfw_hadoop supergroup 0 2020-12-30 14:56 /hbase/data/default/MQL_SAVE_RESULT/e5206427b8b58601ade809081f2f625b/recovered.edits
hdfs dfs -ls /hbase/data/default/MQL_SAVE_RESULT/e5206427b8b58601ade809081f2f625b/recovered.edits
Found 1 items
-rw-r--r-- 3 mfw_hadoop supergroup 0 2020-12-30 14:56 /hbase/data/default/MQL_SAVE_RESULT/e5206427b8b58601ade809081f2f625b/recovered.edits/492005.seqid
1.5 后续region被其他region server open时,针对某一region回放日志只需要将该region对应的所有文件按照sequenceid由小到大依次进行回放即可
这种Distributed Log Splitting方式可以很大程度上加快整个故障恢复的进程,正常故障恢复时间可以降低到分钟级别。然而,这种方式会产生很多日志小文件,产生的文件数将会是M * N,其中M是待切分的总hlog数量,N是一个宕机RegionServer上的Region个数。假如一个RegionServer上有200个Region,并且有90个hlog日志,一旦该RegionServer宕机,那这种方式的恢复过程将会创建 90 * 200 = 18000个小文件。这还只是一个RegionServer宕机的情况,如果是整个集群宕机小文件将会更多!!!
#WALSplitter.writeRegionSequenceIdFile
// write a new seqId file
Path newSeqIdFile = new Path(editsdir, newSeqId + SEQUENCE_ID_FILE_SUFFIX);
znode : splitWAL
znode :recovering-regions
从日志恢复
在region打开的时候,我们从HRegionServer的openRegion方法一路跟踪,中间历经OpenMetaHandler,再到HRegion.openHRegion方法,终于在initializeRegionStores方法里面找到了那么一句话。
RSRpcServices.openRegion
if (region.isMetaRegion()) {
regionServer.service.submit(new OpenMetaHandler(
regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
} else {
regionServer.updateRegionFavoredNodesMapping(region.getEncodedName(),
regionOpenInfo.getFavoredNodesList());
regionServer.service.submit(new OpenRegionHandler(
regionServer, regionServer, region, htd, masterSystemTime, coordination, ord));
}
打开region
HRegion.openHRegion
protected HRegion openHRegion(final CancelableProgressable reporter)
throws IOException {
// Refuse to open the region if we are missing local compression support
checkCompressionCodecs();
// Refuse to open the region if encryption configuration is incorrect or
// codec support is missing
checkEncryption();
// Refuse to open the region if a required class cannot be loaded
checkClassLoading();
this.openSeqNum = initialize(reporter);
this.mvcc.advanceTo(openSeqNum);
if (wal != null && getRegionServerServices() != null && !writestate.readOnly
&& !recovering) {
// Only write the region open event marker to WAL if (1) we are not read-only
// (2) dist log replay is off or we are not recovering. In case region is
// recovering, the open event will be written at setRecovering(false)
writeRegionOpenMarker(wal, openSeqNum);
}
return this;
}
HRegion.initialize
HRegion.initializeRegionInternals
ServerRegionReplicaUtil.shouldReplayRecoveredEdits
// 如果recovered.edits有日志的话,就恢复日志
maxSeqId = Math.max(maxSeqId,
replayRecoveredEditsIfAny(this.fs.getRegionDir(), maxSeqIdInStores, reporter, status));
读取recovered.edits下面的日志,符合条件的就加到MemStore里面去,完成之后,就把这些文件删掉。大家也看到了,这里通篇讲到一个logSeqNum,哪里都有它的身影,它实际上是FSHLog当中的一个递增的AtomicLong,每当往FSLog里面写入一条日志的时候,它都会加一,然后MemStore请求flush的时候,会调用FSLog的startCacheFlush方法,获取(logSeqNum+1)回来,然后写入到StoreFile的sequenceid字段,再次拿出来的时候,就遍历这个HStore下面的StoreFile的logSeqNum,取出来最大的跟它比较,小于它的都已经写过了,没必要再写了。
#replayRecoveredEditsIfAny
HLog.Reader reader = null;
try {
//创建reader读取hlog
reader = HLogFactory.createReader(fs, edits, conf);
long currentEditSeqId = -1;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
long intervalEdits = 0;
HLog.Entry entry;
Store store = null;
boolean reported_once = false;
try {//逐个读取
while ((entry = reader.next()) != null) {
HLogKey key = entry.getKey();
WALEdit val = entry.getEdit();
//实例化firstSeqIdInLog
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
// 从WALEdits里面取出kvs
if (kv.matchingFamily(WALEdit.METAFAMILY) ||
!Bytes.equals(key.getEncodedRegionName(),
this.getRegionInfo().getEncodedNameAsBytes())) {//是meta表的kv就有compaction
CompactionDescriptor compaction = WALEdit.getCompaction(kv);
if (compaction != null) {
//完成compaction未完成的事情,校验输入输出文件,完成文件替换等操作
completeCompactionMarker(compaction);
}
skippedEdits++;
continue;
}
// 获得kv对应的store
if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
store = this.stores.get(kv.getFamily());
}
if (store == null) {
// 应该不会发生,缺少它对应的列族
skippedEdits++;
continue;
}
// seq id小,呵呵,说明已经被处理过了这个日志
if (key.getLogSeqNum() <= maxSeqIdInStores.get(store.getFamily().getName())) {
skippedEdits++;
continue;
}
currentEditSeqId = key.getLogSeqNum();
// 这个就是我们要处理的日志,添加到MemStore里面就ok了
flush = restoreEdit(store, kv);
editsCount++;
}
//MemStore太大了,需要flush掉
if (flush) internalFlushcache(null, currentEditSeqId, status);
}
} catch (IOException ioe) {
// 就是把名字改了,然后在后面加上".时间戳",这个有毛意思?
if (ioe.getCause() instanceof ParseException) {
Path p = HLogUtil.moveAsideBadEditsFile(fs, edits);
msg = "File corruption encountered! " +
"Continuing, but renaming " + edits + " as " + p;
} else {// 不知道是啥错误,抛错误吧,处理不了
throw ioe;
}
}
status.markComplete(msg);
return currentEditSeqId;
} finally {
status.cleanup();
if (reader != null) {
reader.close();
}
}
参考自:https://zhuanlan.zhihu.com/p/27885715
https://blog.csdn.net/qq_26803795/article/details/79152808?utm_source=blogxgwz6