前言:本篇文章主要介绍自己工作中对HBase数据写入的一些浅浅的认识和理解。最近刚接到一个需求是将Hive的数据加工处理成用户标签数据存于HBase中,那么问题来了,对于HBase小白来说自己怎么去实现呢?Hive -> HBase的解决方法有多种,只能依靠搜索引擎来帮助自己了,下面这幅图或许能够体现写这篇文章的用意。
步入正题:
上篇文章已经简单介绍了有关HBase原理的一些东西,其中有一部分讲解了HBase写流程的情况。对于每天千万的数据怎么能够高效的写入HBase呢?首先想到的就是python提供的通过Thrift访问HBase的库Happybase。通过Python API来去操作HBase简单便捷。下面是自己的二两测试案例。
方案1:hive sql + python + hbase
-
第一步
将Hive中的数据通过Hive SQL加工处理成自己想要的结果。落地成文件(格式:yyyy-MM-dd.data)
##query.sql 查询sql语句
use ods_hfjydb;
SELECT
s.student_intention_id,
date(s.create_time) create_time,
d.value
FROM view_student s
left join ddic d
ON s.know_origin=d.code
WHERE d.type='TP016
group by s.student_intention_id,date(s.create_time),d.value
limit 5000000 ;
-
第二步
通过Python API操作HBase,将数据写入相应表中。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os,sys
import happybase
#建立连接
conn = happybase.Connection("hostname")
#获取table
tab_app_data = conn.table("table-name")
#批处理操作
b_app_data = tab_app_data.batch()
#名单进线
for line in sys.stdin:
parts = line.strip('\n').split("\t")
row_key = parts[0]
kv = {}
k = "i:2#" + parts[1] + "#student_source"
kv[k] = parts[2]
tab_app_data.put(row_key , kv)
b_app_data.send()
conn.close()
方案2:hive +presto + python + hbase
方案1,代码量少而且容易理解,在当前数据量少的情况下,使用上面解决思路还是可取的,经测试500万数据从查询到入库大概25min左右。如果只使用Python + SQL 直接coding,还有没有更好的方式呢?既然日常使用presto查询hive数据,为何不直接使用presto +python将数据入库呢!至少查询速度方面,presto相比hive查询速度还是有很大优势的,下面贴上自己测试的code:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os,sys
import datetime
import happybase
from pyhive import presto
#presto获得连接
p_connection = presto.connect(host='hostname',port='8334')
cursor = p_connection.cursor()
#hbase获得连接
h_connection = happybase.Connection("hostname")
tab_app_data = h_connection.table("table_name")
#b_app_data = tab_app_data.batch(batch_size=1000)
b_app_data = tab_app_data.batch()
def execute_batch_to_hbase(dt):
#需要执行的sql语句
sql = "SELECT s.student_intention_id,substr(s.create_time,1,10) create_time,d.value " + \
"FROM dwd_db_hfjydb.view_student s " + \
"left join dwd_db_hfjydb.ddic d " + \
"ON s.know_origin=d.code " + \
"WHERE d.type='TP016' " + \
"group by s.student_intention_id,substr(s.create_time,1,10),d.value limit 5000000"
#获取查询的数据
cursor.execute(sql)
rows = cursor.fetchall()
for item in rows:
student_intention_id = str(item[0])
dt = item[1]
source_name = item[2]
#获取的数据
print str(student_intention_id) + " # " + dt + " # " + source_name
#hbase数据准备
row_key = student_intention_id
kv = {}
k = "i:2#" + dt + "#student_source"
kv[k] = source_name
tab_app_data.put(row_key , kv)
b_app_data.send()
h_connection.close()
#校验参数格式是否正确
def validate(date_text):
try:
datetime.datetime.strptime(date_text, '%Y-%m-%d')
return date_text
except ValueError:
raise ValueError("Incorrect data format, should be YYYY-MM-DD")
if __name__ == '__main__':
if (len(sys.argv) < 2):
raise Exception,u"arguments needed !"
dt = validate(sys.argv[1])
execute_batch_to_hbase(dt)
结论1:方案1和方案2都可以帮助自己达成目标,但是耗时大概都在min左右。但是要考虑的问题有:执行时间效率以及频繁的对hbase写操作和对集群网络的影响。
上面的两种方案其写入数据的过程在下图。 上篇文章已经详细的介绍了,为了和接下来的要写的内容作对比,先画一个简单的草图。
脚步不能听 go on! >>>>>>>
下面或许是大家最常使用的方式了,直接使用spark操作hive将数据入库到HBase,对于在开发领域征战多年的各位,应该都实现过,至于基于Hadoop新版API(rdd.saveAsNewAPIHadoopDataset(job.getConfiguration))和基于hadoop旧版API实现(rdd.saveAsHadoopDataset(jobConf))这两种方式,不做讨论,有兴趣的同学可以自行测试。下面是批量导数据到HBase的code(以HFile的方式写入HBase):
import com.hfjy.bigdata.caculate.utils.{ConfigUtil, DingDingUtils, HBaseUtils, HdfsUtil}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.KeyValue
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import scala.collection.mutable
val sparkSession = SparkSession.builder().appName("HBaseHFileBulkLoadThree").enableHiveSupport()
.getOrCreate()
//HBase配置
val conf = HBaseUtils.getConfiguration
val tableName = HBaseUtils.getHBaseTableName
val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoad(job, tableName)
//Hive SQL查询出500万数据
val query_sql = s"SELECT s.student_intention_id, " +
"substr(s.create_time,1,10) create_time, " +
"CAST(d.value AS string ) source_value " +
"FROM dwd_db_hfjydb.view_student s " +
"left join dwd_db_hfjydb.ddic d " +
"ON s.know_origin=d.code " +
"WHERE d.type='TP016' " +
"group by s.student_intention_id,substr(s.create_time,1,10),d.value limit 5000000 "
val resultDF: DataFrame = sparkSession.sql(query_sql)
val commonConf = ConfigUtil.getAppCommonConfig()
val hdfs_path = commonConf.getProperty("hbase.tmp.save.path") + "/" + label_id
//取出DataFrame的所有字段的列名进行排序
val columnsNames = resultDF.columns.drop(1).sorted
//改过程重点是保证RowKey有序已经Column Name有序。
val resultRDD: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = resultDF.rdd.repartition(30).sortBy(row => row.getString(0)).map(f = row => {
var list: Seq[KeyValue] = List()
//RowKey
val row_key = row.getString(0)
//列簇
val columnFamily = "i"
//列名
var columnName: String = null
//列值
var columnValue: String = null
var kv: KeyValue = null
val sb = new StringBuffer()
for (i <- 0 to (columnsNames.length - 1)) {
if (i <= columnsNames.length - 2) {
sb.append( "2#" + row.getAs[String](columnsNames(i)) + "#student_source")
}
if (i == columnsNames.length - 1) {
columnValue = row.getAs[String](columnsNames(i))
}
}
columnName = sb.toString
kv = new KeyValue(Bytes.toBytes(row_key), columnFamily.getBytes, columnName.getBytes(), columnValue.getBytes())
//将新的kv加在list后面(不能反 需要整体有序)
list = list :+ kv
(new ImmutableBytesWritable(Bytes.toBytes(row_key)), list)
})
//要使用saveAsNewAPIHadoopFile 该方法,必须保证处理的结果数据格式为: RDD[(ImmutableBytesWritable, KeyValue)]
//对当前的数据做整体有序处理,包括RowKey的有序已经Column Name的有序。
val finalResultRDD: RDD[(ImmutableBytesWritable, KeyValue)] = resultRDD.flatMapValues(s => s.iterator)
//判断HDFS上是否有该目录,如果有则删除
val fileSystem = HdfsUtil.getFileSystem
fileSystem.delete(new Path(hdfs_path), true)
//保存HFiles到HDFS上
finalResultRDD.saveAsNewAPIHadoopFile(hdfs_path, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], job.getConfiguration)
//Bulk load HFiles to Hbase
val bulkLoader = new LoadIncrementalHFiles(conf)
bulkLoader.doBulkLoad(new Path(hdfs_path), table)
上面的实现过程是将数据先写到HFile里面,然后再导入到表中,打包放到集群中跑,配置合适的[executor-memory,executor-cores,num-executors]处理完500万的数据大概耗时2min左右。下图是其写入数据的过程。
结论:
- 方案1和方案2的数据写入过程是要经过MemStore,其数据在MemStore中做的最重要的任务就是对数据进行排序。所以在开发中用户可以不用关心顺序的问题。然而最后一种方案,数据是直接写到HFile中的不经过MemStore,在写入之前就是开发者重点注意的就是对RowKey以及Column Name的排序。如果不排序,数据是无法写入HFile中的。会报下面的错误。
java.io.IOException: Added a key not lexically larger than previous. Current cell = xxxxxxxxxxxxxxxx, lastCell =xxxxxxxxxxxxx
- 效率问题是文章开头第一幅图片所表达的也是也是本文写作的目的。方案1和方案2的方式写入数据的效率比最后一种方案效率要低很多。具体在开发中,各位可以结合公司的技术架构,数据量的大小,对网络影响的可控性以及对HBase写操作的时候对HBase对外提供服务的影响相结合来去选择从而达到自己的目标。
- 通过以上的测试,也让自己对HBase 写数据的原理有了更进一步的认识和理解。