HBase那些事

HBase那些事

@(大数据工程学院)[HBase, Hadoop, 优化, HadoopChen, hbase]

[TOC]

HBase特性

HBase是什么

HBase(Hadoop Database),一个高可靠性、高性能、面向列、可伸缩、 实时读写的分布式数据库。

选择特性

  • 列式储存:方便存储结构化和半结构化数据,方便做数据压缩,对针对某一列查询有非常大的IO优势;
  • KV储存:可以通过key快速查询到其value,任意的value格式;
  • 海量数据:PB级;
  • 低延时:毫秒级别;
  • 高吞吐量:百万查询/每秒;
  • 主键索引:基于RowKey的索引,快速检索KV值;
  • Hadoop集成:文件存储于HDFS之上,高效集成Hive、Spark;
  • BigTable:HBase是Google的BigTable架构的一个开源实现;
  • CRUD: 支持增删查改操作;
  • 支持Row级别事务:HBase本身均是基于Row级别的操作,所以行锁即可以保证ACID;
  • 稀疏存储: 节省空间、提高查询效率、便于压缩;
  • 支持非(半)结构化:列式储存和KV结构;
  • 容错性:分布式架构,高效错误转移;
  • 硬件成本低廉:类似于 Hadoop 的分布式集群,硬件成本低廉;
  • 第三方SQL支持:phoenix、hive、sparkSql等等;
  • 开源:基于java开发的开源软件,社区活跃;

HBase环境搭建

CDH

CM安装HBase比较简单,参照安装步骤即可:

image.png

分布式环境

前提条件

  1. Hadoop集群:hadoop01,hadoop02, hadoop03
  2. 用户互信
  3. HBase安装包
  4. JDK
  5. Zookeeper

安装部署

  1. 解压安装包:sudo tar -zxf hbase-1.2.4-bin.tar.gz -C /usr/local/hbase/
  2. 配置环境变量:vi /etc/profile ; export HBASE_HOME=/usr/local/hbase/
  3. 修改配置文件:$HBASE_HOME/conf/hbase-site.xml
<configuration>
  <property>
    <name>hbase.rootdir</name>
       <value>hdfs://nameservice1/hbase</value>
  </property>
  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>
  <property>
    <name>hbase.zookeeper.quorum</name>
    <value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value>
  </property>
</configuration>
  1. 修改启动脚本:$HBASE_HOME/conf/hbase-env.sh; export JAVA_HOME=/usr/java/jdk/
  2. 修改HMaster配置:$HBASE_HOME/conf/regionservers
hadoop01
hadoop02
hadoop03
  1. 新增备用HMater: $HBASE_HOME/conf/backup-masters
hadoop02
  1. 启动集群:$HBASE_HOME/bin/start-hbase.sh
  2. 验证环境:
create 'hello', 'cf'
put 'hello', 'one', 'cf:a', 'b'
get 'hello', 'one'

HBase架构

看图说话

HBase采用Master/Slave架构搭建集群,它隶属于Hadoop生态系统,由一下类型节点组成:HMaster节点、HRegionServer节点、ZooKeeper集群,而在底层,它将数据存储于HDFS中,因而涉及到HDFS的NameNode、DataNode等,总体结构如下:

image.png
  • HMaster :
  1. 管理HRegionServer,实现其负载均衡;
  2. 管理和分配HRegion;实现DDL操作;
  3. 管理namespace和table的元数据;权限控制;
  • HRegionServer :
  1. 存放和管理本地HRegion;
  2. 读写HDFS,管理Table中的数据;
  3. Client直接通过HRegionServer读写数据;
  • ZooKeeper :
  1. 存放整个 HBase集群的元数据以及集群的状态信息;
  2. 实现HMaster主从节点的failover。
  • HRegion:
  1. HBase表数据按行切分成多个HRegion;
  2. HRegion按照列簇切分成多个Store;
  3. 每个Store由一个MemStore和多个StoreFile(HFile)组成;
  4. 每个HRegion还包含一个HLog文件,用于数据恢复;
  • Hadoop:
  1. RegionServer通过DFS Client读写HDFS数据;
  2. RS和DN尽量保证在统一节点,确保数据本地化,提升读写性能;
  3. MemStore满足一定条件下会Flush到HDFS上的HFile文件。

