RL实战练习记录1_Q-learning,Sarsa

RL实战练习记录1_Q-learning,Sarsa

[TOC]

经过对《Reinforcement Learning:An Introduction》前五章的自学,学习了 RL 的理论基础以及简单算法思路.有必要通过实战强化学习的结果,并指导接下来的学习,有的放矢.
调研后决定通过网络课程莫烦PYTHON来进行实战.
一是开源(git repo),二是讲解质量还不错,三是整个教程自成体系,很不错推荐感兴趣的同学尝试一下.缺点是数学基础可能需要自己去补.
本文对其 RL 教材的前3章进行笔记总结,包括 RL 的简介, Q-Learning算法实战, Sarsa算法实战.

RL 的简介

RL 算法大体分类

分类标准

  1. model-free 与 model-base
  • model-free:不尝试去理解环境.举个例子,现在环境就是我们的世界, 我们的机器人正在这个世界里玩耍, 他不理解这个世界是怎样构成的, 也不理解世界对于他的行为会怎么样反馈. 极端地他决定丢颗原子弹去真实的世界, 结果把自己给炸死了.例如 Q-learning,sarsa, DQN, policy gradients.
  • model-based:机器人会通过过往的经验, 先理解真实世界是怎样的, 并建立一个模型来模拟现实世界的反馈, 最后他不仅可以在现实世界中玩耍, 也能在模拟的世界中玩耍 , 这样就没必要去炸真实世界, 连自己也炸死了, 他可以像玩游戏一样炸炸游戏里的世界, 也保住了自己的小命.
  1. 基于概率 和 基于价值
  • 基于概率:对于选取连续的动作, 基于价值的方法是无能为力的. 我们却能用一个概率分布在连续动作中选取特定动作. 例如 policy gradients.
  • 基于价值:对离散动作进行进一步地评价.例如 Q-learning,sarsa.
  • 基于概率+价值: Actor-Critic,actor 会基于概率做出动作, 而 critic 会对做出的动作给出动作的价值, 这样就在原有的 policy gradients 上加速了学习过程.
  1. 回合更新 和 单步更新
    即教材中 episodic 与 continous 的概念.
  • 回合更新:Monte-carlo learning 和基础版的 policy gradients 等.
  • 单步更新:Q-learning, Sarsa, 升级版的 policy gradients 等.
  1. 在线学习 和 离线学习 (on policy / off policy)
  • 在线学习: Sarsa, Sarsa lambda 等.
  • 离线学习: Q-learning,DQN 等.

课程要求

  • Numpy, Pandas (必学), 用于学习的数据处理.
  • Matplotlib (可学), 偶尔会用来呈现误差曲线什么的.
  • Tkinter (可学), 你可以自己用它来编写模拟环境.
  • Tensorflow (可学), 后面实现神经网络与强化学习结合的时候用到.
  • OpenAI gym (可学), 提供了很多现成的模拟环境.

Q-learning

伪代码如下:

$$ \begin{array}{l} L1 \text { Initialize } Q(s, a) \text { arbitrarily } \\ L2 \text { Repeat (for each episode): } \\ L3 \quad \text { Initialize } s \\ L4 \quad \text { Repeat (for each step of episode): } \\ L5 \quad \quad \text { Choose } a \text { from } s \text { using policy derived from } Q \text { (e.g., } \varepsilon \text {-greedy) } \\ L6 \quad \quad \text { Take action } a, \text { observe } r, s^{\prime} \\ L7 \quad \quad Q(s, a) \leftarrow Q(s, a)+\alpha\left[r+\gamma \max _{a^{\prime}} Q\left(s^{\prime}, a^{\prime}\right)-Q(s, a)\right] \\ L8 \quad \quad s \leftarrow s^{\prime} ; \\ L9 \quad \text { until } s \text { is terminal } \end{array} $$

算法思路

  • 使用二维表格q_table管理所有$Q(s,a)$. 因为 Q-learning 是比较简单的算法,因此没有采用回合更新(双重循环迭代),而是单步更新只预测下一个状态的反馈.这样学习的次数要变多一些,才能把 q_table 表更新到最优.
  • L5: 采用 $\varepsilon \text {-greedy} $ 随机采取一个动作.
  • L6: 对随机采取的动作造成的状态$s’$以及伴随产生的奖励$r$.同时在$s’$中自动获得所有能采取的$a’$及其 value $Q(s’,a’)$,选取最大的那个$\max_{a’}Q(s’,a’)$
  • L7: 由于我们需要预测后面的奖励来调整目前的策略$(s,a)$,所以在当前 value $Q(s,a)$的基础上加上预测的奖励,同时注意越往后的 episode 的反馈的权重应该越低,因此对$\max_{a’}Q(s’,a’)$乘以一个$\gamma<1$. 最后在差值的加上一个系数 $\alpha$ 为学习速率.

