马尔科夫过程

马尔可夫奖励过程的Bellman方程如下:

这个方程可用矩阵形式简单表达:

对于$n$ 个状态的价值函数,也可以通过解矩阵方程得到:

1
2
3
4
5
def compute_value(Pss, rewards, gamma = 0.5):
rewards = np.array(rewards).reshape((-1, 1)) #转化成列向量
values = np.dot(np.linalg.inv(np.eye(7,7) - gamma * Pss), rewards)
return values
# 学生上课问题,状态较少(7个),可以通过求逆矩阵来解贝尔曼

网格问题(policy evaluation)

1587283461629

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import numpy as np 
import matplotlib.pyplot as plt
from matplotlib.table import Table

# const
WORLD_SIZE = 5
A_POS = [0, 1]
A_NEXT_POS = [4, 1]
B_POS = [0, 3]
B_NEXT_POS = [2, 3]
gamma = 0.9

# action list
ACTIONS = np.array([[0, -1],
[-1, 0],
[0, 1],
[1, 0]])
PI_prob = 1/4.

def next(state, action): #模拟动作
if state == A_POS:
return A_NEXT_POS, 10
if state == B_POS:
return B_NEXT_POS, 5

next_state = (np.array(state) + action).tolist()
x, y = next_state
if x < 0 or x >= WORLD_SIZE or y < 0 or y >= WORLD_SIZE:
next_state = state
reward = - 1.
else:
reward = 0
return next_state, reward

def draw_tab(data): #表格数据打印函数
fig, ax = plt.subplots()
ax.set_axis_off()
tb = Table(ax, bbox=[0,0, 1,1])

width, height = 1, 1

for (i, j), val in np.ndenumerate(data):
tb.add_cell(i, j, width, height, text = val, loc='center')

ax.add_table(tb)
plt.show()
plt.close()

def bellman_val(): ##迭代求解贝尔曼方程
value = np.zeros((WORLD_SIZE, WORLD_SIZE))
while True:
new_value = np.zeros_like(value) #拷贝一个原价值函数维度的空数组
for i in range(WORLD_SIZE):
for j in range(WORLD_SIZE): #遍历每个状态
for action in ACTIONS: #执行状态的每个动作
(ne_i, ne_j), reward = next([i, j], action)
new_value[i, j] += PI_prob * (reward + gamma * value[ne_i, ne_j]) #期望

if np.sum(np.abs(value - new_value)) < 1e-5: #判断收敛
draw_tab(np.round(new_value, decimals=2)) #打印输出
break
value = new_value #未收敛继续迭代


def optimal_bellman_val(): #最优贝尔曼
value = np.zeros((WORLD_SIZE, WORLD_SIZE))
while True:
new_value = np.zeros_like(value)
for i in range(WORLD_SIZE):
for j in range(WORLD_SIZE):
values = []
for action in ACTIONS:
(ne_i, ne_j), reward = next([i, j], action)
values.append(reward + gamma * value[ne_i, ne_j])
new_value[i, j] = np.max(values) #选择价值最大的
if np.sum(np.abs(value - new_value)) < 1e-5:
draw_tab(np.round(new_value, decimals = 2))
break
value = new_value

#未使用就地更新(in_place)方式
if __name__ == "__main__":
bellman_val()
optimal_bellman_val()

求解随机选择策略下的贝尔曼方程得到的价值函数

1587283525297

求解最优贝尔曼方程得到的价值函数

1587283595711

4*4方格游走问题(PE)

1587283783916

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.table import Table

WORLD_SIZE = 4

ACTIONS = np.array([[0, -1],
[-1, 0],
[0, 1],
[1, 0]])
PI_prob = 0.25

def is_terminal(state):
x, y = state
return (x == 0 and y == 0) or (x == WORLD_SIZE -1 and y == WORLD_SIZE -1)

def next(state, action):
if is_terminal(state):
return state, 0
next_state = (np.array(state) + action).tolist()
x, y = next_state

if x < 0 or x >= WORLD_SIZE or y < 0 or y >= WORLD_SIZE:
next_state = state

reward = -1
return next_state, reward

def draw_tab(data):
fig, ax = plt.subplots()
ax.set_axis_off()
tb = Table(ax, bbox=[0, 0, 1, 1])

width, height = 1,1

#add cells
for (i, j), val in np.ndenumerate(data):
tb.add_cell(i, j, width, height, text = val, loc = 'center', facecolor = 'white')

