MongoDB数据增量同步到Hive(方案二通过MongoSpark)

一、背景

本文续接上文 MongoDB数据增量同步到Hive(方案一通过BSON文件映射)
考虑到上文的方案一,耗时又占用空间,想写程序实现数据直达,于是,有了以下方案。

二、方案二 通过MongoSpark程序拉取monggo数据

首先还是存量数据

工具类SparkHiveUtil


import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NumericType, ShortType, StringType, TimestampType}

object SparkHiveUtil {

  def createTable(spark:SparkSession,df:DataFrame,hive_db:String,tableName:String,ispartiton:Boolean): Unit ={
    val hive_table=hive_db+"."+tableName
    val createsql = getCreateSql(df,hive_table,ispartiton)
    println("SparkHiveUtil_createsql:"+createsql)
    println("SparkHiveUtil_drop table if exists "+hive_table)
    spark.sql("drop table if exists "+hive_table)
    println("SparkHiveUtil create table  "+hive_table)
    spark.sql(createsql)
  }

  def getCreateSql(df:DataFrame,hive_table:String,ispartiton:Boolean): String ={
    val structtype= df.schema
    val fields:scala.Array[org.apache.spark.sql.types.StructField] = structtype.fields
    var fieldType="STRING"
    var createsql = "CREATE  TABLE if not exists "+hive_table+"(\n"

    if(fields.length<1)return ""

    for (i <- 0 until fields.length) {
      var fieldName= fields(i).name
      if(fieldName.startsWith("_"))fieldName="`"+fieldName+"`"
      println(fieldName+":"+fields(i).dataType)
      fieldType=getFieldType(fields(i).dataType)
      if(i==fields.length-1){
        createsql=createsql+fieldName+" "+fieldType+"\n"
      }else{
        createsql=createsql+fieldName+" "+fieldType+",\n"
      }
    }
    createsql=createsql+")\ncomment '"+hive_table +"' \n"
    if(ispartiton) createsql=createsql+"PARTITIONED BY (`pyear` int, `pmonth` int,`pday` int)\n "
    createsql=createsql+"ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'\n " +
      "STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'\n " +
      "OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'\n " +
      "TBLPROPERTIES ( 'orc.compress'='snappy')"
    createsql
  }

  def getFieldType(dataType:DataType): String ={
    dataType match {
      case a:ArrayType => {
        "ARRAY<"+getFieldType(a.elementType)+">"
      }
      case b: BinaryType => "BINARY"
      case bl: BooleanType => "BOOLEAN"
      case by: ByteType => "TINYINT"
      case da: DateType => "TIMESTAMP"
      case de: DecimalType => "DECIMAL"
      case dou: DoubleType => "DOUBLE"
      case f: FloatType => "FLOAT"
      case i: IntegerType => "INT"
      case l: LongType => "BIGINT"
      case n: NumericType => "BIGINT"
      case sh: ShortType => "SMALLINT"
      case st: StringType => "STRING"
      case t: TimestampType => "TIMESTAMP"
      case _:DataType => "STRING"
    }
  }
}

同步存量monggo数据的类SparkMon2hiveOriginNormal


import com.dpl.dws.mongo.utils.{MongoUtils, SparkHiveUtil, SparkMongoUtils}
import com.mongodb.spark.MongoSpark
import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkMon2hiveOriginNormal {
  val logger = Logger.getLogger("SparkMon2hiveOriginNormal")
  val hive_mongo_db=MongoUtils.hive_mongo_db


//不分区的表
  def syn_origin_table_normal(mongo_table:String,hive_table:String): Unit ={
    val spark = initSparkSession(mongo_table)
    spark.sparkContext.setLogLevel("INFO")
    import spark.implicits._
    val df: DataFrame = MongoSpark.load(spark)
    df.printSchema()


    //建表
    SparkHiveUtil.createTable(spark,df,hive_mongo_db,hive_table,false)

    val temp_table=mongo_table+"_tmp"
    df.createOrReplaceTempView(temp_table)//    //注册临时表

    logger.info("开始同步 "+mongo_table+" 表数据到Hive "+hive_mongo_db+"."+hive_table)
      val t1 = System.currentTimeMillis
    spark.sql("set spark.sql.adaptive.enabled=true")
    spark.sql("set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=67108864b")
   // spark.sql("truncate table "+hive_table)
    //将DF写入到Hive中
    spark.sql("INSERT OVERWRITE TABLE   "+hive_mongo_db+"."+hive_table+"  select * from "+temp_table+" distribute by  rand() ")
    val t2 = System.currentTimeMillis
    logger.info("共耗时:" + (t2 - t1) / 60000 + "分钟")
    spark.stop()
  }

// 分区表
  def syn_origin_table_partition(mongo_table:String,hive_table:String,partitionBy:String="createTime"): Unit ={
    val spark = initSparkSession(mongo_table)
    spark.sparkContext.setLogLevel("INFO")
    val df: DataFrame = MongoSpark.load(spark)
    df.printSchema()
    //建表
    SparkHiveUtil.createTable(spark,df,hive_mongo_db,hive_table,true)

    val temp_table=mongo_table+"_tmp"
    df.createOrReplaceTempView(temp_table)//    //注册临时表
    //    val res: DataFrame = spark.sql("SELECT * from "+temp_table+" limit 10")
    //    res.show(10)
    println("开始同步 "+mongo_table+" 表数据到Hive "+hive_mongo_db+"."+hive_table)
    val t1 = System.currentTimeMillis

    spark.sql("set spark.sql.adaptive.enabled=true")
    spark.sql("set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=127108864b")
    spark.sql("set spark.hadoop.hive.exec.dynamic.partition=true")
    spark.sql("set spark.hadoop.hive.exec.dynamic.partition.mode=nostrick")
    //将DF写入到Hive中
    //选择Hive数据库
    spark.sql("INSERT OVERWRITE TABLE " +hive_mongo_db+"."+hive_table+
      " PARTITION (pyear,pmonth,pday) " +
      "SELECT t.*,year(t."+partitionBy+") pyear,month(t."+partitionBy+") pmonth,day(t."+partitionBy+") pday " +
      "from "+temp_table+" t ")
    val t2 = System.currentTimeMillis
    println("共耗时:" + (t2 - t1) / 60000 + "分钟")
    spark.stop()
  }

  def initSparkSession(mongo_table:String): SparkSession ={
    val conf = SparkMongoUtils.initInputConf(mongo_table).setAppName("SparkMon2hiveOriginNormal_"+mongo_table)
    SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
  }

}

