一、背景
本文续接上文 MongoDB数据增量同步到Hive(方案一通过BSON文件映射)
考虑到上文的方案一,耗时又占用空间,想写程序实现数据直达,于是,有了以下方案。
二、方案二 通过MongoSpark程序拉取monggo数据
首先还是存量数据
工具类SparkHiveUtil
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, NumericType, ShortType, StringType, TimestampType}
object SparkHiveUtil {
def createTable(spark:SparkSession,df:DataFrame,hive_db:String,tableName:String,ispartiton:Boolean): Unit ={
val hive_table=hive_db+"."+tableName
val createsql = getCreateSql(df,hive_table,ispartiton)
println("SparkHiveUtil_createsql:"+createsql)
println("SparkHiveUtil_drop table if exists "+hive_table)
spark.sql("drop table if exists "+hive_table)
println("SparkHiveUtil create table "+hive_table)
spark.sql(createsql)
}
def getCreateSql(df:DataFrame,hive_table:String,ispartiton:Boolean): String ={
val structtype= df.schema
val fields:scala.Array[org.apache.spark.sql.types.StructField] = structtype.fields
var fieldType="STRING"
var createsql = "CREATE TABLE if not exists "+hive_table+"(\n"
if(fields.length<1)return ""
for (i <- 0 until fields.length) {
var fieldName= fields(i).name
if(fieldName.startsWith("_"))fieldName="`"+fieldName+"`"
println(fieldName+":"+fields(i).dataType)
fieldType=getFieldType(fields(i).dataType)
if(i==fields.length-1){
createsql=createsql+fieldName+" "+fieldType+"\n"
}else{
createsql=createsql+fieldName+" "+fieldType+",\n"
}
}
createsql=createsql+")\ncomment '"+hive_table +"' \n"
if(ispartiton) createsql=createsql+"PARTITIONED BY (`pyear` int, `pmonth` int,`pday` int)\n "
createsql=createsql+"ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'\n " +
"STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'\n " +
"OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'\n " +
"TBLPROPERTIES ( 'orc.compress'='snappy')"
createsql
}
def getFieldType(dataType:DataType): String ={
dataType match {
case a:ArrayType => {
"ARRAY<"+getFieldType(a.elementType)+">"
}
case b: BinaryType => "BINARY"
case bl: BooleanType => "BOOLEAN"
case by: ByteType => "TINYINT"
case da: DateType => "TIMESTAMP"
case de: DecimalType => "DECIMAL"
case dou: DoubleType => "DOUBLE"
case f: FloatType => "FLOAT"
case i: IntegerType => "INT"
case l: LongType => "BIGINT"
case n: NumericType => "BIGINT"
case sh: ShortType => "SMALLINT"
case st: StringType => "STRING"
case t: TimestampType => "TIMESTAMP"
case _:DataType => "STRING"
}
}
}
同步存量monggo数据的类SparkMon2hiveOriginNormal
import com.dpl.dws.mongo.utils.{MongoUtils, SparkHiveUtil, SparkMongoUtils}
import com.mongodb.spark.MongoSpark
import org.apache.log4j.Logger
import org.apache.spark.sql.{DataFrame, SparkSession}
object SparkMon2hiveOriginNormal {
val logger = Logger.getLogger("SparkMon2hiveOriginNormal")
val hive_mongo_db=MongoUtils.hive_mongo_db
//不分区的表
def syn_origin_table_normal(mongo_table:String,hive_table:String): Unit ={
val spark = initSparkSession(mongo_table)
spark.sparkContext.setLogLevel("INFO")
import spark.implicits._
val df: DataFrame = MongoSpark.load(spark)
df.printSchema()
//建表
SparkHiveUtil.createTable(spark,df,hive_mongo_db,hive_table,false)
val temp_table=mongo_table+"_tmp"
df.createOrReplaceTempView(temp_table)// //注册临时表
logger.info("开始同步 "+mongo_table+" 表数据到Hive "+hive_mongo_db+"."+hive_table)
val t1 = System.currentTimeMillis
spark.sql("set spark.sql.adaptive.enabled=true")
spark.sql("set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=67108864b")
// spark.sql("truncate table "+hive_table)
//将DF写入到Hive中
spark.sql("INSERT OVERWRITE TABLE "+hive_mongo_db+"."+hive_table+" select * from "+temp_table+" distribute by rand() ")
val t2 = System.currentTimeMillis
logger.info("共耗时:" + (t2 - t1) / 60000 + "分钟")
spark.stop()
}
// 分区表
def syn_origin_table_partition(mongo_table:String,hive_table:String,partitionBy:String="createTime"): Unit ={
val spark = initSparkSession(mongo_table)
spark.sparkContext.setLogLevel("INFO")
val df: DataFrame = MongoSpark.load(spark)
df.printSchema()
//建表
SparkHiveUtil.createTable(spark,df,hive_mongo_db,hive_table,true)
val temp_table=mongo_table+"_tmp"
df.createOrReplaceTempView(temp_table)// //注册临时表
// val res: DataFrame = spark.sql("SELECT * from "+temp_table+" limit 10")
// res.show(10)
println("开始同步 "+mongo_table+" 表数据到Hive "+hive_mongo_db+"."+hive_table)
val t1 = System.currentTimeMillis
spark.sql("set spark.sql.adaptive.enabled=true")
spark.sql("set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=127108864b")
spark.sql("set spark.hadoop.hive.exec.dynamic.partition=true")
spark.sql("set spark.hadoop.hive.exec.dynamic.partition.mode=nostrick")
//将DF写入到Hive中
//选择Hive数据库
spark.sql("INSERT OVERWRITE TABLE " +hive_mongo_db+"."+hive_table+
" PARTITION (pyear,pmonth,pday) " +
"SELECT t.*,year(t."+partitionBy+") pyear,month(t."+partitionBy+") pmonth,day(t."+partitionBy+") pday " +
"from "+temp_table+" t ")
val t2 = System.currentTimeMillis
println("共耗时:" + (t2 - t1) / 60000 + "分钟")
spark.stop()
}
def initSparkSession(mongo_table:String): SparkSession ={
val conf = SparkMongoUtils.initInputConf(mongo_table).setAppName("SparkMon2hiveOriginNormal_"+mongo_table)
SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
}
}
其次是增量数据加合并
AppendCommon代码如下:
package com.dpl.dws.mongo.mon2hive.increment.append
import java.text.SimpleDateFormat
import com.dpl.dws.common.DateTimeUtil
import com.dpl.dws.mongo.utils.{MongoUtils, SparkMongoUtils}
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.rdd.MongoRDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.bson.Document
object AppendCommon {
val hive_mongo_db=MongoUtils.hive_mongo_db
def increment_partition(mongo_table:String,inc_date:String,incrementBy:String): Unit ={
println(mongo_table,inc_date)
val year=inc_date.substring(0,4)
val month=inc_date.substring(4,6).toInt
val day=inc_date.substring(6).toInt
println(year,month,day)
val spark = initSparkSession(mongo_table)
spark.sparkContext.setLogLevel("INFO")
val begin_time=inc_date+" 00:00:00.000"
val end_time=inc_date+" 23:59:59.999"
val dateFormat = new SimpleDateFormat("yyyyMMdd HH:mm:ss.SSS")
val begin_date_time= dateFormat.parse(begin_time)
val end_date_time= dateFormat.parse(end_time)
println("begin_date_time:"+begin_date_time)
println("end_date_time:"+end_date_time)
val begin_date_time_UTC=DateTimeUtil.getIUTCTimestamp(begin_date_time)
val end_date_time_UTC=DateTimeUtil.getIUTCTimestamp(end_date_time)
val query = "{ '$match': {'"+incrementBy+"':{'$gte':ISODate('"+begin_date_time_UTC+"')," +
"'$lte':ISODate('"+end_date_time_UTC+"')}} }"
// val query = "{ '$match': {'createTime': { '$gte' : ISODate('"+time+"')} } }"
println("query:"+query)
val rdd:MongoRDD[Document] = MongoSpark.load(spark.sparkContext)
val aggregatedRdd = rdd.withPipeline(Seq(Document.parse(query)))
val count = aggregatedRdd.count
println("count___"+count)
// println(aggregatedRdd.first.toJson)
if(count==0){
spark.stop()
return
}
//将DF写入到Hive中
val df1 = aggregatedRdd.toDF()
df1.printSchema()
val df: DataFrame = df1.drop("_class")
df.printSchema()
//df.show(10)
//将DF写入到Hive中
spark.sql("set spark.executor.memory=7889934592b")
spark.sql("use "+hive_mongo_db)
val temp_table=mongo_table+"_tmp"
df.createOrReplaceTempView(temp_table)// //注册临时表
println("开始同步 "+mongo_table+" 表数据到Hive "+hive_mongo_db+"."+mongo_table)
val t1 = System.currentTimeMillis
spark.sql("set spark.sql.adaptive.enabled=true")
spark.sql("set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=127108864b")
spark.sql("set spark.hadoop.hive.exec.dynamic.partition=true")
spark.sql("set spark.hadoop.hive.exec.dynamic.partition.mode=nostrick")
spark.sql("INSERT OVERWRITE TABLE " +hive_mongo_db+"."+mongo_table+
" PARTITION (pyear="+year+",pmonth="+month+",pday="+day+") " +
"SELECT t.* " +
"from "+temp_table+" t ")
val t2 = System.currentTimeMillis
println("共耗时:" + (t2 - t1) / 60000 + "分钟")
spark.stop()
}
def initSparkSession(mongo_table:String): SparkSession ={
val conf = SparkMongoUtils.initInputConf(mongo_table).setAppName("SparkMon2hiveAppendCommon_"+mongo_table)
SparkSession.builder.config(conf).enableHiveSupport().getOrCreate()
}
}
三、存在的问题
此方案相对于方案一来说,节省了不少空间和步骤,速度也快;
但是对于数据量比较大的表,存在丢失数据的情况,hive中记录条数总是略少于monggo库表的记录条数,目前还没找到原因;
对于更新的数据涉及到的分区范围很广的数据,做增量更新还不如全量,所以还是有待改进,考虑尝试使用Hudi做增量同步。