昨天把项目代码重新整合了一下, 然后就报错了:
ERROR StreamingContext: Error starting the context, marking it as stopped
java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
com.gac.xs6.core.impl.EnAdapterImpl$
java.io.NotSerializableException: Object of org.apache.spark.streaming.kafka010.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects.
19/06/05 10:03:05 org.apache.spark.internal.Logging$class.logError(Logging.scala:91) ERROR Utils: Exception encountered
java.io.NotSerializableException: Object of org.apache.spark.streaming.dstream.DStreamCheckpointData is being serialized possibly as a part of closure of an RDD operation. This is because the DStream object is being referred to from within the closure. Please rewrite the RDD operation inside this DStream to avoid this. This has been enforced to avoid bloating of Spark tasks with unnecessary objects.
原因可能是 mysql 数据库配置的序列化问题:
@Singleton
class MysqlConfiguration extends Serializable {
private val config: Config = ConfigFactory.load()
lazy val mysqlConf: Config = config.getConfig("mysql")
lazy val mysqlJdbcUrl: String = mysqlConf.getString("jdbcUrl")
lazy val mysqlJdbcDriver: String = mysqlConf.getString("jdbcDriver")
lazy val dataSourceUser: String = mysqlConf.getString("dataSource.user")
lazy val dataSourcePassword: String = mysqlConf.getString("dataSource.password")
lazy val dataSourceSize: Int = mysqlConf.getInt("dataSource.size")
}
但是我已经这么序列化了。
@Singleton
class NationApplication @Inject()(sparkConf: SparkConfiguration,
kafkaConf: KafkaConfiguration,
hbaseConf: HbaseConfiguration,
sparkContext: NaSparkContext[SparkContext],
mysqlConf: MysqlConfiguration,
source: NaDStreamSource[Option[(String, String)]],
adapter: SourceDecode,
sanitizer: DataSanitizer,
eventsExact: EventsExact,
eventsCheck: EventsCheck,
anomalyExact: AnomalyExact,
anomalyDetection: AnomalyDetection
) extends Serializable with Logging {
我在类里面添加了 MySQL 配置。然后在 eventStream.foreachRDD 里面引入
了 mysqlConf:
def saveAnomaly(eventStream: DStream[(String, EventUpdate)]): Unit = {
eventStream.foreachRDD((x: RDD[(String, EventUpdate)]) => {
if (!x.isEmpty()) {
x.foreachPartition { res: Iterator[(String, EventUpdate)] => {
// val mysqlConf = new MysqlConfiguration
val c = new GacJdbcUtil(mysqlConf) // 获取 MySQL 数据库配置
...
结论:
在 foreachRDD 里面,使用 new 方法创建一个新的连接, 不使用类传递过来的配置