简述
在HBase中,数据存储在表中,表分为行和列。与关系型数据库不同的是HBase有一个列族(Column Family)的概念,它将一列或者多列组织在一起,HBase的列必须属于某一个列族。
对于HBase中的表而言,有唯一的行键Rowkey,每一行对应一列或者多列,列中的值可以有多个版本,受版本。版本指的是,当同Rowkey的数据写入的时候,根据时间戳的不同,保留的副本数。一般而言,保留版本数的方式是在创建表的时候指定,当然亦可以后续修改。如:
hbase(main):004:0> disable 'tabledemo'
hbase(main):005:0> alter 'tabledemo', NAME => 'f2', VERSIONS => 10
Updating all regions with the new schema...
8/8 regions updated.
Done.
0 row(s) in 1.2530 seconds
hbase(main):010:0> enable 'tabledemo'
数据模型的重要概念
HBase被称为无模式数据库,因为HBase的表没有列定义,同时HBase也不支持表关联。以下对HBase几种关键的组织模型进行介绍:
Namespace(表命名空间):表命名空间不是强制的,当想把多个表分到一个组去统一管理的时候才会用到表命名空间。这个概念之前没提到,因为初学者一般用不到,当数据库中没有那么多表的时候也用不到这个概念,不过接下来会在一个专门的章节介绍一下这个概念。
Table(表):一个表由一个或者多个列族组成。数据属性,比如超时时间(TTL),压缩算法(COMPRESSION)等,都在列族的定义中定义。定义完列族后表是空的,只有添加了行,表才有数据。
Row(行):一个行包含了多个列,这些列通过列族来分类。行中的数据所属列族只能从该表所定义的列族中选取,不能定义这个表中不存在的列族,否则你会得到一个NoSuchColumnFamilyException。由于HBase是一个列式数据库,所以一个行中的数据可以分布在不同的服务器上。
Column Family(列族):列族是多个列的集合。其实列式数据库只需要列就可以了,为什么还需要有列族呢?因为HBase会尽量把同一个列族的列放到同一个服务器上,这样可以提高存取性能,并且可以批量管理有关联的一堆列。所有的数据属性都是定义在列族上。在HBase中,建表定义的不是列,而是列族,列族可以说是HBase中最重要的概念。
Column Qualifier(列):多个列组成一个行。列族和列经常用Column Family:Column Qualifier来一起表示。列是可以随意定义的,一个行中的列不限名字、不限数量,只限定列族。
Cell(单元格):一个列中可以存储多个版本的数据。而每个版本就称为一个单元格(Cell),所以在HBase中的单元格跟传统关系型数据库的单元格概念不一样。HBase中的数据细粒度比传统数据结构更细一级,同一个位置的数据还细分成多个版本。
Timestamp(时间戳/版本号):你既可以把它称为是时间戳,也可以称为是版本号,因为它是用来标定同一个列中多个单元格的版本号的。当你不指定版本号的时候,系统会自动采用当前的时间戳来作为版本号;而当你手动定义了一个数字来当作版本号的时候,这个Timestamp就真的是只有版本号的意义了。
数据模型的操作
HBase对表的操作
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
public class Example {
private static final String TABLE_NAME = "MY_TABLE_NAME_TOO";
private static final String CF_DEFAULT = "DEFAULT_COLUMN_FAMILY";
public static void createOrOverwrite(Admin admin, HTableDescriptor table) throws IOException {
if (admin.tableExists(table.getTableName())) {
admin.disableTable(table.getTableName());
admin.deleteTable(table.getTableName());
}
admin.createTable(table);
}
public static void createSchemaTables(Configuration config) throws IOException {
try (Connection connection = ConnectionFactory.createConnection(config);
Admin admin = connection.getAdmin()) {
HTableDescriptor table = new HTableDescriptor(TableName.valueOf(TABLE_NAME));
table.addFamily(new HColumnDescriptor(CF_DEFAULT).setCompressionType(Algorithm.NONE));
System.out.print("Creating table. ");
createOrOverwrite(admin, table);
System.out.println(" Done.");
}
}
public static void modifySchema (Configuration config) throws IOException {
try (Connection connection = ConnectionFactory.createConnection(config);
Admin admin = connection.getAdmin()) {
TableName tableName = TableName.valueOf(TABLE_NAME);
if (!admin.tableExists(tableName)) {
System.out.println("Table does not exist.");
System.exit(-1);
}
HTableDescriptor table = admin.getTableDescriptor(tableName);
// Update existing table
HColumnDescriptor newColumn = new HColumnDescriptor("NEWCF");
newColumn.setCompactionCompressionType(Algorithm.GZ);
newColumn.setMaxVersions(HConstants.ALL_VERSIONS);
admin.addColumn(tableName, newColumn);
// Update existing column family
HColumnDescriptor existingColumn = new HColumnDescriptor(CF_DEFAULT);
existingColumn.setCompactionCompressionType(Algorithm.GZ);
existingColumn.setMaxVersions(HConstants.ALL_VERSIONS);
table.modifyFamily(existingColumn);
admin.modifyTable(tableName, table);
// Disable an existing table
admin.disableTable(tableName);
// Delete an existing column family
admin.deleteColumn(tableName, CF_DEFAULT.getBytes("UTF-8"));
// Delete a table (Need to be disabled first)
admin.deleteTable(tableName);
}
}
public static void main(String... args) throws IOException {
Configuration config = HBaseConfiguration.create();
//Add any necessary configuration files (hbase-site.xml, core-site.xml)
config.addResource(new Path(System.getenv("HBASE_CONF_DIR"), "hbase-site.xml"));
config.addResource(new Path(System.getenv("HADOOP_CONF_DIR"), "core-site.xml"));
createSchemaTables(config);
modifySchema(config);
}
}
读数据
Get get = new Get(Bytes.toBytes("row1"));
Result r = htable.get(get);
byte[] b = r.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr")); // returns current version of value
含有版本的Get
Get get = new Get(Bytes.toBytes("row1"));
get.setMaxVersions(3); // will return last 3 versions of row
Result r = htable.get(get);
byte[] b = r.getValue(Bytes.toBytes("cf"), Bytes.toBytes("attr")); // returns current version of value
List<KeyValue> kv = r.getColumn(Bytes.toBytes("cf"), Bytes.toBytes("attr")); // returns all versions of this column
写数据
Put put = new Put(Bytes.toBytes(row));
put.add(Bytes.toBytes("cf"), Bytes.toBytes("attr1"), Bytes.toBytes( data));
htable.put(put);
指明版本
Put put = new Put( Bytes.toBytes(row));
long explicitTimeInMs = 555; // just an example
put.add(Bytes.toBytes("cf"), Bytes.toBytes("attr1"), explicitTimeInMs, Bytes.toBytes(data));
htable.put(put);
删除数据
// Instantiating Delete class
Delete delete = new Delete(Bytes.toBytes("row1"));
delete.deleteColumn(Bytes.toBytes("cf"), Bytes.toBytes("attr1"));
delete.deleteFamily(Bytes.toBytes("cf"));
// deleting the data
table.delete(delete);
scan
HTable htable = ... // instantiate HTable
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("cf"),Bytes.toBytes("attr"));
scan.setStartRow( Bytes.toBytes("row")); // start key is inclusive
scan.setStopRow( Bytes.toBytes("row" + (char)0)); // stop key is exclusive
ResultScanner rs = htable.getScanner(scan);
try {
for (Result r = rs.next(); r != null; r = rs.next()) {
// process result...
} finally {
rs.close(); // always close the ResultScanner!
}