需求:快速解析一个文件夹下的所有xml(10年的数据,大概一千万个xml)
遇到的坑:xml里面有dtd,必须这个文件存在,不然会报错
处理思路:重写原有的方法
语言对比:Java处理速度慢,并且代码量大,这儿不做考虑,其次python,由于需要处理的xml太过复杂,python需要写多层嵌套循环,并且不能分布式处理,这儿重点讲一下Scala
Scala我么通过scala.xml.XML可以很方便的取出任意一个标签,并且可以分布式去处理。
Scala XML API提供了类似XPath的语法来解析XML。在NodeSeq这类父类里,定义了两个很重要的操作符(""和"\"),用来获得解析XML:
\ :\ 简单来说就是根据条件搜索第一次出现的节点
\\:而 \\ 则是根据条件搜索所有的子节点
我们先分享几个小方法:
1.递归遍历文件夹,找出所有以xml结尾的文件
/**
- 递归查找文件及子文件夹下面的XML文件
- @param file
- @return
*/
def getFile(file:File): Array[File] ={
val files = file.listFiles().filter(! .isDirectory)
.filter(t => t.toString.toLowerCase.endsWith(".xml")) //此处读取.txt and .md文件
files ++ file.listFiles().filter(.isDirectory).flatMap(getFile)
}
2.解决dtd校验文件问题
源码:
这儿我们会用到loadFile去加载xml文件,我们根据源码可以看到loadFile传进去了一个parser方法:
XML.XML.loadFile(filename)
/**
- Loads XML from the given InputSource, using the supplied parser.
- The methods available in scala.xml.XML use the XML parser in the JDK.
/
def loadXML(source: InputSource, parser: SAXParser): T = {
val newAdapter = adapter
newAdapter.scopeStack push TopScope
parser.parse(source, newAdapter)
newAdapter.scopeStack.pop()
newAdapter.rootElem.asInstanceOf[T]
}
/* Loads XML from the given file, file descriptor, or filename. */
def loadFile(file: File): T = loadXML(fromFile(file), parser)
def loadFile(fd: FileDescriptor): T = loadXML(fromFile(fd), parser)
继承XMLLoader,重写parser方法,设置参数忽略校验
object MyXML extends XMLLoader[Elem] {
override def parser: SAXParser = {
val f = javax.xml.parsers.SAXParserFactory.newInstance()
f.setNamespaceAware(false)
f.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false)
f.newSAXParser()
}
}
最终代码实现:
import java.io.File
import java.util
import org.apache.spark.sql.SparkSession
import scala.xml.{Elem, SAXParser, XML}
import scala.xml.factory.XMLLoaderobject ParseCNPatentFile {
def main(args: Array[String]): Unit = {
//构建sparksession
val spark = SparkSession.builder()
.appName("SparkSQLDemo")
.master("local[1]")
.getOrCreate()
val sc = spark.sparkContext//需要解析的文件路径
val path = new File("data/CNPatentData")
val parseddata = getFile(path).map(line => {
// println(line.toString)
val parseFile = line.toString
//解析xml的方法
parseCNPatent(parseFile)
})sc.parallelize(parseddata).saveAsTextFile("data/CNPatentData/result")
}
/**
- 递归查找文件及子文件夹下面的XML文件
- @param file
- @return
*/
def getFile(file:File): Array[File] ={
val files = file.listFiles().filter(! .isDirectory)
.filter(t => t.toString.toLowerCase.endsWith(".xml")) //此处读取.txt and .md文件
files ++ file.listFiles().filter(.isDirectory).flatMap(getFile)
}/**
- 解析xml文件
- @param filename
- @return
/
def parseCNPatent(filename:String): String ={
// val someXML = XML.loadFile(filename)
val someXML = MyXML.loadFile(filename)
/*
- 提取基础属性
*/
val ossfoder = filename
val file = (someXML \ "PatentDocumentAndRelated" "@file").text
.
.
.
}
object MyXML extends XMLLoader[Elem] {
override def parser: SAXParser = {
val f = javax.xml.parsers.SAXParserFactory.newInstance()
f.setNamespaceAware(false)
f.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false)
f.newSAXParser()
}
}
}