Hbase Scan 主要流程分析.md

Hbase Scan 流程分析

公司在集群在从0.94.6升到0.98.6-cdh5.2.0后, 原来执行的hbase scan 任务出现很多问题.

表现在:

  • setBatch() 与filter 不兼容, 导致代码需要修改(删掉setBatch())

  • scan 效率变慢, 并且经常超时. mapper 报OutOfOrderException. 影响任务执行效率.

所以有必要在理解scan 流程的基础上, 进行优化.

从应用hbase角度来讲, 需要理解scan 几个配置. 包括setCaching() setBatch(), 以及scan的过程.

另外一个, 是过滤器如何工作. 在哪一步发挥作用.

1, Hbase MR

  • Hbase MR 主流程代码

Configurationconf= getConf();

Scanscan= buildScan(conf);

//初始化认证

TableMapReduceUtil.initCredentials(job);

// run job

TableMapReduceUtil.initTableMapperJob(tableName,scan, HbaseSearchCheckerMapper.class, Text.class, Text.class,job);

  • 创建Scan实例

Scanscan=newScan();

scan.setCaching(1000);

scan.setCacheBlocks(false);// no read cache

scan.addFamily(Bytes.toBytes("t"));

  • 和普通MR程序一样定义mapper类 map 函数

public void map(ImmutableBytesWritable key, Result value, Context context)

throws IOException, InterruptedException {

String rowKey = null;

for (Cell kv : value.rawCells()) {

if(rowKey == null) {

rowKey = Bytes.toString(CellUtil.cloneRow(kv));

}

count ++;

}

}

以上是Hbase MR 程序. 非常简单.

但是在执行过程中, scan 实例的设置影响最后的性能. 一般情况下, 可以通过过滤器filter, 来优化scan.

所以有必要了解scan 的流程.

2, Scan 流程

Hbase 结构
  • Hbase 通过Region Server 管理数据

  • Hbase 作为列式数据库, 不同种类数据通过CF管理 [ 分为不同目录] ,数据保存在HFile中. 内存中数据在flush前保留在MemStore中.

  • Hbase RowKey 结构. rowKey 是有序的.

Scan client 流程
  • Scan 类

hbase-client 下package:org.apache.hadoop.hbase.client

这个相当于是配置类, 作用是构造用户scan 的条件. 也就是用户buildScan时的配置.

比如用户需要scan的column, column会转化为如下结构.


Map> fams = scan.getFamilyMap();

注意下这俩函数.


publicScan addFamily(byte[]family) {

familyMap.remove(family);

familyMap.put(family,null);

returnthis;

}

publicScan addColumn(byte[]family,byte[]qualifier) {

NavigableSet set = familyMap.get(family);

if(set ==null) {

set =newTreeSet(Bytes.BYTES_COMPARATOR);

}

if(qualifier ==null) {

qualifier = HConstants.EMPTY_BYTE_ARRAY;

}

set.add(qualifier);

familyMap.put(family, set);

returnthis;

}

这里有个坑. 如果你调用addFamily, 然后在用addColumn, 那么显然addFamily等于是无效的. 需要注意.

  • ClientScanner 类

这个类是封装了scan请求, 以及返回结构.

在构造这个类的时候, 调用nextScanner 构造


ScannerCallable callable = getScannerCallable(localStartKey, nbRows);

发起scan请求:callable.call():


this.scannerId = openScanner();

request = RequestConverter.buildScanRequest(scannerId, caching,false, nextCallSeq);

response = getStub().scan(controller, request);

// Results are returned via controller

CellScanner cellScanner = controller.cellScanner();

rrs = ResponseConverter.getResults(cellScanner, response);

client 发起scan请求, 并接受返回的结果.

Scan Server 流程
  • Scanner 在server 结构

1), 请求到RegionServer 构造 RegionScanner

2), Region Server 管理一堆ColumnFamily. 构造StoreFileScanner, 包括MemStoreScanner

