Hbase二级索引(BaseRegionObserver 协处理器)

概述

HBase 是一款基于 Hadoop 的 key-value 数据库,它提供了对 HDFS 上数据的高效随机读写服务,完美地填补了 Hadoop MapReduce 仅适于批处理的缺陷,正在被越来越多的用户使用。作为 HBase 的一项重要特性,Coprocessor 在 HBase 0.92 版本中被加入,并广受欢迎

利用协处理器,用户可以编写运行在 HBase Server 端的代码。HBase 支持两种类型的协处理器,Endpoint 和 Observer。Endpoint 协处理器类似传统数据库中的存储过程,客户端可以调用这些 Endpoint 协处理器执行一段 Server 端代码,并将 Server 端代码的结果返回给客户端进一步处理,最常见的用法就是进行聚集操作。如果没有协处理器,当用户需要找出一张表中的最大数据,即 max 聚合操作,就必须进行全表扫描,在客户端代码内遍历扫描结果,并执行求最大值的操作。这样的方法无法利用底层集群的并发能力,而将所有计算都集中到 Client 端统一执行,势必效率低下。利用 Coprocessor,用户可以将求最大值的代码部署到 HBase Server 端,HBase 将利用底层 cluster 的多个节点并发执行求最大值的操作。即在每个 Region 范围内执行求最大值的代码,将每个 Region 的最大值在 Region Server 端计算出,仅仅将该 max 值返回给客户端。在客户端进一步将多个 Region 的最大值进一步处理而找到其中的最大值。这样整体的执行效率就会提高很多。

另外一种协处理器叫做 Observer Coprocessor,这种协处理器类似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Server 端调用。Observer Coprocessor 就是一些散布在 HBase Server 端代码中的 hook 钩子,在固定的事件发生时被调用。比如:put 操作之前有钩子函数 prePut,该函数在 put 操作执行前会被 Region Server 调用;在 put 操作之后则有 postPut 钩子函数。

开发环境

  • maven-3.3.9
  • jdk 1.7
  • cdh-hbase-1.2.0
  • myeclipse 10

hbase协处理器加载

进入hbase命令行

# hbase shell

hbase(main):> disable 'test'    

hbase(main):> alter 'test',CONFIGURATION => {'hbase.table.sanity.checks'=>'false'}         //-----》建立表后,执行一次就行

hbase(main):> alter 'test','coprocessor'=>'hdfs:///code/jars/regionObserver-put5.jar|com.hbase.observer.App|1001'   //----》加载jar包

hbase(main):> alter 'test', METHOD => 'table_att_unset', NAME => 'coprocessor$1'  //--------》卸载jar包

hbase(main):> desc 'test'    //-------》查看表的属性描述

hbase(main):> enable 'test'

完整工程代码

package com.hbase.observer;

/**
 * hbase 二级索引
 * @author wing
 * @createTime 2017-4-7 
 */
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;

public class App extends BaseRegionObserver {

    private HTablePool pool = null;

