在使用spark streaming 处理数据后,将数据存入redis中,但是出现了为序列化问题,如下图所示:
org.apache.spark.SparkException: Task not serializable
原代码如下:
val sc = new SparkContext(conf)
//.....
//.....
//.....
//建立一个Redis连接
val jedis: Jedis = JodisClient.getPool.getResource
//将record的数据插入数据库
rowRdd.map(
jedis.set("","")
})
Spark架构原理图如下:
原因剖析:
- 上面连接Jedis的代码是在Driver中运行的,也就是建立SparkContext的地方
- 对RDD的操作是分布式运行在Executor的,不在Driver中运行
- 因此在map中的引用jedis,spark会做两件事情:序列化,和分发到各台机器上。
所以问题就在这里,redis的TCP连接已经绑定在Driver Program机上了,是无法分发到各个节点执行的,因此出现问题的根源就在这里。
正确的做法是在map函数中进行数据库的连接,但是我们要用mappartition来代替map,因为map会对每一个record进行连接数据库,而mappartition仅仅是对每一个partition建立一个连接,然后用iterator进行迭代复用。代码如下:
val sc = new SparkContext(conf)
//.....
//.....
//.....
//将record的数据插入数据库
rowRdd.mapPartitions(iter=>{
while(iter.hasNext){
//建立一个Redis连接
val jedis: Jedis = JodisClient.getPool.getResource
jedis.set("","")
}
})
这样的话就不会出现未序列化的问题了!