Spark:DataFrame写HFile (Hbase)一个列族、一个列扩展一个列族、多个列

扩展:Spark:DataFrame生成HFile 批量导入Hbase

在上一篇博文中遗留了一个问题,就是只能处理DataFrame 的一行一列,虽然给出一个折中的办法处理多个列,但是对于字段多的DataFrame却略显臃肿,经过我的研究,实现了从一个列族、一个列到一个列族、多个列扩展。
此文章再此记录实现方法

实现思路:

保存为HFile的关键是下面这个方法

saveAsNewAPIHadoopFile(save_path,
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      job.getConfiguration)

要使用这个方法就要保证最后的结果数据需要是RDD[(ImmutableBytesWritable, KeyValue)]类型的,所以这就是我们努力前进的方向。在这个过程中有几个问题需要解决
1. 如何一次处理DataFrame 的众多字段

    val result1: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = resultDataFrame
      .map(row => {
        var kvlist: Seq[KeyValue] = List()
        var rowkey: Array[Byte] = null
        var cn: Array[Byte] = null
        var v: Array[Byte] = null
        var kv: KeyValue = null
        val cf: Array[Byte] = clounmFamily.getBytes //列族
        rowkey = Bytes.toBytes(row.getAs[String]("key")) //key
        for (i <- 1 to (columnsName.length - 1)) {
          cn = columnsName(i).getBytes() //列的名称
          v = Bytes.toBytes(row.getAs[String](columnsName(i))) //列的值
          //将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
          kv = new KeyValue(rowkey, cf, cn, v) //封装一下 rowkey, cf, clounmVale, value
          //
          kvlist = kvlist :+ kv //将新的kv加在kvlist后面(不能反 需要整体有序)
        }
        (new ImmutableBytesWritable(rowkey), kvlist)
      })
  1. 上述代码中通过map取出每一行row,用一个for循环通过所有字段的名称(去除掉“key”这个字段)对每个字段进行封装处理,每处理完一个字段加入kvlist。
  2. 在此处有个地方需要注意的是,我们要保证 kvlist 里面的数据整体有序(升序),这里的有序由字段名称排序和加入 kvlist 的位置来保证,kvlist 通过 :+ 将后一个数据放在List的后面,至于字段名称排序在后面说明。
  3. 至于此处为什么要去除掉key,这是因为我默认DataFrame第一个字段就是key,因为需要对所有字段名称进行排序,如果不把key拿出来后续不知道key在哪里了,如果按照正常走下去,key值也会被当成value被保存一次,这显然不符合我们的要求,当然有兴趣的同学可以自己实现更全面的方法。

2. 如何对DataFrame 的所有字段名排序

 var columnsName: Array[String] = resultDataFrame.columns //获取列名 第一个为key
    columnsName = columnsName.drop(1).sorted //把key去掉  因为要排序
  1. 通过resultDataFrame.columns获取所有列名,通过drop(1)删掉“key”,(序号从1开始)
  2. 通过sorted 对列名进行排序,默认就是升序的

通过上面方法处理后数据类型是
RDD[(ImmutableBytesWritable, Seq[KeyValue])]
这显然不是我们需要的,但是距离
RDD[(ImmutableBytesWritable, KeyValue)]
已然不远矣

3. 如何将value的Seq[KeyValue] 穿换成 KeyValue

 val result: RDD[(ImmutableBytesWritable, KeyValue)] = result1.flatMapValues(s => {
      s.iterator
    })
  1. 这点其实很简单,但是脑子当时短路还纠结很久,直接用flatMapValues这个方法即可,最后处理出来的就是我们的目标RDD[(ImmutableBytesWritable, KeyValue)]

4. 目标路径已经存在怎么办

  /**
    * 删除hdfs下的文件
    *
    * @param url 需要删除的路径
    */
  def delete_hdfspath(url: String) {
    val hdfs: FileSystem = FileSystem.get(new Configuration)
    val path: Path = new Path(url)
    if (hdfs.exists(path)) {
      val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)
      hdfs.delete(path, true)
    }
  }
  1. 存在就删除呗,新建个方法delete_hdfspath将路径删除即可

5. 如何生成 HFile 和 load 数据到Hbase
执行方法saveAsNewAPIHadoopFile()生成HFile
注意:此处要对key进行排序(升序)

    //保存数据
    result
      .sortBy(x => x._1, true) //要保持 整体有序
      .saveAsNewAPIHadoopFile(save_path,
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      job.getConfiguration)

load 数据到Hbase

hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles hdfs://ns1/user/hive/warehouse/iptv.db/zzzHFile iptv:spark_test

过程中出现的问题

  1. DataFrame 字段名称没有排序处理
18/10/15 14:19:32 WARN scheduler.TaskSetManager: Lost task 0.1 in stage 2.0 (TID 3, iptve2e03): java.io.IOException: Added a key not lexically larger than previous. 
Current cell = 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ/cf_info:area_code/1539584366048/Put/vlen=5/seqid=0, 
    lastCell = 200_\xE5\xB9\xBF\xE5\xB7\x9E_GD_GZ/cf_info:dict_id/1539584366048/Put/vlen=2/seqid=0

上面的意思是当前列名cf_info:area_code比前一个列名cf_info:dict_id小,这就是为什么需要对列名排序的原因,同时还要把key删除掉,因为不删除会出现cf_info:key这个列

完整代码