例子-一维世界寻宝

代码地址.

1
2
|-o---T
# T 就是宝藏的位置, o 是探索者的位置,最左侧为墙壁.

状态$\boldsymbol{S}$:$o$ 的位置坐标.
动作$\boldsymbol{A}$:左移/右移.
termination:左侧撞墙,右侧寻到宝藏.

  • 预设参数值
1
2
3
4
5
6
7
N_STATES = 6   # 1维世界的宽度
ACTIONS = ['left', 'right'] # 探索者的可用动作
EPSILON = 0.9 # 贪婪度 greedy
ALPHA = 0.1 # 学习率
GAMMA = 0.9 # 奖励递减值
MAX_EPISODES = 13 # 最大回合数
FRESH_TIME = 0.3 # 移动间隔时间
  • 建立q_table
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def build_q_table(n_states, actions):
table = pd.DataFrame(
np.zeros((n_states, len(actions))), # q_table 全 0 初始
columns=actions, # columns 对应的是行为名称
)
return table

# q_table:
"""
left right
0 0.0 0.0
1 0.0 0.0
2 0.0 0.0
3 0.0 0.0
4 0.0 0.0
5 0.0 0.0
"""
  • $\epsilon-greedy$ 动作选取.随机值大于预设的 $\epsilon$ 就探索非最优的动作.
1
2
3
4
5
6
7
8
# 在某个 state 地点, 选择行为
def choose_action(state, q_table):
state_actions = q_table.iloc[state, :] # 选出这个 state 的所有 action 值
if (np.random.uniform() > EPSILON) or (state_actions.all() == 0): # 非贪婪 or 或者这个 state 还没有探索过
action_name = np.random.choice(ACTIONS)
else:
action_name = state_actions.argmax() # 贪婪模式
return action_name
  • 环境反馈
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def get_env_feedback(S, A):
# This is how agent will interact with the environment
if A == 'right': # move right
if S == N_STATES - 2: # terminate
S_ = 'terminal'
R = 1
else:
S_ = S + 1
R = 0
else: # move left
R = 0
if S == 0:
S_ = S # reach the wall
else:
S_ = S - 1
return S_, R
  • 环境更新(可视化)
1
2
3
4
5
6
7
8
9
10
11
12
13
def update_env(S, episode, step_counter):
# This is how environment be updated
env_list = ['-']*(N_STATES-1) + ['T'] # '---------T' our environment
if S == 'terminal':
interaction = 'Episode %s: total_steps = %s' % (episode+1, step_counter)
print('\r{}'.format(interaction), end='')
time.sleep(2)
print('\r ', end='')
else:
env_list[S] = 'o'
interaction = ''.join(env_list)
print('\r{}'.format(interaction), end='')
time.sleep(FRESH_TIME)
  • 算法主体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def rl():
q_table = build_q_table(N_STATES, ACTIONS) # 初始 q table
for episode in range(MAX_EPISODES): # 回合
step_counter = 0
S = 0 # 回合初始位置
is_terminated = False # 是否回合结束
update_env(S, episode, step_counter) # 环境更新
while not is_terminated:

A = choose_action(S, q_table) # 选行为
S_, R = get_env_feedback(S, A) # 实施行为并得到环境的反馈
q_predict = q_table.loc[S, A] # 估算的(状态-行为)值
if S_ != 'terminal':
q_target = R + GAMMA * q_table.iloc[S_, :].max() # 实际的(状态-行为)值 (回合没结束)
else:
q_target = R # 实际的(状态-行为)值 (回合结束)
is_terminated = True # terminate this episode

q_table.loc[S, A] += ALPHA * (q_target - q_predict) # q_table 更新
S = S_ # 探索者移动到下一个 state

update_env(S, episode, step_counter+1) # 环境更新

step_counter += 1
return q_table

最终效果

result
参见上图,可以看到经过有限次的学习后,寻宝者会直奔宝藏,步数为最优的5步.

另一个例子 Maze

