一、协处理器的介绍
Hbase可以让用户的部分逻辑在数据存放端及Hbase服务端进行计算的机制(框架)。协处理器允许用户在Hbase服务端上运行自己的代码。
二、协处理器的分类
加载角度分类—— 系统协处理器、 表协处理器(用户可以指定某一张表使用协处理器 )
功能角度分类—— Observer协处理器(相当于关系型数据库中的触发器 )、Endpoint协处理器(动态终端,类似于一个存储过程 )
2.1Observer分类
RegionObserver协处理器——允许处理region上的事件
RegionServerObserver协处理器——处理RegionServer上的事件
MasterObserver协处理器——专门处理HMaster上的一些事件,比如创建删除表等操作。
WalObserver协处理器——允许处理日志上的事件
2.2 Observer的执行流程
2.3Endpoint协处理器
实现代码被部署在HBase服务器服务端,需要自己写一个客户端,去调用服务端上的实现。
三、演示endpoint服务端编写
1.创建endpoint.proto文件,生成java文件
option java_pakage = “com.jkb.coprocessor.endpoint”;
option java_outer_classname = “Sum”;
option java_generic_service = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
message SumRequence{
required string family = 1;
required string column = 2;
}
message SumResponse{
required int64 sum = 1 [default = 0];
}
//定义rpc服务器类的定义
service SumService{
rpc getSum(SumRequest)
returns (SumResponse);
}
2.执行命令,将.proto文件生成java代码
protoc endpoint.proto --java_out=./
3.将java文件放入eclipse中相应的工程中相应的目录下,然后加载Hbase中的jar包
4.编写SumEndPoint.java
Package com.jkb.coprocessor.endpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import edu.endpoint.Sum.SumRequest;
import edu.endpoint.Sum.SumResponse;
import edu.endpoint.Sum.SumService;
public class SumEndPoint extends SumService implements Coprocessor,CoprocessorService{
private RegionCoprocessorEnvironment env;
@Override
public void getSum(RpcController controller, SumRequest request,
RpcCallback<SumResponse> done) {
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes(request.getFamily()));
scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
SumResponse response = null;
InternalScanner scanner = null;
try{
scanner = env.getRegion().getScanner(scan);
List<Cell> results = new ArrayList<Cell>();
boolean hasMore = false;
Long sum = 0L;
do{
hasMore = scanner.next(results);
for(Cell cell: results){
sum += Long.parseLong(new String(CellUtil.cloneValue(cell)));
}
results.clear();
}while(hasMore);
response = SumResponse.newBuilder().setSum(sum).build();
}catch(IOException e){
ResponseConverter.setControllerException(controller, e);
}finally{
if(scanner!=null){
try {
scanner.close();
} catch (IOException e) {
}
}
}
done.run(response);
}
@Override
public Service getService() {
return this;
}
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if(env instanceof RegionCoprocessorEnvironment){
this.env = (RegionCoprocessorEnvironment)env;
}else{
throw new CoprocessorException("no load region");
}
}
@Override
public void stop(CoprocessorEnvironment arg0) throws IOException {
}
}