版本信息
Spark-sql:2.2.0.cloudera2
Spark-core:2.2.0.cloudera2
JDK:1.8
Scala:2.11.11
问题描述
在通过SparkSql API读取Snappy Parquet文件时,Spark Job task 执行报错如下:
19/12/10 14:40:42 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, hadoop-data-08.cloud, executor 1): java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.uncompressedLength(Ljava/nio/ByteBuffer;II)I
at org.xerial.snappy.SnappyNative.uncompressedLength(Native Method)
at org.xerial.snappy.Snappy.uncompressedLength(Snappy.java:561)
at parquet.hadoop.codec.SnappyDecompressor.decompress(SnappyDecompressor.java:62)
at parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:51)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:89)
at parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:72)
at parquet.column.Encoding$1.initDictionary(Encoding.java:90)
at parquet.column.Encoding$4.initDictionary(Encoding.java:149)
at parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:343)
at parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:82)
at parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:77)
at parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:270)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:138)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:104)
at parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:104)
at parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:138)
at parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:212)
at parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:227)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:105)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
解决方式
将[snappy-java-1.1.4.jar](https://repository.cloudera.com/cloudera/list/repo1/org/xerial/snappy/snappy-java/1.1.4/snappy-java-1.1.4.jar)添加到Yarn集群(根据SparkJob提交模式对应到相应的运行集群)所有NodeManger节点的/opt/cloudera/parcels/SPARK2/lib/spark2/jars(此为CDH目录结构,如未使用CHD则对应到相应目录)目录下。
解决思路
Google出一个[cloudera-spark2文档](链接: https://pan.baidu.com/s/1qc9x3-cwjr4hsaao787Ytg 提取码: tu59),在 UnsatisfiedLinkError observed when using Snappy compression in the spark2-shell 节看到如下内容:
如图 ,snappy-java-1.0.4.1.jar需要访问最高级Class Loader,而如果在提交命令时将设置参数 spark.driver.userClassPathFirst ,spark.executor.userClassPathFirst 为true,查阅SparkSubmit源码可以看到此时loader为ChildFirstURLClassLoader,非最高级Class Loader,所以与snappy-java-1.0.4.1.jar的特性相冲突。
但是同时文档中也提到Spark2-submit不会受到次Bug影响,而我用的正是Spark2-submit来提交任务,事实上也受此bug影响。
最终按照文档所给方法(详见解决方式),此问题得到解决。