将状态与动作的维度升高,如下图中红色 agent 移动到黄色目标区域, 黑色是陷阱.
同时莫烦也将算法分成了算法库+执行主程序+可视化框架的模块. 同时使用 Python Tkinter GUI 框架实现了可视化的部分.
代码地址.

结果如下图,可以看到 Sarsa 明显非常谨慎一旦进入到陷阱很多次都只会在起点附近学习,不敢迈出去:

Sarsa

算法思路

伪代码如下:

$$ \begin{array}{l} L1 \text { Initialize } Q(s, a) \text { arbitrarily } \\ L2 \text { Repeat (for each episode): } \\ L3 \quad \text { Initialize } s \\ L4 \quad \text { Choose } a \text { from } s \text { using policy derived from } Q \text { (e.g., } \varepsilon \text {-greedy) } \\ L5 \quad \text { Repeat (for each step of episode): } \\ L6 \quad \quad \text { Take action } a, \text { observe } r, s^{\prime} \\ L7 \quad \quad \quad \text { Choose } a^{\prime} \text { from } s^{\prime} \text { using policy derived from } Q(\text { e.g., } \varepsilon \text {-greedy) } \\ L8 \quad \quad \quad Q(s, a) \leftarrow Q(s, a)+\alpha\left[r+\gamma Q\left(s^{\prime}, a^{\prime}\right)-Q(s, a)\right] \\ L9 \quad \quad \quad s \leftarrow s^{\prime} ; a \leftarrow a^{\prime} ; \\ L10 \quad \text { until } s \text { is terminal } \end{array} $$

与 Q-learning 算法相比可以看到差异在 L7-L8 行. 也就是说 Q-learning 是贪婪的(因为要求$max$),而 Sarsa 比较谨慎. 同时 Q-learning 每次并没有想好下一次的 action(只是计算更新了 value), 但是 Sarsa 每次在当前的 state 就已经决定好了下一次的 state 以及 action.因此 Sarsa 是 on-policy, Q-learning 是 off-policy.

代码执行 main 部分

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def update():
for episode in range(100):
# 初始化环境
observation = env.reset()

# Sarsa 根据 state 观测选择行为
action = RL.choose_action(str(observation))

while True:
# 刷新环境
env.render()

# 在环境中采取行为, 获得下一个 state_ (obervation_), reward, 和是否终止
observation_, reward, done = env.step(action)

# 根据下一个 state (obervation_) 选取下一个 action_
action_ = RL.choose_action(str(observation_))

# 从 (s, a, r, s, a) 中学习, 更新 Q_tabel 的参数 ==> Sarsa
RL.learn(str(observation), action, reward, str(observation_), action_)

# 将下一个当成下一步的 state (observation) and action
observation = observation_
action = action_

# 终止时跳出循环
if done:
break

# 大循环完毕
print('game over')
env.destroy()

if __name__ == "__main__":
env = Maze()
RL = SarsaTable(actions=list(range(env.n_actions)))

env.after(100, update)
env.mainloop()

算法库

接口类

1
2
3
4
5
6
7
8
9
10
11
12
class RL(object):
def __init__(self, action_space, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9):
... # 和 QLearningTable 中的代码一样

def check_state_exist(self, state):
... # 和 QLearningTable 中的代码一样

def choose_action(self, observation):
... # 和 QLearningTable 中的代码一样

def learn(self, *args):
pass # 每种的都有点不同, 所以用 pass

Q-learning 实现类

1
2
3
4
5
6
7
8
9
10
11
12
class QLearningTable(RL):   # 继承了父类 RL
def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9):
super(QLearningTable, self).__init__(actions, learning_rate, reward_decay, e_greedy) # 表示继承关系

def learn(self, s, a, r, s_): # learn 的方法在每种类型中有不一样, 需重新定义
self.check_state_exist(s_)
q_predict = self.q_table.loc[s, a]
if s_ != 'terminal':
q_target = r + self.gamma * self.q_table.loc[s_, :].max()
else:
q_target = r
self.q_table.loc[s, a] += self.lr * (q_target - q_predict)

Sarsa 实现类

1
2
3
4
5
6
7
8
9
10
11
12
13
class SarsaTable(RL):   # 继承 RL class

def __init__(self, actions, learning_rate=0.01, reward_decay=0.9, e_greedy=0.9):
super(SarsaTable, self).__init__(actions, learning_rate, reward_decay, e_greedy) # 表示继承关系

