如果spark在读取hbase的时候感觉速度达不到需求,可以直接读取hfile进行操作,看代码
package com.yoyi.data.user_profile
import com.yoyi.data.common.Application
import org.apache.hadoop.fs.Path
import org.apache.hadoop.hbase.client.{Result, Scan}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableSnapshotInputFormat}
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.Base64
import org.apache.hadoop.mapreduce.Job
object Test extends Application {
def main(args: Array[String]): Unit = {
val hconf = CommonLabelFunc.createHbaseConf()
hconf.set("hbase.rootdir", "hdfs://host:8020/hbase")
// hconf.set(TableInputFormat.INPUT_TABLE, "demo_label_test")
val sc = createSparkContext("", Seq())
// val rdd = sc.textFile("hdfs://ns20.data.yoyi:8020/hbase/data/default/demo_label_test/fd47c44b1b341703f0ab40a1c47959f6/c")
// rdd.foreach(println)
// val hbaseContext = new HBaseContext(sc, hconf)
val scan = new Scan()
// val rdd = hbaseContext.hbaseRDD(TableName.valueOf("demo_label_test"), scan)
val proto = ProtobufUtil.toScan(scan)
hconf.set(TableInputFormat.SCAN, Base64.encodeBytes(proto.toByteArray()))
val job = Job.getInstance(hconf)
val snapName = "demo_label_test_snapshot"
val path = new Path("hdfs://host:8020/tmp/snapshot")
TableSnapshotInputFormat.setInput(job, snapName, path)
val rdd = sc.newAPIHadoopRDD(job.getConfiguration, classOf[TableSnapshotInputFormat], classOf[ImmutableBytesWritable], classOf[Result])
rdd.foreach(t => {
val result = t._2
val rowkey = new String(result.getRow)
println(rowkey)
val cells = tp._3.rawCells()
for (cell <- cells) {
val qualifierArray = cell.getQualifierArray
val valueArray = cell.getValueArray
val key = new String(qualifierArray, cell.getQualifierOffset, cell.getQualifierLength)
val value = new String(valueArray, cell.getValueOffset, cell.getValueLength)
println("key: " + key)
println("value: " + value)
}
})
// println(rdd.count())
sc.stop()
}
}
- 为了保证读取的hfile在处理期间不会变化,需要将待处理的表就行快照处理
- spark直接通过newAPIHadoopRDD的api读起快照后的表,通过mr的方式读取并解析hfile
欢迎对技术感兴趣的小伙伴一起交流学习^^