HBase Compaction-(3)Compaction可以并发么

这个问题之前一直困扰着我。之前一直怀疑,如果可以并行的话,岂不是要乱套。可是如果不能并行的话,那效率岂不是很低。

后来猜测,是不同Region可以并行,相同Region是串行。

然后,事实证明是我错了。

环境

HBase rel-2.1.0

Git上并没有这个分支,需要用rel/2.1.0这个tag新建一个出来。

解析

在Compaction时,是有一个线程池的。我们来看一下:

ThreadPoolExecutor pool;
if (selectNow) {
    // compaction.get is safe as we will just return if selectNow is true but no compaction is
    // selected
    pool = store.throttleCompaction(compaction.getRequest().getSize()) ? longCompactions
            : shortCompactions;
} else {
    // We assume that most compactions are small. So, put system compactions into small
    // pool; we will do selection there, and move to large pool if necessary.
    pool = shortCompactions;
}
pool.execute(
        new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));

上面的这段代码,在CompactSplit中。

我们可以看到,每个HStore的Compaction,都会根据预估的需要执行的时间,分配到线程池里。所以,其实相同的HStore,在执行Compaction时,也是会并行的。

那我就有疑问了,如何保证Compaction并行的时候不会混乱?

其实很简单,因为Compaction在执行前,需要执行一个选择要进行Compact的HStoreFile的操作,后面就是针对这些HStoreFile进行合并。所以,其实我们只要保证在选择的时候,是线程安全的就好啦。

那怎么做呢?

DefaultStoreFileManager这个类中,我们能够看到。它有这么一个字段:

/**
 * List of store files inside this store. This is an immutable list that
 * is atomically replaced when its contents change.
 */
private volatile ImmutableList<HStoreFile> storefiles = ImmutableList.of();

它的含义,从注释中,就很容易看出来,就是这个HStore有哪些HStoreFiles。

另外,在HStore中,还有一个很重要的变量,是:

private final List<HStoreFile> filesCompacting = Lists.newArrayList();

这个变量,表示当前HStore中,有哪些HStoreFile正在被compact。

每次从storefiles中,选出来有资格进行compact的HStoreFile时,都会被加到这个filesCompacting中。

/**
 * Adds the files to compacting files. filesCompacting must be locked.
 */
private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {
    if (CollectionUtils.isEmpty(filesToAdd)) {
        return;
    }
    // Check that we do not try to compact the same StoreFile twice.
    if (!Collections.disjoint(filesCompacting, filesToAdd)) {
        Preconditions.checkArgument(false, "%s overlaps with %s", filesToAdd, filesCompacting);
    }
    filesCompacting.addAll(filesToAdd);
    Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
}

而这个方法被调用以前,是会进行同步的:

synchronized (filesCompacting) {
    ......
    addToCompactingFiles(selectedFiles);
}

这些代码都在HStore.requestCompaction(int priority,CompactionLifeCycleTracker tracker, User user)中。各位看官可以自行探索。

所以,我们可以看到,其实并不是选出来有资格作为compact的HStoreFile以后,就将它们从DefaultStoreFileManager.storefiles中移除,而是添加到HStore.filesCompacting中。

这样就有一个问题,就是如果DefaultStoreFileManager.storefiles中没有新增HStoreFile,或者即使新增了,前面的Compact没有完成,那由于选择进行Compact的HStoreFile很可能都是一样的,所以就会导致后面发出来的compact请求其实是无效的。

测试代码

package org.apache.hadoop.hbase.regionserver;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.regionserver.compactions.MockStoreFileGenerator;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;

import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;

@Category({RegionServerTests.class, MediumTests.class})
public class TestConcurrentCompaction {

    @Rule
    public TestName name = new TestName();
    private static final byte[] COLUMN_FAMILY = Bytes.toBytes("fam1");
    private static final byte[] FAMILY = Bytes.toBytes("f1");
    private HTableDescriptor htd = null;
    private static final HBaseTestingUtility UTIL = HBaseTestingUtility.createLocalHTU();
    protected Configuration conf = UTIL.getConfiguration();

    private HRegion r = null;
    private CompactionRequester compactionRequester;


    public TestConcurrentCompaction() {
        super();

        // Local mode
        conf.setBoolean("hbase.testing.nocluster", true);
    }

    @Before
    public void setUp() throws Exception {
        this.htd = UTIL.createTableDescriptor(name.getMethodName());
        if (name.getMethodName().equals("testCompactionSeqId")) {
            UTIL.getConfiguration().set("hbase.hstore.compaction.kv.max", "10");
            UTIL.getConfiguration().set(
                    DefaultStoreEngine.DEFAULT_COMPACTOR_CLASS_KEY,
                    TestCompaction.DummyCompactor.class.getName());
            HColumnDescriptor hcd = new HColumnDescriptor(FAMILY);
            hcd.setMaxVersions(65536);
            this.htd.addFamily(hcd);
        }
        prepareConf();
        this.r = UTIL.createLocalHRegion(htd, null, null);
    }

