Hive和Java API操作HBase实践

本博客采用创作共用版权协议, 要求署名、非商业用途和保持一致. 转载本博客文章必须也遵循署名-非商业用途-保持一致的创作共用协议.

由于五一假期, 成文较为简略, 一些细节部分并没有详细介绍, 如有需求, 可以参考之前几篇相当MapRuduce主题的博文.

HBase实践

  • 修改MapReduce阶段倒排索引的信息通过文件输出, 而每个词极其对应的平均出现次数信息写入到Hbase的表Wuxia中(具体的要求可以查看之前的博文MapReduce实战之倒排索引)
  • 编写Java程序, 遍历上一步保存在HBase中的表, 并把表格的内容保存到本地文件中.
  • Hive使用Hive Shell命令行创建表(表名: Wuxia, (word string, count double)), 导入平均出现次数的数据
    • 查询出现次数大于300的词语
    • 查询前100个出现次数最多的数
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.StringTokenizer;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.util.Bytes;



public class InvertedIndexHbase {
    //创建表并进行简单配置
    public static void createHBaseTable(Configuration conf, String tablename) throws IOException {
//      HBaseConfiguration configuration = new HBaseConfiguration();
        HBaseAdmin admin = new HBaseAdmin(conf);
        if (admin.tableExists(tablename)) {  //如果表已经存在
            System.out.println("table exits, Trying recreate table!");
            admin.disableTable(tablename);
            admin.deleteTable(tablename);
        }
        HTableDescriptor htd = new HTableDescriptor(tablename); //row
        HColumnDescriptor col = new HColumnDescriptor("content"); //列族
        htd.addFamily(col); //创建列族
        System.out.println("Create new table: " + tablename);
        admin.createTable(htd); //创建表
    }
    //map函数不变
    public static class Map 
    extends Mapper<Object, Text, Text, Text> {
        private Text keyWord = new Text();
        private Text valueDocCount = new Text();

        public void map(Object key, Text value, Context context)
        throws IOException, InterruptedException {
            //获取文档
            FileSplit fileSplit = (FileSplit)context.getInputSplit();
            String fileName = fileSplit.getPath().getName();
            StringTokenizer itr = new StringTokenizer(value.toString());
            while(itr.hasMoreTokens()) {
                keyWord.set(itr.nextToken() + ":" + fileName);  // key为key#doc
                valueDocCount.set("1"); // value为词频
                context.write(keyWord, valueDocCount);
            }
        }
    }
    //combine函数不变
    public static class InvertedIndexCombiner
        extends Reducer<Text, Text, Text, Text> {
        private Text wordCount = new Text();
        private Text wordDoc = new Text();
        //将key-value转换为word-doc:词频
        public void reduce(Text key, Iterable<Text> values, 
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (Text value : values) {
                sum += Integer.parseInt(value.toString());
            }
            int splitIndex = key.toString().indexOf(":");  // 找到:的位置
            wordDoc.set(key.toString().substring(0, splitIndex));  //key变为单词
            wordCount.set(sum + "");  //value变为doc:词频
            context.write(wordDoc, wordCount);
        }
    }
    //reduce将数据存入HBase
    public static class Reduce
        extends TableReducer<Text, Text, NullWritable> {
        private Text temp = new Text();

        public void reduce(Text key, Iterable<Text> values, 
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            int count = 0;
            Iterator<Text> it = values.iterator();
            //形成最终value
            for(;it.hasNext();) { 
                count++;
                temp.set(it.next());
                sum += Integer.parseInt(temp.toString());
            }
            float averageCount = (float)sum / (float)count;
            FloatWritable average = new FloatWritable(averageCount);
            //加入row为key.toString()
            Put put = new Put(Bytes.toBytes(key.toString()));  //Put实例, 每一词存一行
            //列族为content, 列修饰符为average表示平均出现次数, 列值为平均出现次数
            put.add(Bytes.toBytes("content"), Bytes.toBytes("average"), Bytes.toBytes(average.toString()));
            context.write(NullWritable.get(), put); 
        }
    }

    public static void main(String[] args) throws Exception {
        String tablename = "Wuxia";
        Configuration conf = HBaseConfiguration.create();
        conf.set(TableOutputFormat.OUTPUT_TABLE, tablename);
        createHBaseTable(conf, tablename);
        Job job = Job.getInstance(conf, "Wuxia");  //配置作业名
        //配置作业的各个类
        job.setJarByClass(InvertedIndexHbase.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(InvertedIndexCombiner.class);
        job.setReducerClass(Reduce.class);
//        TableMapReduceUtil.initTableReducerJob(tablename, Reduce.class, job);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
}

然后在Hadoop执行操作.

$ hdfs dfs -mkdir /user
$ hdfs dfs -mkdir /user/input
$ hdfs dfs -put /Users/andrew_liu/Java/Hadoop/wuxia_novels/*  /user/input
$ hadoop jar WorkSpace/InvertedIndexHbase.jar InvertedIndexHbase  /user/input output1

执行成功结束后, 打开HBase Shell的操作

$ hbase shell
> scan 'Wuxia'

HBase中数据写入本地文件

import java.io.FileWriter;
import java.io.IOException;
import java.io.FileWriter;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintWriter;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.util.Bytes;


public class Hbase2Local {
    static Configuration conf = HBaseConfiguration.create();
    public static void getResultScan(String tableName, String filePath) throws IOException {
        Scan scan = new Scan();
        ResultScanner rs = null;
        HTable table =  new HTable(conf, Bytes.toBytes(tableName));
        try {
            rs = table.getScanner(scan);
            FileWriter fos = new FileWriter(filePath, true);
            for (Result r : rs) {
//              System.out.println("获得rowkey: " + new String(r.getRow()));
                for (KeyValue kv : r.raw()) {
//                  System.out.println("列: " + new String(kv.getFamily()) + "  值: " + new String(kv.getValue()));
                    String s = new String(r.getRow() + "\t" + kv.getValue() + "\n");
                    fos.write(s);
                }
            }
            fos.close();
        } catch (IOException e) {
            // TODO: handle exception
            e.printStackTrace();
        }
        rs.close();
    }
    public static void main(String[] args) throws Exception {
        String tableName = "Wuxia";
        String filePath = "/Users/andrew_liu/Java/WorkSpace/Hbaes2Local/bin/Wuxia";
        getResultScan(tableName, filePath);
    }
}

Hive实践

将本地数据导入Hive

hive> create table Wuxia(word string, count double) row format delimited fields terminated by '\t' stored as textfile;
Time taken: 0.049 seconds
hive> load data local inpath '/Users/andrew_liu/Downloads/Wuxia.txt' into table Wuxia;
Loading data to table default.wuxia
Table default.wuxia stats: [numFiles=1, totalSize=2065188]
OK
Time taken: 0.217 seconds

输出出现次数大于300的词语

select * from Wuxia order by count desc limit 100;
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容