其次是增量数据加合并

AppendCommon代码如下:

package com.dpl.dws.mongo.mon2hive.increment.append

import java.text.SimpleDateFormat

import com.dpl.dws.common.DateTimeUtil
import com.dpl.dws.mongo.utils.{MongoUtils, SparkMongoUtils}
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.rdd.MongoRDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.bson.Document

object AppendCommon {
  val hive_mongo_db=MongoUtils.hive_mongo_db

  def increment_partition(mongo_table:String,inc_date:String,incrementBy:String): Unit ={
    println(mongo_table,inc_date)
    val year=inc_date.substring(0,4)
    val month=inc_date.substring(4,6).toInt
    val day=inc_date.substring(6).toInt
    println(year,month,day)
    val spark = initSparkSession(mongo_table)
    spark.sparkContext.setLogLevel("INFO")
    val begin_time=inc_date+" 00:00:00.000"
    val end_time=inc_date+" 23:59:59.999"

    val dateFormat = new SimpleDateFormat("yyyyMMdd HH:mm:ss.SSS")
    val begin_date_time=  dateFormat.parse(begin_time)
    val end_date_time=  dateFormat.parse(end_time)
    println("begin_date_time:"+begin_date_time)
    println("end_date_time:"+end_date_time)
    val begin_date_time_UTC=DateTimeUtil.getIUTCTimestamp(begin_date_time)
    val end_date_time_UTC=DateTimeUtil.getIUTCTimestamp(end_date_time)

    val query = "{ '$match': {'"+incrementBy+"':{'$gte':ISODate('"+begin_date_time_UTC+"')," +
      "'$lte':ISODate('"+end_date_time_UTC+"')}} }"
   // val query = "{ '$match': {'createTime': { '$gte' : ISODate('"+time+"')} } }"
    println("query:"+query)

    val rdd:MongoRDD[Document] = MongoSpark.load(spark.sparkContext)
    val aggregatedRdd = rdd.withPipeline(Seq(Document.parse(query)))
    val count = aggregatedRdd.count
    println("count___"+count)
    //  println(aggregatedRdd.first.toJson)

    if(count==0){
      spark.stop()
      return
    }

    //将DF写入到Hive中
    val df1 = aggregatedRdd.toDF()
    df1.printSchema()
    val df: DataFrame = df1.drop("_class")
    df.printSchema()
    //df.show(10)

    //将DF写入到Hive中
    spark.sql("set spark.executor.memory=7889934592b")
    spark.sql("use "+hive_mongo_db)
    val temp_table=mongo_table+"_tmp"
    df.createOrReplaceTempView(temp_table)//    //注册临时表


    println("开始同步 "+mongo_table+" 表数据到Hive "+hive_mongo_db+"."+mongo_table)
    val t1 = System.currentTimeMillis

    spark.sql("set spark.sql.adaptive.enabled=true")
    spark.sql("set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=127108864b")
    spark.sql("set spark.hadoop.hive.exec.dynamic.partition=true")
    spark.sql("set spark.hadoop.hive.exec.dynamic.partition.mode=nostrick")


    spark.sql("INSERT OVERWRITE TABLE " +hive_mongo_db+"."+mongo_table+
      " PARTITION (pyear="+year+",pmonth="+month+",pday="+day+") " +
      "SELECT t.* " +
      "from "+temp_table+" t ")

    val t2 = System.currentTimeMillis
    println("共耗时:" + (t2 - t1) / 60000 + "分钟")

    spark.stop()
  }

  def initSparkSession(mongo_table:String): SparkSession ={
    val conf = SparkMongoUtils.initInputConf(mongo_table).setAppName("SparkMon2hiveAppendCommon_"+mongo_table)
    SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
  }

}

三、存在的问题

此方案相对于方案一来说,节省了不少空间和步骤,速度也快;
但是对于数据量比较大的表,存在丢失数据的情况,hive中记录条数总是略少于monggo库表的记录条数,目前还没找到原因;
对于更新的数据涉及到的分区范围很广的数据,做增量更新还不如全量,所以还是有待改进,考虑尝试使用Hudi做增量同步。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351