def learn(self, s, a, r, s_, a_):
self.check_state_exist(s_)
q_predict = self.q_table.loc[s, a]
if s_ != 'terminal':
q_target = r + self.gamma * self.q_table.loc[s_, a_] # q_target 基于选好的 a_ 而不是 Q(s_) 的最大值
else:
q_target = r # 如果 s_ 是终止符
self.q_table.loc[s, a] += self.lr * (q_target - q_predict) # 更新 q_table

Sarsa lambda

效果明显比 Sarsa 学习速度提升很多,回合更新要比单步更新学习快很多.

伪代码

$$ \begin{array}{l} L1 \text { Initialize } Q(s, a) \text { arbitrarily, for all } s \in \mathcal{S}, a \in \mathcal{A}(s) \\ L2 \text { Repeat (for each episode): } \\ L3 \quad E(s, a)=0, \text { for all } s \in \mathcal{S}, a \in \mathcal{A}(s) \\ L4 \quad \text { Initialize } S, A \\ L5 \quad \text { Repeat (for each step of episode): } \\ L6 \qquad \text { Take action } A, \text { observe } R, S^{\prime} \\ L7 \qquad \text { Choose } A^{\prime} \text { from } S^{\prime} \text { using policy derived from } Q \text { (e.g., } \varepsilon \text {-greedy) } \\ L8 \qquad \delta \leftarrow R+\gamma Q\left(S^{\prime}, A^{\prime}\right)-Q(S, A) \\ L9 \quad \quad \text { E(S, A)} \leftarrow E(S, A)+1 \\ L10 \quad \quad \text {For all } s \in \mathcal{S}, a \in \mathcal{A}(s): \\ L11 \quad \quad \quad \text {Q(s, a)} \leftarrow Q(s, a)+\alpha \delta E(s, a) \\ L12 \quad \quad \quad \text {E(s, a)} \leftarrow \gamma \lambda E(s, a) \\ L13 \quad \quad \text {S} \leftarrow S^{\prime} ; A \leftarrow A^{\prime} \\ L14 \quad \text {until } S \text { is terminal } \end{array} $$

可以看到与 Sarsa 相比,变动在 L9-L12, 多了一个 $E(s,a)$ 的系数.
这个系数决定了 Sarsa lambda 是单步更新还是回合更新.

Lambda的含义

首先考虑 Sarsa(n) 的概念, $n=1$为单步更新即为上面的普通 Sarsa 的算法. $n$ 越大意味着每次更新一个 $(s,a)$ 的往后预测的更深.

然后可以考虑到当存在一个明确的终止 episode (target)下的一个 dilemma :选择每次更新 Q 值(学习)的时候一直往后探索直到终止 episode, 然后把每一个 episode 的 reward 反馈到起始 episode 的 Q 值中(backup operation), 但是考虑到越往后的未来越不确定, 因此越往后面的 episode 的反馈贡献越低, 这就是 $\gamma$ 的意义. 但是如果我们能找到终止 episode , 那么其实越靠近 target 的后端的 episode 的对 target 的贡献度大的可能性更高. 再考虑到前期如果一直在打转其实是在做无用功. 因此对前期的 episode 的贡献加上权重即为 $\lambda$. 具体的实现为从前期往后推的过程中(一次 episode 到 终止 target 的迭代中),每经历过一次 $(s,a)$就对其 $E$ 值乘以 $\lambda$(初值为1),这样从初始 episode 到 target 的系数依次为 $\lambda ^n,\lambda ^{n-1}, … \lambda ^1, \lambda ^0$.
因此当 $\lambda = 0$, Sarsa lambda 退化为普通 Sarsa.
而对于 $\gamma$ 的序列则为: $\gamma ^0,\gamma ^1,…,\gamma ^{n} $.这里出现了 dilemma, 一个是 $\gamma$减小后面的 episode 的反馈,一个是 $\lambda$增大后面 episode 的反馈. 解决的办法是共乘, 也就是 L12 的 $\lambda \gamma E(s,a)$, 得到 eligibility trace 值.

eligibility trace 更新的方法

Sarsa_lambda_eligibility_trace.png
上图横轴为时刻,纵轴为 eligibility trace 值.对一个(s,a)而言, 第二个是每次遇到它就对其加1,然后乘以 $\lambda ^i$,第二个是基值永远是1,然后乘以 $\lambda ^i$.

RL实战练习记录1_Q-learning,Sarsa

https://www.chuxin911.com/RL_practice_1_20211230/

作者

cx

发布于

2021-12-30

更新于

2022-07-16

许可协议