当程序运行到Spark执行计划时,可能会遇到我似类于以下的问题,如果从错误信息来看,是不是有点费解呢?
问题现象:
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:547)
at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:547)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:701)
at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:265)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:546)
Caused by: java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects.
Serialization stack:
- object not serializable (class: org.apache.spark.streaming.StreamingContext, value: org.apache.spark.streaming.StreamingContext@2d28fb02)
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337)
... 14 more
原因分析:
不过这是一个很简单的问题,就是因为Spark序列化时,遇到了一个关于闭包的问题,因为Spark对象的序列化有一个先后顺序,如果一个对象已经序列化了,那后面就不会再对它进行二次序列化。因此,当我们的程序中没有直接使用一个对象,于后面通过其它方法使用了时,就会出现上面的问题。
错误示例:
object SparkStreamingTest{
/**
* 程序主函数
*
* @param args 参数
*/
def main(args: Array[String]): Unit = {
val sparkJob =new SparkStreamJob()
sparkJob.setTransform()
sparkJob.execute()
}
}
class SparkStreamJob
{
var groupId:String="test"
var jobCode:String = "test"
private def setMap(ds: DStream[ObjectNode]): DStream[ObjectNode] = {
ds.map(x => {
x.put("jobCode", jobCode)
})
}
def setTransform():Unit={
val kafkaParams = scala.collection.mutable.Map[String, Object](
"bootstrap.servers" -> "127.0.0.1:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "test",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
val streamingContext = new StreamingContext(getSparkConf, Seconds(30))
val kafkaStreams: DStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topics, kafkaParams))
setMap(kafkaStreams.map(x=>s"groupId[$groupId]:${x._2}")).print()
}
}
看似没有问题,实则运行报错。解决方案很简单,如下:
正确示例:
class SparkStreamJob
{
var groupId:String="test"
var jobCode:String = "test"
private def setMap(ds: DStream[ObjectNode]): DStream[ObjectNode] = {
val jobCode = this.jobCode //将类中的属性变为闭包内的对象
ds.map(x => {
x.put("jobCode", jobCode)
})
}
def setTransform():Unit={
val kafkaParams = scala.collection.mutable.Map[String, Object](
"bootstrap.servers" -> "127.0.0.1:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "test",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
)
val streamingContext = new StreamingContext(getSparkConf, Seconds(30))
val kafkaStreams: DStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topics, kafkaParams))
setMap(kafkaStreams.map(x=>s"groupId[$groupId]:${x._2}")).print()
}
}