本文是19年基于spark的空间大数据管理论文,主要讲geospark的设计和实现。
编者的总结:
- 本文是一个稳定的、高效的、可用性很强、功能较为完善的一个Spark扩展包,专门针对空间数据,整体设计尽可能和Spark进行融合,对开源软件维护更新很有帮助;
- 自制的序列化器、KDB动态分区、自定义算子算法、Quad Local Index + GS join、Optimizer是本文的突出创新点。
编者的思考:
- 由于是基于Spark的,所以局限性仍然存在,就是固定数据集离线分析,对于实时性还无法达到;
- 对于各种空间数据源类型的read input map特定优化,可否考虑深度谓词下推,到数据文件这一级别。
Abstract
GeoSpark扩展了Spark Core,以支持空间数据类型、索引、几何操作,空间上的操作包括空间范围、KNN、空间join等操作。证实了本地空间索引加速效果明显。整个系统比原生Spark快一个量级。
1 Introduction
从RDBMS做空间数据库有scalability的问题,基于Hadoop的时延又太高。
本文还做了一个optimizer,专门针对空间query。
除了上述贡献,本文还研究了将空间数据基于距离的邻近性进行重新分区、负载均衡相关问题。
3 System overview
4 Spatial RDD (SRDD) layer
首先是大规模空间数据难以放到普通rdd里面,原因有以下几点:
- 多种类型的数据源:空间数据格式多样,统一读取效率低下;
- 数据对象复杂:凸/凹多边形,子多边形,集合,普通rdd很难有效分区、序列化、处理,更没法优化;
- 空间分区:普通rdd基于hash的分区没有用到空间数据的距离邻近性,这会使得处理很低效,不能够削减分区;
- 空间索引支持:普通rdd不支持空间索引。
4.1 SRDD spatial objects support
各种空间数据都能读到RDD来,空间对象一共有7种类型, Point, Multi-Point, Polygon, Multi-Polygon, LineString, Multi-LineString, GeometryCollection, and Circle.一个RDD可能包含多种类型的对象。
具体的数据读取和序列化,由后台执行,用户只要指明源数据什么格式就行。
4.2 SRDD built-in geometrical library
SRDD有配套的内置的几何计算库,都是以rdd或SQL类型来编程的,内部实现算法是分布式的,举两个例子:
- DatasetBoundary: 计算多边形的MBR,类似一个reduce函数,首先每个分区内部的多边形两两merge计算局部MBR,然后多个分区的结果在统一merge下。
- Reference system transform: 改变空间引用系统(编者注:猜测是空间坐标系?),类似一个map函数,对每个图形对象进行一次map转换。
4.3 SRDD partitioning
SRDD的partition的核心挑战在于把空间距离邻近的空间对象放到一个partition里,可以大幅度减少数据shuffle,也使得分区剪裁成为可能。但是从整个SRDD获得数据分布再partition代价太大,因此设计了如下算法。
-
Step1:采样(1%),然后在空间上划分网格,使得这些采样点均匀分布。
- Step2:map,给每个空间对象赋予一个grid id。有一个问题是几何对象可能横跨多个grid,多个grid之间也有可能overlap,因此一个对象可能有多个grid id,这可能导致重复,5.3节解决该问题。
- Step3:根据grid id重新分区。
4.4 SRDD indexing
某些SRDD可能会被频繁重复使用(一些空间数据挖掘算法决定的),因此构建一些索引(R-Tree/Quad-Tree)可以显著加速。
GeoSpark在某个SRDD的每个分区上都构建一个local index,而且这些索引都是聚集索引,直接存储空间对象的。
索引构建组织都是基于MBR进行的,因此查询索引也是由Filter-and-Refine模型构成的,首先找到和MBR相交的,然后再进一步check。
索引要反复使用就需要持久化,可以放内存cache,也可以放外存persist,无论哪一种都是对每个partition进行序列化(定制的序列化),然后进行存储。
4.5 SRDD customized serializer
SRDD将一个扁平空间对象按如下方式进行序列化:
- Byte1:空间对象类型;
- Byte2:子对象数量;
- Byte3:第一个子对象的类型;
- 接下来就是每个子对象有多少个坐标,每个坐标具体数值。
很多空间对象是物化在索引里的,对于整个索引的序列化,采用了经典的N叉树序列化算法,具体如下:
- 序列化:前序序列化树上的每个节点,首先是每个节点的Boundary,然后是节点内部的空间对象,也记录空间对象数量和子节点。
5 Spatial query processing layer
搞定了RDD的加载、分区、索引,接下来研究更为复杂的空间操作。
5.1 Processing spatial range and distance queries in spark
range和distance query语义很明确,就如下面两个SQL这样:
执行起来也比较简单,把query window广播,然后在每个分区做filter再汇总就可以了。
如果本地分区有索引,就利用window的MBR和索引节点上的MBR做谓词判断,Filter-and-Refine。
整个过程都是narrow dependency,没有data shuffle。
但是要注意这样的算法过程是不需要新型partition的,得不偿失。
5.2 Spatial K nearest neighbors (KNN) query
找给定query位置的KNN。算法的整体思路就是local KNN => Global KNN。在最后sort时需要一次data shuffle。同样KNN也不需要新型partition,不会有增益。
5.3 Processing spatial join queries in spark
join query可以关联两个表,返回满足条件的pair,语义如下。
5.3.1 GSJoin
以两个表A和B作为输入,包含以下步骤:
- grid merge:首先将各个grid(partition)内部的空间对象(来自A和B)merge起来,形成一个新的RDD。不同grid的不会产生交集;
- grid join:在每个grid内部执行谓词连接,没有索引的话就用nested loop join,有索引的话就用index loop join(R-Tree索引,仍然是Filter-and-Refine过程)。
-
de-duplicate:对于同一个几何对象跨越多个grid的情况(Quad-Tree, KDB-Tree),可以采用reference point方法。即对于两个几何对象,首先计算它们的交集,如果交集图形的右上点落在了当前grid,才emit join pair,否则不认为在此grid相交。对于多个grid互相overlap产生的重复(R-Tree),只能结果产生之后,再逐个排序去重。
5.3.2 broadcast join
如果有一个输入表是比较小的,就可以广播这个小表,在大表的每个partition进行join,再汇总结果。思路很简单,而且不需要任何特殊的partition,基于hash的即可,在小表情况下非常快速。
5.3.3 Distance join
对于基于距离的join,可以提前扩展grid Boundary,然后再执行Contain的join算法。
5.4 Spatial SQL query optimization
5.4.2 Heuristic rules for logical plans
-
谓词下推:join之类的重操作留到最后,range/distance query优先。
-
谓词融合:对于多谓词AND/OR,对于同一表的,先把query window做AND/OR,然后一次查询。
-
重写query:首先判断是否有交集,然后再计算交集,以减少运算量。
5.4.3 Cost-based strategies for physical plans
GeoSpark自己维护了一组统计信息,用于两种选择(index scan selection/ spatial join algorithm selection)。
这组统计信息包括一列的global MBR以及行计数,需要手动调用ANALYSE才会进行统计。
-
Index Scan Selection:对于Range Query,会考虑选择是否使用索引,如果选择度低(<1%),才会考虑使用索引(由于refine过程)。选择度如下定义。
- SPATIAL JOIN ALGORITHM SELECTION:根据表大小决定广播join还是GS join。
7 Experiments
4个对比竞争者:GeoSpark, Simba, Magellan, SpatialHadoop
Workload: Range query,KNN query,Range join query
实验环境:12核,3.0GHz,100GB内存,4TB HDD
7.1 Performance of range query
7.1.1 Impact of GEOSPARK local indexing
- TAXI数据是点类型,扫描计算负载低,因此无索引更好,索引树反而会因为MBR导致出现多边形,增加计算负载。
- 后两个是多边形和line strings类型,索引树剪枝才体现效果。
-
从左到右筛选度增加,索引方法执行时间相应延长。
7.1.2 Comparing different systems
- 索引树的剪枝和高效的序列化,使得GeoSpark获得了最佳的查询性能;
- GeoSpark的序列化器引入了一些额外的字节存储元信息,因此内存使用略高一部分,尤其是带索引树的。
-
Hadoop-based和Spark-based的内存使用和时间效率都不在一个量级上。
7.2 Performance of K nearest neighbors (KNN) query
Hadoop-based的方法在KNN中内存使用激增,主要由于排序导致的过度data shuffle,产生内存占用。
7.3 Performance of range join query
7.3.1 Impact of spatial partitioning
KDB>Quad>R
(more load-balanced)
7.3.2 Comparing different systems
- Simba的缺陷:
- R-Tree分区不均衡
- Spark默认kyro序列化工具产生了较多无关数据
- Join算法shuffle太多数据(13GB vs 70GB)
- Magellan的缺陷:
- Z-curve分区导致了许多重叠分区;邻近性的信息也少了很多;
- 不支持树形索引;
- 序列化器。
- GS join内存需求很低。