Compact是指HBase表中HRegion上某个Column Family下,部分或全部HFiles的合并。由于数据持续写入的过程中,MemStore达到一定阈值,被flush到磁盘上,形成许多的小文件,这些文件如果不做处理,将会严重影响HBase数据读取的效率。所以,在HBase系统内部,需要定期在满足一定条件的情况下,或者由人为手动触发,将这许多文件合并成一个大文件,称为Compact。
- Compact通过RPC调用触发,RSRpcServices(RegionServer RPC Service),
org.apache.hadoop.hbase.regionserver.RSRpcServices
@QosPriority(priority=HConstants.ADMIN_QOS) // 表示服务的相对优先级,Provides a basic notion of quality of service (QOS).
public CompactRegionResponse compactRegion(final RpcController controller,
final CompactRegionRequest request) throws ServiceException {
try {
checkOpen(); // 检查RegionServer的状态:isOnline,isAborted、isStopped、fsOk
Region region = getRegion(request.getRegion()); // 获取要操作的Region
...
boolean major = false; // 是否执行major compact
byte [] family = null; // Request是否有column family信息
Store store = null; // Column family对应的Store
if (request.hasFamily()) { // 获取存储列族的Store
family = request.getFamily().toByteArray();
store = region.getStore(family);
if (store == null) {
throw new ServiceException(...);
}
}
if (request.hasMajor()) {
major = request.getMajor();
}
if (major) { // 如果有列族信息对列族的Store执行,否则对整个Region执行Major Compaction
// 这里没有真正执行Compaction,只是设置 this.forceMajor = true;
if (family != null) {
store.triggerMajorCompaction();
} else {
region.triggerMajorCompaction();
}
}
// 差别就是是否有Store
if(family != null) {
regionServer.compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER, null, RpcServer.getRequestUser());
} else {
regionServer.compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER, null, RpcServer.getRequestUser());
}
return CompactRegionResponse.newBuilder().build();
} catch (IOException ie) {
throw new ServiceException(ie);
}
}
获取Region
protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
throws NotServingRegionException {
Region region = this.onlineRegions.get(encodedRegionName); // 从online的region列表中获取
if (region == null) {
throw new Excetion(....);
// 不是online的region执行compact操作失败
// 根据region的状态提示异常,例如:正在move的region
}
return region;
}
Map包含所有的因为move操作关闭的region维护在Map中
protected Map<String, MovedRegionInfo> movedRegions;
- 进入Compact逻辑
org.apache.hadoop.hbase.regionserver.CompactSplitThread
如果没有传入Column family,遍历所有的store,执行requestCompactionInternal
private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
throws IOException {
for (Store s : r.getStores()) {
CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
...
}
}
- HStore请求requestCompaction
org.apache.hadoop.hbase.regionserver.HStore
@Override
public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
User user) throws IOException {
if (!this.areWritesEnabled()) { // 如果禁用写入,不进行压缩。
return null;
}
removeUnneededFiles(); // 在进行压缩之前,试着去掉不需要的文件来简化事情。
// 根据hbase.store.delete.expired.storefile判断是否删除过期的文件
final CompactionContext compaction = storeEngine.createCompaction();
CompactionRequest request = null;
this.lock.readLock().lock(); // 只读锁
try {
synchronized (filesCompacting) {
final Store thisStore = this;
if (this.getCoprocessorHost() != null) {
// Coprocessor是0.92之后引入的协处理器,实现一些特性:建立二次索引、复杂过滤器以及访问控制等,先不看这部分逻辑
}
// 通用情况
if (!compaction.hasSelection()) { // this.request != null;
boolean isUserCompaction = priority == Store.PRIORITY_USER; // true
boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
offPeakCompactionTracker.compareAndSet(false, true); // 判断是否是使用高峰
try {
compaction.select(this.filesCompacting, isUserCompaction,
mayUseOffPeak, forceMajor && filesCompacting.isEmpty()); // 调用以选择用于压缩的文件
} catch (IOException e) {
...
throw e;
}
...
}
if (baseRequest != null) {
// 如果baseRequest不是null,比较baseReques和compaction的Request,判断哪些文件需要压缩
...
}
//得到结果文件列表
request = compaction.getRequest();
final Collection<StoreFile> selectedFiles = request.getFiles();
if (selectedFiles.isEmpty()) {
return null;
}
addToCompactingFiles(selectedFiles); // 添加到filesCompacting
// 根据request判断是否是major compacti
this.forceMajor = this.forceMajor && !request.isMajor();
// 设置公共请求属性,设置优先级
request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
}
} finally {
this.lock.readLock().unlock();
}
this.region.reportCompactionRequestStart(request.isMajor()); // 计数
return compaction;
}
storeEngine根据hbase.hstore.engine.class配置获取,默认是DefaultStoreEngine
所以compaction默认是org.apache.hadoop.hbase.regionserver.DefaultCompactionContext
实现
this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
CompactionContext compaction = storeEngine.createCompaction();
compactionPolicy根据hbase.hstore.defaultengine.compactionpolicy.class
获取,默认是ExploringCompactionPolicy
selectCompaction由父类org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy
实现,不深入介绍
request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
- requestCompactionInternal
// request = null, selectNow = true
private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
final String why, int priority, CompactionRequest request, boolean selectNow, User user)
throws IOException {
if (this.server.isStopped()
|| (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
// 如果Region所属的table设置了COMPACTION_ENABLED=false,不会执行任何Compaction
return null;
}
CompactionContext compaction = null;
if (selectNow) {
compaction = selectCompaction(r, s, priority, request, user); // 对Store获取CompactionContext,包含需要压缩的文件
// CompactionContext是合并的上下文类。该类含有运行一个合并所必需的全部“物理”细节
if (compaction == null) return null;
}
//这里假设大多数压缩是小的。因此,将系统压缩放入小池中,在必要时移动到大型池中。
// throttleCompaction判断compactionSize > comConf.getThrottlePoint();
// hbase.regionserver.thread.compaction.throttle参数设置
ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
? longCompactions : shortCompactions;
pool.execute(new CompactionRunner(s, r, compaction, pool, user)); // 多线程执行Compact,执行逻辑在CompactionRunner
return selectNow ? compaction.getRequest() : null;
}
- 线程执行Compact
private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
@Override
public void run() {
if (server.isStopped()
|| (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
return; //判断RegionServer状态和Tabel是否开启Compaction
}
doCompaction(user);
}
private void doCompaction(User user) {
// 通用逻辑,系统compaction,不包含file selection
if (this.compaction == null) {
// 这里判断Store的优先级是否改变,以避免阻塞潜在的更高优先级。
}
this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
// 这里重复执行,之前selectNow=true时已经执行过一次,可能为了防止数据改变
}
// 接下来可以进行压缩
this.compaction.getRequest().beforeExecute(); //现在没有做任何操作
try {
boolean completed = region.compact(compaction, store, compactionThroughputController, user);
if (completed) {
if (store.getCompactPriority() <= 0) {
// 退化情况:重新执行requestCompactionInternal
requestSystemCompaction(region, store, "Recursive enqueue");
} else {
// 查看压缩后是否导致超出最大区域大小,需要进行Split。参考Split操作
requestSplit(region);
}
}
} catch (Exception ex) {
...
} finally {
LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
}
this.compaction.getRequest().afterExecute(); //现在没有做任何操作
}
}
- 真正的compact逻辑
org.apache.hadoop.hbase.regionserver.HRegion
public boolean compact(CompactionContext compaction, Store store,
CompactionThroughputController throughputController, User user) throws IOException {
if (this.closing.get() || this.closed.get()) { //判断Region是否close
store.cancelRequestedCompaction(compaction);
return false;
}
MonitoredTask status = null;
boolean requestNeedsCancellation = true;
lock.readLock().lock();
try {
byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
if (stores.get(cf) != store) { // 如果因为各种情况导致:根据cf获取的store和之前获取的store已经不一样了,退出compact
return false;
}
if (this.closed.get()) { //再判断Region是否close
return false;
}
try {
store.compact(compaction, throughputController, user); // Store执行compact
} catch (InterruptedIOException iioe) {
...
}
return true;
} finally {
try {
if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
if (status != null) status.cleanup();
} finally {
lock.readLock().unlock();
}
}
}
- 文件Compact
org.apache.hadoop.hbase.regionserver.HStore
@Override
public List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController, User user) throws IOException {
List<StoreFile> sfs = null;
CompactionRequest cr = compaction.getRequest();
try {
// 如果有一个有效的压缩请求,在这里做所有明智性检查(sanity check),因为我们需要在下面的最后一个块中清除它之后的清理。
Collection<StoreFile> filesToCompact = cr.getFiles();
synchronized (filesCompacting) {
// sanity check:正在压缩这个Store的文件
Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
}
// 开始压缩
List<Path> newFiles = compaction.compact(throughputController, user);
long outputBytes = 0L;
if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
// 压缩被停止的处理
...
}
// 完成压缩所需的步骤。
sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
writeCompactionWalRecord(filesToCompact, sfs);
replaceStoreFiles(filesToCompact, sfs);
...
// 这时候Store将使用所有新的文件。
completeCompaction(filesToCompact, true); // 存档旧文件和更新存储大小。
if (region.getRegionServerServices() != null
&& region.getRegionServerServices().getMetrics() != null) {
region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
outputBytes);
}
return sfs;
} finally {
finishCompactionRequest(cr);
}
}