hbase有两种Coprocessor,endpoint和observer,endpoint类似于存储过程,可以在hbase上实现了一个类似于mapReduce的过程,observer实现起来比较简单,类似于触发器,具体架构和理论在这里就不在多说,直接上代码,下面代码实现了一个数据自增的功能,相同key的数据,每插入一条,后面的counts列自动加1。
package com.open01.hbase.coprocessor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.List;
/**
* Created by caolch on 2017/4/7.
*/
public class PrePutSumObserver extends BaseRegionObserver{
private byte[] family;
private byte[] col;
@Override
public void start(CoprocessorEnvironment e) throws IOException {
Configuration conf = e.getConfiguration();
family = Bytes.toBytes(conf.get("family"));
col = Bytes.toBytes(conf.get("col"));
}
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
super.stop(e);
}
@Override
public void prePut(ObserverContext<RegionCoprocessorEnvironment> e,
Put put, WALEdit edit, Durability durability) throws IOException {
// super.prePut(e, put, edit, durability);
if (put.has(family,col)){
int oriCounts = 0;
int incrCounts = 0;
int sum = 0;
List<Cell> cells = put.get(family,col);
Result rs = e.getEnvironment().getRegion().get(new Get(put.getRow()));
for (Cell cell : rs.rawCells()) {
if (CellUtil.matchingColumn(cell,family,col)){
oriCounts = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
for (Cell cell : cells) {
if (CellUtil.matchingColumn(cell,family,col)){
incrCounts = Integer.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));
}
}
sum = oriCounts + incrCounts;
put.addColumn(family,col,Bytes.toBytes(String.valueOf(sum)));
}
}
}
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>hbase-observer</groupId>
<artifactId>hbase-observer</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.1.9</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.1.9</version>
</dependency>
</dependencies>
</project>
打好包,把jar包传到上传到hdfs目录,然后在hbase执行alter命令,修改要添加observer的table:
#!/bin/bash
hbase shell <<EOF
disable 'observer_test'
alter 'observer_test', METHOD => 'table_att', 'Coprocessor'=>'hdfs://open009:9000/user/root/observerJar/hbase-observer-1.0-SNAPSHOT.jar|com.open01.hbase.coprocessor.PrePutSumObserver|1001|family=f1,col=counts'
enable 'observer_test'
EOF