Sutton课后习题

C5_blackjack

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
import numpy as np
np.random.seed(0)
import matplotlib.pyplot as plt
import gym
from tqdm import tqdm
import seaborn as sns

## player , dealer, usable_A, action
policy = np.zeros((22, 11, 2, 2))
policy[20:, :, :, 0] = 1 # >= 20 stop
policy[:20, :, :, 1] = 1 # < 20 continue

env = gym.make("Blackjack-v0")
env.seed(0)
print('observation space = {}'.format(env.observation_space))
print('action space = {}'.format(env.action_space))
print('action number = {}'.format(env.action_space.n))

def plot(data):
fig, ax = plt.subplots(1, 2, figsize=(9,4))
titles = ['without A', 'with A']
have_A = [0, 1]
print(data.shape)
v = data[12:, 1:, :]
plt.subplots_adjust(wspace=0.1, hspace = 0.2)
ax = ax.flatten()
v_without_A = v[:, :, 0]
v_with_A = v[:, :, 1]
v = [v_without_A, v_with_A]

for image, title, axis in zip(v, titles, ax):
fig = sns.heatmap(np.flipud(image), cmap="YlGnBu", ax = axis, xticklabels = range(1, 11), yticklabels=list(reversed(range(12,22))))
fig.set_ylabel('player sum', fontsize= 20)
fig.set_title(title, fontsize= 20)
plt.show()
plt.close()

# def plot(data):
# fig, ax = plt.subplots(1, 2, figsize=(9,4))
# titles = ['without A', 'with A']
# have_A = [0, 1]
# extent = [12, 22, 1, 11]
# for title, have_A, axis in zip(titles, have_A, ax):
# dat = data[extent[0]:extent[1], extent[2]:extent[3], have_A].T
# axis.imshow(dat, extent= extent, origin='lower')
# axis.set_xlabel('player sum')
# axis.set_ylabel('dealer show')
# axis.set_title(title)


def ob_state(observation):
return observation[0], observation[1], int(observation[2])

def evaluate_action_monte_carlo(env, policy, episodes = 500000):
v = np.zeros_like(policy)
count = np.zeros_like(policy)
for _ in tqdm(range(episodes)):
s_a_trajectory = []
observation = env.reset()

while observation[0] < 12:
observation, _, _, _ = env.step(1) # hit

while True:
state = ob_state(observation)
action = np.random.choice(env.action_space.n, p = policy[state])
s_a_trajectory.append((state, action))
observation, reward, done, _ = env.step(action)
if done:
break
for state, action in s_a_trajectory:
count[state] += 1
v[state]+= (reward - v[state]) / count[state]
return v

v = evaluate_action_monte_carlo(env, policy)
#v = (q * policy).sum(axis = -1)
print(v.shape)
v = v[:,:,:,0].copy() #可以计算直接v,也可以先计算q,再对action求期望
print(v.shape)
plot(v)

def monte_carlo_ES(env, episodes = 500000):
policy = np.zeros((22, 11, 2, 2))
policy[:,:,:,1] = 1. #初始化为梭哈
q = np.zeros_like(policy)
count = np.zeros_like(policy)
for _ in tqdm(range(episodes)):
# 随机初始化状态和动作
state = (np.random.randint(12, 22),
np.random.randint(1, 11),
np.random.randint(2))
action = np.random.randint(2)
# 开搞
env.reset()
# 将随机生成的数据输入到环境中
if state[2]:
env.player = [1, state[0] - 11]
else:
if state[0] == 21:
env.player = [10, 5, 6] #随便找一个合法的
else:
env.player = [10, state[0] - 10] #因为范围是12-21,所以可以这样设置
env.dealer[0] = state[1]
s_a_trajectory = []
while True:
s_a_trajectory.append((state, action))
observation, reward, done, _ = env.step(action)
if done:
break
state = ob_state(observation)
action = np.random.choice(env.action_space.n, p = policy[state]) # 按策略选择

for state, action in s_a_trajectory:
count[state][action] += 1
q[state][action] += (reward - q[state][action]) / count[state][action]
a_max = q[state].argmax()
policy[state] = 0
policy[state][a_max] = 1
return policy, q

policy, q = monte_carlo_ES(env)
v = q.max(axis = -1)
plot(policy.argmin(-1))
plot(v)