HBaseTable

HBase表主要包含namespace(命名空间)、tableName(表名)、rowKey(主键)、columnFamily(列簇)、qualifier(列)、cell(值)、timeStamp(版本),用结构化的形式展现如下:

| NameSpace | TableName | RowKey | CF1:Name | CF2:Age
| -------- | -----: | :----: |
| Default | HelloWorld | 001 | Allen | 28
| Default | HelloWorld | 002 | Curry | 26
| Default | HelloWorld | 003 | Brant | 20
| Default | HelloWorld | 004 | Sean | 31

  • 存储:表HelloWorld数据量较小,暂时会保存在一个RegionServer的一个Region内;
  • Split:当Region达到最大Region限制(假定256M)后,则会Split成两个Region,这里假定001和002在RegionA,003和004在RegionB:
image.png

读流程

HBase表数据所在Region,Region所在RegionServer,均存放在HBase表hbase:meta,由于元数据较小,一个2G的HRegion大概可以支持4PB的数据,所以meta表是不可Split的。Meta表本身是一个HBase表,该表所在RS的地址存放在ZK,于是读数据的流程如下:

  1. 需求:从HellWorld表读取002这条记录;
  2. Client从ZK读取meta表所在RegionServer地址信息;
  3. Client和Meta表所在RS建立连接,获取HelloWorld表当中RowKey=002的记录所在的RS地址;并将该地址缓存在客户端;
  4. Client和002所在RS建立连接,申请查询002记录数据;
  5. 002所在RS根据RowKey获取Region信息,并初始化Scaner,Scaner优先扫描BlockCache,然后扫描MemStore,然后扫描StoreFile(HFile),直到找到002记录为止;
  6. RS返回结果数据给Client。

写流程

  1. 需求:写入005到HelloWorld表;
  2. Client从ZK去读Meta表所在RS地址;
  3. Client读取Meta表数据,确认005应该写入RS地址;
  4. Client将005数据写入RS02的HLog,用于灾备恢复,然后写入RegionB的memStore;此刻写操作已完成,反馈给client;
  5. 当memStore满足一定条件下会Flush到hdfs,hdfs再根据写策略复制副本。

Split和Compaction

HBase扩展和负载均衡的基本单位是Region。Region从本质上说是行的集合。当Region的大小达到一定的阈值,该Region会自动分裂(split),或者当该Region的HFile数太多,也会自动合并(compaction)。

对于一张表(HTable)而言,初始时只会有一个Region。表的数据量不断增加,系统会监控此表以确保数据量不会超过一个配置的阈值。如果系统发现表容量超过了限制,该Region会被一分为二。分裂主要看行键(row key),从Region正中的键开始分裂,并创建容量大致相等的两个Region。

根据上述写流程会发现HBase是一种Log-Structured Merge Tree架构模式,用户数据写入先写WAL,再写缓存,满足一定条件后缓存数据会执行flush操作真正落盘,形成一个数据文件HFile。随着数据写入不断增多,flush次数也会不断增多,进而HFile数据文件就会越来越多。然而,太多数据文件会导致数据查询IO次数增多,因此HBase尝试着不断对这些文件进行合并,这个合并过程称为Compaction。

BlockCache

上述读流程中RS会优先扫描BlockCache,BlockCache是一个读缓存,即“引用局部性”原理(也应用于CPU,分空间局部性和时间局部性,空间局部性是指CPU在某一时刻需要某个数据,那么有很大的概率在一下时刻它需要的数据在其附近;时间局部性是指某个数据在被访问过一次后,它有很大的概率在不久的将来会被再次的访问),将数据预读取到内存中,以提升读的性能。

HBase数据按照block块存储,默认是64K,HBase中Block分为四种类型:

  • Data Block用于存储实际数据,通常情况下每个Data Block可以存放多条KeyValue数据对;
  • Index Block通过存储索引数据加快数据查找;
  • Bloom Block通过一定算法可以过滤掉部分一定不存在待查KeyValue的数据文件,减少不必要的IO操作;
  • Meta Block主要存储整个HFile的元数据。