依赖:sbt

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.spark" % "spark-hive_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.0-cdh5.7.2"
libraryDependencies += "org.apache.hbase" % "hbase-protocol" % "1.2.0-cdh5.7.2"
libraryDependencies += "mysql" % "mysql-connector-java" % "5.1.38"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0-cdh5.7.2"
libraryDependencies += "com.yammer.metrics" % "metrics-core" % "2.2.0"

关键代码

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}

import com.iptv.domain.DatePattern
import com.iptv.job.JobBase
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.{FsAction, FsPermission}
import org.apache.hadoop.hbase.{HBaseConfiguration, KeyValue}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions.{concat, lit}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
  /**
    * 将DataFrame 保存为 HFile
    *
    * @param resultDataFrame 需要保存为HFile的 DataFrame,DataFrame的第一个字段必须为"key"
    * @param clounmFamily 列族名称(必须在Hbase中存在,否则在load数据的时候会失败)
    * @param save_path HFile的保存路径
    */
  def saveASHfFile(resultDataFrame: DataFrame, clounmFamily: String, save_path: String): Unit = {
    val conf: Configuration = HBaseConfiguration.create()
    lazy val job = Job.getInstance(conf)
    job.setMapOutputKeyClass(classOf[ImmutableBytesWritable]) //设置MapOutput Key Value 的数据类型
    job.setMapOutputValueClass(classOf[KeyValue])

    var columnsName: Array[String] = resultDataFrame.columns //获取列名 第一个为key
    columnsName = columnsName.drop(1).sorted //把key去掉  因为要排序

    val result1: RDD[(ImmutableBytesWritable, Seq[KeyValue])] = resultDataFrame
      .map(row => {
        var kvlist: Seq[KeyValue] = List()
        var rowkey: Array[Byte] = null
        var cn: Array[Byte] = null
        var v: Array[Byte] = null
        var kv: KeyValue = null
        val cf: Array[Byte] = clounmFamily.getBytes //列族
        rowkey = Bytes.toBytes(row.getAs[String]("key")) //key
        for (i <- 1 to (columnsName.length - 1)) {
          cn = columnsName(i).getBytes() //列的名称
          v = Bytes.toBytes(row.getAs[String](columnsName(i))) //列的值
          //将rdd转换成HFile需要的格式,我们上面定义了Hfile的key是ImmutableBytesWritable,那么我们定义的RDD也是要以ImmutableBytesWritable的实例为key
          kv = new KeyValue(rowkey, cf, cn, v) //封装一下 rowkey, cf, clounmVale, value
          //
          kvlist = kvlist :+ kv //将新的kv加在kvlist后面(不能反 需要整体有序)
        }
        (new ImmutableBytesWritable(rowkey), kvlist)
      })

    delete_hdfspath(save_path) //删除save_path 原来的数据

    //RDD[(ImmutableBytesWritable, Seq[KeyValue])] 转换成 RDD[(ImmutableBytesWritable, KeyValue)]
    val result: RDD[(ImmutableBytesWritable, KeyValue)] = result1.flatMapValues(s => {
      s.iterator
    })

    //保存数据
    result
      .sortBy(x => x._1, true) //要保持 整体有序
      .saveAsNewAPIHadoopFile(save_path,
      classOf[ImmutableBytesWritable],
      classOf[KeyValue],
      classOf[HFileOutputFormat2],
      job.getConfiguration)

  }

  /**
    * 删除hdfs下的文件
    * @param url 需要删除的路径
    */
  def delete_hdfspath(url: String) {
    val hdfs: FileSystem = FileSystem.get(new Configuration)
    val path: Path = new Path(url)
    if (hdfs.exists(path)) {
      val filePermission = new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.READ)
      hdfs.delete(path, true)
    }
  }

使用示例

package com.iptv.job.basedata

import com.iptv.job.JobBase
import org.apache.spark.sql.functions.{concat, lit}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
  * @author 利伊奥克儿-lillcol
  *         2018/10/14-11:08
  *
  */
object TestHFile extends JobBase {
  var hdfsPath: String = ""
  var proPath: String = ""
  var DATE: String = ""

  val sparkConf: SparkConf = new SparkConf().setAppName(getClass.getSimpleName)
  val sc: SparkContext = new SparkContext(sparkConf)
  val sqlContext: SQLContext = getSQLContext(sc)

  import sqlContext.implicits._

  def main(args: Array[String]): Unit = {
    hdfsPath = args(0)
    proPath = args(1)

    //HFile保存路径
    val save_path: String = hdfsPath + "zzzHFile"
    //获取测试DataFrame
    val dim_sys_city_dict: DataFrame = readMysqlTable(sqlContext, "DIM_SYS_CITY_DICT", proPath)

    val resultDataFrame: DataFrame = dim_sys_city_dict
      .select(concat($"city_id", lit("_"), $"city_name", lit("_"), $"city_code").as("key"), $"*")
    //注:resultDataFrame 里面的 key 要放在第一位,因为后面需要对字段名排序
    saveASHfFile(resultDataFrame, "cf_info", save_path)
  }
}

上述读取mysql数据为DataFrame的放大可以参考
Spark:读取mysql数据作为DataFrame

此为个人工作过程中的总结,转载请标出处!!!!!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,377评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,390评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,967评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,344评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,441评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,492评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,497评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,274评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,732评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,008评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,184评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,837评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,520评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,156评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,407评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,056评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,074评论 2 352

推荐阅读更多精彩内容