将数据文件(csv,Tsv)导入Hbase的三种方法

将各种类型的数据库或者文件导入到HBase,常见有三种方法:

  (1)使用HBase的API中的Put方法

  (2)使用HBase 的bulk load工具

  (3)使用定制的MapReduce Job方式

(1)使用HBase的API中的Put是最直接的方法,但是它并非都是最高效的方式(2)Bulk load是通过一个MapReduce Job来实现的,通过Job直接生成一个HBase的内部HFile格式文件来形成一个特殊的HBase数据表,然后直接将数据文件加载到运行的集群中。使用bulk load功能最简单的方式就是使用importtsv 工具。importtsv 是从TSV文件直接加载内容至HBase的一个内置工具。它通过运行一个MapReduce Job,将数据从TSV文件中直接写入HBase的表或者写入一个HBase的自有格式数据文件。(3)可以使用MapReduce向HBase导入数据,但海量的数据集会使得MapReduce Job也变得很繁重。推荐使用sqoop,它的底层实现是mapreduce,数据并行导入的,这样无须自己开发代码,过滤条件通过query参数可以实现。

通过单客户端导入mySQL数据

从一个单独的客户端获取数据,然后通过HBase的API中Put方法将数据存入HBase中。这种方式适合处理数据不是太多的情况。

实施:

<1>在HBase中创建表

<2>写一个java程序,mySQL中的数据导入Hbase,并将其打包为JAR.

1.使用Java创建一个connectHBase() 方法来连接到指定的HBase表。

2.使用Java创建一个 connectDB() 方法来 MySQL。

3.通过脚本执行JAR文件

4.验证导入的数据

    在HBase中创建了目标表用于插入数据。目标表名称为hly_temp,且只有单个列族(column family) n。我们将列族名称设计为一个字母的原因,是因为列族名称会存储在HBase的每个键值对中。使用短名能够让数据的存储和缓存更有效率。我们只需要保留一个版本的数据,所以为列族指定VERSION属性。

    首先创建一个配置(Configuration )对象,使用该对象创建一个HTable实例。

    然后,使用JDBC中MySQL中获取数据之后,我们循环读取结果集,将MySQL中的一行映射为HBase表中的一行。

    创建了Put对象,利用row key添加一行数据。每小时的数据的添加需要调用Put.add()方法,传入参数包括列族

    最后,所有打开的资源都需要手动关闭。我们在代码中的final块中结束了MySQL和HBase的连接,这样确保即时导入动作中抛出异常仍然会被调用到。

HIVE和HBASE区别

    Hive是一个构建在Hadoop基础设施之上的数据仓库。通过Hive可以使用HQL语言查询存放在HDFS上的数据。HQL是一种类SQL语言,这种语言最终被转化为Map/Reduce. 虽然Hive提供了SQL查询功能,但是Hive不能够进行交互查询--因为它只能够在Haoop上批量的执行Hadoop。

    HBase是一种Key/Value系统,它运行在HDFS之上。和Hive不一样,Hbase的能够在它的数据库上实时运行,而不是运行MapReduce任务。

两者的特点:

    Hive帮助熟悉SQL的人运行MapReduce任务。因为它是JDBC兼容的,同时,它也能够和现存的SQL工具整合在一起。运行Hive查询会花费很长时间,因为它会默认遍历表中所有的数据。但其可以设置分区查询。

    HBase通过存储key/value来工作。它支持四种主要的操作:增加或者更新行,查看一个范围内的cell,获取指定的行,删除指定的行、列或者是列的版本。

问题:

    Hive目前不支持更新操作,是在hadoop上的批量操作,需要花费很长时间。HBase查询是通过特定的语言来编写的,这种语言需要重新学习。类SQL的功能可以通过Apache Phonenix实现。运行hbase,zookeeper是必须的。

实时查询的比较:

    Hive适合用来对一段时间内的数据进行分析查询,例如,用来计算趋势或者网站的日志。Hive不应该用来进行实时的查询。因为它需要很长时间才可以返回结果。

    Hbase非常适合用来进行大数据的实时查询。Facebook用Hbase进行消息和实时的分析。它也可以用来统计Facebook的连接数。

    注意:IP部分改为本机IP地址或localhost。同时,HBase只支持十六进制存储中文。

