/**
* A simple abstraction over the HBaseContext.foreachPartition method.
*
* It allow addition support for a user to take RDD
* and generate puts and send them to HBase.
* The complexity of managing the HConnection is
* removed from the developer
*
* @param rdd Original RDD with data to iterate over
* @param tableName The name of the table to put into
* @param f Function to convert a value in the RDD to a HBase Put
* @param autoFlush If autoFlush should be turned on
*/
def bulkPut[T](rdd: RDD[T], tableName: String, f: (T) => Put, autoFlush: Boolean) {
rdd.foreachPartition(
it => hbaseForeachPartition[T](
broadcastedConf,
it,
(iterator, hConnection) => {
val htable = hConnection.getTable(tableName)
htable.setAutoFlush(autoFlush, true)
iterator.foreach(T => htable.put(f(T)))
htable.flushCommits()
htable.close()
}))
}
rdd执行foreachPartition方法,传入 partition的迭代器然后传入的方式有三个参数,
一个broadcastedConf配置项,一个迭代器 it,还有一个入参是(it,hConnection)的tuple,然后看下
hbaseForeachPartition
/**
* Under lining wrapper all foreach functions in HBaseContext
*
*/
private def hbaseForeachPartition[T](
configBroadcast: Broadcast[SerializableWritable[Configuration]],
it: Iterator[T],
f: (Iterator[T], HConnection) => Unit) = {
val config = getConf(configBroadcast)
applyCreds(configBroadcast)
// specify that this is a proxy user
val hConnection = HConnectionManager.createConnection(config)
f(it, hConnection)
hConnection.close()
}
首先从configBroadcast中获取配置,然后根据配置获得hbase连接,hConnection,然后执行传入的方法f,最后关闭连接。好简单