def monte_carlo_importance_resample(env, spisodes = 500000):
policy = np.zeros((22, 11, 2, 2))
policy[:, :, :, 0] = 1 #初始化目标策略
behavior_policy = np.ones_like(policy) * 0.5 # 柔性策略
q = np.zeros_like(policy)
count = np.zeros_like(policy)
for _ in tqdm(range(spisodes)):
trajectory = []
observation = env.reset()
while True:
state = ob_state(observation)
action = np.random.choice(env.action_space.n, p = behavior_policy[state])
trajectory.append((state, action))
observation, reward, done, _ = env.step(action)
if done:
break

rho = 1. # 重要采样比
for state, action in reversed(trajectory): #为了方便计算rho,逆序计算 page 109,mc control
count[state][action] += rho
q[state][action] += (rho * (reward - q[state][action]) / count[state][action])
# improvement
a_max = q[state].argmax()
policy[state] = 0
policy[state][a_max] = 1.
# 获得的新动作不满足目标策略(贪心)时,放弃该幂数据 (rho会等于0),因此非贪心动作多时会学习得比较慢
if a_max != action:
break
rho /= behavior_policy[state][action]
return policy, q

policy, q = monte_carlo_importance_resample(env)
v = q.max(axis = -1)
plot(policy.argmin(-1))
plot(v)

C6_TD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import gym
import numpy as np
np.random.seed(0)
import pandas as pd
import matplotlib.pyplot as plt

env = gym.make('Taxi-v3')
env.seed(0)

# 使用方法
# state = env.reset()
# taxirow, taxicol, passloc, destidx = env.unwrapped.decode(state)
# print(taxirow, taxicol, passloc, destidx)
# print('的士位置 = {}'.format((taxirow, taxicol)))
# print('乘客位置 = {}'.format(env.unwrapped.locs[passloc]))
# print('目标位置 = {}'.format(env.unwrapped.locs[destidx]))
# env.render()

class SarsaAgeng:
def __init__(self, env, gamma = 0.9, learning_rate=0.2, epsilon=0.01):
self.gamma = gamma
self.learning_rate = learning_rate
self.epsilon = epsilon # soft parameter
self.action_n = env.action_space.n
self.q = np.zeros((env.observation_space.n, env.action_space.n)) # q(s, a) 500 * 6

def choice_action(self, state):
if np.random.uniform() > self.epsilon:
action = self.q[state].argmax()
else:
action = np.random.randint(self.action_n)
return action

# def choice_action(self, state):
# if np.random.binomial(1, self.epsilon) == 0:
# values_ = self.q[state]
# action = np.random.choice([action_ for action_, value_ in enumerate(values_) if value_ == np.max(values_)])
# else:
# action = np.random.randint(self.action_n)
# return action

def learn(self, state, action, reward, next_state, done, next_action):
u = reward + self.gamma * self.q[next_state, next_action] * (1. - done)
td_erro = u - self.q[state, action]
self.q[state, action] += self.learning_rate * td_erro


def sarsa(env, agent, train=False, render = False):
G = 0
observation = env.reset()
action = agent.choice_action(observation)
while True:
if render:
env.render()
next_observation, reward, done, _ = env.step(action)
G += reward
next_action = agent.choice_action(next_observation)
if train:
agent.learn(observation, action, reward, next_observation, done, next_action)
if done:
break
observation, action = next_observation, next_action
return G

agent = SarsaAgeng(env) # 实例化一个sarsaAgentg对象
episodes = 3000
G = []
for episode in range(episodes):
reward = sarsa(env, agent, train=True)
G.append(reward)
plt.plot(G)

class ExpectedSarasAgent:
def __init__(self, env, gamma=0.9, learning_rate=0.1, epsilon=0.01):
self.gamma = gamma
self.learning_rate = learning_rate
self.epsilon = epsilon
self.q = np.zeros((env.observation_space.n, env.action_space.n))
self.action_n = env.action_space.n

def choice_action(self, state):
if np.random.uniform() > self.epsilon:
action = self.q[state].argmax()
else:
action = np.random.randint(self.action_n)
return action

