Hbase - 表导出CSV数据

> 新鲜文章,昨天刚经过线上验证过的,使用它导出了3亿的用户数据出来,花了半个小时,性能还是稳稳的,好了不吹牛皮了,直接上代码吧。

## MR

考查了Hbase的各种MR,没有发现哪一个是能实现的,如果有请通知我,我给他发红包。

所以我们只能自己来写一个MR了,编写一个Hbase的MR,官方文档上也有相应的例子。

我们用来加以化妆就得到我们想要的了。

导出的CSV格式为

```

admin,22,北京

admin,23,天津

```

依赖 [hbase-mapreduce](https://mvnrepository.com/artifact/org.apache.hbase/hbase-mapreduce)

## 撸scala代码了

定义Map转换类

```

class MyMapper extends TableMapper[Text, Text] {

val keyText = new Text()

val valueText = new Text()

override def map(key: ImmutableBytesWritable, value: Result, context: Mapper[ImmutableBytesWritable, Result, Text, Text]#Context): Unit = {

val maps = result2Map(value)

keyText.set(maps.get("userId"))

valueText.set(s"${maps.get("regTime")}")

context.write(keyText, valueText)

}

//将Result转换为Map

def result2Map(result: Result): util.HashMap[lang.String, lang.String] = {

val map = new util.HashMap[lang.String, lang.String]()

result.rawCells().foreach {

cell =>

val column: Array[Byte] = CellUtil.cloneQualifier(cell)

val value: Array[Byte] = CellUtil.cloneValue(cell)

val qualifierByte = cell.getQualifierArray

if (qualifierByte != null && qualifierByte.nonEmpty) {

if (value == null || value.length == 0) {

map.put(Bytes.toString(column), "")

} else {

map.put(Bytes.toString(column), Bytes.toString(value))

}

}

}

map

}

}

```

定义Reducer类

```

class MyReducer extends Reducer[Text, Text, Text, Text] {

override def reduce(key: Text, values: lang.Iterable[Text], context: Reducer[Text, Text, Text, Text]#Context): Unit = {

val iter = values.iterator()

while (iter.hasNext) {

//这样可以只保留下Key字段,也就只有一行数据了

val tmpText = iter.next()

val mergeKey = new Text()

mergeKey.set(key.toString + "," + tmpText.toString)

val v = new Text()

v.set("")

context.write(mergeKey, v)

}

}

}

```

ExportCsv核心

```

class ExportCsv extends Configured with Tool {

override def run(args: Array[String]): Int = {

val conf = HBaseConfiguration.create()

conf.addResource(new FileInputStream(new File("/etc/hbase/conf/hbase-site.xml")))

conf.set(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.OUTDIR, "/tmp/hbasecsv")

conf.set("mapreduce.job.running.map.limit", "8") //最多有多少个Task同时跑

val job = Job.getInstance(conf, "HbaseExportCsv")

job.setJarByClass(classOf[ExportCsv])

val scan = new Scan()

//过滤我们想要的数据

scan.addFamily(Bytes.toBytes("ext"))

scan.addColumn(Bytes.toBytes("ext"), Bytes.toBytes("userId"))

scan.addColumn(Bytes.toBytes("ext"), Bytes.toBytes("regTime"))

scan.setBatch(1000)

scan.setCacheBlocks(false)

TableMapReduceUtil.initTableMapperJob(

"USER_TABLE",

scan,

classOf[MyMapper],

classOf[Text],

classOf[Text],

job

)

job.setReducerClass(classOf[MyReducer])

val jobConf = new JobConf(job.getConfiguration)

FileOutputFormat.setOutputPath(jobConf, new Path("/tmp/hbasecsv"))

val isDone = job.waitForCompletion(true)

if (isDone) 0 else 1

}

}

```

要跑了任务了

```

hadoop jar ExportCsv.jar

```

---

![](https://upload-images.jianshu.io/upload_images/9028759-07315bb8dadcd082.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 新鲜文章,昨天刚经过线上验证过的,使用它导出了3亿的用户数据出来,花了半个小时,性能还是稳稳的,好了不吹牛皮了,直...
    大猪大猪阅读 157评论 0 2
  • 新鲜文章,昨天刚经过线上验证过的,使用它导出了3亿的用户数据出来,花了半个小时,性能还是稳稳的,好了不吹牛皮了,直...
    kikiki1阅读 160评论 0 2
  • 新鲜文章,昨天刚经过线上验证过的,使用它导出了3亿的用户数据出来,花了半个小时,性能还是稳稳的,好了不吹牛皮了,直...
    kikiki1阅读 93评论 0 2
  • 新鲜文章,昨天刚经过线上验证过的,使用它导出了3亿的用户数据出来,花了半个小时,性能还是稳稳的,好了不吹牛皮了,直...
    kikiki1阅读 92评论 0 2
  • 新鲜文章,昨天刚经过线上验证过的,使用它导出了3亿的用户数据出来,花了半个小时,性能还是稳稳的,好了不吹牛皮了,直...
    kikiki1阅读 88评论 0 1