ax.add_table(tb)
plt.show()
plt.close()

def compute_Svalue(in_place = True, gamma = 1):
new_value = np.zeros((WORLD_SIZE, WORLD_SIZE))
iteration = 0
while True:
if in_place: #是否就地更新
state_value = new_value
else:
state_value = new_value.copy()
old_value = state_value.copy() #保存之前的价值函数,以便比较误差

for i in range(WORLD_SIZE):
for j in range(WORLD_SIZE):
value = 0
for action in ACTIONS:
(ne_i, ne_j), reward = next([i, j], action)
value += PI_prob * (reward + gamma * state_value[ne_i, ne_j])
new_value[i, j] = value

delta = abs(old_value - new_value).max()
if delta < 1e-4:
break
iteration += 1
return new_value, iteration

_, async_it = compute_Svalue()
values, sync_it = compute_Svalue(in_place = False)
draw_tab(np.round(values, decimals = 2))
print('in_place:{} iter'.format(async_it))
print('sync:{}iter'.format(sync_it))
plt.show()
plt.close()

结果如下

1
2
In-place: 113 iterations
Synchronous: 172 iterations

1587303832994

  这个网格问题和上一个用的是同样的方法,都是策略评估,只是环境有些细微区别。

4*4方格问题(两种方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
# 4*4方格问题  策略迭代和价值迭代,策略迭代包括随机策略和贪心策略
S = [i for i in range(16)] # 状态空间
A = ["n", "e", "s", "w"] # 行为空间

ds_actions = {"n": -4, "e": 1, "s": 4, "w": -1} # 行为对状态的改变

def dynamics(s, a): # 生成 下一步状态 和 即时收益
s_next = s
if (s%4 == 0 and a == "w") or (s<4 and a == "n") \
or ((s+1)%4 == 0 and a == "e") or (s > 11 and a == "s")\
or s in [0, 15]: #非法操作 或 已终止
pass
else:
ds = ds_actions[a]
s_next = s + ds
reward = 0 if s in [0, 15] else -1 #除了终止状态0和15,其他都是-1
return s_next, reward

def P(s, a, s1): # 状态转移概率函数,实际上动作确定,下一个状态就是确定的,所以非0即1
s_next, _ = dynamics(s, a)
return s1 == s_next

def R(s, a): # 奖励函数
_, r = dynamics(s, a)
return r

gamma = 1.00

def uniform_random_pi( V = None, s = None, a = None): #随机动作策略
return 1.0/4

def greedy_pi( V, s, a): # 贪婪策略
max_v, a_max_v = -float('inf'), []
for a_opt in A: # 统计后续状态的最大价值以及到达到达该状态的行为(可能多个最大)
s_next, _ = dynamics(s, a_opt)
v_s_next = V[s_next] #
if v_s_next > max_v:
max_v = v_s_next
a_max_v = [a_opt]
elif(v_s_next == max_v):
a_max_v.append(a_opt)
n = len(a_max_v)
if n == 0: return 0.0
return 1.0/n if a in a_max_v else 0.0 #等概率选择价值最大的动作

def get_pi(Pi, s, a, V = None):
return Pi( V, s, a)

def display_V(V): # 显示状态价值
for i in range(16):
print('{0:>6.2f}'.format(V[i]),end = " ")
if (i+1) % 4 == 0:
print("")
print()

def compute_q( V, s, a):
'''根据给定的MDP,价值函数V,计算状态行为对s,a的价值qsa
'''
q_sa = 0
for s_next in S: ## q(s,a) = R + gamma * (P(s',r|s,a) * V[s'])
q_sa += P(s, a, s_next) * ( R(s, a) + gamma * V[s_next] )

return q_sa

def compute_v( V, Pi, s):
'''给定MDP下依据某一策略Pi和当前状态价值函数V计算某状态s的价值
'''
v_s = 0
for a in A: ## 根据动作选择策略,对价值求关于可选动作的期望\sum (pi * q(s,a))
v_s += get_pi(Pi, s, a, V) * compute_q( V, s, a)
return v_s

def update_V( V, Pi):
'''给定一个MDP和一个策略,更新该策略下的价值函数V
'''
V_prime = V.copy() #保存之前的价值函数
for s in S:
V_prime[s] = compute_v(V, Pi, s) #(V_prime, s, compute_v( V_prime, Pi, s)) #
return V_prime

def policy_evaluate( V, Pi, n):
'''使用n次迭代计算来评估一个MDP在给定策略Pi下的状态价值,初始时价值为V
'''
for _ in range(n): #迭代次数
V = update_V( V, Pi)
return V


# 价值迭代得到最优状态价值过程
def compute_v_from_max_q( V, s):
'''根据一个状态的下所有可能的行为价值中最大一个来确定当前状态价值
'''
v_s = -float('inf')
for a in A:
qsa = compute_q( V, s, a)
if qsa >= v_s:
v_s = qsa
return v_s

def update_V_without_pi( V):
'''在不依赖策略的情况下直接通过后续状态的价值来更新状态价值
'''
V_prime = V.copy()
for s in S:
V_prime[s] = compute_v_from_max_q(V, s) #set_value(V_prime, s, compute_v_from_max_q( V_prime, s))
return V_prime

def value_iterate( V, n):
'''价值迭代
'''
for _ in range(n):
V = update_V_without_pi( V)
return V

V = [0 for _ in range(16)] # 初始化一个价值

V_pi = policy_evaluate( V, uniform_random_pi, 160) #随机动作策略
display_V(V_pi)

V = [0 for _ in range(16)]
V_pi = policy_evaluate( V, greedy_pi, 100) # 贪心动作策略
display_V(V_pi)

V = [0 for _ in range(16)]
V_star = value_iterate( V, 4) #价值迭代
display_V(V_star)

杰克租车问题(策略迭代)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns
from scipy.stats import poisson

## 租车问题的宏变量定义
MAX_CARS = 20
MAX_MOVE = 5
LAMBDA_RENT_A = 3
LAMBDA_RENT_B = 4
LAMBDA_RETURN_A = 3
LAMBDA_RETURN_B = 2

gamma = 0.9

RENTAL_EARNED = 10
MOVE_COST = 2
#action 的值代表从A移动到B的车数
ACTIONS = np.arange(-MAX_MOVE, MAX_MOVE+1)

#poisson distribution 的上界
POISSON_UPPER_BOUND = 11
#保存泊松分布概率值的字典
poisson_cache = dict()

def poisson_probability(n, lamb):
global poisson_cache
key = n * 10 + lamb #hash
if key not in poisson_cache:
poisson_cache[key] = poisson.pmf(n, lamb)
return poisson_cache[key]


def expected_return(state, action, state_value, constant_returned_cars):

returns = 0.0
#先减去移动动作的花费
returns -= MOVE_COST * abs(action)
#根据动作移动后的车数
NUM_CAR_A = min(state[0] - action, MAX_CARS)
NUM_CAR_B = min(state[1] + action, MAX_CARS)

#遍历所有可能的租车需求,计算执行action后得到状态的期望收益
for request_A in range(POISSON_UPPER_BOUND):
for request_B in range(POISSON_UPPER_BOUND):
prob = poisson_probability(request_A, LAMBDA_RENT_A) * poisson_probability(request_B, LAMBDA_RENT_B)

num_car_A = NUM_CAR_A
num_car_B = NUM_CAR_B

#确认实际租出去的数量
actual_rental_A = min(num_car_A, request_A)
actual_rental_B = min(num_car_B, request_B)

#租车收益
reward = (actual_rental_A + actual_rental_B) * RENTAL_EARNED
num_car_A -= actual_rental_A
num_car_B -= actual_rental_B

#计算还车数量,归还的车第二天便能租赁,因此用于近似的价值函数中状态的车数需加上它的数量
if constant_returned_cars:
#假设归还为常数(不使用泊松分布)
num_car_A = min(LAMBDA_RETURN_A + num_car_A, MAX_CARS)
num_car_B = min(LAMBDA_RETURN_B + num_car_B, MAX_CARS)
#公式
returns += prob * (reward + gamma * state_value[num_car_A, num_car_B])

else:
#使用泊松分布
for returned_car_A in range(POISSON_UPPER_BOUND):
for returned_car_B in range(POISSON_UPPER_BOUND):
prob_return = poisson_probability(returned_car_A, LAMBDA_RETURN_A) * poisson_probability(returned_car_B, LAMBDA_RETURN_B)
num_car_A = min(num_car_A + returned_car_A, MAX_CARS)
num_car_B = min(num_car_B + returned_car_B, MAX_CARS)
prob_ = prob * prob_return
returns += prob_ * (reward + gamma * state_value[num_car_A, num_car_B])
return returns

def figure(constant_returned_cars = True):
value = np.zeros((MAX_CARS+1, MAX_CARS+1))
#后面会用计算得到的浮点数数量进行索引,必须强制类型为int,不然不能索引
policy = np.zeros_like(value, dtype = np.int)
#policy = np.zeros(value.shape, dtype = np.int)

iterations = 0
_, ax = plt.subplots(2, 3, figsize= (40, 20))
plt.subplots_adjust(wspace = 0.1, hspace = 0.2)
ax = ax.flatten()

while True:
fig = sns.heatmap(np.flipud(policy), cmap = "YlGnBu", ax = ax[iterations])
fig.set_ylabel("# cars at A", fontsize = 20)
fig.set_xlabel("# cars at B", fontsize = 20)
fig.set_yticks(list(reversed(range(MAX_CARS + 1))))
fig.set_title('policy {}'.format(iterations), fontsize = 30)

# policy evaluation (in_place)
while True:
old_value = value.copy()
for i in range(MAX_CARS + 1):
for j in range(MAX_CARS + 1):
new_value = expected_return([i, j], policy[i, j], value, constant_returned_cars)
value[i, j] = new_value
delta = abs(old_value - value).max()
print('delta {}'.format(delta))
if delta < 1e-4:
break

# policy improvement
policy_stable = True
for i in range(MAX_CARS + 1):
for j in range(MAX_CARS + 1):
old_action = policy[i, j]
action_returns = []
#遍历该状态可执行的所有动作
for action in ACTIONS:
if (-j <= action <= i):
action_returns.append(expected_return([i, j], action, value, constant_returned_cars))
else:
action_returns.append(-np.inf)
new_action = ACTIONS[np.argmax(action_returns)]
policy[i, j] = new_action
if policy_stable and old_action != new_action:
policy_stable = False
print('policy stale {}'.format(policy_stable))

if policy_stable:
fig = sns.heatmap(np.flipud(value), cmap='YlGnBu', ax = ax[-1])
fig.set_ylabel('# cars at A', fontsize = 30)
fig.set_yticks(list(reversed(range(MAX_CARS + 1))))
fig.set_xlabel('# cars at B', fontsize = 30)
fig.set_title('optimal value', fontsize = 30)
break
iterations += 1

plt.show()
plt.close()
if __name__ == "__main__":
figure()

输出:

1587391230337

显示在迭代计算4次之后收敛到了最佳策略。

赌徒问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import matplotlib.pyplot as plt
import numpy as np

GOAL = 100

STATES = np.arange(GOAL + 1)

PROB = 0.4

def slove():
state_value = np.zeros(GOAL + 1)
state_value[GOAL] = 1.0
policy = np.zeros(GOAL + 1)
sweeps_history = []

#价值迭代 value iteration
while True:
old_state_value = state_value.copy()
sweeps_history.append(old_state_value)

for state in STATES[1:GOAL]:
#计算当前可操作得赌注(可行动作)
actions = np.arange(1, min(state, GOAL - state) + 1)
action_returns = []
for action in actions:
action_returns.append(0 + PROB * state_value[state + action] + (1 - PROB) * state_value[state - action])
max_vaLue = np.max(action_returns)
#在选择价值最大动作的时候它们的价值进行了四舍五入,这是由于可能存在非常接近的两个价值,
#而它们对应的动作一个较大,一个较小
#为了得到与书上相同的结果,倾向于选择较小的那个,而argmax在遇到相同的值时返回前面的那个,
#因此可以四舍五入让非常接近的价值相等再返回
max_action = np.argmax(np.round(action_returns,9))
state_value[state] = max_vaLue #直接选取价值最大的动作
policy[state] = actions[max_action] #同时更新当前最佳策略
delta = abs(state_value - old_state_value).max()
if delta < 1e-9:
sweeps_history.append(state_value)
break

plt.figure(figsize = (10, 20))
plt.subplot(2, 1, 1)
for sweep, state_value in enumerate(sweeps_history):
plt.plot(state_value, label = 'sweep {}'.format(sweep))
plt.xlabel('capital')
plt.ylabel('Value estimates')
plt.legend(loc = 'best')

plt.subplot(2, 1, 2)
plt.scatter(STATES, policy)
plt.xlabel('Capital')
plt.ylabel('Final policy')
plt.show()
plt.close()

if __name__ == "__main__":
slove()

1587401561816