1. 问题
如下图:日志中有很多 KB 级别的 memstore flush ,为何没到配置的 256MB 就开始 flush ?
2. 结论
这是由后台定时刷新线程发起的刷新;是否刷新的依据:从 sequenceId 的增长和多久没刷新两个维度。
3. Memstore flush 被动触发机制
目前 HBase memstore flush 触发方式可以分为被动和主动两种方式,被动触发机制有:
- 达到单个 memstore 配置的阈值:参数为 hbase.hregion.memstore.flush.size 默认: 128M。
- 达到 regionserver memstore 配置的阈值 :参数为 hbase.regionserver.global.memstore.size 默认堆大小的 40%. regionServer 的全局 memstore 的大小,超过该大小会触发 flush 到磁盘的操作, regionserver 级别的 flush 会阻塞客户端读写,需要关注这个级别的刷新。
4. Memstore 主动 flush 机制
regionserver 会启动一个定时任务线程,定时判断是否需要 flush memstore,避免某些数据长时间停留在内存中。
定时刷新线程代码如下:
static class PeriodicMemstoreFlusher extends ScheduledChore {
final HRegionServer server;
final static int RANGE_OF_DELAY = 5 * 60 * 1000; // 5 min in milliseconds
final static int MIN_DELAY_TIME = 0; // millisec
public PeriodicMemstoreFlusher(int cacheFlushInterval, final HRegionServer server) {
super(server.getServerName() + "-MemstoreFlusherChore", server, cacheFlushInterval);
this.server = server;
}
@Override
protected void chore() {
final StringBuffer whyFlush = new StringBuffer();
for (Region r : this.server.onlineRegions.values()) {
if (r == null) continue;
if (((HRegion)r).shouldFlush(whyFlush)) {
FlushRequester requester = server.getFlushRequester();
if (requester != null) {
long randomDelay = RandomUtils.nextInt(RANGE_OF_DELAY) + MIN_DELAY_TIME;
LOG.info(getName() + " requesting flush of " +
r.getRegionInfo().getRegionNameAsString() + " because " +
whyFlush.toString() +
" after random delay " + randomDelay + "ms");
//Throttle the flushes by putting a delay. If we don't throttle, and there
//is a balanced write-load on the regions in a table, we might end up
//overwhelming the filesystem with too many flushes at once.
requester.requestDelayedFlush(r, randomDelay, false);
}
}
}
}
}
判断是否需要 flush
/**
* Should the memstore be flushed now
*/
boolean shouldFlush(final StringBuffer whyFlush) {
whyFlush.setLength(0);
// This is a rough measure.
if (this.maxFlushedSeqId > 0
&& (this.maxFlushedSeqId + this.flushPerChanges < this.mvcc.getReadPoint())) {
whyFlush.append("more than max edits, " + this.flushPerChanges + ", since last flush");
return true;
}
long modifiedFlushCheckInterval = flushCheckInterval;
if (getRegionInfo().isSystemTable() &&
getRegionInfo().getReplicaId() == HRegionInfo.DEFAULT_REPLICA_ID) {
modifiedFlushCheckInterval = SYSTEM_CACHE_FLUSH_INTERVAL;
}
if (modifiedFlushCheckInterval <= 0) { //disabled
return false;
}
long now = EnvironmentEdgeManager.currentTime();
//if we flushed in the recent past, we don't need to do again now
if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval)) {
return false;
}
//since we didn't flush in the recent past, flush now if certain conditions
//are met. Return true on first such memstore hit.
for (Store s : getStores()) {
if (s.timeOfOldestEdit() < now - modifiedFlushCheckInterval) {
// we have an old enough edit in the memstore, flush
whyFlush.append(s.toString() + " has an old edit so flush to free WALs");
return true;
}
}
return false;
}
解读下自动刷新的时机:
- 首先。上次flush之后。 sequenceId 的增长超过 flushPerChanges。即发起一次flush:
次数限制 flushPerChanges 是通过参数hbase.regionserver.flush.per.changes
配置,默觉得 30000000(3千万),这个 sequenceId 的增长该 Region 上数据的修改次数,不管增、删、改或者append、increment等,它是对HRegion数据变动的一个考虑,即便是 MemStore 不大。数据变动的频繁了,也须要进行 flush。以减少宕机后拆分日志的工作量; - 再看参数
hbase.regionserver.optionalcacheflushinterval
:
参数小于等于 0。不会触发 flush,时间间隔未超过参数配置的时间间隔的话,也不会触发 flush。这个参数默觉得 3600000ms,即1小时; - 当超过参数配置的时间间隔。再检测每一个列簇,当当中一个列簇超过 flushCheckInterval 没有 flush 时。发起 flush。也就是说它有足够久的数据没有被flush。
HBase 是列族式存储,一张表不同的列族存储的物理路径是不同的,例入 table 有两个列族 a 和 b ,他们的列族数据在 hdfs 的存储路径是这样的:
/hbase/data/table1/regionid/a
/hbase/data/table1/regionid/b
但 HBase 的 region 是一个逻辑视图,是 HBase 某一些行的数据块,包含所有的列族。而每个列族对应这一个 Store ,每个 store 包含一个 memstore 和多个 hfile。假设一个表有两个 column family ,那他的每个 region 有两个 store。
在上面针对每一个 HRegion 的循环,以及后面针对每一个 HStore 的推断,我们能够发现,flush 还是以 Region 为最小单位进行的。即便是某个列簇下 MemStore 过大或者过旧,另外一个 MemStore 还比较小或者比较新的话,它还是跟着那个过大或者过旧的列簇一起 flush。这也是 HBase 饱受诟病的列簇不能过多的原因之中的一个。
在 HBase1.1.2 版本号中,有对于 MemStore flush 的改进,改成了以HStore,即列簇为单位进行。
参考: