重写JdbcRDD实现条件查询Mysql数据库

虽然Spark使用关系型数据库作为数据源的场景并不多,但是有时候我们还是希望能够能够从MySql等数据库中读取数据,并封装成RDD。Spark官方确实也提供了这么一个库给我们,org.apache.spark.rdd.JdbcRDD。但是这个库使用起来让人觉得很鸡肋,因为它不支持条件查询,只支持起止边界查询,这大大限定了它的使用场景。很多时候我们需要分析的数据不可能单独建一个表,它们往往被混杂在一个大的表中,我们会希望更加精确的找出某一类的数据做分析。
查看了一下这个JdbcRDD的源码,我们就能明白为什么他只提供起止边界了。

 val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

    val url = conn.getMetaData.getURL
    if (url.startsWith("jdbc:mysql:")) {
      // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force
      // streaming results, rather than pulling entire resultset into memory.
      // See the below URL
      // dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html

      stmt.setFetchSize(Integer.MIN_VALUE)
    } else {
      stmt.setFetchSize(100)
    }

    logInfo(s"statement fetch size set to: ${stmt.getFetchSize}")

    stmt.setLong(1, part.lower)
    stmt.setLong(2, part.upper)
    val rs = stmt.executeQuery()

它使用的是游标的方式,conn.prepareStatement(sql, type, concurrency),因此传入的参数只能是这个分区的起始编号part.lower和这个分区的终止编号part.upper。我查了半天资料,也不知道这种方式该如何将条件传给这个stmt ,有点难受。索性也不尝试了,也不考虑兼容其他类型的数据库,只考虑mysql数据库的话,把游标这种方式给去了,这样使用limit总能给它查出来吧。
以下是具体实现,
重写的JdbcRDD:

package JdbcRDD

import java.sql.{Connection, ResultSet}
import java.util.ArrayList
import scala.reflect.ClassTag
import org.apache.spark.{Partition, SparkContext, TaskContext}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
private class JdbcPartition(idx: Int, val lower: Long, val upper: Long ,val params:ArrayList[Any]) extends Partition {
  override def index: Int = idx 
}