HBase中提供两种BlockCache的实现:默认on-heap LruBlockCache和BucketCache(通常是off-heap)。通常BucketCache的性能要差于LruBlockCache,然而由于GC的影响,LruBlockCache的延迟会变的不稳定,而BucketCache由于是自己管理BlockCache,而不需要GC,因而它的延迟通常比较稳定,这也是有些时候需要选用BucketCache的原因。

  • LruBlockCache :HBase默认的BlockCache实现方案。Block数据块都存储在 JVM heap内,由JVM进行垃圾回收管理。它将内存从逻辑上分为了三块:single-access区、mutil-access区、in-memory区,分别占到整个BlockCache大小的25%、50%、25%。一次随机读中,一个Block块从HDFS中加载出来之后首先放入signle区,后续如果有多次请求访问到这块数据的话,就会将这块数据移到mutil-access区。而in-memory区表示数据可以常驻内存,一般用来存放访问频繁、数据量小的数据,比如元数据,用户也可以在建表的时候通过设置列族属性IN-MEMORY= true将此列族放入in-memory区。很显然,这种设计策略类似于JVM中young区、old区以及perm区。无论哪个区,系统都会采用严格的Least-Recently-Used算法,当BlockCache总量达到一定阈值之后就会启动淘汰机制,最少使用的Block会被置换出来,为新加载的Block预留空间。
image.png
  • BucketCache :很明显LruBlockCache的缺陷是GC,当吞吐量徒增,GC不及时则会造成RS因为OOM停止工作;于是BucketCache引入了堆外内存来缓存Block数据,当然BucketCache通过配置可以工作在三种模式下:heap,offheap和file。无论工作在那种模式下,BucketCache都会申请许多带有固定大小标签的Bucket,一种Bucket存储一种指定BlockSize的数据块,BucketCache会在初始化的时候申请14个不同大小的Bucket,而且即使在某一种Bucket空间不足的情况下,系统也会从其他Bucket空间借用内存使用,不会出现内存使用率低的情况。接下来再来看看不同工作模式,heap模式表示这些Bucket是从JVM Heap中申请,offheap模式使用DirectByteBuffer技术实现堆外内存存储管理,而file模式使用类似SSD的高速缓存文件存储数据块。
image.png
  • CombinedBlockCache :实际实现中,HBase将BucketCache和LRUBlockCache搭配使用,称为CombinedBlockCache。和DoubleBlockCache不同,系统在LRUBlockCache中主要存储Index Block和Bloom Block,而将Data Block存储在BucketCache中。因此一次随机读需要首先在LRUBlockCache中查到对应的Index Block,然后再到BucketCache查找对应数据块。BucketCache通过更加合理的设计极大降低了JVM GC对业务请求的实际影响。
image.png

MemStore

HBase写入数据会先写WAL,再写缓存,WAL是用于RS故障后的数据恢复,而缓存MemStore则是为了提高写入数据的性能。但MemStore是基于内存,一方面空间有限,一方面数据容易丢失,所以RegionServer会在满足一定条件下讲MemStore数据Flush到HDFS,条件如下:

  1. Memstore级别限制:当Region中任意一个MemStore的大小达到了上限(hbase.hregion.memstore.flush.size,默认128MB),会触发Memstore刷新。

  2. Region级别限制:当Region中所有Memstore的大小总和达到了上限(hbase.hregion.memstore.block.multiplier * hbase.hregion.memstore.flush.size,默认 2* 128M = 256M),会触发memstore刷新。

  3. Region Server级别限制:当一个Region Server中所有Memstore的大小总和达到了上限(hbase.regionserver.global.memstore.upperLimit * hbase_heapsize,默认 40%的JVM内存使用量),会触发部分Memstore刷新。Flush顺序是按照Memstore由大到小执行,先Flush Memstore最大的Region,再执行次大的,直至总体Memstore内存使用量低于阈值(hbase.regionserver.global.memstore.lowerLimit * hbase_heapsize,默认 38%的JVM内存使用量);HBase1.0后使用新参数hbase.regionserver.global.memstore.size。

  4. 当一个Region Server中HLog数量达到上限(可通过参数hbase.regionserver.maxlogs配置)时,系统会选取最早的一个 HLog对应的一个或多个Region进行flush

  5. HBase定期刷新Memstore:默认周期为1小时,确保Memstore不会长时间没有持久化。为避免所有的MemStore在同一时间都进行flush导致的问题,定期的flush操作有20000左右的随机延时。

  6. 手动执行flush:用户可以通过shell命令 flush ‘tablename’或者flush ‘region name’分别对一个表或者一个Region进行flush。