    private final static String SOURCE_TABLE = "test";

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        pool = new HTablePool(env.getConfiguration(), 10);
    }

    @Override
    public void postGetOp(ObserverContext<RegionCoprocessorEnvironment> c,
            Get get, List<Cell> results) throws IOException {
        HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));
        String newRowkey = Bytes.toString(get.getRow());
        String pre = newRowkey.substring(0, 1);

        if (pre.equals("t")) {
            String[] splits = newRowkey.split("_");
            String prepre = splits[0].substring(1, 3);
            String timestamp = splits[0].substring(3);
            String uid = splits[1];
            String mid = "";
            for (int i = 2; i < splits.length; i++) {
                mid += splits[i];
                mid += "_";
            }
            mid = mid.substring(0, mid.length() - 1);
            String rowkey = prepre + uid + "_" + timestamp + "_" + mid;
            System.out.println(rowkey);
            Get realget = new Get(rowkey.getBytes());
            Result result = table.get(realget);

            List<Cell> cells = result.listCells();
            results.clear();
            for (Cell cell : cells) {
                results.add(cell);
            }

        }
    }

    @Override
    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e,
            Put put, WALEdit edit, Durability durability) throws IOException {
        try {

            String rowkey = Bytes.toString(put.getRow());
            HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));

            String pre = rowkey.substring(0, 2);
            if (pre.equals("aa") || pre.equals("ab") || pre.equals("ac")
                    || pre.equals("ba") || pre.equals("bb") || pre.equals("bc")
                    || pre.equals("ca") || pre.equals("cb") || pre.equals("cc")) {
                String[] splits = rowkey.split("_");
                String uid = splits[0].substring(2);
                String timestamp = splits[1];
                String mid = "";
                for (int i = 2; i < splits.length; i++) {
                    mid += splits[i];
                    mid += "_";
                }
                mid = mid.substring(0, mid.length() - 1);
                String newRowkey = "t" + pre + timestamp + "_" + uid + "_"
                        + mid;
                System.out.println(newRowkey);
                Put indexput2 = new Put(newRowkey.getBytes());
                indexput2.addColumn("relation".getBytes(),
                        "column10".getBytes(), "45".getBytes());
                table.put(indexput2);

            }
            table.close();

        } catch (Exception ex) {

        }

    }

    @Override
    public boolean postScannerNext(
            ObserverContext<RegionCoprocessorEnvironment> e, InternalScanner s,
            List<Result> results, int limit, boolean hasMore)
            throws IOException {
        HTableInterface table = pool.getTable(Bytes.toBytes(SOURCE_TABLE));
        List<Result> newresults = new ArrayList<Result>();
        for (Result result : results) {
            String newRowkey = Bytes.toString(result.getRow());

            String pre = newRowkey.substring(0, 1);

            if (pre.equals("t")) {
                String[] splits = newRowkey.split("_");
                String prepre = splits[0].substring(1, 3);
                String timestamp = splits[0].substring(3);
                String uid = splits[1];
                String mid = "";
                for (int i = 2; i < splits.length; i++) {
                    mid += splits[i];
                    mid += "_";
                }
                mid = mid.substring(0, mid.length() - 1);
                String rowkey = prepre + uid + "_" + timestamp + "_" + mid;

                Get realget = new Get(rowkey.getBytes());
                Result newresult = table.get(realget);

                newresults.add(newresult);
            }

        }
         results.clear();
        for (Result result : newresults) {
            results.add(result);
        }

        return hasMore;

    }

    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {
        pool.close();
    }
    
}

通过maven工程打包后上传到hdfs相应目录,再通过命令加载jar包。
即可完成二级索引。

  • 当用户put操作时,会将原rowkey,转换为新的rowkey,再存一份索引。
  • 当用户get操作时,会将rowkey映射为实际的rowkey,再根据实际的rowkey获取实际的结果。
  • 当用户执行scanner操作时,会将scanner的结果映射为实际rowkey的结果,返回给用户。

通过hbase的BaseRegionObserver 协处理器,可以封装处理很多hbase操作。

BaseRegionObserver的java接口(注意hbase版本)
https://hbase.apache.org/1.2/apidocs/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • HBase那些事 @(大数据工程学院)[HBase, Hadoop, 优化, HadoopChen, hbase]...
    分痴阅读 4,009评论 3 17
  • 最近在逐步跟进Hbase的相关工作,由于之前对Hbase并不怎么了解,因此系统地学习了下Hbase,为了加深对Hb...
    飞鸿无痕阅读 50,351评论 19 272
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 135,026评论 19 139
  • Hbase架构与原理 HBase是一个分布式的、面向列的开源数据库,该技术来源于 Fay Chang所撰写的Goo...
    全能程序猿阅读 86,330评论 2 37
  • HBase存储架构图 HBase Master 为Region server分配region 负责Region s...
    kimibob阅读 5,634评论 0 52