速记:
1. 场景&异常:
Spark 读取 Hbase 报错,代码如下:
val conf = new SparkConf().setAppName("SparkHistoryTags").setMaster("local")
val sc = new SparkContext(conf)
// 获取HbaseRDD
val hbaseRDD = sc.newAPIHadoopRDD(getHbaseConf(), classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]).repartition(partition)
// 返回rowkeyRDD
val rowRdd = hbaseRDD.map(_._2).map(getRes)
println(rowRdd.count())
rowRdd.foreach(println)
异常如下:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 62 62 73 5f 30 32 38 37 31 36 32 33)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (62 62 73 5f 30 32 38 37 31 36 32 33,keyvalues={bbs_02871623/f:related_brand/1519893110531/Put/vlen=0/seqid=0, bbs_02871623/f:related_car/1519893110531/Put/vlen=0/seqid=0, bbs_02871623/f:related_mfrs/1519893110531/Put/vlen=0/seqid=0, bbs_02871623/f:tag/1519893110531/Put/vlen=0/seqid=0}))
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1843)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1856)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1869)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1940)
at org.apache.spark.rdd.RDD.count(RDD.scala:1157)
at SparkHistoryTags$.main(SparkHistoryTags.scala:31)
at SparkHistoryTags.main(SparkHistoryTags.scala)
2. 解决:
序列化问题,代码中加入以下行即可。如下:
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
或者在conf中添加配置,如下:
val conf = new SparkConf().setAppName("SparkHistoryTags").setMaster("local")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
修改后代码如下:
System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val conf = new SparkConf().setAppName("SparkHistoryTags").setMaster("local")
// val conf = new SparkConf().setAppName("SparkHistoryTags").setMaster("local")
// .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
// 获取HbaseRDD
val hbaseRDD = sc.newAPIHadoopRDD(getHbaseConf(), classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]).repartition(partition)
// 返回rowkeyRDD
val rowRdd = hbaseRDD.map(_._2).map(getRes)
println(rowRdd.count())
rowRdd.foreach(println)
参考文章
1. Spark学习-SparkSQL--06-spark读取HBase数据报异常java.io.NotSerializableException