HBase客户端

HBase提供了许多客户端,例如HBase Shell、JAVA API、REST、Thrift、Avro等等,另外MapReduce、Hive、Spark等分布式计算引擎均有接口读写HBase数据,详细细节参考《HBase In Action》或者《HBase权威指南》。

HBase优化

了解了HBase的基本概念之后,在使用过程中针对每个环节均有优化策略,最常见的优化策略如下。

调整GC策略

HBase内存使用情况参考上文,当吞吐量徒增或者运行一定时长后,大部分内存被BlockCache和MemStore占据,一旦FGC则会“stop the world”,影响RS的正常工作,如果GC时间超过ZK的心跳TimeOut时间,该服务会被ZK标记为BAD状态。所以合理高效的GC策略非常重要。

GC基本知识请参考《深入理解Java虚拟机》,针对HBase这里给出参考JVM参数:

  • -Xms16g、-Xmx16g:内存大小,根据集群情况设置,建议不要超过18G。
  • -Xmn1g:年轻代大小。
  • -XX:+UseParNewGC:设置年轻代为并发回收
  • -XX:+UseConcMarkSweepGC:启用CMS算法
  • -XX:CMSInitiatingOccupancyFraction=70:年老代使用了70%时回收内存
  • -Djava.net.preferIPv4Stack=true:禁用IPv6
  • -XX:-CMSConcurrentMTEnabled(-号代表否认),当该标志被启用时,并发的CMS阶段将以多线程执行(因此,多个GC线程会与所有的应用程序线程并行工作)。该标志已经默认开启,如果顺序执行更好,这取决于所使用的硬件,多线程执行可以通过-XX:-CMSConcurremntMTEnabled禁用。
  • -XX:+CMSIncrementalMode. 在增量模式下,CMS 收集器在并发阶段,不会独占整个周期,而会周期性的暂停,唤醒应用线程。收集器把并发阶段工作,划分为片段,安排在次级(minor) 回收之间运行。
-Xms10g -Xmx10g -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:MaxGCPauseMillis=60000 -XX:CMSInitiatingOccupancyFraction=70 -XX:PermSize=64m -XX:MaxPermSize=256m  -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=3072m -XX:+CMSIncrementalMode

BucketCache Off-Heap

如果存在批量读取HBase数据的请求,BlockCache可能会被迅速占满,GC不及时RS又要悲剧,所以采用BucketCache+Off-Heap是一个不错的优化点,或者不顾性能问题,关闭BlockCache。

  • BucketCache Off-Heap配置方法:
<property>     
   <name>hbase.bucketcache.size</name>    
   <value>3072</value> 
</property>
<property>
    <name>hbase.bucketcache.ioengine</name>
    <value>offheap</value> 
</property>
CombinedBlockCache策略下还需要设置L1缓存大小:
<property>
    <name>hbase.bucketcache.combinedcache.enabled</name>
    <value>true</value> 
</property>
<property>
 <name>hfile.block.cache.size</name>
 <value>0.2</value>
</property>
  • 内存简易观察方案:
jmap -heap pid  -- 内存使用情况
jmap -histo pid -- 内存粗略统计
jmap -F -dump:format=b,file=dump.file pid  -- dump文件
jhat dump.file -- 通过http://dshbase01:7000查看dump文件,但端口白名单导致无法访问

线程并发数

HBase是一个高并发的分布式数据库,牵扯到ZK、RegionServer、DataNode等组件,所以性能提升跟这些组件的并发线程数的控制离不开关系。

  • RS线程数:
hbase.regionserver.handler.count=300
hbase.regionserver.metahandler.count=300
该配置定义了每个Region Server上的RPC Handler的数量。Region Server通过RPC Handler接收外部请求并加以处理。所以提升RPC Handler的数量可以一定程度上提高HBase接收请求的能力。
  • ZK线程数:注意线程数增加带来的内存消耗,相应提升ZK Heap大小;
maxClientCnxns=60
  • DataNode:注意线程数增加带来的文件描述符的问题,调整最大文件描述符最大限制;
dfs.datanode.max.transfer.threads=8192
dfs.datanode.handler.count=100

