最近在做spark和hbase的相关项目。暂且将其分为两部分:一是利用spark streaming消费前台推到kafka中的消息,进行简单处理后写入到hbase;然后就是利用spark读取hbase,将结果组装成json,再利用spark SQL进行计算。
介绍一下环境:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<org.scala.version>2.11.2</org.scala.version>
<org.spark.version>2.0.2</org.spark.version>
<org.kafka.version>0.10.2.1</org.kafka.version>
<org.apache.hbase.version>1.2.2</org.apache.hbase.version>
</properties>
这里先来讨论后半部分,spark读取hbase。
首先,了解过hbase的都应该知道,它是一No SQL的非关系型数据。与我们平时常见的MySQL和Oracle不同,No SQL最大的特点就是不支持事务,对于关系型数据库轻松加随意的join啊、groupby啊什么的,都不擅长。不过hbase既然这么火,肯定有其道理。我这里之所以采用它,最重要的就是因为:一是数据量大,项目还没上线,不过预测日增量有上百g,二来呢hbase提供了java api,以前搞过,get和scan的效率还是很给力的。再加上我们记录的用户行为信息,根本不需要更新操作,我只要能写进去,拿出来就行啦。
废话不多数,下面上代码:
先要组装hbase client
先要引入配置文件
private val config = ConfigFactory.load()
private val conn = getConnection
具体的application.conf如下
spark{
master="local[*]"
appName="KafkaConsumer"
}
kafka {
topics = "topic007"
brokers = "192.168.1.97:9092,192.168.1.98:9092,192.168.1.99:9092,192.168.1.106:9092,192.168.1.107:9092,192.168.1.108:9092"
group = "groupid"
}
hbase{
port = "2181"
quorum = "master1.hadoop,slave2.hadoop,slave3.hadoop,slave4.hadoop,slave5.hadoop,slave6.hadoop"
tableName = "test"
}
大家根据自己的设置自行修改啊。
然后就是hbase相关的api调用啦,具体如下:
/**
* 扫描HBase并返回结果
* @param tableName 表名
* @param filter 过滤条件
* @param startRow 起始行键
* @param stopRow 结束行键
* @return 扫描结果
*/
def scan(tableName: String, filter: Filter, startRow: String, stopRow: String): List[Map[String, String]] = {
val s = buildScan(filter, startRow, stopRow)
val t = conn.getTable(TableName.valueOf(tableName))
scan(t, s)
}
/**
* 执行扫描
* @param table 表
* @param scan scan
*/
private def scan(table: Table, scan: Scan): List[Map[String, String]] = {
val scanner = table.getScanner(scan)
val ite = scanner.iterator()
val result = new ListBuffer[Map[String, String]]
while (ite.hasNext){
val map = new mutable.ListMap[String, String]
ite.next().listCells().foreach(c => map += readCell(c))
result += map.toMap
}
result.toList
}
/**
* 读取单元格
* @param cell 单元格
*/
private def readCell(cell: Cell) = {
val qualifier = Bytes.toString(CellUtil.cloneQualifier(cell))
val value = Bytes.toString(CellUtil.cloneValue(cell))
(qualifier, value)
}
/**
* 构建Scan实例
* @param filter 过滤条件
* @param startRow 起始行键
* @param stopRow 结束行键
*/
private def buildScan(filter: Filter, startRow: String, stopRow: String): Scan ={
val scan = new Scan()
scan.setMaxVersions()
scan.setCaching(2000)
scan.setCacheBlocks(false)
if(filter != null)
scan.setFilter(filter)
if(startRow != null)
scan.setStartRow(Bytes.toBytes(startRow))
if(stopRow != null)
scan.setStopRow(Bytes.toBytes(stopRow))
scan
}
/**
* 获取链接
*/
private def getConnection: Connection = {
val conf = HBaseConfiguration.create()
conf.set(HConstants.ZOOKEEPER_QUORUM, config.getString("hbase.quorum"))
conf.set(HConstants.ZOOKEEPER_CLIENT_PORT, config.getString("hbase.port"))
ConnectionFactory.createConnection(conf)
}
以上就是调用hbase 的scan api做的事,具体的条件设置大家可以自行查看官方文档,按需配置。
scan. 这个啊, scan. 那个啊,,,自己看着弄就行http://hbase.apache.org/apidocs/index.html
然后是我的驱动类:
这里就要介绍一下业务了。以我《Hbase权威指南》两个星期的阅读理解结合我们的实际需求来讲,我把rowkey设计为 "token_querytime" 的形式。例如:p4064d445c9f4ff4d536dfeae965aa95_1503364335426
token是什么呢,据我们的PHP前端工程师+技术总监来说,就是用户的访问行为,具体咋产生我也不知道。。。总之,我的目标就是,前端传过来用户要查看的某段时间内的某页面上的各种访问行为,也就是token和querytime的各种组合,我从hbase中给他拿出来计算好就行,所以我这样设计了rowkey。来看看我是怎么拿的:
def getDF(spark: SparkSession, filter: String, startRow: String, stopRow: String): DataFrame = {
val filter1 = new PrefixFilter(Bytes.toBytes(filter))
val results = HBaseClient.scan("test", filter1, startRow, stopRow)
val jsonString = results.map(e => JSONObject(e).toString())
val jsonRDD = spark.sparkContext.parallelize(jsonString)
val df = spark.read.json(jsonRDD)
df
}
里边的“test” 是我的表名,这里我写死了。startRow和stopRow传入的就是开始和结束的rowkey,filter可以为null。
之后就可以对着结果各种蹂躏啦,只要是DataFrame支持的,什么姿势都行。嘿嘿😁
(注:吐槽简书一句,感觉对代码的支持很不好啊,从idea粘贴过来各种不行)