Hive、MySQL、HBase

hive的操作命令和SQL基本一致

HBase的操作和SQL有些区别,使用hbase shell进入Hbase,

1.创建表  create 'student','Sname','Ssex','Sage','Sdept','course',因为HBase的表中会有一个系统默认的属性作为行键,无需自行创建,默认为put命令操作中表名后第一个数据。创建完“student”表后,可通过describe命令查看“student”表的基本信息。

2.一次只能为一个表的一行数据的一个列,也就是一个单元格添加一个数据,put ‘student’,’95001’,’Sname’,’LiYing’

3.delete 'student','95001','Ssex',只删除了性别,  deleteall 'student','95001'删除了全部元素

4.get 'student','95001'  查看学号为95001的一行数据。

5.查看表中所有数据scan 'student'

6.删除表disable 'student'  ,  drop 'student'  第一步先让该表不可用,第二步删除表

7.查询历史数据, create 'teacher',{NAME=>'username',VERSIONS=>5} 创建保存版本号,插入和更新put会更新版本号。

Java对Hbase进行增删改查:

(1)在工程中导入外部jar包:这里只需要导入hbase安装目录中的lib文件中的所有jar包,以及hadoop的jar包。

(2)java代码

```

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.hbase.*;

import org.apache.hadoop.hbase.client.*;

import java.io.IOException;

public class ExampleForHbase{

    public static Configuration configuration;

    public static Connection connection;

    public static Admin admin;

    //主函数中的语句请逐句执行,只需删除其前的//即可,如:执行insertRow时请将其他语句注释

    public static void main(String[] args)throws IOException{

        //创建一个表,表名为Score,列族为sname,course

        createTable("Score",new String[]{"sname","course"});

        //在Score表中插入一条数据,其行键为95001,sname为Mary(因为sname列族下没有子列所以第四个参数为空)

        //等价命令:put 'Score','95001','sname','Mary'

        //insertRow("Score", "95001", "sname", "", "Mary");

        //在Score表中插入一条数据,其行键为95001,course:Math为88(course为列族,Math为course下的子列)

        //等价命令:put 'Score','95001','score:Math','88'

        //insertRow("Score", "95001", "course", "Math", "88");

        //在Score表中插入一条数据,其行键为95001,course:English为85(course为列族,English为course下的子列)

        //等价命令:put 'Score','95001','score:English','85'

        //insertRow("Score", "95001", "course", "English", "85");

        //1、删除Score表中指定列数据,其行键为95001,列族为course,列为Math

        //执行这句代码前请deleteRow方法的定义中,将删除指定列数据的代码取消注释注释,将删除制定列族的代码注释

        //等价命令:delete 'Score','95001','score:Math'

        //deleteRow("Score", "95001", "course", "Math");

        //2、删除Score表中指定列族数据,其行键为95001,列族为course(95001的Math和English的值都会被删除)

        //执行这句代码前请deleteRow方法的定义中,将删除指定列数据的代码注释,将删除制定列族的代码取消注释

        //等价命令:delete 'Score','95001','score'

        //deleteRow("Score", "95001", "course", "");

        //3、删除Score表中指定行数据,其行键为95001

        //执行这句代码前请deleteRow方法的定义中,将删除指定列数据的代码注释,以及将删除制定列族的代码注释

        //等价命令:deleteall 'Score','95001'

        //deleteRow("Score", "95001", "", "");

        //查询Score表中,行键为95001,列族为course,列为Math的值

        //getData("Score", "95001", "course", "Math");

        //查询Score表中,行键为95001,列族为sname的值(因为sname列族下没有子列所以第四个参数为空)

        //getData("Score", "95001", "sname", "");

        //删除Score表

        //deleteTable("Score");

    }

    //建立连接

    public static void init(){

        configuration  = HBaseConfiguration.create();

        configuration.set("hbase.rootdir","hdfs://localhost:9000/hbase");

        try{

            connection = ConnectionFactory.createConnection(configuration);

            admin = connection.getAdmin();

        }catch (IOException e){

            e.printStackTrace();

        }

    }

    //关闭连接

    public static void close(){

        try{

            if(admin != null){

                admin.close();

            }

            if(null != connection){

                connection.close();

            }

        }catch (IOException e){

            e.printStackTrace();

        }

    }

    /**

    * 建表。HBase的表中会有一个系统默认的属性作为主键,主键无需自行创建,默认为put命令操作中表名后第一个数据,因此此处无需创建id列

    * @param myTableName 表名

    * @param colFamily 列族名

    * @throws IOException

    */

    public static void createTable(String myTableName,String[] colFamily) throws IOException {

        init();

        TableName tableName = TableName.valueOf(myTableName);

        if(admin.tableExists(tableName)){

            System.out.println("talbe is exists!");

        }else {

            HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);

            for(String str:colFamily){

                HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(str);

                hTableDescriptor.addFamily(hColumnDescriptor);

            }

            admin.createTable(hTableDescriptor);

            System.out.println("create table success");

        }

        close();

    }

    /**

    * 删除指定表

    * @param tableName 表名

    * @throws IOException

    */

    public static void deleteTable(String tableName) throws IOException {

        init();

        TableName tn = TableName.valueOf(tableName);

        if (admin.tableExists(tn)) {

            admin.disableTable(tn);

            admin.deleteTable(tn);

        }

        close();

    }

    /**

    * 查看已有表

    * @throws IOException

    */

    public static void listTables() throws IOException {

        init();

        HTableDescriptor hTableDescriptors[] = admin.listTables();

        for(HTableDescriptor hTableDescriptor :hTableDescriptors){

            System.out.println(hTableDescriptor.getNameAsString());

        }

        close();

    }

    /**

    * 向某一行的某一列插入数据

    * @param tableName 表名

    * @param rowKey 行键

    * @param colFamily 列族名

    * @param col 列名(如果其列族下没有子列,此参数可为空)

    * @param val 值

    * @throws IOException

    */

    public static void insertRow(String tableName,String rowKey,String colFamily,String col,String val) throws IOException {

        init();

        Table table = connection.getTable(TableName.valueOf(tableName));

        Put put = new Put(rowKey.getBytes());

        put.addColumn(colFamily.getBytes(), col.getBytes(), val.getBytes());

        table.put(put);

        table.close();

        close();

    }

    /**

    * 删除数据

    * @param tableName 表名

    * @param rowKey 行键

    * @param colFamily 列族名

    * @param col 列名

    * @throws IOException

    */

    public static void deleteRow(String tableName,String rowKey,String colFamily,String col) throws IOException {

        init();

        Table table = connection.getTable(TableName.valueOf(tableName));

        Delete delete = new Delete(rowKey.getBytes());

        //删除指定列族的所有数据

        //delete.addFamily(colFamily.getBytes());

        //删除指定列的数据

        //delete.addColumn(colFamily.getBytes(), col.getBytes());

        table.delete(delete);

        table.close();

        close();

    }

    /**

    * 根据行键rowkey查找数据

    * @param tableName 表名

    * @param rowKey 行键

    * @param colFamily 列族名

    * @param col 列名

    * @throws IOException

    */

    public static void getData(String tableName,String rowKey,String colFamily,String col)throws  IOException{

        init();

        Table table = connection.getTable(TableName.valueOf(tableName));

        Get get = new Get(rowKey.getBytes());

        get.addColumn(colFamily.getBytes(),col.getBytes());

        Result result = table.get(get);

        showCell(result);

        table.close();

        close();

    }

    /**

    * 格式化输出

    * @param result

    */

    public static void showCell(Result result){

        Cell[] cells = result.rawCells();

        for(Cell cell:cells){

            System.out.println("RowName:"+new String(CellUtil.cloneRow(cell))+" ");

            System.out.println("Timetamp:"+cell.getTimestamp()+" ");

            System.out.println("column Family:"+new String(CellUtil.cloneFamily(cell))+" ");

            System.out.println("row Name:"+new String(CellUtil.cloneQualifier(cell))+" ");

            System.out.println("value:"+new String(CellUtil.cloneValue(cell))+" ");

        }

    }

}

```

