1. 需要的jar包依赖
<properties>
<spark.version>2.3.0</spark.version>
<hbase.version>1.2.6</hbase.version>
<scala.main.version>2.11</scala.main.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.main.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>${hbase.version}</version>
<type>pom</type>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>${hbase.version}</version>
</dependency>
<!-- 本文处理数据用到的解析json字符串的jar包,非必需 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
2. 写数据到HBase
使用saveAsHadoopDataset()
import com.alibaba.fastjson.JSON
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author:
* Description: spark 通过内置算子写数据到 HBase:使用saveAsHadoopDataset()
* Create: 2018/7/24 11:24
*/
object WriteHBaseWithOldHadoopAPI {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
val sc = new SparkContext(sparkConf)
val input = sc.textFile("file:///D:/data/news_profile_data.txt")
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, "172.16.13.185:2181")
val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
val admin = hbaseConn.getAdmin
val jobConf = new JobConf(hbaseConf, this.getClass)
// 设置表名
jobConf.set(TableOutputFormat.OUTPUT_TABLE, "news")
jobConf.setOutputFormat(classOf[TableOutputFormat])
// 如果表不存在则创建表
if (!admin.tableExists(TableName.valueOf("news"))) {
val desc = new HTableDescriptor(TableName.valueOf("news"))
val hcd = new HColumnDescriptor("cf1")
desc.addFamily(hcd)
admin.createTable(desc)
}
val data = input.map(jsonStr => {
// 处理数据的逻辑
val jsonObject = JSON.parseObject(jsonStr)
val newsId = jsonObject.get("id").toString.trim
val title = jsonObject.get("title").toString.trim
val put = new Put(Bytes.toBytes(newsId))
put.addColumn(Bytes.toBytes("cf1"), Bytes.toBytes("title"), Bytes.toBytes(title))
(new ImmutableBytesWritable, put)
})
data.saveAsHadoopDataset(jobConf)
sc.stop()
}
}
以上两个算子分别是基于Hadoop新版API和hadoop旧版API实现的,新版API代码有误以后测试正确了在放
,大部分代码都一样,需要注意的是新版API使用中Job类,旧版API使用JobConf类,另外导包的时候新版的相关jar包在org.apache.hadoop.mapreduce下,而旧版的相关jar包在org.apache.hadoop.mapred下
3. 从HBase读数据
以下代码使用newAPIHadoopRDD()算子
import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration}
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._
/**
* Author:
* Description: spark 通过内置算子读取 HBase
* Create: 2018/7/23 15:22
*/
object testHbase {
def readHbase(str: String,str1:String): Unit = {
//System.setProperty("HADOOP_USER_NAME", "hdfs")
val sparkConf = new SparkConf().setMaster("local").setAppName("testHbase")
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[ImmutableBytesWritable]))
val sc = new SparkContext(sparkConf)
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", str)//str是zookeeper的ip
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
hbaseConf.set ("zookeeper.znode.parent", "/hbase-unsecure") //必须有否则连不上,我也不知道为啥
hbaseConf.set(TableInputFormat.INPUT_TABLE, str1)//str1是hbase的表名
val hBaseRDD = sc.newAPIHadoopRDD(
hbaseConf,
classOf[TableInputFormat],
classOf[ImmutableBytesWritable],
classOf[Result])
hBaseRDD.take(10).foreach(tuple => {
val result = tuple._2
val cells=result.listCells()
for(cell<-cells){
val str =
s"rowkey: ${Bytes.toString(CellUtil.cloneRow(cell))}, 列族:${Bytes.toString(CellUtil.cloneFamily(cell))}, " +
s"列名:${Bytes.toString(CellUtil.cloneQualifier(cell))}, 值:${Bytes.toString(CellUtil.cloneValue(cell))}, " +
s"时间戳:${cell.getTimestamp}"
println(str)
}
})
}
}
需要注意的是,代码中对ImmutableBytesWritable这个类进行了序列化:
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sparkConf.registerKryoClasses(Array(classOf[ImmutableBytesWritable]))
否则程序就会报错:
java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
4. 写数据的优化:Bulk Load
以上写数据的过程将数据一条条插入到Hbase中,这种方式运行慢且在导入的过程的占用Region资源导致效率低下,所以很不适合一次性导入大量数据,解决办法就是使用 Bulk Load 方式批量导入数据。
Bulk Load 方式由于利用了 HBase 的数据信息是按照特定格式存储在 HDFS 里的这一特性,直接在 HDFS 中生成持久化的 HFile 数据格式文件,然后完成巨量数据快速入库的操作,配合 MapReduce 完成这样的操作,不占用 Region 资源,不会产生巨量的写入 I/O,所以需要较少的 CPU 和网络资源。
Bulk Load 的实现原理是通过一个 MapReduce Job 来实现的,通过 Job 直接生成一个 HBase 的内部 HFile 格式文件,用来形成一个特殊的 HBase 数据表,然后直接将数据文件加载到运行的集群中。与使用HBase API相比,使用Bulkload导入数据占用更少的CPU和网络资源。
接下来介绍在spark中如何使用 Bulk Load 方式批量导入数据到 HBase 中。
import com.alibaba.fastjson.JSON
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{HFileOutputFormat2, LoadIncrementalHFiles, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.{SparkConf, SparkContext}
/**
* Author: YangYunhe
* Description:
* Create: 2018/7/24 13:14
*/
object BulkLoad {
val zookeeperQuorum = "172.16.13.185:2181"
val dataSourcePath = "file:///D:/data/news_profile_data.txt"
val hdfsRootPath = "hdfs://beh/"
val hFilePath = "hdfs://beh/test/yyh/hbase/bulkload/hfile/"
val tableName = "news"
val familyName = "cf1"
val qualifierName = "title"
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local")
val sc = new SparkContext(sparkConf)
val hadoopConf = new Configuration()
hadoopConf.set("fs.defaultFS", hdfsRootPath)
val fileSystem = FileSystem.get(hadoopConf)
val hbaseConf = HBaseConfiguration.create(hadoopConf)
hbaseConf.set(HConstants.ZOOKEEPER_QUORUM, zookeeperQuorum)
hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
val hbaseConn = ConnectionFactory.createConnection(hbaseConf)
val admin = hbaseConn.getAdmin
// 0. 准备程序运行的环境
// 如果 HBase 表不存在,就创建一个新表
if (!admin.tableExists(TableName.valueOf(tableName))) {
val desc = new HTableDescriptor(TableName.valueOf(tableName))
val hcd = new HColumnDescriptor(familyName)
desc.addFamily(hcd)
admin.createTable(desc)
}
// 如果存放 HFile文件的路径已经存在,就删除掉
if(fileSystem.exists(new Path(hFilePath))) {
fileSystem.delete(new Path(hFilePath), true)
}
// 1. 清洗需要存放到 HFile 中的数据,rowKey 一定要排序,否则会报错:
// java.io.IOException: Added a key not lexically larger than previous.
val data = sc.textFile(dataSourcePath)
.map(jsonStr => {
// 处理数据的逻辑
val jsonObject = JSON.parseObject(jsonStr)
val rowkey = jsonObject.get("id").toString.trim
val title = jsonObject.get("title").toString.trim
(rowkey, title)
})
.sortByKey()
.map(tuple => {
val kv = new KeyValue(Bytes.toBytes(tuple._1), Bytes.toBytes(familyName), Bytes.toBytes(qualifierName), Bytes.toBytes(tuple._2))
(new ImmutableBytesWritable(Bytes.toBytes(tuple._1)), kv)
})
// 2. Save Hfiles on HDFS
val table = hbaseConn.getTable(TableName.valueOf(tableName))
val job = Job.getInstance(hbaseConf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoadMap(job, table)
data.saveAsNewAPIHadoopFile(
hFilePath,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
hbaseConf
)
// 3. Bulk load Hfiles to Hbase
val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
val regionLocator = hbaseConn.getRegionLocator(TableName.valueOf(tableName))
bulkLoader.doBulkLoad(new Path(hFilePath), admin, table, regionLocator)
hbaseConn.close()
fileSystem.close()
sc.stop()
}
}
说明:
如果无法load,hdfs上新生成的hfile文件,是因为集群上是hbase用户去调用这个hfile文件,hbase用户权限不够
先查看一下hdfs都有哪些组,然后把hbase也加到这些组里即可
[root@slave02 ~]# groups hdfs
hdfs : hadoop hdfs
[root@slave02 ~]# usermod -a -G hdfs hbase
[root@slave02 ~]# groups hbase
hbase : hadoop hdfs
rowkey一定要进行排序
上面的代码使用了saveAsNewAPIHadoopFile(),也可以使用saveAsNewAPIHadoopDataset(),把以下代码:
data.saveAsNewAPIHadoopFile(
hFilePath,
classOf[ImmutableBytesWritable],
classOf[KeyValue],
classOf[HFileOutputFormat2],
hbaseConf
)
替换为:
job.getConfiguration.set("mapred.output.dir", hFilePath)
data.saveAsNewAPIHadoopDataset(job.getConfiguration)
即可。