微信公众号:大数据开发运维架构
关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;
文章从公众号复制过来,格式可能错乱,请关注公众号阅读原文
微信公众号:大数据开发运维架构
关注可了解更多大数据相关的资讯。问题或建议,请公众号留言;
如果您觉得“大数据开发运维架构”对你有帮助,欢迎转发朋友圈
从微信公众号拷贝过来,格式有些错乱,建议直接去公众号阅读
引言:
上篇文章“HBase1.x精通:详解HBase读缓存BlockCache(一)”主要讲解了HBase块缓存的机制和三种策略,我们生产环境当前用的版本是HBase1.2.5,它默认的读缓存策略是LruBlockCache,下面我就结合HBase1.2.5源码深入剖析LruBlockCache的实现。
1.BlockCache初始化
当每个HRegionserver线程通过函数run()启动时,调用函数handleReportForDutyResponse()运行初始化。设置WAL并启动所有服务器线程。
/** * The HRegionServer sticks in this loop until closed. */@Overridepublicvoid run() {try{//Regionserver线程注册初始化;zookeeper注册,租借线程,等等。. preRegistrationInitialization();}catch(Throwable e) {abort("Fatal exception during initialization", e); }try{if(!isStopped() && !isAborted()) {ShutdownHook.install(conf, fs,this, Thread.currentThread());// Set our ephemeral znode up in zookeeper now we have a name. createMyEphemeralNode();// 创建临时节点后,初始化RegionServerCoprocessorHost,// 以防任何协处理器想要使用ZooKeeperthis.rsHost = new RegionServerCoprocessorHost(this,this.conf); }//向HMaster申请登记;//如果服务器停止了,或者clusterup标志关闭了,或者hdfs出了问题,则关闭while(keepLooping()) { RegionServerStartupResponse w = reportForDuty();if(w ==null) {LOG.warn("reportForDuty failed; sleeping and then retrying.");this.sleeper.sleep();}else{//这里开始调用实现LruBlockCache的初始化工作 handleReportForDutyResponse(w);break; } } ..................................... }
接下来调用函数handleReportForDutyResponse(),依次调用函数startHeapMemoryManager(),调用变量HeapMemoryManager的函数create(),最后通过CacheConfig.instantiateBlockCache(conf)完成缓存的初始化。
HRegionServer有一个HeapMemoryManager类型的成员变量,用于管理RegionServer进程的堆内存,HeapMemoryManager中的blockCache就是RegionServer中的读缓存,它的初始化在CacheConfig的instantiateBlockCache方法中完成。
/** * Returns the block cache or <code>null</code> in case none should be used. * Sets GLOBAL_BLOCK_CACHE_INSTANCE **@paramconf The current configuration.*@returnThe block cache or null
. */publicstaticsynchronizedBlockCacheinstantiateBlockCache(Configuration conf){if(GLOBAL_BLOCK_CACHE_INSTANCE !=null)returnGLOBAL_BLOCK_CACHE_INSTANCE;if(blockCacheDisabled)returnnull;//一级缓存 LruBlockCache l1 = getL1(conf);// blockCacheDisabled is set as a side-effect of getL1Internal(), so check it again after the call.if(blockCacheDisabled)returnnull;//二级缓存 BlockCache l2 = getL2(conf);if(l2 ==null) { GLOBAL_BLOCK_CACHE_INSTANCE = l1;}else{booleanuseExternal = conf.getBoolean(EXTERNAL_BLOCKCACHE_KEY, EXTERNAL_BLOCKCACHE_DEFAULT);booleancombinedWithLru = conf.getBoolean(BUCKET_CACHE_COMBINED_KEY, DEFAULT_BUCKET_CACHE_COMBINED);//判断是否启用了外部二级缓存:hbase.blockcache.use.external=true启用if(useExternal) {GLOBAL_BLOCK_CACHE_INSTANCE =newInclusiveCombinedBlockCache(l1, l2);}else{if(combinedWithLru) {GLOBAL_BLOCK_CACHE_INSTANCE =newCombinedBlockCache(l1, l2);}else{// L1 and L2 are not 'combined'. They are connected via the LruBlockCache victimhandler// mechanism. It is a little ugly but works according to the following: when the// background eviction thread runs, blocks evicted from L1 will go to L2 AND when we get// a block from the L1 cache, if not in L1, we will search L2. GLOBAL_BLOCK_CACHE_INSTANCE = l1; } } l1.setVictimCache(l2); }returnGLOBAL_BLOCK_CACHE_INSTANCE; }}
上面有两行比较重要的代码:
LruBlockCache l1 = getL1(conf);
//获取一级缓存LruBlockCache默认为HRegionsever堆内存的40%,对应参数:hfile.block.cache.size
BlockCache l2 = getL2(conf);
//这是个什么东西呢?
从HBase-1.1.0起,在使用方式上可以单独使用LRUBlockCache,或者组合起来使用,多级缓存的方式。LruBlockCache为一级缓存,BucketCache或者ExternalBlockCache为二级缓存。HBase可以使用memcached作为外部BlockCache,这是一个在设备失效或者升级时不会发生完全的冷缓存的很好的特性。用句通俗的话讲,就是HBase出现故障或者升级时,缓存轻易不会丢失。
代码21行中对useExternal和combinedWithLru的判断,如果指定了useExternal为true,则结合memcached等外部缓存与BlockCache一起使用。如果指定了combinedWithLru,则结合bucketCache,也就是堆外内存与BlockCache一起使用。在上述两种情况下,BlockCache用于存放索引等元数据,真实的数据文件则缓存在memcached或bucketCache中。
二级缓存可通过配置以下两个参数启动:
hbase.blockcache.use.external为true 开启二级缓存
并配置hbase.cache.memcached.servers来指明memcached servers.
当 hbase.bucketcache.combinedcache.enable 为false。在这种模式下,当L1缓存内容被清除(置换)时,会将置换出的块放入L2。当一个块被缓存时,首先被缓存在L1。当我们去查询一个缓存块时,首先在L1查,若是没找到,则再搜索L2。我们将此部署方法称为Raw L1+L2。需要注意的是,这个L1+L2模式已经在hbase 2.0.0 以后被移除了。
2.BlockCache实现
BlockCache基于客户端对数据的访问频率,定义了三个不同的优先级,如下所示:
SINGLE:如果Block被第一次访问,则该Block被放在这一优先级队列中;
MULTI:如果一个Block被多次访问,则从single移到Multi中;
MEMORY:memory优先级由用户指定,一般不推荐,只用系统表才使用memory优先级;
LruBlockCache内部是通过一个ConcurrentHashMap来保存所有cache的block的
/** Concurrent map (the cache) */privatefinalMapmap;
Block块加入缓存的实现是在函数cacheBlock()中,
/** * Cache the block with the specified name and buffer. * <p> * It is assumed this will NOT be called on an already cached block. In rare cases (HBASE-8547) * this can happen, for which we compare the buffer contents.*@paramcacheKey block's cache key*@parambuf block buffer*@paraminMemory if block is in-memory*@paramcacheDataInL1 */@OverridepublicvoidcacheBlock(BlockCacheKey cacheKey, Cacheable buf,booleaninMemory,finalbooleancacheDataInL1){if(buf.heapSize() > maxBlockSize) {// If there are a lot of blocks that are too// big this can make the logs way too noisy.// So we log 2%if(stats.failInsert() %50==0) {LOG.warn("Trying to cache too large a block "+ cacheKey.getHfileName() +" @ " + cacheKey.getOffset()+" is "+ buf.heapSize()+" which is larger than "+ maxBlockSize); }return; } LruCachedBlock cb = map.get(cacheKey);if(cb !=null) {// compare the contents, if they are not equal, we are in big troubleif(compare(buf, cb.getBuffer()) !=0) {thrownewRuntimeException("Cached block contents differ, which should not have happened."+"cacheKey:"+ cacheKey); }String msg ="Cached an already cached block: "+ cacheKey +" cb:"+ cb.getCacheKey();msg +=". This is harmless and can happen in rare cases (see HBASE-8547)"; LOG.warn(msg);return; }longcurrentSize = size.get();longcurrentAcceptableSize = acceptableSize();longhardLimitSize = (long) (hardCapacityLimitFactor * currentAcceptableSize);if(currentSize >= hardLimitSize) { stats.failInsert();if(LOG.isTraceEnabled()) {LOG.trace("LruBlockCache current size "+ StringUtils.byteDesc(currentSize)+" has exceeded acceptable size "+ StringUtils.byteDesc(currentAcceptableSize) +" too many."+" the hard limit size is "+ StringUtils.byteDesc(hardLimitSize) +", failed to put cacheKey:"+ cacheKey +" into LruBlockCache."); }if(!evictionInProgress) { runEviction(); }return; }cb =newLruCachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);longnewSize = updateSizeMetrics(cb,false); map.put(cacheKey, cb);longval = elements.incrementAndGet();if(LOG.isTraceEnabled()) {longsize = map.size(); assertCounterSanity(size, val); }if(newSize > currentAcceptableSize && !evictionInProgress) { runEviction(); } }
代码大体逻辑如下:
1).这里假设不会对同一个已经被缓存的BlockCacheKey重复放入cache操作;
2).根据inMemory标志创建不同类别的CachedBlock对象:若inMemory为true则创建BlockPriority.MEMORY类型,否则创建BlockPriority.SINGLE;注意,这里只有这两种类型的Cache,因为BlockPriority.MULTI在Cache Block被重复访问时才进行创建。
3).将BlockCacheKey和创建的CachedBlock对象加入到前文说过的ConcurrentHashMap中,同时更新log&metrics上的计数;
4).最后判断如果加入新block后cache size大于设定的临界值且当前没有淘汰线程运行,则调用runEviction()方法启动LRU淘汰线程。
缓存数据的获取是在方法getBlock()中实现的
/** * Get the buffer of the block with the specified name.*@paramcacheKey block's cache key*@paramcaching true if the caller caches blocks on cache misses*@paramrepeat Whether this is a repeat lookup for the same block * (used to avoid double counting cache misses when doing double-check locking)*@paramupdateCacheMetrics Whether to update cache metrics or not*@returnbuffer of specified cache key, or null if not in cache */@OverridepublicCacheablegetBlock(BlockCacheKey cacheKey,booleancaching,booleanrepeat,booleanupdateCacheMetrics){ LruCachedBlock cb = map.get(cacheKey);if(cb ==null) {if(!repeat && updateCacheMetrics) stats.miss(caching, cacheKey.isPrimary());// If there is another block cache then try and read there.// However if this is a retry ( second time in double checked locking )// And it's already a miss then the l2 will also be a miss.if(victimHandler !=null&& !repeat) { Cacheable result = victimHandler.getBlock(cacheKey, caching, repeat, updateCacheMetrics);// Promote this to L1.if(result !=null&& caching) {cacheBlock(cacheKey, result,/* inMemory = */false,/* cacheData = */true); }returnresult; }returnnull; }if(updateCacheMetrics) stats.hit(caching, cacheKey.isPrimary()); cb.access(count.incrementAndGet());returncb.getBuffer(); }
代码逻辑大体如下:
1、首先,从LruBlockCache的map中直接获取;
2、如果map中没有,则在victimHandler存在且!repeat的情况下,通过victimHandler的getBlock()方法获取并缓存到LruBlockCache中,即综合考虑第二种缓存模式,并同步到第一种缓存中;
3、如果1或2能够获取到数据,更新统计数据,且通过缓存块的access方法,更新访问时间accessTime,将可能的BlockPriority.SINGLE升级为BlockPriority.MULTI;
BlockCache的LRU淘汰过程,主要是通过EvictionThread线程实现的
线程启动后调用wait被阻塞住,直到函数evict()调用notifyAl()才开始执行。envict()函数代码如下:
/** * Eviction method. */voidevict(){// Ensure only one eviction at a timeif(!evictionLock.tryLock())return;try{evictionInProgress =true;longcurrentSize =this.size.get();longbytesToFree = currentSize - minSize();if(LOG.isTraceEnabled()) {LOG.trace("Block cache LRU eviction started; Attempting to free "+StringUtils.byteDesc(bytesToFree) +" of total="+ StringUtils.byteDesc(currentSize)); }if(bytesToFree <=0)return;// Instantiate priority bucketsBlockBucket bucketSingle =newBlockBucket("single", bytesToFree, blockSize, singleSize());BlockBucket bucketMulti =newBlockBucket("multi", bytesToFree, blockSize, multiSize());BlockBucket bucketMemory =newBlockBucket("memory", bytesToFree, blockSize, memorySize());// Scan entire map putting into appropriate bucketsfor(LruCachedBlock cachedBlock : map.values()) {switch(cachedBlock.getPriority()) {caseSINGLE: {bucketSingle.add(cachedBlock);break; }caseMULTI: {bucketMulti.add(cachedBlock);break; }caseMEMORY: {bucketMemory.add(cachedBlock);break; } } }longbytesFreed =0;if(forceInMemory || memoryFactor >0.999f) {longs = bucketSingle.totalSize();longm = bucketMulti.totalSize();if(bytesToFree > (s + m)) {// this means we need to evict blocks in memory bucket to make room,// so the single and multi buckets will be emptied bytesFreed = bucketSingle.free(s); bytesFreed += bucketMulti.free(m);if(LOG.isTraceEnabled()) {LOG.trace("freed "+ StringUtils.byteDesc(bytesFreed) +" from single and multi buckets"); } bytesFreed += bucketMemory.free(bytesToFree - bytesFreed);if(LOG.isTraceEnabled()) {LOG.trace("freed "+ StringUtils.byteDesc(bytesFreed) +" total from all three buckets "); }}else{// this means no need to evict block in memory bucket,// and we try best to make the ratio between single-bucket and// multi-bucket is 1:2longbytesRemain = s + m - bytesToFree;if(3* s <= bytesRemain) {// single-bucket is small enough that no eviction happens for it// hence all eviction goes from multi-bucket bytesFreed = bucketMulti.free(bytesToFree);}elseif(3* m <=2* bytesRemain) {// multi-bucket is small enough that no eviction happens for it// hence all eviction goes from single-bucket bytesFreed = bucketSingle.free(bytesToFree);}else{// both buckets need to evict some blocksbytesFreed = bucketSingle.free(s - bytesRemain /3);if(bytesFreed < bytesToFree) { bytesFreed += bucketMulti.free(bytesToFree - bytesFreed); } } }}else{ PriorityQueue<BlockBucket> bucketQueue =newPriorityQueue(3);bucketQueue.add(bucketSingle);bucketQueue.add(bucketMulti);bucketQueue.add(bucketMemory);intremainingBuckets =3; BlockBucket bucket;while((bucket = bucketQueue.poll()) !=null) {longoverflow = bucket.overflow();if(overflow >0) {longbucketBytesToFree = Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets); bytesFreed += bucket.free(bucketBytesToFree); } remainingBuckets--; } }if(LOG.isTraceEnabled()) {longsingle = bucketSingle.totalSize();longmulti = bucketMulti.totalSize();longmemory = bucketMemory.totalSize();LOG.trace("Block cache LRU eviction completed; "+"freed="+ StringUtils.byteDesc(bytesFreed) +", "+"total="+ StringUtils.byteDesc(this.size.get()) +", "+"single="+ StringUtils.byteDesc(single) +", "+"multi="+ StringUtils.byteDesc(multi) +", "+"memory="+ StringUtils.byteDesc(memory)); }}finally{ stats.evict();evictionInProgress =false; evictionLock.unlock(); } }
具体逻辑如下:
1).先获取锁,保证同一时间只有一个线程执行;
2).计算当前缓存总大小currentSize,以及需要清理的缓存大小bytesToFree ,如果bytesToFree为0,直接返回;
3).创建三个队列,存放Single、Multi和InMemory类Block Cache,其中每个BlockBucket维护了一个CachedBlockQueue,按LRU淘汰算法维护该BlockBucket中的所有CachedBlock对象;
4).遍历记录所有Block Cache的全局ConcurrentHashMap,加入到相应的BlockBucket队列中;
5).将以上三个BlockBucket队列加入到一个优先级队列中,按照各个BlockBucket超出bucketSize的大小顺序排序(见BlockBucket的compareTo方法);
6) 遍历优先级队列,对于每个BlockBucket,通过Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets)计算出需要释放的空间大小,这样做可以保证尽可能平均地从三个BlockBucket中释放指定的空间;具体实现过程详见BlockBucket的free方法,从其CachedBlockQueue中取出即将被淘汰掉的CachedBlock对象。