- 要求:
- 采用面向对象编程
- 寻求从结点一到结点十二的最短路径
- 可视化处理。
1. 信息提取——获取地图
从给出的两个csv文件中获取地图信息。
edges.csv说明了相连的结点及其真实耗散值;nodes.csv给出了每个结点的坐标及其到目标节点的启发函数值。
edges.jpg
nodes.jpg
定义的数据结构
- heuristic_cost_to_go = [ ]——每个结点的启发函数值
- neighbour_table = {}——每个结点的相邻节点
- cost = numpy.zeros(nodes_number,nodes_number)——相邻节点的耗散值(12*12矩阵)
以上数据结构在构造函数中定义
2. 搜索路径
采用深度有限搜索。因为结点数量有限,故搜索深度是有限的。小于等于n。将初始节点放入OPEN表,弹出当前结点,放入CLOSED表。
数据结构
- OPEN = [ ],CLOSED = [ ],两个表用于记录结点搜索状态
- est_total_cost = [ ],评价函数值。初始化为est_total_cost = [inf, inf, inf, ...]
- past_cost = [ ],结点的总耗散值。初始化为past_cost = [0, inf, inf, ...]
- path = [ ] ,最终路径。
- parent = { },字典记录父结点。注意,每次扩展已经在OPEN表中结点时,需要判断二者评价值并选择保留路径。
3. 回溯
A* 图搜索,评价函数值满足一致性。则第一个搜索出来的路径就是最佳路径。故一旦OPEN表弹出目标结点,则立即回溯路径并返回。
算法实现
使用面向对象式编程
class AStarPathPlanner( Object):
def __init__(self, nodes_path, edges_path):
self.heuristic_cost_to_go = [ ] # 启发函数值
with open( nodes_path, "rt") as f_obj:
contents = csv.reader( f_obj)
for row in contents:
if( row[0][0] != "#"):
self.heuristic_cost_to_go.append(row[1])
self.nodes_num = len(self.heuristic_cost_to_go)
self.neighbour_table = { }
self.cost = np.zeros((self.nodes_num,self.nodes_num))
with open(edges_path, "rt") as f_obj:
contents = csv.reader( f_obj)
for row in contents:
if(row[0][0] != "#"):
node1 = int(row[0])
node2 = int(row[1])
if node1 not in self.neighbour_table:
self.neighbour_table[node1] = [ ] # 如果node1不在neighbour_table的键中,则创立键
self.neighbour[node1].append(node2)
if node1 not in self.neighbour_table:
self.neighbour_table[node1] = [ ] # 同node1
self.neighbour[node2].append(node1)
cost = float (row[2])
self.cost [node1-1][node2-2] = cost
self.cost [node2-1][node1-1] = cost
def cmd(x):
return self.est_total_cost[x-1]
def search_for_path(self):
self.OPEN = [1]
self.CLOSED = [ ]
self.est_total_cost = [float("inf")] * (self.nodes_num) # 因为优先选择评价函数值小的结点搜索,所以将所有结点的评价值初始化为inf
self.past_cost = [float("inf")] * (self.nodes_num)
self.past_cost[0] = 0 # 因为评价函数值是真实耗散值和启发函数值的加和,所以也要初始化为inf
self.parent = { }
while self.OPEN:
current = OPEN.pop(0) # 深度优先
self.CLOSED.append(current)
if current == self.nodes_num:
# 如果当前结点为目标结点,则返回路径
while True:
self.path.append(current)
if current == 1:
break
current = self.parent[current]
return True, self.path
for nbr in self.neighbour_table[current]:
if nbr not in self.CLOSED:
#完成扩展结点current,并根据评价函数值重排OPEN表
if self.est_total_cost[nbr -1] == float("inf"):
# 判断nbr是否已经具有评价函数值,等价于 if nbr not in self.OPEN:
self.past_cost [ nbr - 1 ] = self.past_cost [current - 1] + self.cost [ current-1][ nbr -1]
self.est_total_cost[ nbr -1 ] = self.past_cost[nbr-1] + self.heuristic_cost_to_go [ nbr - 1 ]
self.parent[nbr] = current
self.OPEN.append(nbr)
else: # 如果已经在OPEN表中
temp_p = self.past_cost [current - 1] + self.cost [ current-1][ nbr -1]
temp_e = self.past_cost[nbr-1] + self.heuristic_cost_to_go [ nbr - 1 ]
if temp_e < self.est_total_cost [ nbr -1]:
self.past_cost[nbr-1] = temp_p
self.est_total_cost[nbr -1]= temp_e
self.parent[nbr] = current
self.OPEN.sort(key = cmd)
return False, [ ]
if name == '__main__':
planner = AStarPathPlanner(nodes_path, edges_path)
success, path = planner.search_for_path()
if success:
print(path[::-1])
else:
print("None")
重排OPEN表
使用sort函数,cmd规则
def cmd(x):
return self.est_total_cost [x-1]
self.OPEN.sort(key = cmd)