内容:
利用大数据工具Spark,综合采用课程中介绍的方法进行车辆轨迹的初步分析,包括车辆停留点分析、加减速监测等
任务:
- 车辆速率计算
从原始轨迹数据中获取每个轨迹点的经纬度坐标,通过该轨迹点与前一个点的距离和两个轨迹点的时间差,计算该点的即时速率。根据实际意义,规定起点和终点的即时速率为零。 - 车辆停留点分析
停留点识别的算法描述如下:
- 遍历轨迹点,检查速率值,若小于阈值,则将上一个点作为停留开始点,记录其序号。
- 继续遍历轨迹点,只要轨迹点的速率值仍小于阈值,则将该点作为停留点。
- 车辆加减速分析
加速和减速都是持续性的过程,而突发的速率变化则可能是数据采集或预处理阶段的误差引起的正常波动,因此加/减速检测方法可以采用寻找至少连续N(N ≥ 2)个点速率单调变化的片段。
代码:
头文件
my_window 用来访问Dataframe的前一条或后一条数据
import findspark
findspark.init()
from math import pi
from pyspark.sql import functions as F
from pyspark.sql.functions import lag, when, monotonically_increasing_id
from pyspark import SparkContext, SparkConf, Row
from pyspark.sql import SparkSession, Window
from pyspark.sql.types import *
from haversine import haversine
import pandas as pd
import numpy as np
spark = SparkSession.builder.config(conf = SparkConf()).getOrCreate()
my_window = Window.partitionBy().orderBy("Day_from1899")
判断是否加速
def ifUp(v):
if v>0:
return 1
elif v<0:
return -1
else:
return 0
计算速度
def speedCalc(lng1, lat1, lng2, lat2, timeback, timenow):
#角度转弧度(WGS84坐标系下)
lng1 = lng1 * pi / 180
lat1 = lat1 * pi / 180
lng2 = lng2 * pi / 180
lat2 = lat2 * pi / 180
dlon = lng2 - lng1
dlat = lat2 - lat1
a = F.sin(dlat / 2) ** 2 + F.cos(lat1) * F.cos(lat2) * F.sin(dlon / 2) ** 2
distance = 2 * F.asin(F.sqrt(a)) * 6371 * 1000 # 地球平均半径,6371km
distance = F.round(distance, 5)# 五位小数,单位米
speed = distance / ((timenow - timeback) * 24*3600)#距离除以时间差(s)得到速度
# if speed==None:
# speed=0
return speed
计算加速度
def accCalc(v1,v2,t1,t2):
dv=v2-v1
dt=(t2-t1)* 24*3600
acc=dv/dt
return acc
读入文件并生成速度、加速度等列用以分析
def readPlt(path):
df=spark.read.csv(path,header=True, inferSchema=True,sep=',')
#df.show()
df=df.withColumn("speed", \
speedCalc(lag("Longitude", 1).over(my_window), lag("Latitude", 1).over(my_window), \
F.col("Longitude"), F.col("Latitude"), \
lag("Day_from1899", 1).over(my_window), F.col("Day_from1899")))
df = df.fillna(0)
df=df.withColumn("acceleration", \
accCalc(lag("speed",1).over(my_window),F.col("speed"), \
lag("Day_from1899", 1).over(my_window), F.col("Day_from1899")))
df = df.fillna(0)
df = df.withColumn("id", monotonically_increasing_id())
df=df.withColumn("Speed_up",when(F.col('acceleration') > 0, 1).\
when(F.col('acceleration')==0,0).otherwise(-1))
df=df.drop('null')
df.show()
return df
停留点判断
def getStopPoint(df):
#输入一个dataframe,输出一个含有停留点信息的dataframe
df_s=df.withColumn('Stop', df.speed < 0.5)
#这里阈值取0.5m/s,之后df就多了一列为布尔值的数据Stop,用于判断停留点
return df_s
停留点分析
def gatherStopPoint(res):
sdf = res.filter(res['Stop'] == 1)
# 把停止点表转为list
stop_points = sdf.collect()
# 把静止点的序号提取成一个数组
sp_index = []
for single_s_point in stop_points:
sp_index.append(single_s_point.id)
sp_array = np.array(sp_index)
sp_group = np.split(sp_array, np.where(np.diff(sp_array) != 1)[0] + 1)
return sp_group
速度区间划分与分析
# 单调区间分析函数
def analyseSpeed(points, num, up):
"""
分析单调区间,并保存成文件
:param points:输入spark DF的点列表
:param num:输入最少数目
:param up:布尔值,true加速false减速
:return:void
"""
s=0
if up==True:
s=1
else :
s=-1
# 把点表转为list
allPoints= points.collect()
pList=points.filter(df["Speed_up"]==s).collect()
#print(allPoints)
# 把点的序号提取成一个数组
pIndex = []
# 搞一个区间
periods = []
for aPoint in pList:
pIndex.append(aPoint.id)
pArray = np.array(pIndex)
pGroup = np.split(pArray, np.where(np.diff(pArray) != 1)[0] + 1)
# 创建一个空列表
newGroup = []
# 遍历 arr 中的每个元素
for element in pGroup:
# 如果元素长度大于num
if len(element) > num:
newGroup.append(element)
if len(newGroup) > 0:
# 首先要保证有值
# 分析区间
for index_in_real, single_period in enumerate(newGroup):
tmp_acc_period = []
print("第{0}个{4}区间包含{1}个数据点,这个区间开始的时间为{2}的{3}"\
.format(index_in_real + 1, len(single_period),\
allPoints[single_period[0] ].Date,\
allPoints[single_period[0] ].Time,\
"加速" if up else "减速"))
# 在点中循环
for single_single_period, id_in_single_points in enumerate(single_period):
tmp_acc_period.append(allPoints[single_period[single_single_period] - 1])
print("--->第{0}个{4}区间的第{1}个数据点的速度为{2},加速度为{3}".\
format(index_in_real + 1, single_single_period + 1,\
allPoints[single_period[single_single_period] ].speed,\
allPoints[single_period[ single_single_period] ].acceleration,\
"加速" if up else "减速"))
periods.append(tmp_acc_period)
print(
"第{0}个{3}区间结束的时间为{1}的{2}".\
format(index_in_real + 1,\
allPoints[single_period[len(single_period) - 1] ].Date,\
allPoints[single_period[len(single_period) - 1] ].Time,\
"加速" if up else "减速"))
print("===分隔线===")
for index_in_save, single_period in enumerate(periods):
tmpdf = pd.DataFrame(
columns=['Latitude', 'Longitude', 'Altitude', 'Day_from1899', 'Date', 'Time', 'speed', 'acceleration',
'id', 'Speed_up', 'Stop'], data=single_period)
tmpdf.to_csv(("加速" if up else "减速") + '区间' + str(index_in_save + 1) + '.csv',
index=False)