HBase那些事
@(大数据工程学院)[HBase, Hadoop, 优化, HadoopChen, hbase]
[TOC]
HBase特性
HBase是什么
HBase(Hadoop Database),一个高可靠性、高性能、面向列、可伸缩、 实时读写的分布式数据库。
选择特性
- 列式储存:方便存储结构化和半结构化数据,方便做数据压缩,对针对某一列查询有非常大的IO优势;
- KV储存:可以通过key快速查询到其value,任意的value格式;
- 海量数据:PB级;
- 低延时:毫秒级别;
- 高吞吐量:百万查询/每秒;
- 主键索引:基于RowKey的索引,快速检索KV值;
- Hadoop集成:文件存储于HDFS之上,高效集成Hive、Spark;
- BigTable:HBase是Google的BigTable架构的一个开源实现;
- CRUD: 支持增删查改操作;
- 支持Row级别事务:HBase本身均是基于Row级别的操作,所以行锁即可以保证ACID;
- 稀疏存储: 节省空间、提高查询效率、便于压缩;
- 支持非(半)结构化:列式储存和KV结构;
- 容错性:分布式架构,高效错误转移;
- 硬件成本低廉:类似于 Hadoop 的分布式集群,硬件成本低廉;
- 第三方SQL支持:phoenix、hive、sparkSql等等;
- 开源:基于java开发的开源软件,社区活跃;
HBase环境搭建
CDH
CM安装HBase比较简单,参照安装步骤即可:
分布式环境
前提条件
- Hadoop集群:hadoop01,hadoop02, hadoop03
- 用户互信
- HBase安装包
- JDK
- Zookeeper
安装部署
- 解压安装包:sudo tar -zxf hbase-1.2.4-bin.tar.gz -C /usr/local/hbase/
- 配置环境变量:vi /etc/profile ; export HBASE_HOME=/usr/local/hbase/
- 修改配置文件:$HBASE_HOME/conf/hbase-site.xml
<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://nameservice1/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>hadoop01:2181,hadoop02:2181,hadoop03:2181</value>
</property>
</configuration>
- 修改启动脚本:$HBASE_HOME/conf/hbase-env.sh; export JAVA_HOME=/usr/java/jdk/
- 修改HMaster配置:$HBASE_HOME/conf/regionservers
hadoop01
hadoop02
hadoop03
- 新增备用HMater: $HBASE_HOME/conf/backup-masters
hadoop02
- 启动集群:$HBASE_HOME/bin/start-hbase.sh
- 验证环境:
create 'hello', 'cf'
put 'hello', 'one', 'cf:a', 'b'
get 'hello', 'one'
HBase架构
看图说话
HBase采用Master/Slave架构搭建集群,它隶属于Hadoop生态系统,由一下类型节点组成:HMaster节点、HRegionServer节点、ZooKeeper集群,而在底层,它将数据存储于HDFS中,因而涉及到HDFS的NameNode、DataNode等,总体结构如下:
- HMaster :
- 管理HRegionServer,实现其负载均衡;
- 管理和分配HRegion;实现DDL操作;
- 管理namespace和table的元数据;权限控制;
- HRegionServer :
- 存放和管理本地HRegion;
- 读写HDFS,管理Table中的数据;
- Client直接通过HRegionServer读写数据;
- ZooKeeper :
- 存放整个 HBase集群的元数据以及集群的状态信息;
- 实现HMaster主从节点的failover。
- HRegion:
- HBase表数据按行切分成多个HRegion;
- HRegion按照列簇切分成多个Store;
- 每个Store由一个MemStore和多个StoreFile(HFile)组成;
- 每个HRegion还包含一个HLog文件,用于数据恢复;
- Hadoop:
- RegionServer通过DFS Client读写HDFS数据;
- RS和DN尽量保证在统一节点,确保数据本地化,提升读写性能;
- MemStore满足一定条件下会Flush到HDFS上的HFile文件。
HBaseTable
HBase表主要包含namespace(命名空间)、tableName(表名)、rowKey(主键)、columnFamily(列簇)、qualifier(列)、cell(值)、timeStamp(版本),用结构化的形式展现如下:
| NameSpace | TableName | RowKey | CF1:Name | CF2:Age
| -------- | -----: | :----: |
| Default | HelloWorld | 001 | Allen | 28
| Default | HelloWorld | 002 | Curry | 26
| Default | HelloWorld | 003 | Brant | 20
| Default | HelloWorld | 004 | Sean | 31
- 存储:表HelloWorld数据量较小,暂时会保存在一个RegionServer的一个Region内;
- Split:当Region达到最大Region限制(假定256M)后,则会Split成两个Region,这里假定001和002在RegionA,003和004在RegionB:
读流程
HBase表数据所在Region,Region所在RegionServer,均存放在HBase表hbase:meta,由于元数据较小,一个2G的HRegion大概可以支持4PB的数据,所以meta表是不可Split的。Meta表本身是一个HBase表,该表所在RS的地址存放在ZK,于是读数据的流程如下:
- 需求:从HellWorld表读取002这条记录;
- Client从ZK读取meta表所在RegionServer地址信息;
- Client和Meta表所在RS建立连接,获取HelloWorld表当中RowKey=002的记录所在的RS地址;并将该地址缓存在客户端;
- Client和002所在RS建立连接,申请查询002记录数据;
- 002所在RS根据RowKey获取Region信息,并初始化Scaner,Scaner优先扫描BlockCache,然后扫描MemStore,然后扫描StoreFile(HFile),直到找到002记录为止;
- RS返回结果数据给Client。
写流程
- 需求:写入005到HelloWorld表;
- Client从ZK去读Meta表所在RS地址;
- Client读取Meta表数据,确认005应该写入RS地址;
- Client将005数据写入RS02的HLog,用于灾备恢复,然后写入RegionB的memStore;此刻写操作已完成,反馈给client;
- 当memStore满足一定条件下会Flush到hdfs,hdfs再根据写策略复制副本。
Split和Compaction
HBase扩展和负载均衡的基本单位是Region。Region从本质上说是行的集合。当Region的大小达到一定的阈值,该Region会自动分裂(split),或者当该Region的HFile数太多,也会自动合并(compaction)。
对于一张表(HTable)而言,初始时只会有一个Region。表的数据量不断增加,系统会监控此表以确保数据量不会超过一个配置的阈值。如果系统发现表容量超过了限制,该Region会被一分为二。分裂主要看行键(row key),从Region正中的键开始分裂,并创建容量大致相等的两个Region。
根据上述写流程会发现HBase是一种Log-Structured Merge Tree架构模式,用户数据写入先写WAL,再写缓存,满足一定条件后缓存数据会执行flush操作真正落盘,形成一个数据文件HFile。随着数据写入不断增多,flush次数也会不断增多,进而HFile数据文件就会越来越多。然而,太多数据文件会导致数据查询IO次数增多,因此HBase尝试着不断对这些文件进行合并,这个合并过程称为Compaction。
BlockCache
上述读流程中RS会优先扫描BlockCache,BlockCache是一个读缓存,即“引用局部性”原理(也应用于CPU,分空间局部性和时间局部性,空间局部性是指CPU在某一时刻需要某个数据,那么有很大的概率在一下时刻它需要的数据在其附近;时间局部性是指某个数据在被访问过一次后,它有很大的概率在不久的将来会被再次的访问),将数据预读取到内存中,以提升读的性能。
HBase数据按照block块存储,默认是64K,HBase中Block分为四种类型:
- Data Block用于存储实际数据,通常情况下每个Data Block可以存放多条KeyValue数据对;
- Index Block通过存储索引数据加快数据查找;
- Bloom Block通过一定算法可以过滤掉部分一定不存在待查KeyValue的数据文件,减少不必要的IO操作;
- Meta Block主要存储整个HFile的元数据。
HBase中提供两种BlockCache的实现:默认on-heap LruBlockCache和BucketCache(通常是off-heap)。通常BucketCache的性能要差于LruBlockCache,然而由于GC的影响,LruBlockCache的延迟会变的不稳定,而BucketCache由于是自己管理BlockCache,而不需要GC,因而它的延迟通常比较稳定,这也是有些时候需要选用BucketCache的原因。
- LruBlockCache :HBase默认的BlockCache实现方案。Block数据块都存储在 JVM heap内,由JVM进行垃圾回收管理。它将内存从逻辑上分为了三块:single-access区、mutil-access区、in-memory区,分别占到整个BlockCache大小的25%、50%、25%。一次随机读中,一个Block块从HDFS中加载出来之后首先放入signle区,后续如果有多次请求访问到这块数据的话,就会将这块数据移到mutil-access区。而in-memory区表示数据可以常驻内存,一般用来存放访问频繁、数据量小的数据,比如元数据,用户也可以在建表的时候通过设置列族属性IN-MEMORY= true将此列族放入in-memory区。很显然,这种设计策略类似于JVM中young区、old区以及perm区。无论哪个区,系统都会采用严格的Least-Recently-Used算法,当BlockCache总量达到一定阈值之后就会启动淘汰机制,最少使用的Block会被置换出来,为新加载的Block预留空间。
- BucketCache :很明显LruBlockCache的缺陷是GC,当吞吐量徒增,GC不及时则会造成RS因为OOM停止工作;于是BucketCache引入了堆外内存来缓存Block数据,当然BucketCache通过配置可以工作在三种模式下:heap,offheap和file。无论工作在那种模式下,BucketCache都会申请许多带有固定大小标签的Bucket,一种Bucket存储一种指定BlockSize的数据块,BucketCache会在初始化的时候申请14个不同大小的Bucket,而且即使在某一种Bucket空间不足的情况下,系统也会从其他Bucket空间借用内存使用,不会出现内存使用率低的情况。接下来再来看看不同工作模式,heap模式表示这些Bucket是从JVM Heap中申请,offheap模式使用DirectByteBuffer技术实现堆外内存存储管理,而file模式使用类似SSD的高速缓存文件存储数据块。
- CombinedBlockCache :实际实现中,HBase将BucketCache和LRUBlockCache搭配使用,称为CombinedBlockCache。和DoubleBlockCache不同,系统在LRUBlockCache中主要存储Index Block和Bloom Block,而将Data Block存储在BucketCache中。因此一次随机读需要首先在LRUBlockCache中查到对应的Index Block,然后再到BucketCache查找对应数据块。BucketCache通过更加合理的设计极大降低了JVM GC对业务请求的实际影响。
MemStore
HBase写入数据会先写WAL,再写缓存,WAL是用于RS故障后的数据恢复,而缓存MemStore则是为了提高写入数据的性能。但MemStore是基于内存,一方面空间有限,一方面数据容易丢失,所以RegionServer会在满足一定条件下讲MemStore数据Flush到HDFS,条件如下:
Memstore级别限制:当Region中任意一个MemStore的大小达到了上限(hbase.hregion.memstore.flush.size,默认128MB),会触发Memstore刷新。
Region级别限制:当Region中所有Memstore的大小总和达到了上限(hbase.hregion.memstore.block.multiplier * hbase.hregion.memstore.flush.size,默认 2* 128M = 256M),会触发memstore刷新。
Region Server级别限制:当一个Region Server中所有Memstore的大小总和达到了上限(hbase.regionserver.global.memstore.upperLimit * hbase_heapsize,默认 40%的JVM内存使用量),会触发部分Memstore刷新。Flush顺序是按照Memstore由大到小执行,先Flush Memstore最大的Region,再执行次大的,直至总体Memstore内存使用量低于阈值(hbase.regionserver.global.memstore.lowerLimit * hbase_heapsize,默认 38%的JVM内存使用量);HBase1.0后使用新参数hbase.regionserver.global.memstore.size。
当一个Region Server中HLog数量达到上限(可通过参数hbase.regionserver.maxlogs配置)时,系统会选取最早的一个 HLog对应的一个或多个Region进行flush
HBase定期刷新Memstore:默认周期为1小时,确保Memstore不会长时间没有持久化。为避免所有的MemStore在同一时间都进行flush导致的问题,定期的flush操作有20000左右的随机延时。
手动执行flush:用户可以通过shell命令 flush ‘tablename’或者flush ‘region name’分别对一个表或者一个Region进行flush。
HBase客户端
HBase提供了许多客户端,例如HBase Shell、JAVA API、REST、Thrift、Avro等等,另外MapReduce、Hive、Spark等分布式计算引擎均有接口读写HBase数据,详细细节参考《HBase In Action》或者《HBase权威指南》。
HBase优化
了解了HBase的基本概念之后,在使用过程中针对每个环节均有优化策略,最常见的优化策略如下。
调整GC策略
HBase内存使用情况参考上文,当吞吐量徒增或者运行一定时长后,大部分内存被BlockCache和MemStore占据,一旦FGC则会“stop the world”,影响RS的正常工作,如果GC时间超过ZK的心跳TimeOut时间,该服务会被ZK标记为BAD状态。所以合理高效的GC策略非常重要。
GC基本知识请参考《深入理解Java虚拟机》,针对HBase这里给出参考JVM参数:
- -Xms16g、-Xmx16g:内存大小,根据集群情况设置,建议不要超过18G。
- -Xmn1g:年轻代大小。
- -XX:+UseParNewGC:设置年轻代为并发回收
- -XX:+UseConcMarkSweepGC:启用CMS算法
- -XX:CMSInitiatingOccupancyFraction=70:年老代使用了70%时回收内存
- -Djava.net.preferIPv4Stack=true:禁用IPv6
- -XX:-CMSConcurrentMTEnabled(-号代表否认),当该标志被启用时,并发的CMS阶段将以多线程执行(因此,多个GC线程会与所有的应用程序线程并行工作)。该标志已经默认开启,如果顺序执行更好,这取决于所使用的硬件,多线程执行可以通过-XX:-CMSConcurremntMTEnabled禁用。
- -XX:+CMSIncrementalMode. 在增量模式下,CMS 收集器在并发阶段,不会独占整个周期,而会周期性的暂停,唤醒应用线程。收集器把并发阶段工作,划分为片段,安排在次级(minor) 回收之间运行。
-Xms10g -Xmx10g -Xmn1g -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:MaxGCPauseMillis=60000 -XX:CMSInitiatingOccupancyFraction=70 -XX:PermSize=64m -XX:MaxPermSize=256m -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=3072m -XX:+CMSIncrementalMode
BucketCache Off-Heap
如果存在批量读取HBase数据的请求,BlockCache可能会被迅速占满,GC不及时RS又要悲剧,所以采用BucketCache+Off-Heap是一个不错的优化点,或者不顾性能问题,关闭BlockCache。
- BucketCache Off-Heap配置方法:
<property>
<name>hbase.bucketcache.size</name>
<value>3072</value>
</property>
<property>
<name>hbase.bucketcache.ioengine</name>
<value>offheap</value>
</property>
CombinedBlockCache策略下还需要设置L1缓存大小:
<property>
<name>hbase.bucketcache.combinedcache.enabled</name>
<value>true</value>
</property>
<property>
<name>hfile.block.cache.size</name>
<value>0.2</value>
</property>
- 内存简易观察方案:
jmap -heap pid -- 内存使用情况
jmap -histo pid -- 内存粗略统计
jmap -F -dump:format=b,file=dump.file pid -- dump文件
jhat dump.file -- 通过http://dshbase01:7000查看dump文件,但端口白名单导致无法访问
线程并发数
HBase是一个高并发的分布式数据库,牵扯到ZK、RegionServer、DataNode等组件,所以性能提升跟这些组件的并发线程数的控制离不开关系。
- RS线程数:
hbase.regionserver.handler.count=300
hbase.regionserver.metahandler.count=300
该配置定义了每个Region Server上的RPC Handler的数量。Region Server通过RPC Handler接收外部请求并加以处理。所以提升RPC Handler的数量可以一定程度上提高HBase接收请求的能力。
- ZK线程数:注意线程数增加带来的内存消耗,相应提升ZK Heap大小;
maxClientCnxns=60
- DataNode:注意线程数增加带来的文件描述符的问题,调整最大文件描述符最大限制;
dfs.datanode.max.transfer.threads=8192
dfs.datanode.handler.count=100
Balancer
HBase是一种支持自动负载均衡的分布式KV数据库,在开启balance的开关(balance_switch)后,HBase的HMaster进程会自动根据 指定策略 挑选出一些Region,并将这些Region分配给负载比较低的RegionServer上。官方目前支持两种挑选Region的策略,一种叫做DefaultLoadBalancer,另一种叫做StochasticLoadBalancer。由于HBase的所有数据(包括HLog/Meta/HStoreFile等)都是写入到HDFS文件系统中的, 因此HBase的Region移动其实非常轻量级。在做Region移动的时候,保持这个Region对应的HDFS文件位置不变,只需要将Region的Meta数据分配到相关的RegionServer即可,整个Region移动的过程取决于RegionClose以及RegionOpen的耗时,这个时间一般都很短。
Region切分
Region Split的依据是最大Region Size,调大该参数可以减少Region数量,同时也减少了MemStore数量和总空间。
hbase.hregion.max.filesize=2G
非本地化部署
HBase读取数据时会从HDFS上的HFile加载数据到BlockCache,如果是本地数据则非常高效,如果不同机器或不同机架,则会带来网络消耗,同样写流程当中的Flush过程一样会本地化的问题,所以建议DataNode和RegionServer部署在同一台机器,因为RS调用hdfs client去读写数据,hdfs client优先会读写本地磁盘。另外也可以通过HBase Web UI观察数据本地化情况。
MemStore Flush
MemStore的频繁Flush会消耗大量服务器资源,根据Flush条件合理控制Flush次数是一个经常要观察的优化点。
- 调大全局MemStore大小:
<property>
<name>hbase.regionserver.global.memstore.size</name>
<value>0.6</value>
</property>
- 调大HLog个数限制:每个Region均包含一个HLog,所以需要根据Region个数来动态调整;
hbase.regionserver.maxlogs=500
压缩
压缩通常会带来较好的性能,因为CPU压缩和解压消耗的时间比从磁盘中读取和写入数据消耗的时间更短,HBase支持多种压缩方式,例如snappy、lzo、gzip等等,不同压缩算法压缩比和效率有一定差异。开启表压缩的方式如下:
disable 'MILESTONE'
alter 'MILESTONE',{NAME=>'PUSH',COMPRESSION=>'snappy'}
alter 'MILESTONE',{NAME=>'BEHAVIOR',COMPRESSION=>'snappy'}
enable 'MILESTONE'
MSLAB
HBase内存回收策略CMS一定程度上降低了Stop时间,但却避免不了内存碎片的问题,HBase提供了以下几个参数,防止堆中产生过多碎片,大概思路也就是参考TLAB,叫做MSLAB,MemStore-Local Allocation Buffer。
一个regionserver的内存里各个region的数据混合在一起,当某个region被flush到磁盘时,就会形成很多堆碎片。其实这跟java中gc模型的假设是冲突的,同一时间创建的对象,会在同一时间消亡。这个问题可以通过Arena Allocation来解决,即每次分配内存时都是在一个更大的叫做arena的内存区域分配。一个典型的实现是TLAB,即每个线程维护自己的arena,每个线程用到的对象都在自己的arena区域分配。其实,jvm早已经实现了TLAB,但是这个对于hbase不适用,因为hbase的regionserver使用一个单独的线程来处理所有region的请求,就算这个线程用arena方式分配还是会把所有region的数据混在一起。因此hbase自己实现了MSLAB,即每个region的memStore自己实现了arena,使各个region的数据分开,就不会形成太细的碎片。Arena里存放的是KeyValue对象,如果这些KeyValue对象是一样大的,不会导致严重碎片,相反这些KeyValue对象引用的字节数组才是引起碎片的主因,因此要做的就是把这些字节数组分配在一起。
hbase.hregion.memstore.mslab.enabled=true // 开启MSALB
hbase.hregion.memstore.mslab.chunksize=2m // chunk的大小,越大内存连续性越好,但内存平均利用率会降低
hbase.hregion.memstore.mslab.max.allocation=256K // 通过MSLAB分配的对象不能超过256K,否则直接在Heap上分配,256K够大了
RowKey设计
HBase是根据Rowkey来进行检索的,所以合理的Rowkey设计可以提高查询效率。
- Rowkey长度原则:Rowkey是一个二进制码流,Rowkey的长度被很多开发者建议说设计在10~100个字节,不过建议是越短越好,不要超过16个字节。
数据的持久化文件HFile中是按照KeyValue存储的,如果Rowkey过长比如100个字节,1000万列数据光Rowkey就要占用100*1000万=10亿个字节,将近1G数据,这会极大影响HFile的存储效率;
MemStore将缓存部分数据到内存,如果Rowkey字段过长内存的有效利用率会降低,系统将无法缓存更多的数据,这会降低检索效率。因此Rowkey的字节长度越短越好。
目前操作系统是都是64位系统,内存8字节对齐。控制在16个字节,8字节的整数倍利用操作系统的最佳特性。
Rowkey散列原则:如果Rowkey是按时间戳的方式递增,不要将时间放在二进制码的前面,建议将Rowkey的高位作为散列字段,由程序循环生成,低位放时间字段,这样将提高数据均衡分布在每个Regionserver实现负载均衡的几率。如果没有散列字段,首字段直接是时间信息将产生所有新数据都在一个 RegionServer上堆积的热点现象,这样在做数据检索的时候负载将会集中在个别RegionServer,降低查询效率。
Rowkey唯一原则:必须在设计上保证其唯一性,这句是废话。
不同应用场景RowKey的设计也需要定制化考虑。
客户端调优
- Retry:hbase.client.retries.number=20
- AutoFlush:将HTable的setAutoFlush设为false,可以支持客户端批量更新。即当Put填满客户端flush缓存时,才发送到服务端。默认是true。
- Scan Caching:scanner一次缓存多少数据来scan(从服务端一次抓多少数据回来scan);默认值是 1,一次只取一条。
- Scan Attribute Selection:scan时建议指定需要的Column Family,减少通信量,否则scan操作默认会返回整个row的所有数据(所有Coulmn Family)。
- Close ResultScanners:通过scan取完数据后,记得要关闭ResultScanner,否则RegionServer可能会出现问题(对应的Server资源无法释放)。
- Optimal Loading of Row Keys:当你scan一张表的时候,返回结果只需要row key(不需要CF, qualifier,values,timestaps)时,你可以在scan实例中添加一个filterList,并设置 MUST_PASS_ALL操作,filterList中add FirstKeyOnlyFilter或KeyOnlyFilter。这样可以减少网络通信量。
- 启用Bloom Filter:Bloom Filter通过空间换时间,提高读操作性能。
- 批量Get : HBase提供了get(List)批量获取数据的接口,减少RPC交互,提高性能,但注意需要在客户端关闭blockCache,否则容易OOM,也会冲掉热点数据,这样blockCache则失去了意义。
Hive On HBase
NoSQL种类繁多,各有优势,HBase其中一个优势就是和HDFS的集成,想象下如果实时事务数据需要结合历史统计数据,则需要将Hive离线跑批的数据T+1日导入HBase,这个数据搬迁的过程,针对Redis\MongoDB\cassandra,则需要定制开发ETL工具。对于HBase,Hive提供了HBaseStorageHandle解决方案:
-- KV映射
CREATE TABLE helloWorld(key int, value string)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:val")
TBLPROPERTIES ("hbase.table.name" = "helloWorld");
-- 列簇映射
CREATE TABLE helloWorld(key int, cf1 map<string,string>, cf2 map<string, string>)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,cf1:")
TBLPROPERTIES ("hbase.table.name" = "helloWorld");
然后基于HQL插入数据即可。
Spark On HBase
某些业务场景下面,需要结合多个数据源的数据,比如redis和hbase的数据进行join。第一个念头则是Spark DataFrame或DataSet,DF支持多种数据源,同时还提供了SparkSQL语法用于统计分析。
- 针对Redis,参考https://github.com/RedisLabs/spark-redis 。
// 配置文件
val conf = new SparkConf()
.setAppName(jobName)
.setMaster("local[*]")
.set("redis.host", redisHost)
.set("redis.port", redisPort)
.set("redis.db", redisDB)
.set("redis.timeout", redisTimeOut)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
// RDD读取
var redisRDD = sc.fromRedisSet(groupID)
// 注册DF
val redisSchemaString = "USER_ID"
val redisSchema = StructType(redisSchemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = redisRDD.map(p => Row(p.toString.trim))
var redisDF = sqlContext.createDataFrame(rowRDD, redisSchema)
redisDF.registerTempTable(redisDFTableName)
- 针对HBase,常见的方案有newAPIHadoopRDD、nerdammer、hortonworks-spark/shc、unicredit/hbase-rdd。
- newAPIHadoopRDD:Spark官方提供的HDFS文件读写接口;
// 配置文件
val hconf = HBaseConfiguration.create()
hconf.set("hbase.zookeeper.property.clientPort", zkPort)
hconf.set("hbase.zookeeper.quorum", zkQuorum)
// RDD读取
hconf.set(TableInputFormat.INPUT_TABLE, tagTableName)
hconf.set(TableInputFormat.SCAN_COLUMNS, hbaseColumns)
val hbaseRDD = sc.newAPIHadoopRDD(hconf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])...
// 注册DF:注册之前需要将HBaseRDD转换成RowRDD,此处省略
val hbaseDF = sqlContext.createDataFrame(hbaseRDD, hbaseStruct)
hbaseDF.registerTempTable(hbaseDFTableName)
- SHC :hortonworks提供的Spark读写HBase的方案,具体Demo见Git,SHC只支持spark2.0+版本;
// Schema定义
def catalog =
s"""{
|"table":{"namespace":"default", "name":"tag"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"behavior", "col":"activate", "type":"string"}}
}""".stripMargin
// 读取HBase数据
def withCatalog(cat: String): DataFrame = {
sqlContext
.read
.options(Map(HBaseTableCatalog.tableCatalog -> cat))
.format("org.apache.spark.sql.execution.datasources.hbase")
.load()
}
// 注册DF
val df = withCatalog(catalog)
df.registerTempTable("userBehaviorTime")
- nerdammer : 写入操作必须是tunpleRDD,但Scala的tunple最大长度22,如果需要查询的HBase列数超过22,则无法使用该方案;
// nerdammer方案读取HBase数据:结果是tuple,长度有限制22.
val hbaseRDD = sc.hbaseTable[(Option[String], Option[String], Option[String], Option[String], Option[String],
Option[String], Option[String], Option[String], Option[String], Option[String], Option[String], Option[String],
Option[String])]("tag").select(columnStringArray: _*).inColumnFamily("behavior")
// DF只接收RowRDD
val hbaseRowRDD = hbaseRDD.map(
i => {
var record = new ArrayBuffer[String]()
i.productIterator.foreach { col => {
record += col.asInstanceOf[Option[String]].get
}
}
Row(record.toArray: _*)
})
// RowRDD转成DF
val hbaseDF = sqlContext.createDataFrame(hbaseRowRDD, hbaseSchema.struct)
hbaseDF.registerTempTable("userBehaviorTime")
unicredit太小众,坑太多,不建议使用;
Phoenix : 两大优势Sql On HBase,HBase二级索引实现。HBase神器,但无法关联Redis数据和HBase数据。
以上解决方案均是采用传统的JOIN方式,这样会带来一个问题就是HBase全表扫描,性能低下。解决思路是先将RedisRDD进行repartition操作,针对每个partition形成HBase的List[Get]对象,进行批量读取。但同样批量读取需要考虑内存激增的情况,所以需要结合上面的优化点使用。
Phoenix On HBase
Phoenix是构建在HBase上的一个SQL层,能让我们用标准的JDBC API而不是HBase客户端API来增删查改HBase数据。相对原生HBase接口,Phoenix提供了以下特性:
- JDBC API
- 性能优化
- 二级索引实现
- SQL引擎
- 事务
- UDF
- 统计信息收集
- 分页查询
- 散步表
- 跳跃扫描
- 视图
- 多租户
- 动态列
- 大量CSV数据加载
- 查询服务器
- 追踪
- 指标
有兴趣者查阅Phoenix官网。