- 测试数据
10.5,32.11,30.50,60.21,33.50,60.21,10.5,32.11,china1
9.51,30.11,32.50,62.21,34.50,62.21,9.51,30.11,china2
11.5,32.11,31.50,64.21,33.50,64.21,11.5,32.11,china3
10.5,31.16,32.51,63.21,35.51,63.21,10.5,31.16,china4
11.5,32.11,30.50,59.21,33.50,59.21,11.5,32.11,china5
- 测试代码
package txt_demo
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.sql.SparkSession
import org.datasyslab.geospark.enums.FileDataSplitter
import org.datasyslab.geospark.serde.GeoSparkKryoRegistrator
import org.datasyslab.geospark.spatialRDD.PolygonRDD
import org.datasyslab.geosparksql.utils.{Adapter, GeoSparkSQLRegistrator}
import org.datasyslab.geosparkviz.core.Serde.GeoSparkVizKryoRegistrator
object area2_demo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().
setAppName("SpatialRangeQueryApp").setMaster("local[*]").
set("spark.serializer", classOf[KryoSerializer].getName).
set("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
implicit val sc = new SparkContext(conf)
var sparkSession = SparkSession.builder()
.master("local[*]") // Delete this if run in cluster mode
.appName("readTestScala") // Change this to a proper name
// Enable GeoSpark custom Kryo serializer
.config("spark.serializer", classOf[KryoSerializer].getName)
.config("spark.kryo.registrator", classOf[GeoSparkKryoRegistrator].getName)
.config("spark.serializer", classOf[KryoSerializer].getName)
.config("spark.kryo.registrator", classOf[GeoSparkVizKryoRegistrator].getName)
.getOrCreate()
GeoSparkSQLRegistrator.registerAll(sparkSession)
val polygonRDD = createPolygonRDD
val polygonDf = Adapter.toDf(polygonRDD,sparkSession)
polygonDf.printSchema()
polygonDf.createOrReplaceTempView("p_view")
val df = sparkSession.sql(
"""
|select p_view._c1 ,ST_Area(ST_GeomFromText(p_view.geometry)) from p_view
""".stripMargin)
df.show(truncate = false)
}
def createPolygonRDD(implicit sc:SparkContext):PolygonRDD={
val polygonRDDInputLocation = "D:\\idea\\demo_spark\\es_demo\\src\\data\\area.csv"
val polygonRDDStartOffset = 0
val polygonRDDEndOffset = 7
val polygonRDDSplitter = FileDataSplitter.CSV // or use FileDataSplitter.TSV
val carryOtherAttributes = true
val objectRDD = new PolygonRDD(sc, polygonRDDInputLocation, polygonRDDStartOffset, polygonRDDEndOffset, polygonRDDSplitter, carryOtherAttributes)
objectRDD
}
}
- 测试结果
+------+----------------------------------+
|_c1 |st_area(st_geomfromtext(geometry))|
+------+----------------------------------+
|china1|42.150000000000034 |
|china2|32.10000000000002 |
|china3|32.099999999999966 |
|china4|48.07499999999999 |
|china5|40.650000000000034 |
+------+----------------------------------+