Spark SQL 访问Hbase

@[toc]
参考文档 : https://hbase.apache.org/book.html#_sparksql_dataframes

简介

hbase-spark integration使用了Spark-1.2.0中引入的DataSource API (SPARK-3247), 它在简单的HBase KV存储和复杂的关系SQL查询之间架起桥梁,使用户能够使用Spark在HBase上执行复杂的数据分析工作。HBase数据帧是一个标准的Spark数据帧,能够与Hive、ORC、Parquet、JSON等任何其他数据源交互。HBase Spark集成应用了诸如分区修剪、列修剪、谓词下推和数据位置等关键技术。
要使用hbase-spark integration connector,用户需要为HBase和Spark表之间的模式映射定义Catalog,准备数据并填充HBase表,然后加载HBase数据帧。之后,用户可以使用SQL查询来集成查询和访问HBase表中的记录。

打包生成hbase-spark库

使用hbase-spark integration需要hbase-spark库
找了半天没有找到最新的那个包, 所以自己去github上面下载代码打包, 然后安装到本地仓库

git clone https://github.com/apache/hbase-connectors.git
cd hbase-connectors/spark/hbase-spark
mvn -Dspark.version=2.4.3 -Dscala.version=2.11.7 -Dscala.binary.version=2.11 clean install

然后在项目pom.xml中添加依赖

        <dependency>
            <groupId>org.apache.hbase.connectors.spark</groupId>
            <artifactId>hbase-spark</artifactId>
            <version>1.0.1</version>
        </dependency>
       <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.4</version>
        </dependency>

解决访问Hbase问题

执行代码时出现错误:

Exception in thread "main" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
    at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/hbase/fs/HFileSystem
    at cn.com.sjfx.sparkappdemo.Application.main(Application.java:27)
    ... 6 more
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.hbase.fs.HFileSystem
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 7 more

这是因为spark无法访问hbase中的库造成的, 需要在制作镜像的时候把hbase的库加入到spark中,
修改Dockerfile, 增加如下内容:

COPY /hbase-lib/* /spark/jars/

读写Hbase

public class Application {
    public static void main(String[] args) {
        SparkConf sparkConf = new SparkConf().setAppName("demo");
        JavaSparkContext jsc = new JavaSparkContext(sparkConf);
        SparkSession sparkSession = SparkSession.builder()
                .sparkContext(jsc.sc())
                .getOrCreate();

        //设置要访问的hbase的zookeeper
        Configuration configuration = HBaseConfiguration.create();
        configuration.set("hbase.zookeeper.quorum", "192.168.1.22:15301,192.168.1.22:15302,192.168.1.22:15303");
        //一定要创建这个hbaseContext, 因为后面写入时会用到它
        HBaseContext hBaseContext=new HBaseContext(jsc.sc(),configuration,null);

        //创建一个测试用的RDD
        List<Integer> data = new ArrayList<>();
        for (int i = 0; i < 256; i++) {
            data.add(i);
        }
        JavaRDD<Integer> rdd = jsc.parallelize(data);
        JavaRDD<HBaseRecord> rdd1 = rdd.map(i -> new HBaseRecord(i, "extra"));
        rdd1.collect().forEach(System.out::println);
        //根据RDD创建数据帧
        Dataset<Row> df = sparkSession.createDataFrame(rdd1, HBaseRecord.class);

        //定义映射的catalog
        String catalog = "{" +
                "       \"table\":{\"namespace\":\"default\", \"name\":\"table1\"}," +
                "       \"rowkey\":\"key\"," +
                "       \"columns\":{" +
                "         \"col0\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"}," +
                "         \"col1\":{\"cf\":\"cf1\", \"col\":\"col1\", \"type\":\"boolean\"}," +
                "         \"col2\":{\"cf\":\"cf2\", \"col\":\"col2\", \"type\":\"double\"}," +
                "         \"col3\":{\"cf\":\"cf3\", \"col\":\"col3\", \"type\":\"float\"}," +
                "         \"col4\":{\"cf\":\"cf4\", \"col\":\"col4\", \"type\":\"int\"}," +
                "         \"col5\":{\"cf\":\"cf5\", \"col\":\"col5\", \"type\":\"bigint\"}," +
                "         \"col6\":{\"cf\":\"cf6\", \"col\":\"col6\", \"type\":\"smallint\"}," +
                "         \"col7\":{\"cf\":\"cf7\", \"col\":\"col7\", \"type\":\"string\"}," +
                "         \"col8\":{\"cf\":\"cf8\", \"col\":\"col8\", \"type\":\"tinyint\"}" +
                "       }" +
                "     }";
        //写入数据
        df.write()
                .format("org.apache.hadoop.hbase.spark")
                .option(HBaseTableCatalog.tableCatalog(), catalog)
                .option(HBaseTableCatalog.newTable(), "5")  //写入到5个分区
                .mode(SaveMode.Overwrite)  // 覆盖模式
                .save();
        //读取数据
        Dataset<Row> df2 = sparkSession.read()
                .format("org.apache.hadoop.hbase.spark")
                .option(HBaseTableCatalog.tableCatalog(), catalog)
                .load();
        System.out.println("read result: ");
        df2.show();
    }

    //类需要可序列化
    public static class HBaseRecord implements Serializable {
        private static final long serialVersionUID = 4331526295356820188L;
        //属性一定要getter/setter, 即使是public
        public String col0;
        public Boolean col1;
        public Double col2;
        public Float col3;
        public Integer col4;
        public Long col5;
        public Short col6;
        public String col7;
        public Byte col8;

        public String getCol0() {
            return col0;
        }

        public void setCol0(String col0) {
            this.col0 = col0;
        }

        public Boolean getCol1() {
            return col1;
        }

        public void setCol1(Boolean col1) {
            this.col1 = col1;
        }

        public Double getCol2() {
            return col2;
        }

        public void setCol2(Double col2) {
            this.col2 = col2;
        }

        public Float getCol3() {
            return col3;
        }

        public void setCol3(Float col3) {
            this.col3 = col3;
        }

        public Integer getCol4() {
            return col4;
        }

        public void setCol4(Integer col4) {
            this.col4 = col4;
        }

        public Long getCol5() {
            return col5;
        }

        public void setCol5(Long col5) {
            this.col5 = col5;
        }

        public Short getCol6() {
            return col6;
        }

        public void setCol6(Short col6) {
            this.col6 = col6;
        }

        public String getCol7() {
            return col7;
        }

        public void setCol7(String col7) {
            this.col7 = col7;
        }

        public Byte getCol8() {
            return col8;
        }

        public void setCol8(Byte col8) {
            this.col8 = col8;
        }

        public HBaseRecord(Integer i, String s) {
            col0 = String.format("row%03d", i);
            col1 = i % 2 == 0;
            col2 = Double.valueOf(i);
            col3 = Float.valueOf(i);
            col4 = i;
            col5 = Long.valueOf(i);
            col6 = i.shortValue();
            col7 = "String:" + s;
            col8 = i.byteValue();
        }

        @Override
        public String toString() {
            return "HBaseRecord{" +
                    "col0='" + col0 + '\'' +
                    ", col1=" + col1 +
                    ", col2=" + col2 +
                    ", col3=" + col3 +
                    ", col4=" + col4 +
                    ", col5=" + col5 +
                    ", col6=" + col6 +
                    ", col7='" + col7 + '\'' +
                    ", col8=" + col8 +
                    '}';
        }
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容