3),StoreFile 管理一堆HFile, 构造HFileScanner. 这个是实际读取数据的地方.

这里有一个问题. 过滤器是在哪一步来执行的?

  • RegionSanner

实际在HRegion中, 通过RegionScannerImpl构造

HRegion:


protectedRegionScanner getScanner(Scan scan,

List additionalScanners)throwsIOException {

startRegionOperation(Operation.SCAN);

try{

// Verify families are all valid

prepareScanner(scan);

if(scan.hasFamilies()) {

for(byte[] family : scan.getFamilyMap().keySet()) {

checkFamily(family);

}

}

returninstantiateRegionScanner(scan, additionalScanners);

}finally{

closeRegionOperation(Operation.SCAN);

}

}

//instantiateRegionScanner 函数里:

returnnewRegionScannerImpl(scan, additionalScanners,this);

  • StoreScanner 类:

RegionScannerImpl 构造StoreScanner:


for(Map.Entry> entry :

scan.getFamilyMap().entrySet()) {

Store store = stores.get(entry.getKey());

KeyValueScanner scanner = store.getScanner(scan, entry.getValue(),this.readPt);

if(this.filter ==null|| !scan.doLoadColumnFamiliesOnDemand()

||this.filter.isFamilyEssential(entry.getKey())) {

scanners.add(scanner);

}else{

joinedScanners.add(scanner);

}

}

  • Server Scan 处理流程

1), HRegionServer 获取到Scan的RPC请求

前面client提到:

response = getStub().scan(controller, request);

getStub() 返回的是ClientService.BlockingInterface

HRegionServer 的定义:

public class HRegionServer implements ClientProtos.ClientService.BlockingInterface,

所以HRegionServer 其实是继承自一个protobuf 类. 可以方便的交换数据. 并且定义了scan接口. 调用方式可能是通过序列化, 反射的方式来执行.这个后面再说.

2), scan 流程


// 初始化

region = getRegion(request.getRegion());

ClientProtos.Scan protoScan = request.getScan();

Scan scan = ProtobufUtil.toScan(protoScan);

region.prepareScanner(scan);

RegionScannerscanner = region.getScanner(scan);

// scan 函数:

region.startRegionOperation(Operation.SCAN);

while (i < rows) {

// Stop collecting results if maxScannerResultSize is set and we have exceeded it

if ((maxScannerResultSize < Long.MAX_VALUE) &&

(currentScanResultSize >= maxResultSize)) {

break;

}

// Collect values to be returned here

boolean moreRows = scanner.nextRaw(values);

if (!values.isEmpty()) {

for (Cell cell : values) {

KeyValue kv = KeyValueUtil.ensureKeyValue(cell);

currentScanResultSize += kv.heapSize();

totalKvSize += kv.getLength();

}

results.add(Result.create(values));

i++;

}

if (!moreRows) {

break;

}

values.clear();

}

注意上面的rows 就是scan.setCaching(rows)的设置的.

核心的代码是 RegionScannerImp 实现的nextRaw函数

nextRaw 调用nextInternal 函数


while (true) {

// First, check if we are at a stop row. If so, there are no more results.

if (stopRow) {

if (filter != null && filter.hasFilterRow()) {

filter.filterRowCells(results);

}

return false;

}

// Check if rowkey filter wants to exclude this row. If so, loop to next.

// Technically, if we hit limits before on this row, we don't need this call.

if (filterRowKey(currentRow, offset, length)) {

boolean moreRows = nextRow(currentRow, offset, length);

if (!moreRows) return false;

results.clear();

continue;

}

KeyValue nextKv = populateResult(results, this.storeHeap, limit, currentRow, offset,

length);

// Ok, we are good, let's try to get some results from the main heap.

if (nextKv == KV_LIMIT) {

if (this.filter != null && filter.hasFilterRow()) {

throw new IncompatibleFilterException(

"Filter whose hasFilterRow() returns true is incompatible with scan with limit!");

}

return true; // We hit the limit.

}

FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;

if (filter != null && filter.hasFilterRow()) {

ret = filter.filterRowCellsWithRet(results);

}

if ((isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE) || filterRow()) {

results.clear();

boolean moreRows = nextRow(currentRow, offset, length);

if (!moreRows) return false;

// This row was totally filtered out, if this is NOT the last row,

// we should continue on. Otherwise, nothing else to do.

if (!stopRow) continue;

return false;

}

} else {

// Populating from the joined heap was stopped by limits, populate some more.

populateFromJoinedHeap(results, limit);

}

}

