原文
Authors: Trend Micro Hadoop Group: Mingjie Lai, Eugene Koontz, Andrew Purtell
HBase对MapReduce集成非常友好,可用于对其表中存储的数据进行分布式计算,但是在许多情况下,例如简单的加法或聚合操作(如求和,计数等),会将计算推向服务器,服务器可以在其中进行操作。与HBase已经良好的扫描性能相比,直接的数据传输无需通信开销,可以显着提高性能。
同样,在0.92之前,除了扩展基类,不可能用自定义功能扩展HBase。 由于Java缺乏多重继承,因此需要将扩展名和基本代码重构为一个类,以提供完整的实现,而在考虑多个扩展名时,它们很快就会变得脆弱。 谁继承谁? 协处理器允许使用更加灵活的mixin扩展模型。
在本文中,我将介绍HBase的新协处理器功能,它是灵活和通用扩展的框架,并且直接在HBase服务器进程中进行分布式计算。 我将讨论它是什么,它如何工作以及如何开发协处理器扩展。
HBase协处理器的思想是受到Google BigTable协处理器启发的。 杰夫·迪恩(Jeff Dean)在LADIS '09上发表了演讲(http://www.scribd.com/doc/21631448/Dean-Keynote-Ladis2009,第66-67页),并提到了协处理器的一些基本概念,谷歌开发了协处理器以带动计算 与BigTable并行。 它们具有以下特征:
任意代码可以在表服务器中的每个平板电脑上运行
-
客户端的高级呼叫界面
- 调用寻址到行或行范围,并且协处理器客户端库将其解析到实际位置;
- 跨多行的调用会自动拆分为多个并行化的RPC
为构建分布式服务提供了非常灵活的模型
自动扩展,负载平衡,应用程序请求路由
回到HBase,我们当然也希望支持Hadoop MapReduce所不能提供的高效计算并行性。 此外,可以在其之上构建令人兴奋的新功能,例如二级索引,复杂过滤(下推谓词)和访问控制。 HBase协处理器受BigTable协处理器的启发,但在实现细节上有所不同。 我们建立的框架提供了一个库和运行时环境,用于在HBase区域服务器和主进程中执行用户代码。 相比之下,Google协处理器与平板电脑服务器托管在一起,但位于其地址空间之外。 (HBase还考虑了在服务器进程外部部署协处理器代码的选项。请参见https://issues.apache.org/jira/browse/HBASE-4047。)
协处理器可以全局加载到区域服务器托管的所有表和区域上,这些系统被称为系统协处理器。 或者管理员可以为每个表指定应在表的所有区域上加载哪些协处理器,这些被称为表协处理器。
为了为潜在的协处理器行为提供足够的灵活性,框架提供了两个不同的扩展方面。 一个是观察者,类似于常规数据库中的触发器,另一个是端点,类似于存储过程的动态RPC端点。
观察者
观察者背后的想法是,我们可以通过覆盖协处理器框架提供的上行调用方法来插入用户代码。 发生某些事件时,将从核心HBase代码执行回调函数。 协处理器框架处理各种基础HBase活动期间调用回调的所有详细信息。 协处理器只需插入所需的附加或替代功能。
当前在HBase 0.92中,我们提供了三个观察器接口:
- RegionObserver:为数据操作事件,获取,放置,删除,扫描等提供挂钩。 每个表区域都有一个RegionObserver协处理器实例,它们可以进行的观察范围仅限于该区域。
- WALObserver:提供用于预写日志(WAL)相关操作的挂钩。 这是观察或拦截WAL写入和重建事件的方式。 WALObserver在WAL处理的上下文中运行。 每个区域服务器都有一个这样的上下文。
- MasterObserver:提供DDL类型操作的钩子,即创建,删除,修改表等。MasterObserver在HBase主服务器的上下文中运行。
可以在一个地方(区域,主机或WAL)加载多个观察者,以扩展功能。 它们被链接以按分配的优先级顺序顺序执行。 没有什么可以阻止协处理器实现者在其安装的观察者的上下文之间进行内部通信,从而全面介绍HBase函数。
优先级较高的协处理器可以通过抛出IOException(或其子类)来抢占优先级较低的处理器。 在下面的访问控制协处理器示例中,我们将使用这种抢占式功能。
如上所述,各种事件导致在加载的观察程序上调用观察程序方法。 从HBase版本0.92开始,事件和方法签名的集合在HBase API中提供。 请注意,由于HBase客户端API更改或其他原因,将来可能会更改API。 我们已尝试在0.92版之前稳定该API,但不能保证。)
RegionObserver接口为以下项提供回调:
- preOpen,postOpen:在向主服务器报告该区域在线之前和之后调用。
- preFlush,postFlush:在将内存存储区刷新到新的存储文件之前和之后调用。
- preGet,postGet:在客户端发出Get请求之前和之后调用。
- preExists,postExists:在客户端使用Get测试存在性之前和之后调用。
- prePut和postPut:在客户端存储值之前和之后调用。
- preDelete和postDelete:在客户端删除值之前和之后调用。
- 等等
请参考HBase 0.92 javadoc以获取已声明方法的完整列表。
我们提供了一个方便的抽象类BaseRegionObserver,该类以默认行为实现了所有RegionObserver方法,因此您可以专注于感兴趣的事件,而不必担心所有事件的进程调用。
这是一个序列图,显示RegionObservers如何与其他HBase组件一起工作:
下面是使用RegionObserver接口挂接到HBase函数的扩展示例。 这是对简单访问控制的补充说明。 该协处理器通过在某些RegionObserver preXXX钩上注入代码来检查给定客户端请求的用户信息。 如果不允许用户访问资源,则将引发AccessDeniedException。 如上所述,高优先级协处理器(例如访问控制协处理器)的异常可用于抢先采取进一步的措施。 在这种情况下,AccessDeniedException意味着将不处理客户端的请求,并且客户端将收到异常信息以指示发生了什么。
package org.apache.hadoop.hbase.coprocessor;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
// Sample access-control coprocessor. It utilizes RegionObserver
// and intercept preXXX() method to check user privilege for the given table
// and column family.
public class AccessControlCoprocessor extends BaseRegionObserver {
@Override
public void preGet(final ObserverContext<RegionCoprocessorEnvironment> c,
final Get get, final List<KeyValue> result) throws IOException
throws IOException {
// check permissions..
if (!permissionGranted()) {
throw new AccessDeniedException("User is not allowed to access.");
}
}
// override prePut(), preDelete(), etc.
}
MasterObserver接口为以下项提供上行调用:
- preCreateTable / postCreateTable:在向主服务器报告该区域在线之前和之后调用。
- preDeleteTable / postDeleteTable
- 等等
WALObserver提供以下方面的上行呼叫:
- preWALWrite / postWALWrite:在写入WAL的WALEdit之前和之后调用。
请参阅HBase Javadoc以获取观察者接口声明的完整列表。
端点
如前所述,可以将观察者视为数据库触发器。 另一方面,端点更强大,类似于存储过程。 可以随时从客户端调用端点。 然后,将在目标区域或多个区域远程执行端点实现,并将这些执行的结果返回给客户端。
端点是动态RPC扩展的接口。 端点实现安装在服务器端,然后可以用HBase RPC调用。 客户端库提供了调用此类动态接口的便捷方法。
同样如上所述,没有什么可以阻止端点的实现与任何观察者实现进行通信。 结合使用这些扩展曲面,您可以向HBase添加全新功能,而无需修改或重新编译HBase本身。 这可能非常强大。
为了构建和使用自己的端点,您需要:
- 具有扩展了CoprocessorProtocol的新协议接口。
- 实现端点接口。 该实现将被加载到区域上下文中并从中执行。
- 扩展抽象类BaseEndpointCoprocessor。 此便利类隐藏了实现者无需担心的一些内部细节,例如协处理器框架类的加载。
- 在客户端,可以通过两个新的HBase客户端API调用端点:
- 针对单个区域执行:
HTableInterface.coprocessorProxy(Class<T> protocol, byte[] row)
- 在一系列区域执行
HTableInterface.coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T,R> callable)
这是显示端点如何工作的示例。
在此示例中,终结点计算机将扫描区域中的给定列并聚合期望被序列化的Long值的值,然后将结果返回给客户端。 客户端收集从各个区域的远程端点调用返回的部分聚合,并对结果求和以得出整个表的最终答案。 注意,HBase客户端负责将并行端点调用分派到目标区域,并负责收集返回的结果以呈现给应用程序代码。 这就像一个轻量级的MapReduce作业:“ map”是在每个目标区域上的区域服务器中执行的端点执行,而“ reduce”是客户端上的最终聚合。 同时,服务器端和客户端库中的协处理器框架就像MapReduce框架一样,将繁琐的分布式系统编程细节移到了干净的API之后,因此程序员可以专注于应用程序。
还请注意,HBase和所有Hadoop当前都需要Java 6,该Java对匿名类具有冗长的语法。 随着HBase(和Hadoop)随着Java 7语言功能的引入而发展,我们期望客户端端点代码的冗长程度可以大大降低。
// A sample protocol for performing aggregation at regions.
public static interface ColumnAggregationProtocol
extends CoprocessorProtocol {
// Perform aggregation for a given column at the region. The aggregation
// will include all the rows inside the region. It can be extended to
// allow passing start and end rows for a fine-grained aggregation.
public long sum(byte[] family, byte[] qualifier) throwsIOException;
}
// Aggregation implementation at a region.
public static class ColumnAggregationEndpoint extends BaseEndpointCoprocessor
implements ColumnAggregationProtocol {
@Override
public long sum(byte[] family, byte[] qualifier)
throws IOException {
// aggregate at each region
Scan scan = new Scan();
scan.addColumn(family, qualifier);
long sumResult = 0;
InternalScanner scanner = getEnvironment().getRegion().getScanner(scan);
try {
List<KeyValue> curVals = new ArrayList<KeyValue>();
boolean hasMore = false;
do {
curVals.clear();
hasMore = scanner.next(curVals);
KeyValue kv = curVals.get(0);
sumResult += Bytes.toLong(kv.getValue());
} while (hasMore);
} finally {
scanner.close();
}
return sumResult;
}
}
客户端调用通过HTableInterface上的新方法执行:
public <T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, Row row);
public <T extends CoprocessorProtocol, R> void coprocessorExec(
Class<T> protocol, List<? extends Row> rows,
BatchCall<T,R> callable, BatchCallback<R> callback);
public <T extends CoprocessorProtocol, R> voidcoprocessorExec(
Class<T> protocol, RowRange range,
BatchCall<T,R> callable, BatchCallback<R> callback);
这是调用ColumnAggregationEndpoint的客户端示例:
HTableInterface table = new HTable(util.getConfiguration(), TEST_TABLE);
Scan scan;
Map<byte[], Long> results;
// scan: for all regions
scan = new Scan();
results = table.coprocessorExec(ColumnAggregationProtocol.class, scan,
new BatchCall<ColumnAggregationProtocol,Long>() {
public Integer call(ColumnAggregationProtocol instance)throws IOException{
return instance.sum(TEST_FAMILY, TEST_QUALIFIER);
}
});
long sumResult = 0;
long expectedResult = 0;
for (Map.Entry<byte[], Long> e : results.entrySet()) {
sumResult += e.getValue();
}
上面的示例实际上是HBASE-1512的简化版本。 您可以参考JIRA或org.apache.hadoop.hbase.coprocessor.AggregateImplementation的HBase源代码以获取更多详细信息。
以下是此示例的动态RPC调用的可视化。 应用程序代码客户端执行批处理调用。 这将在每个目标表区域上启动已注册动态协议的并行RPC调用。 这些调用的结果将在可用时返回。 客户端库代表应用程序管理此并行通信,混乱的详细信息(例如重试和错误),直到返回所有结果(或在发生不可恢复的错误时)。 然后,客户端库将响应汇总到Map中并将其移交给应用程序。 如果发生不可恢复的错误,则将引发异常,然后以供应用程序代码捕获并采取措施。
Coprocessor 管理
了解协处理器在HBase中的工作原理后,您可以开始构建自己的实验协处理器,将其部署到HBase集群中,并观察新行为。
构建自己的处理器
现在,我们假设您已经准备好协处理器代码,将其编译并打包为jar文件。 在以下各节中,您将看到如何配置协处理器框架来加载协处理器。
(我们应该有一个模板协处理器,可以帮助用户快速开始开发。目前有一些内置协处理器可以作为示例和实现新协处理器的起点。但是,它们分散在代码库中。如前所述 在HBASE-5273中,在HBase源代码的src / example / coprocessor下将提供一些协处理器样本。
协处理器部署
当前,我们提供了两个用于部署协处理器扩展的选项:配置加载(在主服务器或区域服务器启动时发生); 或从表格属性加载,在(重新)打开表格时动态加载。 由于大多数用户将通过HBase Shell的“ alter”命令设置表格属性,因此我们从外壳调用此加载。
从配置加载
当打开区域时,框架尝试读取作为配置条目提供的协处理器类名称:
- hbase.coprocessor.region.classes:用于RegionObservers和端点
- hbase.coprocessor.master.classes:用于MasterObservers
- hbase.coprocessor.wal.classes:用于WALObservers
Hers是hbase-site.xml的示例,其中为所有HBase表配置了一个RegionObserver:
<property>
<name>hbase.coprocessor.region.classes</name>
<value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
</property>
如果指定了多个要加载的类,则这些类名必须用逗号分隔。 然后,框架将尝试使用默认的类加载器来加载所有配置的类。 这意味着jar文件必须驻留在服务器端HBase类路径上。
如果以这种方式加载,则协处理器将在所有表的所有区域上处于活动状态。 这是前面介绍的系统协处理器。 列出的第一个协处理器将被分配优先级Coprocessor.Priority.SYSTEM。 列表中的每个后续协处理器将其优先级值加一(这将降低其优先级,优先级具有整数的自然排序顺序)。
我们还没有真正讨论过优先级,但是应该合理清楚地给予协处理器优先级如何影响它与其他协处理器的集成。 当调用注册的观察者时,框架将按照其优先级的排序顺序执行其回调方法。 领带被任意打破。
从shell加载
协处理器也可以配置为通过shell命令“ alter” +“ table_att”按表加载。
hbase(main):005:0> alter 't1', METHOD => 'table_att',
'coprocessor'=>'hdfs:///foo.jar|com.foo.FooRegionObserver|1001|arg1=1,arg2=2'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 1.0730 seconds
hbase(main):006:0> describe 't1'
DESCRIPTION ENABLED
{NAME => 't1', coprocessor$1 => 'hdfs:///foo.jar|com.foo.FooRegio false
nObserver|1001|arg1=1,arg2=2', FAMILIES => [{NAME => 'c1', DATA_B
LOCK_ENCODING => 'NONE', BLOOMFILTER => 'NONE', REPLICATION_SCOPE
=> '0', VERSIONS => '3', COMPRESSION => 'NONE', MIN_VERSIONS =>
'0', TTL => '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZ
E => '65536', IN_MEMORY => 'false', ENCODE_ON_DISK => 'true', BLO
CKCACHE => 'true'}, {NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE',
BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSIONS => '3'
, COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '2147483647'
, KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY
=> 'false', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true'}]}
1 row(s) in 0.0190 seconds
协处理器框架将尝试从协处理器表属性值中读取类信息。 该值包含四条信息,并用“ |”隔开:
- 文件路径:包含协处理器实现的jar文件必须位于所有区域服务器均可读取的位置。 可以将文件复制到所有区域服务器的本地磁盘上的某个位置,但是我们建议将文件存储到HDFS中。 如果没有给出文件路径,则框架将尝试使用默认的类加载器从服务器类路径中加载类。
- 类名:协处理器的完整类名。
- 优先级:整数。 框架将使用优先级确定在同一钩子上注册的所有已配置观察者的执行顺序。 该字段可以留为空白。 在这种情况下,框架将分配默认优先级值。
- 参数:此字段传递给协处理器实现。
您还可以通过alter''+
table_att_unset''命令在shell上删除已加载的协处理器:
hbase(main):007:0> alter 't1', METHOD => 'table_att_unset',
hbase(main):008:0* NAME => 'coprocessor$1'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 1.1130 seconds
hbase(main):009:0> describe 't1'
DESCRIPTION ENABLED
{NAME => 't1', FAMILIES => [{NAME => 'c1', DATA_BLOCK_ENCODING => false
'NONE', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSION
S => '3', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '214
7483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE => '65536', IN
_MEMORY => 'false', ENCODE_ON_DISK => 'true', BLOCKCACHE => 'true
'}, {NAME => 'f1', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER =>
'NONE', REPLICATION_SCOPE => '0', VERSIONS => '3', COMPRESSION =>
'NONE', MIN_VERSIONS => '0', TTL => '2147483647', KEEP_DELETED_C
ELLS => 'false', BLOCKSIZE => '65536', IN_MEMORY => 'false', ENCO
DE_ON_DISK => 'true', BLOCKCACHE => 'true'}]}
1 row(s) in 0.0180 seconds
(注意:这是0.92中的当前行为。已经进行了一些讨论,其中讨论了如何使用Shell的“ alter”命令重构协处理器和其他功能的表属性。有关更多详细信息,请参见HBASE-4879。)
在本节中,我们讨论了如何告诉HBase期望加载哪些协处理器,但是不能保证框架会成功加载它们。 例如,shell命令既不能保证jar文件存在于特定位置,也不能验证给定的类是否实际包含在jar文件中。
HBase shell协处理器的状态
配置协处理器后,还需要使用外壳或主服务器和区域服务器Web UI检查协处理器状态,以确定协处理器是否已成功加载。
hbase(main):018:0> alter 't1', METHOD => 'table_att',
'coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation|1001|arg1=1,arg2=2'
Updating all regions with the new schema...
1/1 regions updated.
Done.
0 row(s) in 1.1060 seconds
hbase(main):019:0> enable 't1'
0 row(s) in 2.0620 seconds
hbase(main):020:0> status 'detailed'
version 0.92-tm-6
0 regionsInTransition
master coprocessors: []
1 live servers
localhost:52761 1328082515520
requestsPerSecond=3, numberOfOnlineRegions=3, usedHeapMB=32, maxHeapMB=995
-ROOT-,,0
numberOfStores=1, numberOfStorefiles=1, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0,
storefileIndexSizeMB=0, readRequestsCount=54, writeRequestsCount=1, rootIndexSizeKB=0, totalStaticIndexSizeKB=0,
totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, coprocessors=[]
.META.,,1
numberOfStores=1, numberOfStorefiles=0, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0,
storefileIndexSizeMB=0, readRequestsCount=97, writeRequestsCount=4, rootIndexSizeKB=0, totalStaticIndexSizeKB=0,
totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN, coprocessors=[]
t1,,1328082575190.c0491168a27620ffe653ec6c04c9b4d1.
numberOfStores=2, numberOfStorefiles=1, storefileUncompressedSizeMB=0, storefileSizeMB=0, memstoreSizeMB=0,
storefileIndexSizeMB=0, readRequestsCount=0, writeRequestsCount=0, rootIndexSizeKB=0, totalStaticIndexSizeKB=0,
totalStaticBloomSizeKB=0, totalCompactingKVs=0, currentCompactedKVs=0, compactionProgressPct=NaN,
coprocessors=[AggregateImplementation]
0 dead servers
如果找不到加载的协处理器,则需要检查服务器日志文件以发现加载失败的原因。
目前的状态
有几个JIRA已开放用于协处理器开发。 HBASE-2000充当协处理器开发的保护伞。 HBASE-2001涵盖了协处理器框架的开发。 HBASE-2002涵盖了端点的RPC扩展。 解决这些问题所产生的代码已在2010年提交给HBase干线,并且从0.92.0版本开始可用。
在协处理器之上开发了一些新功能:
- HBase访问控制(HBASE-3025,HBASE-3045):这是针对HBase的实验性基本访问控制,在0.92.0中可用,但不太可能在0.92.1之前完全起作用。
- 列聚合(HBASE-1512):为列上的类似SQL的sum(),avg(),max(),min()等函数提供实验性支持。 在0.92.0中可用。
- 约束(HBASE-4605):一种对数据属性的域进行简单限制的机制。 有关更多信息,请阅读Constraints软件包摘要:http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/constraint/package-summary.html。 它在后备箱中,将在0.94版本中可用。
未来的工作
协处理器是从0.92.0版本开始提供的全新功能。 当他们打开许多门时,还有很大的改进空间。 随着时间的推移,可能会添加更多的钩子和接口以支持更多的用例,例如:
并行计算框架
通过端点,我们有一种新的动态方式将用户代码注入到各个表区域中的动作处理中,并且在相应的客户端支持下,我们可以并行查询所有对象并将结果灵活地返回给客户端。 这对于在HBase上构建批处理数据和聚合非常有用。 但是,您需要了解一些内部HBase详细信息才能开发此类应用程序。
已经开放了各种JIRA,这些JIRA考虑公开用于并行计算的其他框架,这些框架可以提供方便但强大的更高级别的抽象。 正在考虑的选项包括与Hadoop提供的API类似的MapReduce API。 支持scriptlet,即Ruby脚本片段发送到服务器端以执行工作; 或类似于或直接支持服务器上的Cascading框架(http://cascading.org)的内容,以便更抽象地处理数据流。
但是,据我所知,目前没有一个在建设中。
外部协处理器主机(HBASE-4047)
HBase协处理器与Google BigTable协处理器的设计大不相同的地方是我们将其重新构想为内部扩展的框架。 相反,BigTable协处理器作为与平板电脑服务器并置的独立进程运行。 必须在性能,灵活性和可能性之间进行权衡。 以及控制和强制使用资源的能力。
我们正在考虑开发一个协处理器,该协处理器是另一个协处理器的通用主机。 主机将进程内安装到主服务器或区域服务器中,但是用户代码将被加载到分支的子进程中。 父级和子级之间的双向管道上的事件模型和脐带协议将为加载到子级中的用户代码提供与内部加载到父级中相同的语义。 但是,我们立即将用户代码与HBase内部隔离开来,并最终期望使用平台上可用的资源管理功能,以进一步限制系统管理员或应用程序设计人员所希望的子资源消耗。
代码的编程(HBASE-2058)
目前,协处理器可以采取什么行动没有任何限制。 我们不能防止协处理器意外引入的恶意行为或错误。 作为外部协处理器主机的替代方法,我们可以在协处理器加载时引入代码编织和代码转换。 我们将编织一组可配置的策略,以约束协处理器可以采取的动作。 例如:
- 通过各种静态检查改善故障隔离和系统完整性保护
- 包装堆分配以实施限制
- 通过注入到方法和循环头中的工具监视CPU时间
- 拒绝对不安全的API的静态或动态方法调用
更多...
协处理器框架提供了扩展HBase的可能性。 还有更多可以在协处理器之上构建的已识别应用程序:
- HBase隔离和分配(HBase-4120)
- 二级索引:http://wiki.apache.org/hadoop/Hbase/SecondaryIndexing
- 在HBase中搜索(HBASE-3529)
- HBase表,区域访问统计信息。
- 和更多 ...