使用Sqoop将数据从MySQL导入HBase

(1)在hbase中创建表

(2)运行shell

```

./bin/sqoop  import  --connect  jdbc:mysql://localhost:3306/dblab

--username  root

--password  hadoop

--table user_action

--hbase-table user_action #HBase中表名称

--column-family f1 #列簇名称

--hbase-row-key id #HBase 行键

--hbase-create-table #是否在不存在情况下创建表

```

-m 1 #启动 Map 数量

查看:只查询前10条

scan 'user_action',{LIMIT=>10}  #只查询前面10行

一种MySQL到HBase的迁移策略的研究与实现

三类迁移方法的比较:

(1)现有的迁移工具如Hadoop的官方工具Sqoop只支持单表的增量加载,无法完成数据库系统中众多表模式的迁移;

(2)HBase的Importtsv 工具只支持TSV等指定文件的迁移;

(3)Put方法虽然简单直接但也只是完成数据的迁移且迁移效率不佳。

      关系形数据库是一种建立在关系模型基础上的数据库。用一张二维表代表现实世界中的实体,用表中的字段代表实体的属性,用外键等联合操作代表实体之间的关系。关系型数据库MySQL中默认安装INFORMATION_SCHEMA数据库,这些存储在INFORMATION_SCHEMA中的数据就叫做数据库系统的元数据。因此,在数据迁移的过程中,可以利用对关系型数据库中元数据表的查询快速获取关系型数据库中各个表的模式和各表之间的关系,然后进行迁移。

      HBase的表是一个稀疏矩阵。HBase与传统关系型数据库表所不同的是:它可以存储半结构化数据,即HBase中的表在设计上没有严格的限制[8],数据记录可能包含不一致的列、不确定大小等。此外,与关系型数据库不同,HBase在存储上基于列而非行,因此对同列中的数据具有较好的查询性能。

      HBase的数据存储在HDFS中,能够很好地利用HDFS的分布式处理模式,并从Hadoop的MapReduce程序模型中获益。 HBase逻辑上的表在行的方向上分割成多个HRegion,HRegion按大小分割,每张表开始只有一个Region,随着记录数的不断增加,Region不断增大,当增大到一定程度时,HRegion会被等分成两个新的HRegion。HRegion是HBase中分布式存储和负载均衡的最小单元,但却不是存储的最小单元。HRegion由一个或者多个Store组成,每个Store保存了表中的一个列族。每个Store又由一个 Memstore和0至多个StoreFile(HFile)组成,StoreFile用来存储数据并以HFile的形式保存在HDFS上。

