使用MapReduce操作HBase

1 环境准备

1)成功搭建Hadoop-2.2.0开发环境
2)成功启动HBase,通过HBase Shell进行测试
3)使用MyEclipse作为开发工具
4)使用Maven构建项目

2 创建项目

这里我就不带大家如何创建项目了,细节可看HBase Java API 练习中的操作,我们也是使用HBase Java API 练习中的项目。

3 创建上传数据至HBase的类

3.1 WordCountUpLoadToHBase.class

package com.szh.hbase;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.util.Tool;

public class WordCountUpLoadToHBase extends Configured {

    public static class WCHBaseMapper extends Mapper<Object, Text, ImmutableBytesWritable, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        public void map(Object key,Text value,Context context) throws IOException, InterruptedException{
            StringTokenizer strs = new StringTokenizer(value.toString());
            while(strs.hasMoreTokens()){
                word.set(strs.nextToken());
                context.write(new ImmutableBytesWritable(Bytes.toBytes(word.toString())), one);
            }
        }
        
    }
    
    public static class WCHBaseReducer extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable>{
    
        public void reduce(ImmutableBytesWritable key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException{
            int sum = 0;
            for(IntWritable val:values){
                sum += val.get();
            }
            Put put = new Put(key.get());
            put.add(Bytes.toBytes("content"),Bytes.toBytes("count"),Bytes.toBytes(sum+""));
            context.write(key, put);
        }
    }
    
    
    @SuppressWarnings("all")
    public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        String tableName = "wordcount";
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum","szh");
        conf.set("hbase.zookeeper.property.clientPort","2181");
        
        HBaseAdmin admin = new HBaseAdmin(conf);
        //如果表格存在就删除
        if(admin.tableExists(tableName)){
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        }
        HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
        HColumnDescriptor columnDescriptor =new HColumnDescriptor("content");
        tableDescriptor.addFamily(columnDescriptor);
        admin.createTable(tableDescriptor);
        
        Job job = new Job(conf,"upload to hbase");
        job.setJarByClass(WordCountUpLoadToHBase.class);
        job.setMapperClass(WCHBaseMapper.class);
        TableMapReduceUtil.initTableReducerJob(tableName, WCHBaseReducer.class, job,null,null,null,null,false);
        
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Put.class);
        FileInputFormat.addInputPaths(job, "hdfs://szh:9000/myhbase/hbase-test.txt");
        System.exit(job.waitForCompletion(true)?0:1);
        
    }

}

使用HBase Shell查看执行结果:

使用MapReduce上传数据至HBase.png

4 从HBase获取数据上传至HDFS

4.1 创建MRReadFormHbase.class

package com.szh.hbase;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MRReadFromHbase extends Configured {

    public static class WCHBaseMapper extends TableMapper<Text, Text>{
        
        @Override
        public void map(ImmutableBytesWritable key,Result values,Context context) throws IOException, InterruptedException{
            StringBuffer sb =new StringBuffer("");
            for(Map.Entry<byte[], byte[]> value:values.getFamilyMap("content".getBytes()).entrySet()){
                String  str =new String(value.getValue());
                if(str!=null){
                    sb.append(str);
                }
                context.write(new Text(key.get()), new Text(sb.toString()));
            }
        }
    }
    
    public static class WCHBaseReducer extends Reducer<Text, Text, Text, Text>{
        private Text result =new Text();
        public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{
            for(Text val:values){
                result.set(val);
                context.write(key,result);
            }
        }
    }
    
    
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // TODO Auto-generated method stub
        String tableName = "wordcount";
        Configuration conf =HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "szh");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        
        Job job =new Job(conf,"read from hbase to hdfs");
        job.setJarByClass(MRReadFromHbase.class);
        job.setReducerClass(WCHBaseReducer.class);
        TableMapReduceUtil.initTableMapperJob(tableName, new Scan(), WCHBaseMapper.class, Text.class, Text.class, job);
        FileOutputFormat.setOutputPath(job, new Path("hdfs://szh:9000/myhbase/out"));
        System.exit(job.waitForCompletion(true)?0:1);
    }

}

用hdfs命令查看结果:

从HBase读取数据至HDFS
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 136,665评论 19 139
  • 入门指南 1. 简介 Quickstart会让你启动和运行一个单节点单机HBase。 2. 快速启动 – 单点HB...
    和心数据阅读 5,005评论 1 41
  • Android 自定义View的各种姿势1 Activity的显示之ViewRootImpl详解 Activity...
    passiontim阅读 179,267评论 25 708
  • 一、任意取3个标题,你认为有10万+阅读潜质的标题。 《如果中韩或中日发生战争,你身边会有多少”卖国贼“》 《我们...
    抢领外卖券阅读 376评论 0 0
  • 从毕业实习开始算已经从医三年多了。从不知如何问诊,从不敢独立开处方,从一个人学习写病例。到现在已经每天一个人单...
    杏林淳阅读 231评论 0 0

友情链接更多精彩内容