HBase1.x精通:详解HBase读缓存BlockCache(二、源码剖析)

微信公众号:大数据开发运维架构

关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;

文章从公众号复制过来,格式可能错乱,请关注公众号阅读原文


微信公众号:大数据开发运维架构

关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;

如果您觉得“大数据开发运维架构”对你有帮助,欢迎转发朋友圈

从微信公众号拷贝过来,格式有些错乱,建议直接去公众号阅读


引言:

上篇文章“HBase1.x精通:详解HBase读缓存BlockCache(一)”主要讲解了HBase块缓存的机制和三种策略,我们生产环境当前用的版本是HBase1.2.5,它默认的读缓存策略是LruBlockCache,下面我就结合HBase1.2.5源码深入剖析LruBlockCache的实现。

1.BlockCache初始化

当每个HRegionserver线程通过函数run()启动时,调用函数handleReportForDutyResponse()运行初始化。设置WAL并启动所有服务器线程。

/** * The HRegionServer sticks in this loop until closed. */@Overridepublicvoid run() {try{//Regionserver线程注册初始化;zookeeper注册,租借线程,等等。. preRegistrationInitialization();}catch(Throwable e) {abort("Fatal exception during initialization", e);    }try{if(!isStopped() && !isAborted()) {ShutdownHook.install(conf, fs,this, Thread.currentThread());// Set our ephemeral znode up in zookeeper now we have a name. createMyEphemeralNode();// 创建临时节点后,初始化RegionServerCoprocessorHost,// 以防任何协处理器想要使用ZooKeeperthis.rsHost = new RegionServerCoprocessorHost(this,this.conf);      }//向HMaster申请登记;//如果服务器停止了,或者clusterup标志关闭了,或者hdfs出了问题,则关闭while(keepLooping()) { RegionServerStartupResponse w = reportForDuty();if(w ==null) {LOG.warn("reportForDuty failed; sleeping and then retrying.");this.sleeper.sleep();}else{//这里开始调用实现LruBlockCache的初始化工作 handleReportForDutyResponse(w);break; }      }      ..................................... }


 接下来调用函数handleReportForDutyResponse(),依次调用函数startHeapMemoryManager(),调用变量HeapMemoryManager的函数create(),最后通过CacheConfig.instantiateBlockCache(conf)完成缓存的初始化。

    HRegionServer有一个HeapMemoryManager类型的成员变量,用于管理RegionServer进程的堆内存,HeapMemoryManager中的blockCache就是RegionServer中的读缓存,它的初始化在CacheConfig的instantiateBlockCache方法中完成。

/**  * Returns the block cache or <code>null</code> in case none should be used.  * Sets GLOBAL_BLOCK_CACHE_INSTANCE  **@paramconf  The current configuration.*@returnThe block cache or null.  */publicstaticsynchronizedBlockCacheinstantiateBlockCache(Configuration conf){if(GLOBAL_BLOCK_CACHE_INSTANCE !=null)returnGLOBAL_BLOCK_CACHE_INSTANCE;if(blockCacheDisabled)returnnull;//一级缓存    LruBlockCache l1 = getL1(conf);// blockCacheDisabled is set as a side-effect of getL1Internal(), so check it again after the call.if(blockCacheDisabled)returnnull;//二级缓存    BlockCache l2 = getL2(conf);if(l2 ==null) {      GLOBAL_BLOCK_CACHE_INSTANCE = l1;}else{booleanuseExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT);booleancombinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY,        DEFAULT_BUCKET_CACHE_COMBINED);//判断是否启用了外部二级缓存:hbase.blockcache.use.external=true启用if(useExternal) {GLOBAL_BLOCK_CACHE_INSTANCE =newInclusiveCombinedBlockCache(l1, l2);}else{if(combinedWithLru) {GLOBAL_BLOCK_CACHE_INSTANCE =newCombinedBlockCache(l1, l2);}else{// L1 and L2 are not 'combined'.  They are connected via the LruBlockCache victimhandler// mechanism.  It is a little ugly but works according to the following: when the// background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get// a block from the L1 cache, if not in L1, we will search L2.          GLOBAL_BLOCK_CACHE_INSTANCE = l1;        }      }      l1.setVictimCache(l2);    }returnGLOBAL_BLOCK_CACHE_INSTANCE;  }}

上面有两行比较重要的代码:

LruBlockCache l1 = getL1(conf);

    //获取一级缓存LruBlockCache默认为HRegionsever堆内存的40%,对应参数:hfile.block.cache.size

BlockCache l2 = getL2(conf); 

//这是个什么东西呢?

    从HBase-1.1.0起,在使用方式上可以单独使用LRUBlockCache,或者组合起来使用,多级缓存的方式。LruBlockCache为一级缓存,BucketCache或者ExternalBlockCache为二级缓存。HBase可以使用memcached作为外部BlockCache,这是一个在设备失效或者升级时不会发生完全的冷缓存的很好的特性。用句通俗的话讲,就是HBase出现故障或者升级时,缓存轻易不会丢失。

    代码21行中对useExternal和combinedWithLru的判断,如果指定了useExternal为true,则结合memcached等外部缓存与BlockCache一起使用。如果指定了combinedWithLru,则结合bucketCache,也就是堆外内存与BlockCache一起使用。在上述两种情况下,BlockCache用于存放索引等元数据,真实的数据文件则缓存在memcached或bucketCache中。

    二级缓存可通过配置以下两个参数启动:

        hbase.blockcache.use.external为true  开启二级缓存

并配置hbase.cache.memcached.servers来指明memcached servers.

    当 hbase.bucketcache.combinedcache.enable 为false。在这种模式下,当L1缓存内容被清除(置换)时,会将置换出的块放入L2。当一个块被缓存时,首先被缓存在L1。当我们去查询一个缓存块时,首先在L1查,若是没找到,则再搜索L2。我们将此部署方法称为Raw L1+L2。需要注意的是,这个L1+L2模式已经在hbase 2.0.0 以后被移除了。

2.BlockCache实现

BlockCache基于客户端对数据的访问频率,定义了三个不同的优先级,如下所示:

    SINGLE:如果Block被第一次访问,则该Block被放在这一优先级队列中;

    MULTI:如果一个Block被多次访问,则从single移到Multi中;

    MEMORY:memory优先级由用户指定,一般不推荐,只用系统表才使用memory优先级;


LruBlockCache内部是通过一个ConcurrentHashMap来保存所有cache的block的

/** Concurrent map (the cache) */privatefinalMapmap;

Block块加入缓存的实现是在函数cacheBlock()中,

/**  * Cache the block with the specified name and buffer.  * <p>  * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547)  * this can happen, for which we compare the buffer contents.*@paramcacheKey block's cache key*@parambuf block buffer*@paraminMemory if block is in-memory*@paramcacheDataInL1  */@OverridepublicvoidcacheBlock(BlockCacheKey cacheKey, Cacheable buf,booleaninMemory,finalbooleancacheDataInL1){if(buf.heapSize() > maxBlockSize) {// If there are a lot of blocks that are too// big this can make the logs way too noisy.// So we log 2%if(stats.failInsert() %50==0) {LOG.warn("Trying to cache too large a block "+ cacheKey.getHfileName() +" @ "            + cacheKey.getOffset()+" is "+ buf.heapSize()+" which is larger than "+ maxBlockSize);      }return;    }    LruCachedBlock cb = map.get(cacheKey);if(cb !=null) {// compare the contents, if they are not equal, we are in big troubleif(compare(buf, cb.getBuffer()) !=0) {thrownewRuntimeException("Cached block contents differ, which should not have happened."+"cacheKey:"+ cacheKey);      }String msg ="Cached an already cached block: "+ cacheKey +" cb:"+ cb.getCacheKey();msg +=". This is harmless and can happen in rare cases (see HBASE-8547)";      LOG.warn(msg);return;    }longcurrentSize = size.get();longcurrentAcceptableSize = acceptableSize();longhardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);if(currentSize >= hardLimitSize) {      stats.failInsert();if(LOG.isTraceEnabled()) {LOG.trace("LruBlockCache current size "+ StringUtils.byteDesc(currentSize)+" has exceeded acceptable size "+ StringUtils.byteDesc(currentAcceptableSize) +"  too many."+" the hard limit size is "+ StringUtils.byteDesc(hardLimitSize) +", failed to put cacheKey:"+ cacheKey +" into LruBlockCache.");      }if(!evictionInProgress) {        runEviction();      }return;    }cb =newLruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);longnewSize = updateSizeMetrics(cb,false);    map.put(cacheKey, cb);longval = elements.incrementAndGet();if(LOG.isTraceEnabled()) {longsize = map.size();      assertCounterSanity(size, val);    }if(newSize > currentAcceptableSize && !evictionInProgress) {      runEviction();    }  }

代码大体逻辑如下:

1).这里假设不会对同一个已经被缓存的BlockCacheKey重复放入cache操作;

2).根据inMemory标志创建不同类别的CachedBlock对象:若inMemory为true则创建BlockPriority.MEMORY类型,否则创建BlockPriority.SINGLE;注意,这里只有这两种类型的Cache,因为BlockPriority.MULTI在Cache Block被重复访问时才进行创建。

3).将BlockCacheKey和创建的CachedBlock对象加入到前文说过的ConcurrentHashMap中,同时更新log&metrics上的计数;

4).最后判断如果加入新block后cache size大于设定的临界值且当前没有淘汰线程运行,则调用runEviction()方法启动LRU淘汰线程。

 缓存数据的获取是在方法getBlock()中实现的

/**  * Get the buffer of the block with the specified name.*@paramcacheKey block's cache key*@paramcaching true if the caller caches blocks on cache misses*@paramrepeat Whether this is a repeat lookup for the same block  *        (used to avoid double counting cache misses when doing double-check locking)*@paramupdateCacheMetrics Whether to update cache metrics or not*@returnbuffer of specified cache key, or null if not in cache  */@OverridepublicCacheablegetBlock(BlockCacheKey cacheKey,booleancaching,booleanrepeat,booleanupdateCacheMetrics){    LruCachedBlock cb = map.get(cacheKey);if(cb ==null) {if(!repeat && updateCacheMetrics) stats.miss(caching, cacheKey.isPrimary());// If there is another block cache then try and read there.// However if this is a retry ( second time in double checked locking )// And it's already a miss then the l2 will also be a miss.if(victimHandler !=null&& !repeat) {        Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);// Promote this to L1.if(result !=null&& caching) {cacheBlock(cacheKey, result,/* inMemory = */false,/* cacheData = */true);        }returnresult;      }returnnull;    }if(updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary());    cb.access(count.incrementAndGet());returncb.getBuffer();  }

   代码逻辑大体如下:

        1、首先,从LruBlockCache的map中直接获取;

        2、如果map中没有,则在victimHandler存在且!repeat的情况下,通过victimHandler的getBlock()方法获取并缓存到LruBlockCache中,即综合考虑第二种缓存模式,并同步到第一种缓存中;

        3、如果1或2能够获取到数据,更新统计数据,且通过缓存块的access方法,更新访问时间accessTime,将可能的BlockPriority.SINGLE升级为BlockPriority.MULTI;

BlockCache的LRU淘汰过程,主要是通过EvictionThread线程实现的

    线程启动后调用wait被阻塞住,直到函数evict()调用notifyAl()才开始执行。envict()函数代码如下:

/**  * Eviction method.  */voidevict(){// Ensure only one eviction at a timeif(!evictionLock.tryLock())return;try{evictionInProgress =true;longcurrentSize =this.size.get();longbytesToFree = currentSize - minSize();if(LOG.isTraceEnabled()) {LOG.trace("Block cache LRU eviction started; Attempting to free "+StringUtils.byteDesc(bytesToFree) +" of total="+          StringUtils.byteDesc(currentSize));      }if(bytesToFree <=0)return;// Instantiate priority bucketsBlockBucket bucketSingle =newBlockBucket("single", bytesToFree, blockSize,          singleSize());BlockBucket bucketMulti =newBlockBucket("multi", bytesToFree, blockSize,          multiSize());BlockBucket bucketMemory =newBlockBucket("memory", bytesToFree, blockSize,          memorySize());// Scan entire map putting into appropriate bucketsfor(LruCachedBlock cachedBlock : map.values()) {switch(cachedBlock.getPriority()) {caseSINGLE: {bucketSingle.add(cachedBlock);break;          }caseMULTI: {bucketMulti.add(cachedBlock);break;          }caseMEMORY: {bucketMemory.add(cachedBlock);break;          }        }      }longbytesFreed =0;if(forceInMemory || memoryFactor >0.999f) {longs = bucketSingle.totalSize();longm = bucketMulti.totalSize();if(bytesToFree > (s + m)) {// this means we need to evict blocks in memory bucket to make room,// so the single and multi buckets will be emptied          bytesFreed = bucketSingle.free(s);          bytesFreed += bucketMulti.free(m);if(LOG.isTraceEnabled()) {LOG.trace("freed "+ StringUtils.byteDesc(bytesFreed) +" from single and multi buckets");          }          bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);if(LOG.isTraceEnabled()) {LOG.trace("freed "+ StringUtils.byteDesc(bytesFreed) +" total from all three buckets ");          }}else{// this means no need to evict block in memory bucket,// and we try best to make the ratio between single-bucket and// multi-bucket is 1:2longbytesRemain = s + m - bytesToFree;if(3* s <= bytesRemain) {// single-bucket is small enough that no eviction happens for it// hence all eviction goes from multi-bucket            bytesFreed = bucketMulti.free(bytesToFree);}elseif(3* m <=2* bytesRemain) {// multi-bucket is small enough that no eviction happens for it// hence all eviction goes from single-bucket            bytesFreed = bucketSingle.free(bytesToFree);}else{// both buckets need to evict some blocksbytesFreed = bucketSingle.free(s - bytesRemain /3);if(bytesFreed < bytesToFree) {              bytesFreed += bucketMulti.free(bytesToFree - bytesFreed);            }          }        }}else{        PriorityQueue<BlockBucket> bucketQueue =newPriorityQueue(3);bucketQueue.add(bucketSingle);bucketQueue.add(bucketMulti);bucketQueue.add(bucketMemory);intremainingBuckets =3;        BlockBucket bucket;while((bucket = bucketQueue.poll()) !=null) {longoverflow = bucket.overflow();if(overflow >0) {longbucketBytesToFree = Math.min(overflow,                (bytesToFree - bytesFreed) / remainingBuckets);            bytesFreed += bucket.free(bucketBytesToFree);          }          remainingBuckets--;        }      }if(LOG.isTraceEnabled()) {longsingle = bucketSingle.totalSize();longmulti = bucketMulti.totalSize();longmemory = bucketMemory.totalSize();LOG.trace("Block cache LRU eviction completed; "+"freed="+ StringUtils.byteDesc(bytesFreed) +", "+"total="+ StringUtils.byteDesc(this.size.get()) +", "+"single="+ StringUtils.byteDesc(single) +", "+"multi="+ StringUtils.byteDesc(multi) +", "+"memory="+ StringUtils.byteDesc(memory));      }}finally{      stats.evict();evictionInProgress =false;      evictionLock.unlock();    }  }

具体逻辑如下:

1).先获取锁,保证同一时间只有一个线程执行;

    2).计算当前缓存总大小currentSize,以及需要清理的缓存大小bytesToFree ,如果bytesToFree为0,直接返回;

    3).创建三个队列,存放Single、Multi和InMemory类Block Cache,其中每个BlockBucket维护了一个CachedBlockQueue,按LRU淘汰算法维护该BlockBucket中的所有CachedBlock对象;

    4).遍历记录所有Block Cache的全局ConcurrentHashMap,加入到相应的BlockBucket队列中;

    5).将以上三个BlockBucket队列加入到一个优先级队列中,按照各个BlockBucket超出bucketSize的大小顺序排序(见BlockBucket的compareTo方法);

6) 遍历优先级队列,对于每个BlockBucket,通过Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets)计算出需要释放的空间大小,这样做可以保证尽可能平均地从三个BlockBucket中释放指定的空间;具体实现过程详见BlockBucket的free方法,从其CachedBlockQueue中取出即将被淘汰掉的CachedBlock对象。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,295评论 6 512
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,928评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,682评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,209评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,237评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,965评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,586评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,487评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,016评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,136评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,271评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,948评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,619评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,139评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,252评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,598评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,267评论 2 358

推荐阅读更多精彩内容