Spark 之 Redis on spark java.io.NotSerializableException

在使用spark streaming 处理数据后,将数据存入redis中,但是出现了为序列化问题,如下图所示:
org.apache.spark.SparkException: Task not serializable

原代码如下:

val sc = new SparkContext(conf)
//.....
//.....
//.....
//建立一个Redis连接
val jedis: Jedis = JodisClient.getPool.getResource
//将record的数据插入数据库
rowRdd.map(
   jedis.set("","")
})

Spark架构原理图如下:

Spark 架构图

原因剖析:

  1. 上面连接Jedis的代码是在Driver中运行的,也就是建立SparkContext的地方
  2. 对RDD的操作是分布式运行在Executor的,不在Driver中运行
  3. 因此在map中的引用jedis,spark会做两件事情:序列化,和分发到各台机器上。

所以问题就在这里,redis的TCP连接已经绑定在Driver Program机上了,是无法分发到各个节点执行的,因此出现问题的根源就在这里。

正确的做法是在map函数中进行数据库的连接,但是我们要用mappartition来代替map,因为map会对每一个record进行连接数据库,而mappartition仅仅是对每一个partition建立一个连接,然后用iterator进行迭代复用。代码如下:

val sc = new SparkContext(conf)
//.....
//.....
//.....
//将record的数据插入数据库
rowRdd.mapPartitions(iter=>{
      while(iter.hasNext){
      //建立一个Redis连接
        val jedis: Jedis = JodisClient.getPool.getResource
        jedis.set("","")
      }
    })

这样的话就不会出现未序列化的问题了!

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容