Balancer

HBase是一种支持自动负载均衡的分布式KV数据库,在开启balance的开关(balance_switch)后,HBase的HMaster进程会自动根据 指定策略 挑选出一些Region,并将这些Region分配给负载比较低的RegionServer上。官方目前支持两种挑选Region的策略,一种叫做DefaultLoadBalancer,另一种叫做StochasticLoadBalancer。由于HBase的所有数据(包括HLog/Meta/HStoreFile等)都是写入到HDFS文件系统中的, 因此HBase的Region移动其实非常轻量级。在做Region移动的时候,保持这个Region对应的HDFS文件位置不变,只需要将Region的Meta数据分配到相关的RegionServer即可,整个Region移动的过程取决于RegionClose以及RegionOpen的耗时,这个时间一般都很短。

Region切分

Region Split的依据是最大Region Size,调大该参数可以减少Region数量,同时也减少了MemStore数量和总空间。

hbase.hregion.max.filesize=2G

非本地化部署

HBase读取数据时会从HDFS上的HFile加载数据到BlockCache,如果是本地数据则非常高效,如果不同机器或不同机架,则会带来网络消耗,同样写流程当中的Flush过程一样会本地化的问题,所以建议DataNode和RegionServer部署在同一台机器,因为RS调用hdfs client去读写数据,hdfs client优先会读写本地磁盘。另外也可以通过HBase Web UI观察数据本地化情况。

MemStore Flush

MemStore的频繁Flush会消耗大量服务器资源,根据Flush条件合理控制Flush次数是一个经常要观察的优化点。

  • 调大全局MemStore大小:
<property>
 <name>hbase.regionserver.global.memstore.size</name>
 <value>0.6</value>
</property>
  • 调大HLog个数限制:每个Region均包含一个HLog,所以需要根据Region个数来动态调整;
hbase.regionserver.maxlogs=500

压缩

压缩通常会带来较好的性能,因为CPU压缩和解压消耗的时间比从磁盘中读取和写入数据消耗的时间更短,HBase支持多种压缩方式,例如snappy、lzo、gzip等等,不同压缩算法压缩比和效率有一定差异。开启表压缩的方式如下:

disable 'MILESTONE'
alter 'MILESTONE',{NAME=>'PUSH',COMPRESSION=>'snappy'}
alter 'MILESTONE',{NAME=>'BEHAVIOR',COMPRESSION=>'snappy'}
enable 'MILESTONE'

MSLAB

HBase内存回收策略CMS一定程度上降低了Stop时间,但却避免不了内存碎片的问题,HBase提供了以下几个参数,防止堆中产生过多碎片,大概思路也就是参考TLAB,叫做MSLAB,MemStore-Local Allocation Buffer。

一个regionserver的内存里各个region的数据混合在一起,当某个region被flush到磁盘时,就会形成很多堆碎片。其实这跟java中gc模型的假设是冲突的,同一时间创建的对象,会在同一时间消亡。这个问题可以通过Arena Allocation来解决,即每次分配内存时都是在一个更大的叫做arena的内存区域分配。一个典型的实现是TLAB,即每个线程维护自己的arena,每个线程用到的对象都在自己的arena区域分配。其实,jvm早已经实现了TLAB,但是这个对于hbase不适用,因为hbase的regionserver使用一个单独的线程来处理所有region的请求,就算这个线程用arena方式分配还是会把所有region的数据混在一起。因此hbase自己实现了MSLAB,即每个region的memStore自己实现了arena,使各个region的数据分开,就不会形成太细的碎片。Arena里存放的是KeyValue对象,如果这些KeyValue对象是一样大的,不会导致严重碎片,相反这些KeyValue对象引用的字节数组才是引起碎片的主因,因此要做的就是把这些字节数组分配在一起。

hbase.hregion.memstore.mslab.enabled=true // 开启MSALB
hbase.hregion.memstore.mslab.chunksize=2m // chunk的大小,越大内存连续性越好,但内存平均利用率会降低
hbase.hregion.memstore.mslab.max.allocation=256K // 通过MSLAB分配的对象不能超过256K,否则直接在Heap上分配,256K够大了

RowKey设计