def learn(self, state, action, reward, next_state, done):
Eq = (self.q[next_state].mean() * self.epsilon + self.q[next_state].max() * (1-self.epsilon))
u = reward + self.gamma * Eq * (1. - done)
td_erro = u - self.q[state, action]
self.q[state, action] += self.learning_rate * td_erro

def q_learning(env, agent, train=False, render=False):
G = 0
observation = env.reset()
while True:
if render:
env.render()
action = agent.choice_action(observation)
next_observation, reward, done, _ = env.step(action)
G += reward
if train:
agent.learn(observation, action, reward, next_observation, done)
if done:
break
observation = next_observation
return G

agent = ExpectedSarasAgent(env)

episodes = 5000
Gs = []
for episode in range(episode):
G = q_learning(env, agent, train=True)
Gs.append(G)
plt.plot(Gs)


### Q-learning
class QLearningAgent:
def __init__(self, env, gamma=0.9, learning_rate=0.1, epsilon=0.01):
self.gamma = gamma
self.epsilon = epsilon
self.learning_rate = learning_rate
self.action_n = env.action_space.n
self.q = np.zeros((env.observation_space.n, env.action_space.n))

def choice_action(self, state):
if np.random.uniform() > self.epsilon:
action = self.q[state].argmax()
else:
action = np.random.randint(self.action_n)
return action

def learn(self, state, action, reward, next_state, done):
u = reward + self.gamma * self.q[next_state].max() * (1. - done)
td_erro = u - self.q[state, action]
self.q[state, action] += self.learning_rate * td_erro

agent = QLearningAgent(env)

episodes = 4000
Gs = []
for _ in range(episodes):
G = q_learning(env, agent, train=True)
Gs.append(G)
plt.plot(Gs)

C9_semiGTD

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# 绘制位置和速度图像
fig, ax = plt.subplots()
ax.plot(positions, label='position')
ax.plot(velocities, label='velocity')
ax.legend()

class TileCoder:
def __init__(self, layers, features):
self.layers = layers
self.features = features
self.codebook = {}

def get_feature(self, codeword):
if codeword in self.codebook:
return self.codebook[codeword]
count = len(self.codebook)
if count >= self.features: # 大于特征数量,冲突处理
return hash(codeword) % self.features
self.codebook[codeword] = count
return count

## call 让这个类的实例可当成函数调用,方便更改数据,直接编码
def __call__(self, states=(), ints=()):
dim = len(floats) # 2
states = tuple(f * self.layers * self.layers for f in floats) # 2 float state
print('floats {}'.format(states))
features = []
for layer in range(self.layers):
codeword = (layer,) + tuple(int((s_value + (1 + dim * index) * layer) / self.layers) for index, s_value in enumerate(states)) + ints
print(codeword)
feature = self.get_feature(codeword)
features.append(feature)
print('features {}'.format(features))
return features # 8维的特征向量(list)

class SARSAAgent:
def __init__(self, env, layers=8, features=1893, gamma=1.,
learning_rate=0.03, epsilon=0.001):
self.action_n = env.action_space.n # 动作数
self.obs_low = env.observation_space.low # 2个元素的列表
self.obs_scale = env.observation_space.high - \
env.observation_space.low # 观测空间范围
self.encoder = TileCoder(layers, features) # 砖瓦编码器
self.w = np.zeros(features) # 权重
self.gamma = gamma # 折扣
self.learning_rate = learning_rate # 学习率
self.epsilon = epsilon # 探索

def encode(self, observation, action): # 编码
states = tuple((observation - self.obs_low) / self.obs_scale) # 归一化状态
actions = (action,) # 元组
return self.encoder(states, actions)

def get_q(self, observation, action): # 动作价值
features = self.encode(observation, action)
return self.w[features].sum()

def decide(self, observation): # 判决
if np.random.rand() < self.epsilon:
return np.random.randint(self.action_n)
else:
qs = [self.get_q(observation, action) for action in
range(self.action_n)]
return np.argmax(qs)

def learn(self, observation, action, reward,
next_observation, done, next_action): # 学习
u = reward + (1. - done) * self.gamma * \
self.get_q(next_observation, next_action)
td_error = u - self.get_q(observation, action)
features = self.encode(observation, action)
self.w[features] += (self.learning_rate * td_error)

