HBase之数据写入再认识

前言:本篇文章主要介绍自己工作中对HBase数据写入的一些浅浅的认识和理解。最近刚接到一个需求是将Hive的数据加工处理成用户标签数据存于HBase中,那么问题来了,对于HBase小白来说自己怎么去实现呢?Hive -> HBase的解决方法有多种,只能依靠搜索引擎来帮助自己了,下面这幅图或许能够体现写这篇文章的用意。


《怎么更高效的往hbase写数据?》

步入正题:

上篇文章已经简单介绍了有关HBase原理的一些东西,其中有一部分讲解了HBase写流程的情况。对于每天千万的数据怎么能够高效的写入HBase呢?首先想到的就是python提供的通过Thrift访问HBase的库Happybase。通过Python API来去操作HBase简单便捷。下面是自己的二两测试案例。

方案1:hive sql + python + hbase

  • 第一步

将Hive中的数据通过Hive SQL加工处理成自己想要的结果。落地成文件(格式:yyyy-MM-dd.data)

##query.sql 查询sql语句
use ods_hfjydb;
SELECT
s.student_intention_id,
date(s.create_time) create_time,
d.value
FROM view_student s
left join ddic d
ON s.know_origin=d.code
WHERE d.type='TP016
group by s.student_intention_id,date(s.create_time),d.value 
limit 5000000 ;
  • 第二步

通过Python API操作HBase,将数据写入相应表中。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os,sys
import happybase

#建立连接
conn = happybase.Connection("hostname")
#获取table
tab_app_data = conn.table("table-name")
#批处理操作
b_app_data = tab_app_data.batch()

#名单进线
for line in sys.stdin:
        parts = line.strip('\n').split("\t")
        row_key = parts[0]

        kv = {}
        k = "i:2#" + parts[1] + "#student_source"
        kv[k] = parts[2]
        tab_app_data.put(row_key , kv)

b_app_data.send()
conn.close()

方案2:hive +presto + python + hbase

方案1,代码量少而且容易理解,在当前数据量少的情况下,使用上面解决思路还是可取的,经测试500万数据从查询到入库大概25min左右。如果只使用Python + SQL 直接coding,还有没有更好的方式呢?既然日常使用presto查询hive数据,为何不直接使用presto +python将数据入库呢!至少查询速度方面,presto相比hive查询速度还是有很大优势的,下面贴上自己测试的code:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os,sys
import datetime
import happybase
from pyhive import presto

#presto获得连接
p_connection = presto.connect(host='hostname',port='8334')
cursor = p_connection.cursor()

#hbase获得连接
h_connection = happybase.Connection("hostname")
tab_app_data = h_connection.table("table_name")
#b_app_data = tab_app_data.batch(batch_size=1000)
b_app_data = tab_app_data.batch()

def execute_batch_to_hbase(dt):
    #需要执行的sql语句
    sql = "SELECT s.student_intention_id,substr(s.create_time,1,10) create_time,d.value " + \
    "FROM dwd_db_hfjydb.view_student s " + \
    "left join dwd_db_hfjydb.ddic d " + \
    "ON s.know_origin=d.code " + \
    "WHERE d.type='TP016' " + \
    "group by s.student_intention_id,substr(s.create_time,1,10),d.value  limit 5000000" 
    
    #获取查询的数据
    cursor.execute(sql)
    rows = cursor.fetchall()

    for item in rows:
        student_intention_id = str(item[0])
        dt = item[1]
        source_name = item[2]

        #获取的数据  
        print str(student_intention_id) + " # " + dt + " # " + source_name
    
        #hbase数据准备
        row_key = student_intention_id
        kv = {}
        k = "i:2#" + dt + "#student_source"
        kv[k] = source_name
        tab_app_data.put(row_key , kv)

    b_app_data.send()

    h_connection.close()

#校验参数格式是否正确
def validate(date_text):
    try:
        datetime.datetime.strptime(date_text, '%Y-%m-%d')
        return date_text
    except ValueError:
        raise ValueError("Incorrect data format, should be YYYY-MM-DD")

if __name__ == '__main__':
    if (len(sys.argv) < 2):
        raise Exception,u"arguments needed !"
        
    dt = validate(sys.argv[1])
    
    execute_batch_to_hbase(dt)

结论1:方案1和方案2都可以帮助自己达成目标,但是耗时大概都在min左右。但是要考虑的问题有:执行时间效率以及频繁的对hbase写操作和对集群网络的影响。
上面的两种方案其写入数据的过程在下图。 上篇文章已经详细的介绍了,为了和接下来的要写的内容作对比,先画一个简单的草图。

脚步不能听 go on! >>>>>>>

下面或许是大家最常使用的方式了,直接使用spark操作hive将数据入库到HBase,对于在开发领域征战多年的各位,应该都实现过,至于基于Hadoop新版API(rdd.saveAsNewAPIHadoopDataset(job.getConfiguration))和基于hadoop旧版API实现(rdd.saveAsHadoopDataset(jobConf))这两种方式,不做讨论,有兴趣的同学可以自行测试。下面是批量导数据到HBase的code(以HFile的方式写入HBase):

import com.hfjy.bigdata.caculate.utils.{ConfigUtil, DingDingUtils, HBaseUtils, HdfsUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}

import scala.collection.mutable

val sparkSession = SparkSession.builder().appName("HBaseHFileBulkLoadThree").enableHiveSupport()
      .getOrCreate()

