https://blog.csdn.net/wypersist/article/details/79830811
https://www.cnblogs.com/mthoutai/p/7323316.html
https://blog.csdn.net/lifuxiangcaohui/article/details/39991183/
HBase的一级索引就是rowkey,我们仅仅能通过rowkey进行检索。
假设我们想对hbase里面列族的列进行一些组合查询。就须要採用HBase的二级索引方案来进行多条件的查询。
二级索引的本质就是建立各列值与行键之间的映射关系
如上图,当要对F:C1这列建立索引时,只需要建立F:C1各列值到其对应行键的映射关系,如C11->RK1等,这样就完成了对F:C1列值的二级索引的构建,当要查询符合F:C1=C11对应的F:C2的列值时(即根据C1=C11来查询C2的值,图1青色部分)
MapReduce方案
IndexBuilder:利用MR的方式构建Index
长处:并发批量构建Index
缺点:不能实时构建Index
举例:
Demo
流程:
- 我们需要查询 某列 所在行的其他信息,就需要创建一个新的索引表
把 原表的这一列 作为新表的 rowkey,把 原表的 rowkey 作为新表的 列
首先在 Mapper 中创建了一个 HashMap,把原表的所有列作为 key,value我们先不关注
在 map() 中,会传入原表,依据原表 列族和从HashMap中获取的列 得到原表的rowkey
把 HashMap 中的key 作为新表的 rowkey,把原表的 rowkey作为新表的对应列下的值,具体看代码。
package IndexDouble;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.commons.collections.map.HashedMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.MultiTableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.GenericOptionsParser;
public class IndexBuilder {
private String rootDir;
private String zkServer;
private String port;
private Configuration conf;
private HConnection hConn = null;
private IndexBuilder(String rootDir,String zkServer,String port) throws IOException{
this.rootDir = rootDir;
this.zkServer = zkServer;
this.port = port;
conf = HBaseConfiguration.create();
conf.set("hbase.rootdir", rootDir);
conf.set("hbase.zookeeper.quorum", zkServer);
conf.set("hbase.zookeeper.property.clientPort", port);
hConn = HConnectionManager.createConnection(conf);
}
static class MyMapper extends TableMapper<ImmutableBytesWritable, Put>{
//记录了要进行索引的列
private Map<byte[], ImmutableBytesWritable> indexes = new
HashMap<byte[], ImmutableBytesWritable>();
private String familyName;
@Override
protected void map(ImmutableBytesWritable key, Result value,
Context context) throws IOException, InterruptedException {
//原始表列
Set<byte[]> keys = indexes.keySet();
//索引表的rowkey是原始表的列。索引表的列是原始表的rowkey
for (byte[] key : keys){
//获得新建索引表的表名
ImmutableBytesWritable indexTableName = indexes.get(k);
//Result存放的是原始表的数据
//依据列族 和 列 得到原始表的rowkey
byte[] rowkey = value.getValue(Bytes.toBytes(familyName), k);
if (rowkey != null) {
//索引表
Put put = new Put(rowkey);//索引表行键
//列族 列 原始表的行键(作为新表的 列:id 的值)
put.add(Bytes.toBytes("f1"),Bytes.toBytes("id"),key.get());
context.write(indexTableName, put);
}
}
}
//真正运行Map之前运行一些处理。
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
//通过上下文得到配置
Configuration conf = context.getConfiguration();
//获得表名
String tableName = conf.get("tableName");
//String family = conf.get("familyName");
//获得列族
familyName = conf.get("columnFamily");
//获得列
String[] qualifiers = conf.getStrings("qualifiers");
for (String qualifier : qualifiers) {
//建立一个映射,为每个列创建一个表,表的名字tableName+"-"+qualifier
//原始表的列 索引表新建表名
indexes.put(Bytes.toBytes(qualifier),
new ImmutableBytesWritable(Bytes.toBytes(tableName+"-"+qualifier)));
}
}
}
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
String rootDir = "hdfs://hadoop1:8020/hbase";
String zkServer = "hadoop1";
String port = "2181";
IndexBuilder conn = new IndexBuilder(rootDir,zkServer,port);
String[] otherArgs = new GenericOptionsParser(conn.conf, args).getRemainingArgs();
//IndexBuilder: TableName,ColumnFamily,Qualifier
if(otherArgs.length<3){
System.exit(-1);
}
//表名
String tableName = otherArgs[0];
//列族
String columnFamily = otherArgs[1];
conn.conf.set("tableName", tableName);
conn.conf.set("columnFamily", columnFamily);
//列 可能存在多个列
String[] qualifiers = new String[otherArgs.length-2];
for (int i = 0; i < qualifiers.length; i++) {
qualifiers[i] = otherArgs[i+2];
}
//设置列
conn.conf.setStrings("qualifiers", qualifiers);
@SuppressWarnings("deprecation")
Job job = new Job(conn.conf,tableName);
job.setJarByClass(IndexBuilder.class);
job.setMapperClass(MyMapper.class);
job.setNumReduceTasks(0);//因为不须要运行reduce阶段
job.setInputFormatClass(TableInputFormat.class);
job.setOutputFormatClass(MultiTableOutputFormat.class);
Scan scan = new Scan();
TableMapReduceUtil.initTableMapperJob(tableName,scan,
MyMapper.class, ImmutableBytesWritable.class, Put.class, job);
job.waitForCompletion(true);
}
}