提炼

为统一实现java的封装,采用 bulk load工具来导入数据

(1)首先将数据库的文件导出为CSV文件,也可以在保存的时候保存为CSV文件,产生CSV文件

(2)准备工作:从数据源中提取数据,开启MapReduce守护进程,客户端服务器上添加hac用户用于运行job,在HDFS中为hac用户建立主文件夹:

```

hadoop$ $HADOOP_HOME/bin/start-mapred.sh

root@client1# usermod -a -G hadoop hac

root@client1# chmod -R 775 /usr/local/hadoop/var

hadoop@client1$ $HADOOP_HOME/bin/hadoop fs -mkdir /user/hac

hadoop@client1$ $HADOOP_HOME/bin/hadoop fs -chown hac /user/hac

hadoop@client1$ $HADOOP_HOME/bin/hadoop fs -chmod -R 775 /usr/local/hadoop/var/mapred

```

(3)将在HDFS中建立文件夹,并且将TSV文件从本地文件系统拷贝至HDFS中

hac@client1$ $HADOOP_HOME/bin/hadoop fs -mkdir /user/hac/input/2-1

hac@client1$ $HADOOP_HOME/bin/hadoop fs -copyFromLocal hly-temp-10pctl.tsv /user/hac/input/2-1

(4)在HBase中添加目标表。连接到HBase,添加hly_temp表

hac@client1$ $HBASE_HOME/bin/hbase shell

hbase> create 'hly_temp', {NAME => 't', VERSIONS => 1}

  (5)将hbase-site.xml文件放置在Hadoop的配置目录中就能够加入Hadoop的环境变量了

hac@client1$ ln -s $HBASE_HOME/conf/hbase-site.xml $HADOOP_HOME/conf/hbase-site.xml

编辑客户端服务器的$HADOOP_HOME/conf 下的hadoop-env.sh文件,添加HBase的依赖库到Hadoop的环境变量中

