Geospark源码解析(一)
本节我们以查询为例,看下GeoSpark如何利用分布式来实现高效查询。首先,对于Spark来说,想要利用Spark,必须要将自己的类型转为RDD,我们就先看下Geospark是如何读取GeoJson,并将Geometry
转为RDD的。
public class SpatialRDD<T extends Geometry>
implements Serializable
{
/**
* The raw spatial RDD.
*/
public JavaRDD<T> rawSpatialRDD;
...
}}
Geospark自定义了一个RDD,SpatialRDD
,他是一个泛型类,并且泛型要求是Geometry
的子类,对于Geometry
来说,他的子类有Point
、Line
、Polygon
等,这个大家可以去看JTS库http://www.tsusiatsoftware.net/jts/main.html。然后我这里列举了SpatialRDD
一个重要的成员,对于rawSpatialRDD
来说,他里面存储的就是我们的需要分析的Geometry
。
GeoSpark提供了PointRDD
,PolygonRDD
等,他们都继承自SpatialRDD
,我们以PointRDD
为例,分析一下GeoSpark是如何将geojson转为RDD的。
public PointRDD(JavaSparkContext sparkContext, String InputLocation, Integer Offset, FileDataSplitter splitter,
boolean carryInputData, Integer partitions, StorageLevel newLevel, String sourceEpsgCRSCode, String targetEpsgCode)
{
JavaRDD rawTextRDD = partitions != null ? sparkContext.textFile(InputLocation, partitions) : sparkContext.textFile(InputLocation);
if (Offset != null) {this.setRawSpatialRDD(rawTextRDD.mapPartitions(new PointFormatMapper(Offset, splitter, carryInputData)));}
else {this.setRawSpatialRDD(rawTextRDD.mapPartitions(new PointFormatMapper(splitter, carryInputData)));}
if (sourceEpsgCRSCode != null && targetEpsgCode != null) { this.CRSTransform(sourceEpsgCRSCode, targetEpsgCode);}
if (newLevel != null) { this.analyze(newLevel);}
if (splitter.equals(FileDataSplitter.GEOJSON)) { this.fieldNames = FormatMapper.readGeoJsonPropertyNames(rawTextRDD.take(1).get(0).toString()); }
}
这是PointRDD常用的一个构造函数,其中第4行JavaRDD rawTextRDD = partitions != null ? sparkContext.textFile(InputLocation, partitions) : sparkContext.textFile(InputLocation);
则是利用Spark的原生方法将geojson首先转为一个RDD,他的类型可以理解为是String,第7行if (sourceEpsgCRSCode != null && targetEpsgCode != null) { this.CRSTransform(sourceEpsgCRSCode, targetEpsgCode);}
则是做了一个坐标转换,关键是第5行this.setRawSpatialRDD(rawTextRDD.mapPartitions(new PointFormatMapper(Offset, splitter, carryInputData)));
,
在第5行中,Geospark首先调用了mapPartitions
方法来将rawTextRDD中的每一行转为Geometry,其中pointFormatMapper
中有一个方法
public Iterator<T> call(Iterator<String> stringIterator)
throws Exception
{
List<T> result = new ArrayList<>();
while (stringIterator.hasNext()) {
String line = stringIterator.next();
addGeometry(readGeometry(line), result);
}
return result.iterator();
}
他是一个重载,函数参数stringIterator
是每个分区的所有string,Geospark遍历这个集合,在每一行调用了一个addGeometry
方法,将String转为Geometry,这个方法就不细讲,主要是解析GeoJson,感兴趣的可以去看GeoSpark源码。
这样构造完成后,就将GeoJson转为了一个RDD,此时我们还没有构建空间索引,但是对于大数据量的空间数据我们已经可以利用Spark的RDD进行并行计算了。
public static <U extends Geometry, T extends Geometry> JavaRDD<T> SpatialRangeQuery(SpatialRDD<T> spatialRDD, U originalQueryGeometry, boolean considerBoundaryIntersection, boolean useIndex)
throws Exception
{
U queryGeometry = originalQueryGeometry;
if (spatialRDD.getCRStransformation()) {
queryGeometry = CRSTransformation.Transform(spatialRDD.getSourceEpsgCode(), spatialRDD.getTargetEpgsgCode(), originalQueryGeometry);
}
if (useIndex == true) {
if (spatialRDD.indexedRawRDD == null) {
throw new Exception("[RangeQuery][SpatialRangeQuery] Index doesn't exist. Please build index on rawSpatialRDD.");
}
return spatialRDD.indexedRawRDD.mapPartitions(new RangeFilterUsingIndex(queryGeometry, considerBoundaryIntersection, true));
}
else {
return spatialRDD.getRawSpatialRDD().filter(new RangeFilter(queryGeometry, considerBoundaryIntersection, true));
}
}
这里我们看第16行return spatialRDD.getRawSpatialRDD().filter(new RangeFilter(queryGeometry, considerBoundaryIntersection, true));
在第9行if (useIndex == true)
判断不用索引时,就会跳到第16行,本质上还是用了RDD来利用自定义函数进行判断,如果是真,就过滤出来,我们看RangeFilter
这个类。
public class RangeFilter<U extends Geometry, T extends Geometry>
extends JudgementBase
implements Function<T, Boolean>
{
public RangeFilter(U queryWindow, boolean considerBoundaryIntersection, boolean leftCoveredByRight)
{
super(queryWindow, considerBoundaryIntersection, leftCoveredByRight);
}
public Boolean call(T geometry)
throws Exception
{
if (leftCoveredByRight) {
return match(geometry, queryGeometry);
}
else {
return match(queryGeometry, queryGeometry);
}
}
}
注意到call
这个方法,里面又调用了match
方法,它在父类JudgementBase
定义有:
public boolean match(Geometry spatialObject, Geometry queryWindow)
{
if (considerBoundaryIntersection) {
if (queryWindow.intersects(spatialObject)) { return true; }
}
else {
if (queryWindow.covers(spatialObject)) { return true; }
}
return false;
}
这里面,我们可以看到第4行和第7行均是利用了JTS来判断的,到这里,就一目了然了,实际上还是我们提供了match这个方法,利用Spark来计算。
本文中,我们并没有涉及到索引,GeoSpark也将JTS的索引进行了封装,原理和上面讲的是一样的,我们下一篇文章中在进行分析。