// nextRow 代码:

protected boolean nextRow(byte [] currentRow, int offset, short length) throws IOException {

assert this.joinedContinuationRow == null: "Trying to go to next row during joinedHeap read.";

KeyValue next;

while ((next = this.storeHeap.peek()) != null &&

next.matchingRow(currentRow, offset, length)) {

this.storeHeap.next(MOCKED_LIST);

}

resetFilters();

// Calling the hook in CP which allows it to do a fast forward

return this.region.getCoprocessorHost() == null

|| this.region.getCoprocessorHost()

.postScannerFilterRow(this, currentRow, offset, length);

}

注意, 这里的limit 是setBatch(N)设置的.

也就是说, 这里的limit是不能设置的. 否则也会报错.

但是如果不设置, scan 会找到一个rowKey 一行所有的列. 对于某些较大的数据, 就会非常慢.

如上代码里, 过滤器在哪一步执行也非常明显了.

那么之前提到的StoreScanner 在哪里呢, 也就是真正去读HFile文件的地方呢?

答案来了:


// scanners 就是之前定义的CF scanner

this.storeHeap =newKeyValueHeap(scanners, region.comparator);

//this.storeHeap.peek():

我们知道, StoreScanner 下管理很多的HFile. 这相当于是一个多路归并拉数据的算法.

这块的调用比较复杂. 但是看StoreScanner下next() 函数

这里是个递归的调用:


LOOP: while((kv = this.heap.peek()) != null) {

if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap.

checkScanOrder(prevKV, kv, comparator);

prevKV = kv;

ScanQueryMatcher.MatchCode qcode = matcher.match(kv);

switch(qcode) {

case INCLUDE:

case INCLUDE_AND_SEEK_NEXT_ROW:

case INCLUDE_AND_SEEK_NEXT_COL:

this.countPerRow++;

if (storeLimit > -1 &&

this.countPerRow > (storeLimit + storeOffset)) {

// do what SEEK_NEXT_ROW does.

if (!matcher.moreRowsMayExistAfter(kv)) {

return false;

}

seekToNextRow(kv);

break LOOP;

}

// add to results only if we have skipped #storeOffset kvs

// also update metric accordingly

if (this.countPerRow > storeOffset) {

outResult.add(kv);

count++;

}

if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {

if (!matcher.moreRowsMayExistAfter(kv)) {

return false;

}

seekToNextRow(kv);

} else if (qcode == ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {

seekAsDirection(matcher.getKeyForNextColumn(kv));

} else {

this.heap.next();

}

if (limit > 0 && (count == limit)) {

break LOOP;

}

continue;

default:

throw new RuntimeException("UNEXPECTED");

}

}

注意storeLimit变量


java:

// set storeLimit

this.storeLimit = scan.getMaxResultsPerColumnFamily();

也就是, 在这里, 可以通过这个给函数. 在实际scan table的时候, 对那些column 非常多的行, 做过滤. 实际上不需要所有的行都读.

这样可以近似的加快数据的统计.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,287评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,346评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,277评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,132评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,147评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,106评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,019评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,862评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,301评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,521评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,682评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,405评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,996评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,651评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,803评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,674评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,563评论 2 352

推荐阅读更多精彩内容