def play_sarsa(env, agent, train=False, render=False):
episode_reward = 0
observation = env.reset()
action = agent.decide(observation)
while True:
if render:
env.render()
next_observation, reward, done, _ = env.step(action)
episode_reward += reward
next_action = agent.decide(next_observation) # 终止状态时此步无意义
if train:
agent.learn(observation, action, reward, next_observation,
done, next_action)
if done:
break
observation, action = next_observation, next_action
return episode_reward

agent = SARSAAgent(env)

# 训练
episodes = 500
episode_rewards = []
for episode in range(episodes):
episode_reward = play_sarsa(env, agent, train=True)
episode_rewards.append(episode_reward)
plt.plot(episode_rewards)

C9_tile_coding

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#!/usr/bin/env python
import numpy as np

class TileCoder:
def __init__(self, tiles_per_dim, value_limits, tiles, offset=lambda n: 2 * np.arange(n) + 1):
'''
以 8 * 8 tiling的状态空间,8 个tile覆盖为例
原状态空间被分为8 * 8 的tiling,对其设置不同的offset以获取更多不同的特征数量。而偏移量我们将其限制在1个tiling的范围,于是需要使用9 * 9的tile。
_limits表示状态区间的两个维度的范围,将tiling数量除以对应维度的范围大小,得到规范化的维度(norm_dims)。
_tile_base_ind表示不同tile的索引起始位置,8个tile,每个tile 9*9
_hash_vec 给出了将二维坐标点映射到1维数值的关系,这里它的实际值是[1, 9] ,若坐标点看作(x, y),则它在tile中的位置可数值化为9y+x。
于是,编码过程可总结为:
1. 给每个tile设置不同的偏移,这里给第一维偏移设置为1/8, 第二维为3/8,超过1的循环放置。
2. 获取状态坐标(x,y),减去对应维度的下限,再乘以规范化维度得到实际坐标。再加上不同tile对应的不同偏移量,得到每个tile下的坐标
3. 对每个tile,(x,y)的特征分量值为tile起始索引 + 偏移后tile坐标 * hash映射
4. 得到一个8维特征向量
'''
tiling_dims = np.array(np.ceil(tiles_per_dim), dtype=np.int) + 1 # 保证覆盖
self._offsets = offset(len(tiles_per_dim)) * np.repeat([np.arange(tilings)], len(tiles_per_dim), 0).T / float(tilings) % 1
self._limits = np.array(value_limits) # 2 * 2
self._norm_dims = np.array(tiles_per_dim) / (self._limits[:, 1] - self._limits[:, 0])
self._tile_base_ind = np.prod(tiling_dims) * np.arange(tiles) # 9 * 9 * [0, 1, ..., 7]
self._hash_vec = np.array([np.prod(tiling_dims[0:i]) for i in range(len(tiles_per_dim))])
self._n_tiles = tiles * np.prod(tiling_dims)

def __getitem__(self, x): ## 使得实例对象t可以索引如 t[key]
off_coords = ((x - self._limits[:, 0]) * self._norm_dims + self._offsets).astype(int)
return self._tile_base_ind + np.dot(off_coords, self._hash_vec)

@property
def n_tiles(self):
return self._n_tiles

C13_AC

朴素AC(TD自举)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import gym
import torch
import numpy as np
import matplotlib.pyplot as plt
import torch.nn.functional as F
import torch.nn as nn
from torch.autograd import Variable