//HBase配置     
val conf = HBaseUtils.getConfiguration
val tableName = HBaseUtils.getHBaseTableName

val job = Job.getInstance(conf)
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setMapOutputValueClass(classOf[KeyValue])

HFileOutputFormat2.configureIncrementalLoad(job, tableName)

//Hive SQL查询出500万数据
val query_sql = s"SELECT s.student_intention_id, " +
      "substr(s.create_time,1,10) create_time, " +
      "CAST(d.value  AS string ) source_value " +
      "FROM dwd_db_hfjydb.view_student s  " +
      "left join dwd_db_hfjydb.ddic d  " +
      "ON s.know_origin=d.code  " +
      "WHERE d.type='TP016'  " +
      "group by s.student_intention_id,substr(s.create_time,1,10),d.value  limit 5000000 "

val resultDF: DataFrame = sparkSession.sql(query_sql)

val commonConf = ConfigUtil.getAppCommonConfig()
val hdfs_path = commonConf.getProperty("hbase.tmp.save.path") + "/" + label_id

//取出DataFrame的所有字段的列名进行排序
val columnsNames = resultDF.columns.drop(1).sorted

//改过程重点是保证RowKey有序已经Column Name有序。
val resultRDD: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = resultDF.rdd.repartition(30).sortBy(row => row.getString(0)).map(f = row => {

      var list: Seq[KeyValue] = List()
      //RowKey
      val row_key = row.getString(0)
      //列簇
      val columnFamily = "i"
      //列名
      var columnName: String = null
      //列值
      var columnValue: String = null

      var kv: KeyValue = null

      val sb = new StringBuffer()

      for (i <- 0 to (columnsNames.length - 1)) {
        if (i <= columnsNames.length - 2) {
          sb.append( "2#" + row.getAs[String](columnsNames(i)) + "#student_source")
        }
        if (i == columnsNames.length - 1) {
          columnValue = row.getAs[String](columnsNames(i))
        }
      }
      columnName = sb.toString

      kv = new KeyValue(Bytes.toBytes(row_key), columnFamily.getBytes, columnName.getBytes(), columnValue.getBytes())

      //将新的kv加在list后面(不能反 需要整体有序)
      list = list :+ kv

      (new ImmutableBytesWritable(Bytes.toBytes(row_key)), list)
    })

 //要使用saveAsNewAPIHadoopFile 该方法,必须保证处理的结果数据格式为: RDD[(ImmutableBytesWritable, KeyValue)]
 //对当前的数据做整体有序处理,包括RowKey的有序已经Column Name的有序。
 val finalResultRDD: RDD[(ImmutableBytesWritable, KeyValue)] = resultRDD.flatMapValues(s => s.iterator)

 //判断HDFS上是否有该目录,如果有则删除
 val fileSystem = HdfsUtil.getFileSystem
 fileSystem.delete(new Path(hdfs_path), true)

 //保存HFiles到HDFS上
 finalResultRDD.saveAsNewAPIHadoopFile(hdfs_path, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)

 //Bulk load HFiles to Hbase
 val bulkLoader = new LoadIncrementalHFiles(conf)
 bulkLoader.doBulkLoad(new Path(hdfs_path), table)

上面的实现过程是将数据先写到HFile里面,然后再导入到表中,打包放到集群中跑,配置合适的[executor-memory,executor-cores,num-executors]处理完500万的数据大概耗时2min左右。下图是其写入数据的过程。


结论:

  • 方案1和方案2的数据写入过程是要经过MemStore,其数据在MemStore中做的最重要的任务就是对数据进行排序。所以在开发中用户可以不用关心顺序的问题。然而最后一种方案,数据是直接写到HFile中的不经过MemStore,在写入之前就是开发者重点注意的就是对RowKey以及Column Name的排序。如果不排序,数据是无法写入HFile中的。会报下面的错误。
java.io.IOException: Added a key not lexically larger than previous. Current cell = xxxxxxxxxxxxxxxx, lastCell =xxxxxxxxxxxxx
  • 效率问题是文章开头第一幅图片所表达的也是也是本文写作的目的。方案1和方案2的方式写入数据的效率比最后一种方案效率要低很多。具体在开发中,各位可以结合公司的技术架构,数据量的大小,对网络影响的可控性以及对HBase写操作的时候对HBase对外提供服务的影响相结合来去选择从而达到自己的目标。
  • 通过以上的测试,也让自己对HBase 写数据的原理有了更进一步的认识和理解。

>>本篇文章仅仅记录自己的学习过程,文章中如有错误或不妥之处,请留言,谢谢!<<
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容

  • Zookeeper用于集群主备切换。 YARN让集群具备更好的扩展性。 Spark没有存储能力。 Spark的Ma...
    Yobhel阅读 7,260评论 0 34
  • 【什么是大数据、大数据技术】 大数据,又称巨量资料,指的是所涉及的数据资料量规模巨大到无法在合理时间内通过传统的应...
    kimibob阅读 2,738评论 0 51
  • 才发现我高一时正是他大一时,同样是新环境,同样是对未来充满着无限的期待,同样是在一年末收获不同的爱情,但高一和大一...
  • 2018年1月8日,五位学员继续到华兴中学跟岗学习。上午,体卫处孙主任与李干事介绍了体卫艺工作;观摩了学校监控室。...
    龙正中学张春城阅读 400评论 0 0
  • 下载和安装grafana,此处例子为使用homebrew方式安装: 第二部安装grafana最新版本 当出现如下信...
    时彬斌阅读 6,568评论 0 2