该实践借鉴莫烦的Maze,拓展到了100个状态,一些代码也修改了,探索-利用比例实现了动态改变,最后基本是一个保守的Agent了,开始的时候我把一般移动的R设为-1,但效果很不理想,让我一度怀疑要不要再加一个V-table来评估状态,后来证明是我自己学艺不精,因为Q-table值的更新已经涉及到下一状态的MaxQ了,只要把一般移动的R设为0就可以了,因为如果设为-1的话走的次数越多值越低。
import numpy as np
import pandas as pd
import time
import sys
import os
from PIL import ImageTk,Image
if sys.version_info.major == 2:
import Tkinter as tk
else:
import tkinter as tk
UNIT = 60 # pixels
MAZE_H = 10 # grid height
MAZE_W = 10 # grid width
class Maze(tk.Tk, object):
def __init__(self):
super(Maze, self).__init__()
self.action_space = ['u', 'd', 'l', 'r']
self.n_actions = len(self.action_space)
self.title('小徐寻宝')
self.geometry('{0}x{1}'.format(MAZE_H * UNIT, MAZE_H * UNIT))
self.flags=0
self._build_maze()
def _build_maze(self):
self.canvas = tk.Canvas(self, bg='white',
height=MAZE_H * UNIT,
width=MAZE_W * UNIT)
self.explorerpicture = ImageTk.PhotoImage(Image.open(os.getcwd() + "/touxiang.jpeg").resize((50,50)))
self.paradisepicture = ImageTk.PhotoImage(Image.open(os.getcwd() + "/baoxiang2.jpeg").resize((50,50)))
self.hellpicture = ImageTk.PhotoImage(Image.open(os.getcwd() + "/laoshujia.jpeg").resize((50,50)))
# create grids
for c in range(0, MAZE_W * UNIT, UNIT):
x0, y0, x1, y1 = c, 0, c, MAZE_H * UNIT
self.canvas.create_line(x0, y0, x1, y1)
for r in range(0, MAZE_H * UNIT, UNIT):
x0, y0, x1, y1 = 0, r, MAZE_W * UNIT, r
self.canvas.create_line(x0, y0, x1, y1)
# create origin
origin = np.array([30, 30])
# hell
hell1_center = origin + np.array([UNIT * 2, UNIT])
self.hell1=self.canvas.create_image(hell1_center[0],hell1_center[1],image=self.hellpicture)
hell2_center = origin + np.array([UNIT, UNIT * 2])
self.hell2 = self.canvas.create_image(hell2_center[0], hell2_center[1], image=self.hellpicture)
hell3_center = origin + np.array([UNIT, UNIT * 3])
self.hell3 = self.canvas.create_image(hell3_center[0], hell3_center[1], image=self.hellpicture)
hell4_center = origin + np.array([UNIT*5, UNIT * 5])
self.hell4 = self.canvas.create_image(hell4_center[0], hell4_center[1], image=self.hellpicture)
hell5_center = origin + np.array([UNIT * 5, UNIT * 6])
self.hell5 = self.canvas.create_image(hell5_center[0], hell5_center[1], image=self.hellpicture)
hell6_center = origin + np.array([UNIT * 5, UNIT * 7])
self.hell6 = self.canvas.create_image(hell6_center[0], hell6_center[1], image=self.hellpicture)
hell7_center = origin + np.array([UNIT * 5, UNIT * 2])
self.hell7 = self.canvas.create_image(hell7_center[0], hell7_center[1], image=self.hellpicture)
hell8_center = origin + np.array([UNIT * 6, UNIT * 2])
self.hell8 = self.canvas.create_image(hell8_center[0], hell8_center[1], image=self.hellpicture)
hell9_center = origin + np.array([UNIT * 7, UNIT * 2])
self.hell9 = self.canvas.create_image(hell9_center[0], hell9_center[1], image=self.hellpicture)
hell10_center = origin + np.array([UNIT * 7, UNIT * 3])
self.hell10= self.canvas.create_image(hell10_center[0], hell10_center[1], image=self.hellpicture)
# create 宝箱
oval_center = origin + np.array([UNIT * 6,UNIT * 7])
self.oval=self.canvas.create_image(oval_center[0],oval_center[1],image=self.paradisepicture
# create 头像
self.rect=self.canvas.create_image(origin[0],origin[1],image=self.explorerpicture)
# pack all
self.canvas.pack()
def reset(self):
self.update()
self.flags+=1
print(self.flags)
if self.flags<4 or self.flags>147:
razytime=0.08
else:
razytime=0.0005
time.sleep(razytime)
self.canvas.delete(self.rect)
origin = np.array([30, 30])
self.rect = self.canvas.create_image(origin[0], origin[1], image=self.explorerpicture)
# return observation
return self.canvas.coords(self.rect)
def step(self, action):
s = self.canvas.coords(self.rect)
base_action = np.array([0, 0])
if action == 0: # up
if s[1] > UNIT:
base_action[1] -= UNIT
elif action == 1: # down
if s[1] < (MAZE_H - 1) * UNIT:
base_action[1] += UNIT
elif action == 2: # right
if s[0] < (MAZE_W - 1) * UNIT:
base_action[0] += UNIT
elif action == 3: # left
if s[0] > UNIT:
base_action[0] -= UNIT
self.canvas.move(self.rect, base_action[0], base_action[1]) # move agent
s_ = self.canvas.coords(self.rect) # next state
# reward function
if s_ == self.canvas.coords(self.oval):
reward = 200
done = True
s_ = 'terminal'
elif s_ in [self.canvas.coords(self.hell1), self.canvas.coords(self.hell2),
self.canvas.coords(self.hell3), self.canvas.coords(self.hell4),
self.canvas.coords(self.hell5), self.canvas.coords(self.hell6),
self.canvas.coords(self.hell7), self.canvas.coords(self.hell8),
self.canvas.coords(self.hell9), self.canvas.coords(self.hell10)]:
reward = -1000
done = True
s_ = 'terminal'
else:
reward = 0
done = False
return s_, reward, done
def render(self):
if self.flags < 4 or self.flags > 147:
razytime = 0.08
else:
razytime = 0.0005
time.sleep(razytime)
self.update()
class qlearningtable:
# 初始化
def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.5,e_greedy_min=0.01,e_greedy_decay=0.96):
self.actions = actions # a list
self.lr = learning_rate # 学习率
self.gamma = reward_decay # 奖励衰减
self.epsilon = e_greedy # 贪婪度
self.epsilon_min=e_greedy_min
self.epsilon_decay=e_greedy_decay
self.q_table = pd.DataFrame(columns=self.actions, dtype=np.float64) # 初始 q_table
# 选行为
def choose_action(self, observation):
self.check_state_exist(observation) # 检测本 state 是否在 q_table 中存在
if np.random.rand() >= self.epsilon: # 选择 Q value 最高的 action
state_action = self.q_table.loc[observation, :]
# 同一个 state, 可能会有多个相同的 Q action value, 所以我们乱序一下
action = np.random.choice(state_action[state_action == np.max(state_action)].index)
else: # 随机选择 action
action = np.random.choice(self.actions)
if self.epsilon>self.epsilon_min:
self.epsilon*=self.epsilon_decay
return action
# 学习更新参数
def learn(self, s, a, r, s_):
self.check_state_exist(s_) # 检测 q_table 中是否存在 s_
q_predict = self.q_table.loc[s, a]
if s_ != 'terminal':
q_target = r + self.gamma * self.q_table.loc[s_, :].max() # 下个 state 不是 终止符
else:
q_target = r # 下个 state 是终止符
self.q_table.loc[s, a] += self.lr * (q_target - q_predict) # 更新对应的 state-action 值
# 检测 state 是否存在
def check_state_exist(self, state):
if state not in self.q_table.index:
# append new state to q table
self.q_table = self.q_table.append(
pd.Series(
[0] * len(self.actions),
index=self.q_table.columns,
name=state,
)
)
def update():
# 学习 150 回合
for episode in range(150):
# 初始化 state 的观测值
observation = env.reset()
while True:
# 更新可视化环境
env.render()
# RL 大脑根据 state 的观测值挑选 action
action = RL.choose_action(str(observation))
# 探索者在环境中实施这个 action, 并得到环境返回的下一个 state 观测值, reward 和 done (是否是掉下地狱或者升上天堂)
observation_, reward, done = env.step(action)
print(observation_, reward, done)
# RL 从这个序列 (state, action, reward, state_) 中学习
RL.learn(str(observation), action, reward, str(observation_))
# 将下一个 state 的值传到下一次循环
observation = observation_
# 如果进入陷阱或者找到宝藏, 这回合就结束了
if done:
break
# 结束游戏并关闭窗口
print('game over')
if __name__ == '__main__':
# 定义环境 env 和 RL 方式
env = Maze()
RL = qlearningtable(actions=list(range(env.n_actions)))
# 开始可视化环境 env
env.after(50, update)
env.mainloop()
print(RL.q_table)