hadoop@client1$ vi $HADOOP_HOME/conf/hadoop-env.sh

export HADOOP_CLASSPATH=/usr/local/zookeeper/current/zookeeper-3.4.3.jar:/usr/local/hbase/current/lib/guava-r09.jar

  (6)使用hac用户运行importtsv工具,执行如下脚本

hac@client1$ $HADOOP_HOME/bin/hadoop jar $HBASE_HOME/hbase-

0.92.1.jar importtsv \ -Dimporttsv.columns=HBASE_ROW_KEY,t:v01,t:v02,t:v03,t:v04,t:v05,t:v06,t:v07,t:v08,t:v09,t:v10,t:v11,t:v12,t:v13,t:v14,t:v15,t:v1

6,t:v17,t:v18,t:v19,t:v20,t:v21,t:v22,t:v23,t:v24 \

hly_temp \

/user/hac/input/2-1

  (7)检查

HBase 写优化之 BulkLoad 实现数据快速入库

bulk load 的入库方式,它是利用hbase的数据信息按照特定格式存储在hdfs内这一原理,直接在HDFS中生成持久化的HFile数据格式文件,然后上传至合适位置,即完成巨量数据快速入库的办法。配合mapreduce完成,高效便捷,而且不占用region资源,增添负载,在大数据量写入时能极大的提高写入效率,并降低对HBase节点的写入压力。

通过使用先生成HFile,然后再BulkLoad到Hbase的方式来替代之前直接调用HTableOutputFormat的方法有如下的好处:

(1)消除了对HBase集群的插入压力

(2)提高了Job的运行速度,降低了Job的执行时间

目前此种方式仅仅适用于只有一个列族的情况,在新版 HBase 中,单列族的限制会消除。

三步走:

(1)第一个Job还是运行原来业务处理逻辑,处理的结果不直接调用HTableOutputFormat写入到HBase,而是先写入到HDFS上的一个中间目录下(如 middata) 

(2)第二个Job以第一个Job的输出(middata)做为输入,然后将其格式化HBase的底层存储文件HFile 

(3)调用BulkLoad将第二个Job生成的HFile导入到对应的HBase表中

代码的实现:

(1)继承configured类和实现tool接口,

tool接口需要实现run方法,用于执行带有指定参数的命令。例如:hadoop jar myMR.jar MyTool -input inputDir -output outputDir

继承configured,可直接使用setConf()和getConf()方法

(2)创建配值参数并连接,使用HBaseConfigured类来获得Configured对象。

通过toolrun运行配值,并取得连接后的状态码

(3)实现Maper类的编写实现Maper类

将rowKey保存到外面,这样会创建一个RowKey保证查询不用,换文件夹,查询快。

重写maper方法,首先获得数据,value.toString()通过表的格式进行分割,

封装put对象,传入列对象,设置rowkey

  (4)在run方法中创建job,配值job类,同时设置输入\输出,设置maper类

(5)设置自带的reducer程序

(6)提交job

  (7)将HFile类导入到Hbase中

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,417评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,921评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,850评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,945评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,069评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,188评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,239评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,994评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,409评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,735评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,898评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,578评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,205评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,916评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,156评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,722评论 2 363
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,781评论 2 351

推荐阅读更多精彩内容

  • 简介 HBase是高可靠性,高性能,面向列,可伸缩的分布式存储系统,利用HBase技术可在廉价PC Serve...
    九世的猫阅读 2,184评论 1 6
  • 一、简介 Hbase:全名Hadoop DataBase,是一种开源的,可伸缩的,严格一致性(并非最终一致性)的分...
    菜鸟小玄阅读 2,375评论 0 12
  • import org.apache.hadoop.conf.Configuration; import org.a...
    苏大鸿阅读 617评论 0 1
  • 走在回家的路上 那是多么遥远的梦 空气里只有沉默 我可以听到那些声音 没有终点 只是越来越近 是谁在呐喊 你说你爱...
    布老头和他的家人们阅读 277评论 2 1
  • 山涯瀑布喜层叠,水帘铺垂美景携。 坐赏洁白瑕玉带,阁楼望外挂珠结。 葱茏绿意峰峦驻,朵朵飞花落地接。 醉卧山中魂魄...
    六月天气阅读 343评论 40 44