class JdbcRDD[T: ClassTag](
                            sc: SparkContext,
                            getConnection: () => Connection,
                            sql: String,
                            lowerBound: Long,
                            upperBound: Long,
                            params: ArrayList[Any],
                            numPartitions: Int,
                            mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
  extends RDD[T](sc, Nil) with Logging {
  override def getPartitions: Array[Partition] = {
    // bounds are inclusive, hence the + 1 here and - 1 on end
    val length = BigInt(1) + upperBound - lowerBound
    (0 until numPartitions).map { i =>
      val start = lowerBound + ((i * length) / numPartitions)
      val end = lowerBound + (((i + 1) * length) / numPartitions) - 1
      new JdbcPartition(i, start.toLong, end.toLong,params)
    }.toArray
  }

  override def compute(thePart: Partition, context: TaskContext): Iterator[T] = new NextIterator[T]
  {
    context.addTaskCompletionListener{ context => closeIfNeeded() }
    val part = thePart.asInstanceOf[JdbcPartition]
    val conn = getConnection()
//直接采用我们常用的预处理方式
    val stmt = conn.prepareStatement(sql)
    val url = conn.getMetaData.getURL
    if (url.startsWith("jdbc:mysql:")) {
      stmt.setFetchSize(Integer.MIN_VALUE)
    } else {
      return null
    }
    logInfo(s"statement fetch size set to: ${stmt.getFetchSize}")
//传参
   val params = part.params
    val paramsSize = params.size()
    if(params!=null){
      for(i <- 1 to paramsSize){
        val param = params.get(i-1)
        param match {
          case param:String => stmt.setString(i,param)
          case param:Int => stmt.setInt(i,param)
          case param:Boolean => stmt.setBoolean(i,param)
          case param:Double => stmt.setDouble(i,param)
          case param:Float => stmt.setFloat(i,param)
          case _=> {
            println("type is fault")
          }
        }
      }
    }
//限定该分区查询起始偏移量和条数
    stmt.setLong(paramsSize+1, part.lower)
    stmt.setLong(paramsSize+2, part.upper-part.lower+1)
    val rs = stmt.executeQuery()
    override def getNext(): T = {
      if (rs.next()) {
        mapRow(rs)
      } else {
        finished = true
        null.asInstanceOf[T]
      }
    }
    override def close() {
      try {
        if (null != rs) {
          rs.close()
        }
      } catch {
        case e: Exception => logWarning("Exception closing resultset", e)
      }
      try {
        if (null != stmt) {
          stmt.close()
        }
      } catch {
        case e: Exception => logWarning("Exception closing statement", e)
      }
      try {
        if (null != conn) {
          conn.close()
        }
        logInfo("closed connection")
      } catch {
        case e: Exception => logWarning("Exception closing connection", e)
      }
    }
  }
}
object JdbcRDD {
  def resultSetToObjectArray(rs: ResultSet): Array[Object] = {
    Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
  }

  trait ConnectionFactory extends Serializable {
    @throws[Exception]
    def getConnection: Connection
  }

  def create[T](
                 sc: JavaSparkContext,
                 connectionFactory: ConnectionFactory,
                 sql: String,
                 lowerBound: Long,
                 upperBound: Long,
                 params: ArrayList[Any],
                 numPartitions: Int,
                 mapRow: JFunction[ResultSet, T]): JavaRDD[T] = {

    val jdbcRDD = new JdbcRDD[T](
      sc.sc,
      () => connectionFactory.getConnection,
      sql,
      lowerBound,
      upperBound,
      params,
      numPartitions,
      (resultSet: ResultSet) => mapRow.call(resultSet))(fakeClassTag)
    new JavaRDD[T](jdbcRDD)( fakeClassTag)
  }

 
  def create(
              sc: JavaSparkContext,
              connectionFactory: ConnectionFactory,
              sql: String,
              lowerBound: Long,
              upperBound: Long,
              params: ArrayList[Any],
              numPartitions: Int): JavaRDD[Array[Object]] = {

    val mapRow = new JFunction[ResultSet, Array[Object]] {
      override def call(resultSet: ResultSet): Array[Object] = {
        resultSetToObjectArray(resultSet)
      }
    }
    create(sc, connectionFactory, sql, lowerBound, upperBound, params,numPartitions, mapRow)
  }
  private def fakeClassTag[T]: ClassTag[T] = ClassTag.AnyRef.asInstanceOf[ClassTag[T]]
}

接下来是测试代码:

package JdbcRDD

import java.sql.{DriverManager, ResultSet}
import java.util

import org.apache.spark.SparkContext

object JdbcRDDTest {
  def main(args: Array[String]) {
    //val conf = new SparkConf().setAppName("spark_mysql").setMaster("local")
    val sc = new SparkContext("local[2]","spark_mysql")

    def createConnection() = {
      Class.forName("com.mysql.jdbc.Driver").newInstance()
      DriverManager.getConnection("jdbc:mysql://localhost:3306/transportation", "root", "pass")
    }
    def extractValues(r: ResultSet) = {
      (r.getString(1), r.getString(2))
    }
    val params = new util.ArrayList[Any]
    params.add(100)//传参
    params.add(7)
    val data = new JdbcRDD(sc, createConnection, "SELECT * FROM login_log where  id<=? and user_id=? limit ?,?", lowerBound = 1, upperBound =20,params=params, numPartitions = 5, mapRow = extractValues)
    data.cache()
    println(data.collect.length)
    println(data.collect().toList)
    sc.stop()
  }
}

测试结果:


测试结果

可以看出,重写这个JdbcRDD后我们可以条件查询某一个表,也可以同时限定查询条数,这给我们用Spark分析Mysql中的数据提供了方便,我们不需要先将需要的数据滤出来再进行分析。当然,这个demo写的比较粗糙,只是提供这么一种方法的演示,后期还可以稍加修改。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352