最近有一个需求是这样的:原来的数据是存储在MySQL,然后通过Sqoop将MySQL的数据抽取到了HDFS集群上,抽取到HDFS上的数据都是纯数据,字段值之间以\t分隔,现在需要将这部分数据还原为json格式的,因为这样做的原因:一来是更清楚具体字段的含义;二来是后期的数据通过kafka直接消费存储到HDFS,存的就是json数据,所以为了所有存储数据格式一致,需要将历史数据进行转换。所以只能通过MR或者Spark进行一次数据清洗转换了。因为需要根据每条数据中的一个时间字段将数据存储到不同的文件中。比如一条纯数据如下:
1 2019-04-26 00:32:09.0 null true 1025890 10004515
那么需要根据第二个字段信息来将数据分别存储到不同的文件夹,分为4个时段,格式为:
/2019/04/26/00-06.txt,/2019/04/26/06-12.txt,/2019/04/26/12-18.txt,/2019/04/26/18-00.txt,
直接上spark代码:
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.spark.sql.SparkSession
/**
* spark版本将数据输出到不同文件
* create date:2019-07-16
* author:ly
*/
object OutputToMultiFileApp {
def main(args: Array[String]): Unit = {
val inputPath = args(0)
val outputPath = args(1)
//val inputPath = "D:\\bigdata_workspace\\gey\\3\\in.txt"
//val outputPath = "D:\\bigdata_workspace\\gey\\3\\out"
val spark = SparkSession.builder().appName("OutputToMultiFileApp").master("local[*]").getOrCreate()
val data =spark.sparkContext.textFile(inputPath).map(item => {
val splits = item.toString.split("\t")
val str = "{\"id\":\"" + splits(4) + "\",\"uid\":\"" + splits(5) + "\",\"createTime\":\"" + splits(1) + "\",\"epochs\":\"1\"}"
//将时间字段作为key,包装后的json作为value
(splits(1),str)
})
/**按Key保存到不同文件*/
data.saveAsHadoopFile(outputPath,
classOf[String],
classOf[String],
classOf[MyMultipleTextOutputFormat]
)
spark.stop()
}
}
class MyMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
//1)文件名:根据key生成我们自己的路径
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={
//2019-04-26 16:32:09.0
val splits: Array[String] = key.toString.split(" ")
//2019-04-26
val ymd: String = splits(0)
//16:32:09.0
val hms: String = splits(1)
//[2019,04,26]
val arr1: Array[String] = ymd.split("-")
//[16,32,09]
val arr2: Array[String] = hms.split(":")
var temp: String = ""
val h6: Int = 6
val h12: Int = 12
val h18: Int = 18
val h24: Int = 24
val h: Int = arr2(0).toInt
if(h >= 0 && h <=6) temp = "00-06"
if(h > h6 && h <= h12) temp = "06-12"
if(h > h12 && h <= h18) temp = "12-18"
if(h > h18 && h < h24) temp = "18-00"
val paths = arr1(0) + "/" + arr1(1) + "/" + arr1(2) + "/" + temp + ".txt"
paths
}
//2)文件内容:默认同时输出key和value。这里指定不输出key。
override def generateActualKey(key: Any, value: Any): String = {
null
}
}
上述代码直接在IDEA上运行,笔者是在win10上搞了一个比较小的文件测试,测试结果如下:
年份:
月份:
日期:
最终数据:
妥妥的成功了。。直接打包放到集群上运行。但是数据量大一些的话,好像会丢失数据,目前还不知道为啥。。。
欢迎大家留言讨论
内容将同步到微信公众号,欢迎关注微信公众号:LearnBigData