package wmstat.trip
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.filter._
import org.apache.hadoop.hbase.protobuf.ProtobufUtil
import org.apache.hadoop.hbase.util.{Base64, Bytes}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import wmutils.WmTimeUtil._
import scala.collection.mutable.ArrayBuffer
object HBaseSpark {
def main(args:Array[String]): Unit ={
// 本地模式运行,便于测试
val sparkConf = new SparkConf().setMaster("local").setAppName("HBaseTest")
// 创建 HBase 扫描器
val scan = new Scan()
// val filter=new RowFilter(CompareFilter.CompareOp.GREATER_OR_EQUAL, new RegexStringComparator("^[a-zA-Z0-9]+_20180903[0-9]{6}$")) //使用正则表达式过滤近一个月的
// scan.setFilter(filter)
// 过去 7 天
val arrayWeek: ArrayBuffer[String] = lastestNdays("", 7)
// filterList
val filterList = new FilterList(FilterList.Operator.MUST_PASS_ONE);
for (d <- arrayWeek) {
filterList.addFilter(new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator( "_" + d)))
}
scan.setFilter(filterList)
scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("vin"))
scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("tripStatus"))
scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("tripStartTime"))
scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("tripEndTime"))
scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("tripDistance"))
scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("startSoc"))
scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("endSoc"))
scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("maxSpeed"))
scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("startMileage"))
scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("coordinate"))
var proto = ProtobufUtil.toScan(scan)
var scanToString = Base64.encodeBytes(proto.toByteArray());
// 创建hbase configuration
val hBaseConf = HBaseConfiguration.create()
hBaseConf.set(TableInputFormat.INPUT_TABLE,"trip_signal")
hBaseConf.set(TableInputFormat.SCAN, scanToString)
// 创建 spark context
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// 从数据源获取数据
val hbaseRDD = sc.newAPIHadoopRDD(hBaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])
// 将数据映射为表 也就是将 RDD转化为 dataframe schema
val df = hbaseRDD.map(r=>(
Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("vin"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("tripStatus"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("tripStartTime"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("tripEndTime"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("tripDistance"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("startSoc"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("endSoc"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("maxSpeed"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("startMileage"))),
Bytes.toString(r._2.getValue(Bytes.toBytes("info"),Bytes.toBytes("coordinate")))
)).toDF("vin","tripStatus", "tripStartTime", "tripEndTime", "tripDistance", "startSoc", "endSoc", "maxSpeed", "startMileage", "coordinate")
df.show(500)
sc.stop()
}
}
使用 HBase 的 FilterList 过滤器
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
相关阅读更多精彩内容
- 1:作用于 随机读,对于执行了qualifier的scan有一定的优化,对于顺序读没有优化 1、任何类型的get(...
- 一个星期前新版更新,从5.07到了5.1.2.9。所以我的网盘中的镜像也做了相应的更新。 原来说的几个缺点,已经修...