sparkstreaming将offset存到hbase

之前一直将offset存到zookeeper,由于streaming程序会对zk有大量的读取操作,故将offset存到zk不太好,现已将offset都改到hbase中

  • kafka版本:0.10.2.0

  • spark版本:2.0

  • hbase表结构:groupid名字作为表名,topic名字作为rowkey,列族为info,分区号作为列名

kafka_offset:groupid info:0 info:1 info:2
rowkey(topicName) 10000 10000 10000
  1. 改完后streaming程序中的代码调用
//先初始化hbase连接对象
HbaseUtil.setConf("zk address", "zk port")
//hbase中存offset的命名空间和表名
val offsetTbName = "kafka_offset:groupId"
HbaseUtil.createTable(offsetTbName, "info")//hbase中不存这个表就创建
//去hbase中获取topic partition范围,hbase中不存在也没关系(第一次用这个groupid的时候)
val fromOffsets: Map[TopicPartition, Long] = OffsetUtil.getFromOffsets
/**
* param offsets :
* offsets to begin at on initial startup.  If no offset is given for a
* TopicPartition, the committed offset (if applicable) or kafka param
* auto.offset.reset will be used.
* 引用源码的注释,意思大概就是如果第一次获取不到topicPartition就用auto.offset.reset这个配置来决定是从earliest还是latest开始读取kafka数据,也就是说不用担心fromOffset第一次取为空的情况
*/
//创建kafkaStream
val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
    ssc,//StreamingContext
    PreferConsistent,
    Subscribe[String, String](topicsSet, kafkaParams, fromOffsets)
)

kafkaStream.foreachRDD(rdd => {
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    //忽略逻辑代码
    pass...
    //提交offset到hbase
    OffsetUtil.saveOffsetToHbase(offsetRanges, "groupId")
})

HbaseUtil.close()
  • 附上上面用到的HbaseUtil.scala和OffsetUtil.scala
  1. HbaseUtil.scala
object HbaseUtil {
  var conf: Configuration = _
  //线程池
  lazy val connection: Connection = ConnectionFactory.createConnection(conf)
  lazy val admin: Admin = connection.getAdmin

  /**
    * hbase conf
    *
    * @param quorum hbase的zk地址
    * @param port   zk端口2181
    * @return
    */
  def setConf(quorum: String, port: String): Unit = {
    val conf = HBaseConfiguration.create()
    conf.set("hbase.zookeeper.quorum", quorum)
    conf.set("hbase.zookeeper.property.clientPort", port)
    this.conf = conf
  }

  /**
    * 如果不存在就创建表
    * @param tableName 命名空间:表名
    * @param columnFamily 列族
    */
  def createTable(tableName: String, columnFamily: String): Unit = {
    val tbName = TableName.valueOf(tableName)
    if (!admin.tableExists(tbName)) {
      val htableDescriptor = new HTableDescriptor(tbName)
      val hcolumnDescriptor = new HColumnDescriptor(columnFamily)
      htableDescriptor.addFamily(hcolumnDescriptor)
      admin.createTable(htableDescriptor)
    }
  }

  /**
    * 获取hbase单元格内容
    * @param tableName 命名空间:表名
    * @param rowKey rowkey
    * @return 返回单元格组成的List
    */
  def getCell(tableName: String, rowKey: String): mutable.Buffer[Cell] = {
    val get = new Get(Bytes.toBytes(rowKey))
    val table = connection.getTable(TableName.valueOf(tableName))
    val result: Result = table.get(get)
    import scala.collection.JavaConverters._
    result.listCells().asScala
  }

  /**
    * 单条插入
    * @param tableName 命名空间:表名
    * @param rowKey rowkey
    * @param family 列族
    * @param qualifier column列
    * @param value 列值
    */
  def singlePut(tableName: String, rowKey: String, family: String, qualifier: String, value: String): Unit = {
    //单个插入
    val put: Put = new Put(Bytes.toBytes(rowKey)) //参数是行健
    put.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes.toBytes(value))

    //获得表对象
    val table: Table = connection.getTable(TableName.valueOf(tableName))
    table.put(put)
    table.close()
  }

  def close(): Unit = {
    admin.close()
    connection.close()
  }
  1. OffsetUtil.scala
object OffsetUtil {
  //从hbase中获取offset
  def getFromOffsets: Map[TopicPartition, Long] ={
    var fromOffsets: Map[TopicPartition, Long] = Map()
    AnalysisParam.topicSet.foreach(topic => {
      val get = new Get(Bytes.toBytes(topic))
      val table: Table = HbaseUtil.connection.getTable(TableName.valueOf(s"kafka_offset:groupId"))
      if (table.exists(get)) {
        val cells = HbaseUtil.getCell(s"kafka_offset:groupId", topic)
        cells.foreach(cell => {
          val partition = Bytes.toString(CellUtil.cloneQualifier(cell))
          val offset = Bytes.toString(CellUtil.cloneValue(cell))
          val tp = new TopicPartition(topic, partition.toInt)
          fromOffsets += (tp -> offset.toLong)
        })
      }
    })
    fromOffsets
  }

  //将offset存到hbase
  def saveOffsetToHbase(offsetRanges:Array[OffsetRange],groupId:String): Unit ={
    offsetRanges.foreach(o => {
      val topic = o.topic
      val partition = o.partition
      val offset = o.fromOffset
      HbaseUtil.singlePut(s"kafka_offset:$groupId", topic, "info", partition.toString, offset.toString)
    })
  }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,837评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,551评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,417评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,448评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,524评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,554评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,569评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,316评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,766评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,077评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,240评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,912评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,560评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,176评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,425评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,114评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,114评论 2 352

推荐阅读更多精彩内容

  • Zookeeper用于集群主备切换。 YARN让集群具备更好的扩展性。 Spark没有存储能力。 Spark的Ma...
    Yobhel阅读 7,263评论 0 34
  • HBase那些事 @(大数据工程学院)[HBase, Hadoop, 优化, HadoopChen, hbase]...
    分痴阅读 3,937评论 3 17
  • 参考:https://www.jianshu.com/p/569106a3008f 最近在逐步跟进Hbase的相关...
    博弈史密斯阅读 852评论 1 1
  • 还是一样的雨,默默走在雨里,伞静静的挂在包上,也许路人见了,会说我有病吧,有伞不用!我回之以淡笑,这雨可不可以来的...
    望而生灰阅读 148评论 0 0
  • BitmapShader描述:Shader used to draw a bitmap as a texture....
    大大大寒阅读 1,388评论 1 1