Spark Streaming提高写数据库的效率

1. 前言

这是一篇挂羊头卖狗肉的文章,事实上,本文要描述的内容,和Spark Streaming没有什么关系。

在上一篇文章http://www.jianshu.com/p/a73c0c95d2fe 我们写了如何通过Spark Streaming向数据库中插入数据。可能你已经发现了,数据是逐条插入数据库的,效率底下。那么如何提高插入数据库的效率呢?

数据库写是个IO任务,并行不一定能够加速写入数据库的速度。我们主要说下批量提交和Bulk Copy Insert的方式。

2.批量提交

批量提交,就是JDBC Statment的executeBatch,直接看代码吧。

/**
  * 从Kafka中读取数据,并把数据成批写入数据库
  */
object KafkaToDB {

  val logger = LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {
    // 参数校验
    if (args.length < 2) {
      System.err.println(
        s"""
           |Usage: KafkaToDB <brokers> <topics>
           |  <brokers> is a list of one or more Kafka brokers
           |  <topics> is a list of one or more kafka topics to consume from
           |""".stripMargin)
      System.exit(1)
    }

    // 处理参数
    val Array(brokers, topics) = args
    // topic以“,”分割
    val topicSet: Set[String] = topics.split(",").toSet
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "example",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    // 创建上下文,以每1秒间隔的数据作为一批
    val sparkConf = new SparkConf().setAppName("KafkaToDB")
    val streamingContext = new StreamingContext(sparkConf, Seconds(2))

    // 1.创建输入流,获取数据。流操作基于DStream,InputDStream继承于DStream
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topicSet, kafkaParams)
    )

    // 2. DStream上的转换操作
    // 取消息中的value数据,以英文逗号分割,并转成Tuple3
    val values = stream.map(_.value.split(","))
      .filter(x => x.length == 3)
      .map(x => new Tuple3[String, String, String](x(0), x(1), x(2)))


    // 输入前10条到控制台,方便调试
    values.print()

    // 3.同foreachRDD保存到数据库
    val sql = "insert into kafka_message(timeseq,timeseq2, thread, message) values (?,?,?,?)"
    values.foreachRDD(rdd => {
      val count = rdd.count()
      println("-----------------count:" + count)
      if (count > 0) {
        rdd.foreachPartition(partitionOfRecords => {
          val conn = ConnectionPool.getConnection.orNull
          if (conn != null) {
            val ps = conn.prepareStatement(sql)
            try{
              // 关闭自动执提交
              conn.setAutoCommit(false)
              partitionOfRecords.foreach(data => {
                ps.setString(1, data._1)
                ps.setString(2,System.currentTimeMillis().toString)
                ps.setString(3, data._2)
                ps.setString(4, data._3)
                ps.addBatch()
              })
              ps.executeBatch()
              conn.commit()
            } catch {
              case e: Exception =>
                logger.error("Error in execution of insert. " + e.getMessage)
            }finally {
              ps.close()
              ConnectionPool.closeConnection(conn)
            }
          }
        })
      }
    })

    streamingContext.start() // 启动计算
    streamingContext.awaitTermination() // 等待中断结束计算

  }
}

3. Bulk Copy Insert

我们使用的是PostgreSQL,其数据库JDBC驱动程序提供了Copy Insert的API,其主要过程是:

  • 1.获取数据库连接
  • 2.创建CopyManager
  • 3.把Spark Streaming中的流数据封装成InputStream
  • 4.执行CopyInsert
import java.sql.Connection

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies._
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.postgresql.copy.CopyManager
import org.postgresql.core.BaseConnection
import org.slf4j.LoggerFactory

object CopyInsert {

  val logger = LoggerFactory.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {
    // 参数校验
    if (args.length < 4) {
      System.err.println(
        s"""
           |Usage: CopyInsert <brokers> <topics> <duration> <batchsize>
           |  <brokers> is a list of one or more Kafka brokers
           |  <topics> is a list of one or more kafka topics to consume from
           |""".stripMargin)
      System.exit(1)
    }

    // 处理参数
    val Array(brokers, topics,duration,batchsize) = args
    // topic以“,”分割
    val topicSet: Set[String] = topics.split(",").toSet
    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> brokers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "example",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )

    // 创建上下文,以每1秒间隔的数据作为一批
    val sparkConf = new SparkConf().setAppName("CopyInsertIntoPostgreSQL")
    val streamingContext = new StreamingContext(sparkConf, Seconds(duration.toInt))

    // 1.创建输入流,获取数据。流操作基于DStream,InputDStream继承于DStream
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topicSet, kafkaParams)
    )

    // 2. DStream上的转换操作
    // 取消息中的value数据,以英文逗号分割,并转成Tuple3
    val values = stream.map(_.value.split(","))
      .filter(x => x.length == 3)
      .map(x => new Tuple3[String, String, String](x(0), x(1), x(2)))


    // 输入前10条到控制台,方便调试
    values.print()

    // 3.同foreachRDD保存到数据库
    // http://rostislav-matl.blogspot.jp/2011/08/fast-inserts-to-postgresql-with-jdbc.html
    values.foreachRDD(rdd => {
      val count = rdd.count()
      println("-----------------count:" + count)
      if (count > 0) {
        rdd.foreachPartition(partitionOfRecords => {
          val start = System.currentTimeMillis()
          val conn: Connection = ConnectionPool.getConnection.orNull
          if (conn != null) {
            val batch = batchsize.toInt
            var counter: Int = 0
            val sb: StringBuilder = new StringBuilder()
            // 获取数据库连接
            val baseConnection = conn.getMetaData.getConnection.asInstanceOf[BaseConnection]
            // 创建CopyManager
            val cpManager: CopyManager = new CopyManager(baseConnection)
            partitionOfRecords.foreach(record => {
              counter += 1
              sb.append(record._1).append(",")
                .append(System.currentTimeMillis()).append(",")
                .append(record._2).append(",")
                .append(record._3).append("\n")
              if (counter == batch) {
                // 构建输入流
                val in: InputStream = new ByteArrayInputStream(sb.toString().getBytes())
                // 执行copyin
                cpManager.copyIn("COPY kafka_message FROM STDIN WITH CSV", in)
                println("-----------------batch---------------: " + batch)
                counter = 0
                sb.delete(0, sb.length)
                closeInputStream(in)
              }
            })
            val lastIn: InputStream = new ByteArrayInputStream(sb.toString().getBytes())
            cpManager.copyIn("COPY kafka_message2 FROM STDIN WITH CSV", lastIn)
            sb.delete(0, sb.length)
            counter = 0
            closeInputStream(lastIn)
            val end = System.currentTimeMillis()
            println("-----------------duration---------------ms :" + (end - start))
          }
        })

      }
    })

    streamingContext.start() // 启动计算
    streamingContext.awaitTermination() // 等待中断结束计算
 }

 def closeInputStream(in: InputStream): Unit ={
   try{
       in.close()
    }catch{
     case e: IOException =>
       logger.error("Error on close InputStream. " + e.getMessage)
      }
  }
    
}

其它数据库应该也有bulk load的方式,例如MySQL,com.mysql.jdbc.Statment中有setLocalInfileInputStream方法,功能应该和上述的Copy Insert类似,但我还没有写例子验证。文档里有如下的描述,供参考。原文地址

Sets an InputStream instance that will be used to send data to the MySQL server for a "LOAD DATA LOCAL INFILE" statement rather than a FileInputStream or URLInputStream that represents the path given as an argument to the statement.

(完)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容