Hbase - 表导出CSV数据

> 新鲜文章,昨天刚经过线上验证过的,使用它导出了3亿的用户数据出来,花了半个小时,性能还是稳稳的,好了不吹牛皮了,直接上代码吧。

## MR

考查了Hbase的各种MR,没有发现哪一个是能实现的,如果有请通知我,我给他发红包。

所以我们只能自己来写一个MR了,编写一个Hbase的MR,官方文档上也有相应的例子。

我们用来加以化妆就得到我们想要的了。

导出的CSV格式为

```

admin,22,北京

admin,23,天津

```

依赖 [hbase-mapreduce](https://mvnrepository.com/artifact/org.apache.hbase/hbase-mapreduce)

## 撸scala代码了

定义Map转换类

```

class MyMapper extends TableMapper[Text, Text] {

  val keyText = new Text()

  val valueText = new Text()

  override def map(key: ImmutableBytesWritable, value: Result, context: Mapper[ImmutableBytesWritable, Result, Text, Text]#Context): Unit = {

    val maps = result2Map(value)

    keyText.set(maps.get("userId"))

    valueText.set(s"${maps.get("regTime")}")

    context.write(keyText, valueText)

  }

  //将Result转换为Map

  def result2Map(result: Result): util.HashMap[lang.String, lang.String] = {

    val map = new util.HashMap[lang.String, lang.String]()

    result.rawCells().foreach {

      cell =>

        val column: Array[Byte] = CellUtil.cloneQualifier(cell)

        val value: Array[Byte] = CellUtil.cloneValue(cell)

        val qualifierByte = cell.getQualifierArray

        if (qualifierByte != null && qualifierByte.nonEmpty) {

          if (value == null || value.length == 0) {

            map.put(Bytes.toString(column), "")

          } else {

            map.put(Bytes.toString(column), Bytes.toString(value))

          }

        }

    }

    map

  }

}

```

定义Reducer类

```

class MyReducer extends Reducer[Text, Text, Text, Text] {

  override def reduce(key: Text, values: lang.Iterable[Text], context: Reducer[Text, Text, Text, Text]#Context): Unit = {

    val iter = values.iterator()

    while (iter.hasNext) {

    //这样可以只保留下Key字段,也就只有一行数据了

      val tmpText = iter.next()

      val mergeKey = new Text()

      mergeKey.set(key.toString + "," + tmpText.toString)

      val v = new Text()

      v.set("")

      context.write(mergeKey, v)

    }

  }

}

```

ExportCsv核心

```

class ExportCsv extends Configured with Tool {

  override def run(args: Array[String]): Int = {

    val conf = HBaseConfiguration.create()

    conf.addResource(new FileInputStream(new File("/etc/hbase/conf/hbase-site.xml")))

    conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, "/tmp/hbasecsv")

    conf.set("mapreduce.job.running.map.limit", "8") //最多有多少个Task同时跑

    val job = Job.getInstance(conf, "HbaseExportCsv")

    job.setJarByClass(classOf[ExportCsv])

    val scan = new Scan()

    //过滤我们想要的数据

    scan.addFamily(Bytes.toBytes("ext"))

    scan.addColumn(Bytes.toBytes("ext"), Bytes.toBytes("userId"))

    scan.addColumn(Bytes.toBytes("ext"), Bytes.toBytes("regTime"))

    scan.setBatch(1000)

    scan.setCacheBlocks(false)

    TableMapReduceUtil.initTableMapperJob(

      "USER_TABLE",

      scan,

      classOf[MyMapper],

      classOf[Text],

      classOf[Text],

      job

    )

    job.setReducerClass(classOf[MyReducer])

    val jobConf = new JobConf(job.getConfiguration)

    FileOutputFormat.setOutputPath(jobConf, new Path("/tmp/hbasecsv"))

    val isDone = job.waitForCompletion(true)

    if (isDone) 0 else 1

  }

}

```

要跑了任务了

```

hadoop jar ExportCsv.jar

```

---

![](https://upload-images.jianshu.io/upload_images/9028759-07315bb8dadcd082.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 新鲜文章,昨天刚经过线上验证过的,使用它导出了3亿的用户数据出来,花了半个小时,性能还是稳稳的,好了不吹牛皮了,直...
    kikiki2阅读 140评论 0 2
  • 新鲜文章,昨天刚经过线上验证过的,使用它导出了3亿的用户数据出来,花了半个小时,性能还是稳稳的,好了不吹牛皮了,直...
    kikiki1阅读 163评论 0 3
  • 新鲜文章,昨天刚经过线上验证过的,使用它导出了3亿的用户数据出来,花了半个小时,性能还是稳稳的,好了不吹牛皮了,直...
    kikiki1阅读 73评论 0 2
  • 新鲜文章,昨天刚经过线上验证过的,使用它导出了3亿的用户数据出来,花了半个小时,性能还是稳稳的,好了不吹牛皮了,直...
    大猪大猪阅读 176评论 0 1
  • 一、环境准备 请参考:ELK环境准备 二、安装kibana 2.1 下载安装 下载地址 2.2 安装配置 vim ...
    57山本无忧阅读 326评论 0 0