HBase是根据Rowkey来进行检索的,所以合理的Rowkey设计可以提高查询效率。

  • Rowkey长度原则:Rowkey是一个二进制码流,Rowkey的长度被很多开发者建议说设计在10~100个字节,不过建议是越短越好,不要超过16个字节。
  1. 数据的持久化文件HFile中是按照KeyValue存储的,如果Rowkey过长比如100个字节,1000万列数据光Rowkey就要占用100*1000万=10亿个字节,将近1G数据,这会极大影响HFile的存储效率;

  2. MemStore将缓存部分数据到内存,如果Rowkey字段过长内存的有效利用率会降低,系统将无法缓存更多的数据,这会降低检索效率。因此Rowkey的字节长度越短越好。

  3. 目前操作系统是都是64位系统,内存8字节对齐。控制在16个字节,8字节的整数倍利用操作系统的最佳特性。

  • Rowkey散列原则:如果Rowkey是按时间戳的方式递增,不要将时间放在二进制码的前面,建议将Rowkey的高位作为散列字段,由程序循环生成,低位放时间字段,这样将提高数据均衡分布在每个Regionserver实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息将产生所有新数据都在一个 RegionServer上堆积的热点现象,这样在做数据检索的时候负载将会集中在个别RegionServer,降低查询效率。

  • Rowkey唯一原则:必须在设计上保证其唯一性,这句是废话。

  • 不同应用场景RowKey的设计也需要定制化考虑。

客户端调优

  • Retry:hbase.client.retries.number=20
  • AutoFlush:将HTable的setAutoFlush设为false,可以支持客户端批量更新。即当Put填满客户端flush缓存时,才发送到服务端。默认是true。
  • Scan Caching:scanner一次缓存多少数据来scan(从服务端一次抓多少数据回来scan);默认值是 1,一次只取一条。
  • Scan Attribute Selection:scan时建议指定需要的Column Family,减少通信量,否则scan操作默认会返回整个row的所有数据(所有Coulmn Family)。
  • Close ResultScanners:通过scan取完数据后,记得要关闭ResultScanner,否则RegionServer可能会出现问题(对应的Server资源无法释放)。
  • Optimal Loading of Row Keys:当你scan一张表的时候,返回结果只需要row key(不需要CF, qualifier,values,timestaps)时,你可以在scan实例中添加一个filterList,并设置 MUST_PASS_ALL操作,filterList中add FirstKeyOnlyFilter或KeyOnlyFilter。这样可以减少网络通信量。
  • 启用Bloom Filter:Bloom Filter通过空间换时间,提高读操作性能。
  • 批量Get : HBase提供了get(List)批量获取数据的接口,减少RPC交互,提高性能,但注意需要在客户端关闭blockCache,否则容易OOM,也会冲掉热点数据,这样blockCache则失去了意义。

Hive On HBase

NoSQL种类繁多,各有优势,HBase其中一个优势就是和HDFS的集成,想象下如果实时事务数据需要结合历史统计数据,则需要将Hive离线跑批的数据T+1日导入HBase,这个数据搬迁的过程,针对Redis\MongoDB\cassandra,则需要定制开发ETL工具。对于HBase,Hive提供了HBaseStorageHandle解决方案:

-- KV映射
CREATE TABLE helloWorld(key int, value string)
  STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
  WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val")
  TBLPROPERTIES ("hbase.table.name" = "helloWorld");
-- 列簇映射
CREATE TABLE helloWorld(key int, cf1 map<string,string>, cf2 map<string, string>)
  STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
  WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:")
  TBLPROPERTIES ("hbase.table.name" = "helloWorld");

然后基于HQL插入数据即可。

Spark On HBase

某些业务场景下面,需要结合多个数据源的数据,比如redis和hbase的数据进行join。第一个念头则是Spark DataFrame或DataSet,DF支持多种数据源,同时还提供了SparkSQL语法用于统计分析。

// 配置文件
val conf = new SparkConf()
      .setAppName(jobName)
      .setMaster("local[*]")
      .set("redis.host", redisHost)
      .set("redis.port", redisPort)
      .set("redis.db", redisDB)
      .set("redis.timeout", redisTimeOut)
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

