Spark解决Map算子中调用自定义方法出现的初始化问题
1. 背景
有一批JSON数据需要被要存成parquet格式,但是这些json数据中有一些数据是脏数据,
例如:
{'name':'zhangsan','age':14,'wo}k':'teacher'}
如果存在这样的数据的话,当我直接用下面spark代码将json文件转成parquet文件时则会报错
代码:
spark.read.json("data/data.json").write.mode(SaveMode.Overwrite).parquet("data/data.parquet")
异常:
19/08/28 08:02:06 WARN TaskSetManager: Lost task 4.0 in stage 33.0 (TID 344, GDGZ-TEL-DATACENTER07, executor 1): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.sql.AnalysisException: Attribute name "wo}k" contains invalid character(s) among " ,;{}()\n\t=". Please use alias to rename it.;
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkConversionRequirement(ParquetSchemaConverter.scala:584)
at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter$.checkFieldName(ParquetSchemaConverter.scala:571)
at org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:339)
at org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:335)
at org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter$$anonfun$convertField$1.apply(ParquetSchemaConverter.scala:551)
at org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter$$anonfun$convertField$1.apply(ParquetSchemaConverter.scala:550)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:186)
at org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:550)
at org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:335)
at org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:524)
at org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convertField(ParquetSchemaConverter.scala:335)
at org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter$$anonfun$convert$1.apply(ParquetSchemaConverter.scala:327)
at org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter$$anonfun$convert$1.apply(ParquetSchemaConverter.scala:327)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at org.apache.spark.sql.types.StructType.foreach(StructType.scala:99)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at org.apache.spark.sql.types.StructType.map(StructType.scala:99)
at org.apache.spark.sql.execution.datasources.parquet.SparkToParquetSchemaConverter.convert(ParquetSchemaConverter.scala:327)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.init(ParquetWriteSupport.scala:95)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:341)
at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:302)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.newOutputWriter(FileFormatWriter.scala:367)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:378)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:269)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:267)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:272)
... 8 more
于是就想用另外的办法,如果遇到这样的数据,就先textFile("data/data.json")
然后再用map
算子进行遍历,处理掉脏数据再进行写parquet的操作。
2.过程
在直接写parquet文件出现异常的情况下,想到用下面代码来处理,过滤掉脏数据
def main(args:String)=Unit{
try{
spark.read.load("data/data.json").write.mode(SaveMode.Overwrite).parquet("data/data.parquet")
} catch {
case e:Exception =>
val dataRdd = spark.textFile(("data/data.json")
val newDataJsonRdd = dataRdd.mapPartition(datas => {
datas.map(data => {
val newData = findError(data)
newData
})
})
spark.read.json(newDataJsonRdd.filter(data => {!data.contains("log_error")})).write.mode(SaveMode.Overwrite).paruqte("data/data.parquet")
} finally {
spark.stop
}
}
def findError{data:String}:String={
val newData = data
val keyNames = JSON.parseObject(data).keySet.iterator
while(keyNames.hasNext){
val keyName = keyNames.next
val pattern = new Regex("[^a-zA-Z_]")
if(pattern.findAllMatch(keyNames).notNull){
newData = "{'log_error':'log_error'}"
}
}
newData
}
这个样子看起来好像没问题,自己测试了一下,用local模式运行也没问题,但是一到服务器用yarn-client模式运行起来,又出现了一个异常:
Caused by:java.lang.NoClassDefFoundError:could not initialize class com.meda.myClass
想了半天,也没有想清楚问题出在哪里?(后续请大家知道为什么报这个错的话可以一起讨论下,谢谢大家)为什么会报一个类加载异常,因为该方法findError
与main
方法位于一个类中,从报错的代码位置以及不同模式之间的运行异常,初步推断是因为该方法没有被发送到executor所在节点
3、解决方案
1.将该方法封装到另外一个类中,然后用动态广播变量的形式将该方法该类的对象广播出去,我已经将findError
这个方法放入到另外一个类JsonUtil
中,主方法代码如下:
def main(args:String)=Unit{
try{
spark.read.load("data/data.json").write.mode(SaveMode.Overwrite).parquet("data/data.parquet")
} catch {
case e:Exception =>
val dataRdd = spark.textFile(("data/data.json")
val jsonUtil = new JsonUtil()
val brdcst = sc.broadCast(jsonUtil)
val newDataJsonRdd = dataRdd.mapPartition(datas => {
datas.map(data => {
val newData = brdcst.value.findError(data)
newData
})
})
spark.read.json(newDataJsonRdd.filter(data => {!data.contains("log_error")})).write.mode(SaveMode.Overwrite).paruqte("data/data.parquet")
} finally {
spark.stop
}
}
2.在spark提交命令的时候加上一个--jars
,在--jars
参数后面重新指定这样一个jar包,代码如下:
spark-submit --master yarn-client --jars jars/myTest.jar --class main jars/myTest.jar