    @After
    public void tearDown() throws Exception {
        this.r.close();
    }

    @Test
    public void testConcurrentCompaction() throws IOException, KeeperException, InterruptedException {

        addHStoresToHRegion(r);

        // >>>>>>>>>>>>>>>>>>>
        HStore hStore = r.getStore(COLUMN_FAMILY);
        System.out.println(hStore);
        for (HStoreFile hStoreFile : hStore.getStorefiles()) {
            System.out.println(hStoreFile);
        }
        // <<<<<<<<<<<<<<<<<<<

        initCompactionRequestor(r);
        CountDownLatch latch = new CountDownLatch(2);
        TestCompaction.Tracker tracker = new TestCompaction.Tracker(latch);
        compactionRequester.requestCompaction(r, "No reason", PRIORITY_USER, tracker, null);

        Thread.sleep(5 * 1000);

        compactionRequester.requestCompaction(r, "No reason", PRIORITY_USER, tracker, null);

    }

    private void prepareConf() {
        conf.set("hbase.hstore.compaction.ratio", "1.0");
        conf.set("hbase.hstore.compaction.min", "3");
        conf.set("hbase.hstore.compaction.max", "5");
        conf.set("hbase.hstore.compaction.min.size", "10");
        conf.set("hbase.hstore.compaction.max.size", "1000");
        conf.set("hbase.hstore.defaultengine.compactionpolicy.class",
                "org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy");
    }

    private void initCompactionRequestor(HRegion hRegion) throws IOException, KeeperException {
        HRegionServer hRegionServer = new HMaster(conf);
        compactionRequester = new CompactSplit(hRegionServer);
    }

    private void addHStoresToHRegion(HRegion hRegion) throws IOException {

        MockStoreFileGenerator mockStoreFileGenerator = new MockStoreFileGenerator(TestCompaction.class);

        ConcurrentSkipListMap<byte[], HStore> stores = new ConcurrentSkipListMap<>(Bytes.BYTES_RAWCOMPARATOR);
        ColumnFamilyDescriptor columnFamilyDescriptor = ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY);
        HStore hStore = new HStore(hRegion, columnFamilyDescriptor, conf);

        List<HStoreFile> storeFiles = new ArrayList<>();
        storeFiles.add(mockStoreFileGenerator.createMockStoreFileBytes(7));
        storeFiles.add(mockStoreFileGenerator.createMockStoreFileBytes(6));
        storeFiles.add(mockStoreFileGenerator.createMockStoreFileBytes(5));
        storeFiles.add(mockStoreFileGenerator.createMockStoreFileBytes(4));
        storeFiles.add(mockStoreFileGenerator.createMockStoreFileBytes(3));
        storeFiles.add(mockStoreFileGenerator.createMockStoreFileBytes(2));
        storeFiles.add(mockStoreFileGenerator.createMockStoreFileBytes(1));
        stores.put(COLUMN_FAMILY, hStore);

        hStore.addStoreFiles(storeFiles);

        hRegion.addStores(stores);
    }
}

为了保证一个Compact不会很快就完成,导致实际上这两次compact是串行的。我在HRegion.compact(CompactionContext compaction, HStore store, ThroughputController throughputController, User user)中加了这么一段代码:

// >>>>>>>>>>>>>>>>>>>
try {
    for (HStoreFile storeFile : compaction.getRequest().getFiles()) {
        System.out.println(storeFile);
    }
    Thread.sleep(5 * 60 * 1000);
} catch (InterruptedException e) {
    e.printStackTrace();
}
// <<<<<<<<<<<<<<<<<<<

我们可以看一下日志输出:

第一个compact的日志:
2019-01-11 09:04:05,681 DEBUG [main] compactions.SortedCompactionPolicy(80): Selecting compaction from 7 store files, 0 compacting, 7 eligible, 16 blocking
2019-01-11 09:04:05,686 DEBUG [main] compactions.ExploringCompactionPolicy(130): Exploring compaction algorithm has selected 5  files of size 15 starting at candidate #15 after considering 12 permutations with 12 in ratio
第二个compact的日志:
2019-01-11 09:04:10,700 DEBUG [main] compactions.SortedCompactionPolicy(80): Selecting compaction from 7 store files, 5 compacting, 0 eligible, 16 blocking
2019-01-11 09:04:10,701 DEBUG [main] compactions.ExploringCompactionPolicy(130): Exploring compaction algorithm has selected 0  files of size 0 starting at candidate #0 after considering 0 permutations with 0 in ratio
2019-01-11 09:04:10,701 DEBUG [main] compactions.SortedCompactionPolicy(258): Not compacting files because we only have 0 files ready for compaction. Need 3 to initiate.
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容