class Actor_Critic(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(4, 128)
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, 2)
self.fc4 = nn.Linear(128, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
action = F.log_softmax(self.fc3(x), dim=-1)
V = F.relu(self.fc4(x))
return action, V

device = 'cpu'
env = gym.make('CartPole-v0')
obs = env.reset()
actor_critic = Actor_Critic()
actor_critic.to(device)
optimizer_a_c = torch.optim.Adam(actor_critic.parameters(), lr = 0.0004)
gamma = 0.99
alpha_c = 0.01
steps = []
A_loss = []
C_loss = []

for episode in range(10000):
I = 1
for step in range(200):
obs = np.reshape(obs, [1, -1])
input_obs = Variable(torch.from_numpy(obs).float()).to(device)
action_probability, V = actor_critic(input_obs) # 经过神经网络得到动作策略和价值函数

action = np.random.choice(2, p = np.exp(action_probability[0].detach().cpu()).numpy())
obs, reward, done, info = env.step(action)
obs = np.reshape(obs, [1, -1])
next_input_obs = Variable(torch.from_numpy(obs).float()).to(device) #获取下步状态
_, next_V = actor_critic(next_input_obs) ## 单步TD-erro 需要下一步状态的价值估计

if done:
delta = reward - V
else:
delta = reward + gamma * next_V.detach() - V
# 策略损失函数
Actor_loss = -action_probability[0][action] * I * delta.detach()
A_loss.append(Actor_loss)
# 状态价值损失函数
Critic_loss = -alpha_c * delta.detach() * V
C_loss.append(Critic_loss)
I *= gamma
loss = Actor_loss + Critic_loss

actor_critic.zero_grad()
loss.backward()
optimizer_a_c.step()

if done:
steps.append(step) # 该幂收益
print(f'episode {episode}, step {step}', end = '\r')
obs = env.reset()
break
if np.mean(steps[-20:]) > 180:
break

mid = []
interval = 30

for i in range(len(steps) - interval):
mid.append(np.mean(steps[i:i+interval+1]))
plt.figure(figsize=(10,10))
plt.title('Performace of vanilla Actor-Critic on CartPole_V0',fontsize = 'xx-large')
plt.xlabel('Episodes', fontsize = 'xx-large')
plt.ylabel('Rewards',fontsize = 'xx-large')
x_fit = list(range(len(steps) - interval))
plt.plot(x_fit, steps[interval:], '-', c = 'gray', label='Episode-Wise data')
plt.plot(mid, '-', c= 'green', linewidth = 5, label='Moving Average')
plt.legend(loc="best",prop={'size': 12})
plt.show()
plt.figure(figsize=(10,10))
plt.title('Error Analysis of vanilla Actor-Critic on CartPole_V0',fontsize = 'xx-large')
plt.xlabel('Episodes', fontsize = 'xx-large')
plt.ylabel('Loss',fontsize = 'xx-large')
plt.plot(A_loss, c = 'blue', label = 'Actor_Loss')
plt.plot(C_loss, c = 'red', label = 'Critic_Loss')
plt.legend(loc="best",prop={'size': 12})
plt.show()

朴素AC(monte carlo)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import gym 
import torch
import numpy as np
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

class Actor_Critit(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(4, 128)
self.fc2 = nn.Linear(128, 128)
self.fc3 = nn.Linear(128, 2)
self.fc4 = nn.Linear(128, 1)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
action = F.log_softmax(self.fc3(x), dim = -1)
V = F.relu(self.fc4(x))
return action, V

device = 'cpu'
env = gym.make('CartPole-v0')
obs = env.reset()
ac = Actor_Critit()
ac.to(device)
optimizer_ac = torch.optim.Adam(ac.parameters(), lr=0.003)
gamma = 0.99
steps = []
A_loss = []
C_loss = []
eps = np.finfo(np.float32).eps.item() # 浮点数下限

for episode in range(1000):
log_action_trajectory = []
V_trajectory = []
for step in range(200):
# 按策略运行,直到done或达到200步,如果done再进行更新(monte carlo)
obs = np.reshape(obs, [1, -1])
input_obs = Variable(torch.from_numpy(obs).float()).to(device)
log_action_probability, V = ac(input_obs)
action = np.random.choice(2, p = np.exp(log_action_probability[0].detach().cpu()).numpy())
log_action_trajectory.append(log_action_probability[0][action])
V_trajectory.append(V)
obs, reward, done, info = env.step(action)

if done:
ac.zero_grad()
steps.append(step)
print(f'episode {episode}, step {step}', end = '\r')
obs = env.reset()
reward_list = np.ones((step + 1,))
for i in range(len(reward_list)-2, -1, -1):
reward_list[i] += reward_list[i+1] * gamma
reward_list -= np.mean(reward_list)
reward_list /= (np.std(reward_list) + eps)
critic_loss = 0
Delta = []

# 对蒙特卡洛保存的收益和价值估计进行回溯
for discount_G, V in zip(reward_list, V_trajectory):
# monte carlo获取了折扣回报,因此直接将G、V之差作为loss
critic_loss += F.smooth_l1_loss(V, torch.tensor([discount_G]))
Delta.append(discount_G - V.detach())
Actor_loss = 0
# 对蒙特卡洛保存的动作策略和根据价值函数得到的delta估计进行回溯
for delta, log_prob in zip(Delta, log_action_trajectory):
Actor_loss -= log_prob * delta.detach()
# 平均误差

# # 对策略loss添加熵正则化项可鼓励探索,
# entropy = 0
# beta = 0.01
# for log_p in action_log_history:
# entropy -= log_p * torch.exp(log_p)
# Actor_Loss = Actor_Loss * (1 - beta) + entropy * beta

A_loss.append(Actor_loss.item() / step)
C_loss.append(critic_loss.item() / step)
loss = critic_loss + Actor_loss
# 计算
loss.backward()
optimizer_ac.step()
break
if np.mean(steps[-20:]) > 190:
break

mid = []
interval = 30
plt.style.use('dark_background')
for i in range(len(steps) - interval):
mid.append(np.mean(steps[i:i+interval+1]))
plt.figure(figsize=(10,10))
plt.title('Performace of EpisodeWise Adavantage Actor-Critic on CartPole_V0',fontsize = 'xx-large')
plt.xlabel('Episodes', fontsize = 'xx-large')
plt.ylabel('Rewards',fontsize = 'xx-large')
x_fit = list(range(len(steps) - interval))
plt.plot(x_fit, steps[interval:], '-', c = 'gray', label='Episode-Wise data')
plt.plot(mid, '-', c= 'green', linewidth = 5, label='Moving Average')
plt.legend(loc="best",prop={'size': 12})
plt.show()
plt.figure(figsize=(10,10))
plt.title('Error Analysis of EpisodeWise Adavantage Actor-Critic on CartPole_V0',fontsize = 'xx-large')
plt.xlabel('Episodes', fontsize = 'xx-large')
plt.ylabel('Loss',fontsize = 'xx-large')
T_loss = [sum(x) for x in zip(A_loss, C_loss)]
plt.plot(A_loss, c = 'blue', label = 'Actor_Loss')
plt.plot(C_loss, c = 'red', label = 'Critic_Loss')
plt.plot(T_loss, c = 'green', label = 'total_Loss')
plt.legend(loc="best",prop={'size': 12})
plt.show()

C13_PG

REFORCE (on policy版)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import gym
import torch
import numpy as np
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.autograd import Variable

class RE_NET(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(4, 24)
self.fc2 = nn.Linear(24, 48)
self.fc3 = nn.Linear(48, 24)
self.fc4 = nn.Linear(24, 2)
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = F.relu(self.fc3(x))
x = F.log_softmax(self.fc4(x), dim=1) # 使用log softmax函数,防止数据下溢,在后面用到概率时需要计算指数exp,需要计算梯度的也是对数函数
return x

env = gym.make('CartPole-v0')
observation = env.reset()

model = RE_NET()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
gamma = 0.99
steps = []
for episode in range(100000): ## 对每幂循环
obs_list = []
log_prob = []
for step in range(200): ## 对幂的每一步循环
obs_list.append(observation)
observation = np.reshape(observation, [1, -1]) #从np数组到np矩阵
observation = Variable(torch.from_numpy(observation).float())
action_probability = model(observation) ## 通过神经网络拟合策略Pi

with torch.no_grad():
action = np.random.choice(2, p = torch.exp(action_probability)[0].detach().numpy())

log_prob.append(action_probability[0][action]) #保存所选动作的概率
observation, reward, done, info = env.step(action)

if done:
steps.append(step) # 该幂收益
print(f'episode {episode}, step {step}',end = '\r')
reward_list = np.ones((step +1, )) ##每一步收益都为1
for i in range(len(reward_list)-2, -1, -1):
reward_list[i] += reward_list[i+1] * gamma
reward_list -= np.mean(reward_list)
reward_list /= np.std(reward_list) ## 收益序列标准化,减少方差, 标准化后的SGD可以同时计算全批次更新

loss = 0
for a, b in zip(log_prob, reward_list): #将这两个迭代对象打包成元组传给(a, b)
loss -= a * b
# loss

model.zero_grad()
loss.backward() ## 计算梯度
optimizer.step() ## 更新策略参数
observation = env.reset() ##复位进入下一幂
break
if np.mean(steps[-20:]) > 180:
break

REFORCE (off policy版)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
import gym 
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch.nn import Linear, ReLU
import torch.nn.functional as F
from torch.autograd import Variable


class REINFORCE(torch.nn.Module):
def __init__(self):
super(REINFORCE,self).__init__()
self.fc1 = Linear(4, 24)
self.fc2 = Linear(24, 48)
self.fc3 = Linear(48, 24)
self.fc4 = Linear(24, 2)
self.steps = []
def forward(self, x):
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = F.relu(self.fc3(x))
x = F.log_softmax(self.fc4(x), dim=1)
return x
# 评估使用目标策略
def evaluate(self):
env = gym.make('CartPole-v0')
obs = env.reset()
for step in range(200):
obs = np.reshape(obs, [1,-1])
obs = Variable(torch.from_numpy(obs).float())
action_probability = model(obs)
action = np.random.choice(2, p=torch.exp(action_probability)[0].detach().numpy())
obs, reward, done, info = env.step(action)
if done:
self.steps.append(step)
break


env = gym.make('CartPole-v0')
obs = env.reset()
model = REINFORCE()
optimizer = torch.optim.Adam(model.parameters(), lr= 0.001)
gamma = 0.99
steps = []
eps = 0.5
for episode in range(100000):
log_prob = []
for step in range(200):
obs = np.reshape(obs, [1,-1])
obs = Variable(torch.from_numpy(obs).float())
action_probability = model(obs)
## 使用柔性的行为策略获取动作
if np.random.random() < eps:
action = np.random.randint(2)
else:
with torch.no_grad():
action = np.random.choice(2, p=torch.exp(action_probability)[0].detach().numpy())
log_prob.append(action_probability[0][action])
obs, reward, done, info = env.step(action)

if done:
steps.append(step)
print(f'episode {episode}, step {step}', end = '\r')
reward_list = np.ones((step + 1,))
for i in range(len(reward_list)-2, -1, -1):
reward_list[i] += reward_list[i+1] * gamma
reward_list -= np.mean(reward_list)
reward_list /= np.std(reward_list)
# ------calculate loss start------
loss = 0
# We are computing Log(pi) and sum it together here.
# Note the loss is negative. Since we need gradient Ascent.
for a, b in zip(log_prob, reward_list):
p = np.exp(a.detach()) # 目标策略选取该动作的概率
rho = p/(eps*p+(1-eps)*0.5) # 重要度
loss -= rho*a*b ## 梯度上升,所以将损失函数取负
# ------calculate loss end------
model.zero_grad()
loss.backward()
optimizer.step()
# ------evaluate------
with torch.no_grad():
model.evaluate()
obs = env.reset()
break
if np.mean(model.steps[-20:]) > 190:
break


mid = []
interval = 30
print(steps)
print(model.steps)
#plt.style.use('dark_background')
for i in range(len(steps) - interval):
mid.append(np.mean(steps[i:i+interval+1]))
plt.figure(figsize=(10,10))
plt.title('Behaviour Policy in off-policy REINFORCE on CartPole_V0',fontsize = 'xx-large')
plt.xlabel('Episodes', fontsize = 'xx-large')
plt.ylabel('Rewards',fontsize = 'xx-large')
x_fit = list(range(len(steps) - interval))
plt.plot(x_fit, steps[interval:], '-', c = 'gray', label = 'Episode-Wise data')
plt.plot(mid, '-', c= 'green', linewidth = 5,label = 'Moving Average')
plt.legend(loc="best",prop={'size': 12})
plt.show()
#-------below is evaluation graph
mid = []
interval = 30
#plt.style.use('dark_background')
for i in range(len(model.steps) - interval):
mid.append(np.mean(model.steps[i:i+interval+1]))
plt.figure(figsize=(10,10))
plt.title('Target Policy off-policy REINFORCE on CartPole_V0',fontsize = 'xx-large')
plt.xlabel('Episodes', fontsize = 'xx-large')
plt.ylabel('Rewards',fontsize = 'xx-large')
x_fit = list(range(len(model.steps) - interval))
plt.plot(x_fit, model.steps[interval:], '-', c = 'gray', label = 'Episode-Wise data')
plt.plot(mid, '-', c= 'green', linewidth = 5,label = 'Moving Average')
plt.legend(loc="best",prop={'size': 12})
plt.show()