将fruit表中的一部分数据,通过MR迁入到fruit_mr表中
1、Code -> GitHub
https://github.com/liufengji/hbase_mapredece_one.git
2、构建ReadFruitMapper类,用于读取fruit表中的数据
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
@Override
protected void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
//将fruit的name和color提取出来,相当于将每一行数据读取出来放入到Put对象中。
Put put = new Put(key.get());
//遍历添加column行
for(Cell cell: value.rawCells()){
//添加/克隆列族:info
if("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))){
//添加/克隆列:name
if("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))){
//将该列cell加入到put对象中
put.add(cell);
//添加/克隆列:color
}else if("color".equals(
Bytes.toString(CellUtil.cloneQualifier(cell)))){
//向该列cell加入到put对象中
put.add(cell);
}
}
}
//将从fruit读取到的每行数据写入到context中作为map的输出
context.write(key, put);
}
}
3、构建WriteFruitMRReducer类,用于将读取到的fruit表中的数据写入到fruit_mr表中
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;
public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable,
Put, NullWritable> {
@Override
protected void reduce(ImmutableBytesWritable key, Iterable<Put> values,
Context context)
throws IOException, InterruptedException {
//读出来的每一行数据写入到fruit_mr表中
for(Put put: values){
context.write(NullWritable.get(), put);
}
}
}
4、构建Fruit2FruitMRRunner用于组装运行Job任务
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Fruit2FruitMRRunner extends Configured implements Tool {
@Override
public int run(String[] arg0) throws Exception {
// 得到Configuration
Configuration conf = this.getConf();
// 创建Job任务
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
job.setJarByClass(Fruit2FruitMRRunner.class);
// 配置Job
Scan scan = new Scan();
scan.setCacheBlocks(false);
scan.setCaching(500);
// 设置Mapper,注意导入的是mapreduce包下的,不是mapred包下的,后者是老版本
TableMapReduceUtil.initTableMapperJob("fruit", // 数据源的表名
scan, // scan扫描控制器
ReadFruitMapper.class, // 设置Mapper类
ImmutableBytesWritable.class, // 设置Mapper输出key类型
Put.class, // 设置Mapper输出value值类型
job// 设置给哪个JOB
);
// 设置Reducer
TableMapReduceUtil.initTableReducerJob("fruit_mr",
WriteFruitMRReducer.class, job);
// 设置Reduce数量,最少1个
job.setNumReduceTasks(1);
boolean isSuccess = job.waitForCompletion(true);
if (!isSuccess) {
throw new IOException("Job running with error");
}
return isSuccess ? 0 : 1;
}
}
5、主函数中调用运行该Job任务
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int status = ToolRunner.run(conf, new Fruit2FruitMRRunner(), args);
System.exit(status);
}
6、打包运行任务
[victor@node1 hbase-1.3.1]$ /opt/module/hadoop-2.7.2/bin/yarn jar \
hbase-0.0.1-SNAPSHOT.jar com.victor.hbase.mr1.Fruit2FruitMRRunner
尖叫提示:运行任务前,如果待数据导入的表不存在,则需要提前创建之。
尖叫提示:maven打包命令:-P local clean package或-P dev clean package install(将第三方jar包一同打包,需要插件:maven-shade-plugin)