如何高效的使用foreachRDD

对于foreachRDD的正确理解,请参考对DStream.foreachRDD的理解
在spark streaming的官方文档中也有对foreachRDD的说明,请参见Design Patterns for using foreachRDD

基于数据的连接

在实际的应用中经常会使用foreachRDD将数据存储到外部数据源,那么就会涉及到创建和外部数据源的连接问题,最常见的错误写法就是为每条数据都建立连接

dstream.foreachRDD { rdd =>
  val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/tutorials", "root", "root")  // executed at the driver
  rdd.foreach { record =>
    connection.send(record) // executed at the worker
  }
}

缺点:要为每行数据进行创建连接操作,非常的低效。

基于partition的连接

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/tutorials", "root", "root")
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

缺点:这种方式虽然可以一定程度的缓解外部数据源的压力,但是如果partition数量过多,也会导致连接数过多。

基于静态连接

在上面案例的基础上,可以通过静态对象的方式,创建一个静态单例,每个JVM中只有一个连接对象

object Client {
  val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/tutorials", "root", "root")
  def apply(): Connection = conn
}

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = Client()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

缺点:这样写的问题在于无论是否有数据执行了查询都会创建连接

基于lazy的静态连接

可以对上面的稍加改动就可以实现只有在真正使用的时候才创建连接

object Client {
  lazy val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/tutorials", "root", "root")
  def apply(): Connection = conn
}

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = Client()
    partitionOfRecords.foreach(record => connection.send(record))
  }
}

缺点:这种方式一个executor上的task都依赖于同一个连接对象,有可能会造成性能的瓶颈,所以需要一个终极的解决方案。

基于lazy的静态连接池

在官方的样例中也提到创建连接的时候需要ConnectionPool is a static, lazily initialized pool of connections

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

ConnectionPool可以借助org.apache.commons.pool2框架实现,请参考使用commons.pool2实现mysql连接池
下面是简单的一种实现方法

object ConnectionPool {
  private val pool = new GenericObjectPool[Connection](new MysqlConnectionFactory("jdbc:mysql://103.235.245.156:3306/tutorials", "root", "root", "com.mysql.jdbc.Driver"))
  def getConnection(): Connection ={
    pool.borrowObject()
  }

  def returnConnection(conn: Connection): Unit ={
    pool.returnObject(conn)
  }
}

class MysqlConnectionFactory(url: String, userName: String, password: String, className: String) extends BasePooledObjectFactory[Connection]{
  override def create(): Connection = {
    Class.forName(className)
    DriverManager.getConnection(url, userName, password)
  }

  override def wrap(conn: Connection): PooledObject[Connection] = new DefaultPooledObject[Connection](conn)

  override def validateObject(pObj: PooledObject[Connection]) = !pObj.getObject.isClosed

  override def destroyObject(pObj: PooledObject[Connection]) =  pObj.getObject.close()
}

这样官方的样例就可以改造为

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    lazy val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,933评论 18 139
  • Spring Boot 参考指南 介绍 转载自:https://www.gitbook.com/book/qbgb...
    毛宇鹏阅读 46,954评论 6 342
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,765评论 18 399
  • 狗在楼下吠 蚊子在叫嚣 思绪像个小河流 往全身各处流 挠一挠头 却把思绪搞丢 想从头捋过 却不知情从何处而起 竟勾...
    Rachel曹阅读 138评论 0 0
  • 这事得从那根电线杆子说起。 那根电线杆子,由哪年代的哪几个电工竖起,己无从考证。反正我们见到它的时候,它就灰不溜秋...
    衣袂阅读 359评论 3 0