Spark PairRDDFunctions提供了两个API函数saveAsHadoopDataset和saveAsNewAPIHadoopDataset,将RDD输出到Hadoop支持的存储系统中。
方法说明
def saveAsHadoopDataset(conf: JobConf): Unit
Output the RDD to any Hadoop-supported storage system,
using a Hadoop JobConf object for that storage system.
The JobConf should set an OutputFormat and any output
paths required (e.g. a table name to write to) in the
same way as it would be configured for a Hadoop MapReduce job.
def saveAsNewAPIHadoopDataset(conf: Configuration): Unit
Output the RDD to any Hadoop-supported storage system with new Hadoop API,
using a Hadoop Configuration object for that storage system.
The Conf should set an OutputFormat and any output paths required
(e.g. a table name to write to) in the same way as it would be
configured for a Hadoop MapReduce job.
举例
package com.welab.wetag.mine
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{ Put, Durability }
import org.apache.hadoop.hbase.util.Bytes
object SaveHbase {
//记录转换代码
def convert(triple: (Int, String, Int)) = {
val (key, name, age) = triple
val p = new Put(Bytes.toBytes(key))
p.setDurability(Durability.SKIP_WAL)
p.addColumn(Bytes.toBytes("attr"), Bytes.toBytes("name"), Bytes.toBytes(name))
p.addColumn(Bytes.toBytes("attr"), Bytes.toBytes("age"), Bytes.toBytes(age))
(new ImmutableBytesWritable, p)
}
def main(args: Array[String]) {
val appName: String = this.getClass.getSimpleName.split("\\$").last
val sc = new SparkContext(new SparkConf().setAppName(appName))
//定义HBase的配置,保证wetag已经创建
val conf = HBaseConfiguration.create()
conf.set("hbase.zookeeper.property.clientPort", "2181")
conf.set("hbase.zookeeper.quorum", "sparkmaster1")
conf.set(TableOutputFormat.OUTPUT_TABLE, "wetag")
val job = Job.getInstance(conf)
job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
val rawData = Array((1, "Jazz", 14), (2, "Andy", 18), (3, "Vincent", 38))
/* 业务逻辑,放在map里面实现
val xdata = sc.parallelize(rawData).map {
case (key, name, age) => {
val p = new Put(Bytes.toBytes(key))
p.setDurability(Durability.SKIP_WAL)
p.addColumn(Bytes.toBytes("attr"), Bytes.toBytes("name"), Bytes.toBytes(name))
p.addColumn(Bytes.toBytes("attr"), Bytes.toBytes("age"), Bytes.toBytes(age))
(new ImmutableBytesWritable, p)
}
}
*/
val xdata = sc.parallelize(rawData).map(convert)
xdata.saveAsNewAPIHadoopDataset(job.getConfiguration)
}
}
以上代码编译成jar,在spark-submit提交执行