// RDD读取   
var redisRDD = sc.fromRedisSet(groupID)
// 注册DF
val redisSchemaString = "USER_ID"
val redisSchema = StructType(redisSchemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = redisRDD.map(p => Row(p.toString.trim))
var redisDF = sqlContext.createDataFrame(rowRDD, redisSchema)
redisDF.registerTempTable(redisDFTableName)
  • 针对HBase,常见的方案有newAPIHadoopRDD、nerdammer、hortonworks-spark/shc、unicredit/hbase-rdd。
  1. newAPIHadoopRDD:Spark官方提供的HDFS文件读写接口;
// 配置文件
    val hconf = HBaseConfiguration.create()
    hconf.set("hbase.zookeeper.property.clientPort", zkPort)
    hconf.set("hbase.zookeeper.quorum", zkQuorum)
    
// RDD读取
    hconf.set(TableInputFormat.INPUT_TABLE, tagTableName)
    hconf.set(TableInputFormat.SCAN_COLUMNS, hbaseColumns)
    val hbaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])...

// 注册DF:注册之前需要将HBaseRDD转换成RowRDD,此处省略
    val hbaseDF = sqlContext.createDataFrame(hbaseRDD, hbaseStruct)
    hbaseDF.registerTempTable(hbaseDFTableName)
  1. SHC :hortonworks提供的Spark读写HBase的方案,具体Demo见Git,SHC只支持spark2.0+版本;
// Schema定义
    def catalog =
      s"""{
         |"table":{"namespace":"default", "name":"tag"},
         |"rowkey":"key",
         |"columns":{
         |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
         |"col1":{"cf":"behavior", "col":"activate", "type":"string"}}
        }""".stripMargin

// 读取HBase数据
    def withCatalog(cat: String): DataFrame = {
          sqlContext
            .read
            .options(Map(HBaseTableCatalog.tableCatalog -> cat))
            .format("org.apache.spark.sql.execution.datasources.hbase")
            .load()
        }

// 注册DF
    val df = withCatalog(catalog)
    df.registerTempTable("userBehaviorTime")
  1. nerdammer : 写入操作必须是tunpleRDD,但Scala的tunple最大长度22,如果需要查询的HBase列数超过22,则无法使用该方案;
// nerdammer方案读取HBase数据:结果是tuple,长度有限制22.
val hbaseRDD = sc.hbaseTable[(Option[String], Option[String], Option[String], Option[String], Option[String],
Option[String], Option[String], Option[String], Option[String], Option[String], Option[String], Option[String],
Option[String])]("tag").select(columnStringArray: _*).inColumnFamily("behavior")

// DF只接收RowRDD
    val hbaseRowRDD = hbaseRDD.map(
      i => {
        var record = new ArrayBuffer[String]()
        i.productIterator.foreach { col => {
          record += col.asInstanceOf[Option[String]].get
        }
        }
        Row(record.toArray: _*)
      })

// RowRDD转成DF
    val hbaseDF = sqlContext.createDataFrame(hbaseRowRDD, hbaseSchema.struct)
    hbaseDF.registerTempTable("userBehaviorTime")
  1. unicredit太小众,坑太多,不建议使用;

  2. Phoenix : 两大优势Sql On HBase,HBase二级索引实现。HBase神器,但无法关联Redis数据和HBase数据。

以上解决方案均是采用传统的JOIN方式,这样会带来一个问题就是HBase全表扫描,性能低下。解决思路是先将RedisRDD进行repartition操作,针对每个partition形成HBase的List[Get]对象,进行批量读取。但同样批量读取需要考虑内存激增的情况,所以需要结合上面的优化点使用。

Phoenix On HBase

Phoenix是构建在HBase上的一个SQL层,能让我们用标准的JDBC API而不是HBase客户端API来增删查改HBase数据。相对原生HBase接口,Phoenix提供了以下特性:

  • JDBC API
  • 性能优化
  • 二级索引实现
  • SQL引擎
  • 事务
  • UDF
  • 统计信息收集
  • 分页查询
  • 散步表
  • 跳跃扫描
  • 视图
  • 多租户
  • 动态列
  • 大量CSV数据加载
  • 查询服务器
  • 追踪
  • 指标

有兴趣者查阅Phoenix官网。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,039评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,223评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,916评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,009评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,030评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,011评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,934评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,754评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,202评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,433评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,590评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,321评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,917评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,568评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,738评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,583评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,482评论 2 352

推荐阅读更多精彩内容