接下来,我们要做一个比较有挑战性的工作,那么就是使用pytorch实现强化学习网络,即DQN.目前,已经有tensorflow的实现

,所以,涉及到游戏的python代码,非我原创。

关于DQN,算法伪代码:




关于游戏的介绍,我这里就还不多说了

https://blog.csdn.net/songrotek/article/details/50951537

github上有人放出使用DQN玩Flappy Bird的代码,
https://github.com/yenchenlin1994/DeepLearningFlappyBird
<https://github.com/yenchenlin1994/DeepLearningFlappyBird>【1】 

这个在tensorflow下的实现,这里,我主要是对其深度网络模块的设计,采用了pytorch重新设计了一遍。其余模块不变

,所以变量名基本不变。

深度学习网络设计:




所以,我们关于该网络模块的设计代码实现为:


class DeepNetWork(nn.Module): def __init__(self): super(DeepNetWork, self).
__init__() # 需要将事先训练好的词向量载入 self.conv1 = nn.Sequential( nn.Conv2d(in_channels=4,
out_channels=32, kernel_size=8,stride=4,padding=2), nn.ReLU(inplace=True),
nn.MaxPool2d(kernel_size=2) # ) self.conv2 = nn.Sequential( nn.Conv2d(
in_channels=32, out_channels=64, kernel_size=4, stride=2,padding=1), nn.ReLU(
inplace=True), ) self.conv3 = nn.Sequential( nn.Conv2d(in_channels=64,
out_channels=64, kernel_size=3, stride=1, padding=1), nn.ReLU(inplace=True), )
self.fc1=nn.Sequential( nn.Linear(1600,256), nn.ReLU(), ) self.out = nn.Linear(
256,2) def forward(self,x): #btach channel width,weight x = self.conv1(x) x =
self.conv2(x) x = self.conv3(x) x = x.view(x.size(0), -1) # 将(batch,
outchanel,w,h)展平为(batch,outchanel*w*h) x = self.fc1(x) output = self.out(x)
returnoutputtrain模块

def train(self): # Step 1: obtain random minibatch from replay memory
minibatch = random.sample(self.replayMemory, BATCH_SIZE) state_batch = [data[0]
fordata in minibatch] action_batch = [data[1] for data in minibatch]
reward_batch = [data[2] for data in minibatch] nextState_batch = [data[3] for
datain minibatch] # Step 2: calculate y y_batch = np.zeros([BATCH_SIZE,1])
nextState_batch=np.array(nextState_batch)#print("train next state shape")
#print(nextState_batch.shape) nextState_batch=torch.Tensor(nextState_batch)
action_batch=np.array(action_batch) index=action_batch.argmax(axis=1) print(
"action "+str(index)) index=np.reshape(index,[BATCH_SIZE,1])
action_batch_tensor=torch.LongTensor(index) QValue_batch =self
.Q_netT(nextState_batch) QValue_batch=QValue_batch.detach().numpy()for i in
range(0, BATCH_SIZE): terminal = minibatch[i][4] if terminal: y_batch[i][0
]=reward_batch[i]else: # 这里的QValue_batch[i]为数组,大小为所有动作集合大小,QValue_batch[i],代表 #
做所有动作的Q值数组,y计算为如果游戏停止,y=rewaerd[i],如果没停止,则y=reward[i]+gamma*np.max(Qvalue[i]) #
代表当前y值为当前reward+未来预期最大值*gamma(gamma:经验系数) y_batch[i][0]=reward_batch[i] + GAMMA
* np.max(QValue_batch[i]) y_batch=np.array(y_batch) y_batch=np.reshape(y_batch,
[BATCH_SIZE,1]) state_batch_tensor=Variable(torch.Tensor(state_batch))
y_batch_tensor=Variable(torch.Tensor(y_batch)) y_predict=self
.Q_net(state_batch_tensor).gather(1,action_batch_tensor) loss=self
.loss_func(y_predict,y_batch_tensor) print("loss is "+str(loss)) self
.optimizer.zero_grad() loss.backward()self.optimizer.step() if self.timeStep %
UPDATE_TIME ==0: self.Q_netT.load_state_dict(self.Q_net.state_dict()) self
.save()

这里,Q值网络针对每个输入的状态输出的为一个大小为action的数组。所以,代表输入状态执行不同动作的的Q值。所以在计算损失时,要根据训练时动作和深度网络计算出Q值数组计算出神经网络关于某种状态下做出某种动作的Q值,与训练集中的Q值做损失计算。

最后,每次我们选择动作时,选择使得Q值最大的动作,这是强化学习的知识,这里就不用仔细讨论了。

我这里贴出这部分代码:

这是flappy_bird.py,代表主函数执行:


import sys import cv2 sys.path.append("game/") import wrapped_flappy_bird as
gameimport BrainDQN import numpy as np # preprocess raw image to 80*80 gray
imagedef preprocess(observation): observation =
cv2.cvtColor(cv2.resize(observation, (80, 80)), cv2.COLOR_BGR2GRAY) ret,
observation = cv2.threshold(observation,1,255,cv2.THRESH_BINARY) return
np.reshape(observation,(1,80,80)) def playFlappyBird(): # Step 1: init BrainDQN
actions =2 brain = BrainDQN.BrainDQNMain(actions) # Step 2: init Flappy Bird
Game flappyBird = game.GameState() # Step 3: play game # Step 3.1: obtain init
state action0 = np.array([1,0]) # do nothing observation0, reward0, terminal =
flappyBird.frame_step(action0) observation0 =
cv2.cvtColor(cv2.resize(observation0, (80, 80)), cv2.COLOR_BGR2GRAY) ret,
observation0 = cv2.threshold(observation0,1,255,cv2.THRESH_BINARY)
brain.setInitState(observation0)print(brain.currentState.shape) # Step 3.2: run
the game while 1!= 0: action = brain.getAction() nextObservation,reward,
terminal = flappyBird.frame_step(action) nextObservation =
preprocess(nextObservation)#print(nextObservation.shape)
brain.setPerception(nextObservation,action,reward,terminal) def main():
playFlappyBird()if __name__ == '__main__': main()
然后,关于深度网络调用和学习的代码:


from collections import deque import torch import numpy as np from
torch.autogradimport Variable import torch.nn as nn # Hyper Parameters:
FRAME_PER_ACTION =1 GAMMA = 0.99 # decay rate of past observations OBSERVE =
1000.# timesteps to observe before training EXPLORE = 200000. # frames over
which to anneal epsilonFINAL_EPSILON = 0#0.001 # final value of epsilon
INITIAL_EPSILON =0#0.01 # starting value of epsilon REPLAY_MEMORY = 50000 #
number of previous transitions to rememberBATCH_SIZE = 32 # size of minibatch
UPDATE_TIME =100 width=80 height=80; import random class DeepNetWork(nn.Module):
def__init__(self): super(DeepNetWork, self).__init__() self.conv1 =
nn.Sequential( nn.Conv2d(in_channels=4, out_channels=32, kernel_size=8,stride=4,
padding=2), nn.ReLU(inplace=True), nn.MaxPool2d(kernel_size=2) # ) self.conv2 =
nn.Sequential( nn.Conv2d(in_channels=32, out_channels=64, kernel_size=4, stride=
2,padding=1), nn.ReLU(inplace=True), ) self.conv3 = nn.Sequential( nn.Conv2d(
in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1), nn.ReLU(
inplace=True), ) self.fc1=nn.Sequential( nn.Linear(1600,256), nn.ReLU(), ) self
.out = nn.Linear(256,2) def forward(self,x): #btach channel width,weight x =
self.conv1(x) x = self.conv2(x) x = self.conv3(x) x = x.view(x.size(0), -1) # 将(
batch,outchanel,w,h)展平为(batch,outchanel*w*h) x = self.fc1(x) output = self
.out(x)return output import os class BrainDQNMain: def save(self): print("save
model param") torch.save(self.Q_net.state_dict(), 'params3.pth') def load(self):
ifos.path.exists("params3.pth"): print("load model param") self
.Q_net.load_state_dict(torch.load('params3.pth')) self
.Q_netT.load_state_dict(torch.load('params3.pth')) def __init__(self,actions):
self.replayMemory = deque() # init some parameters self.timeStep = 0 self
.epsilon = INITIAL_EPSILONself.actions = actions self.Q_net=DeepNetWork() self
.Q_netT=DeepNetWork();self.load() self.loss_func=nn.MSELoss() LR=1e-6 self
.optimizer = torch.optim.Adam(self.Q_net.parameters(), lr=LR) def train(self):
# Step 1: obtain random minibatch from replay memory minibatch = random.sample(
self.replayMemory, BATCH_SIZE) state_batch = [data[0] for data in minibatch]
action_batch = [data[1] for data in minibatch] reward_batch = [data[2] for data
inminibatch] nextState_batch = [data[3] for data in minibatch] # Step 2:
calculate y y_batch = np.zeros([BATCH_SIZE,1])
nextState_batch=np.array(nextState_batch)#print("train next state shape")
#print(nextState_batch.shape) nextState_batch=torch.Tensor(nextState_batch)
action_batch=np.array(action_batch) index=action_batch.argmax(axis=1) print(
"action "+str(index)) index=np.reshape(index,[BATCH_SIZE,1])
action_batch_tensor=torch.LongTensor(index) QValue_batch =self
.Q_netT(nextState_batch) QValue_batch=QValue_batch.detach().numpy()for i in
range(0, BATCH_SIZE): terminal = minibatch[i][4] if terminal: y_batch[i][0
]=reward_batch[i]else: # 这里的QValue_batch[i]为数组,大小为所有动作集合大小,QValue_batch[i],代表 #
做所有动作的Q值数组,y计算为如果游戏停止,y=rewaerd[i],如果没停止,则y=reward[i]+gamma*np.max(Qvalue[i]) #
代表当前y值为当前reward+未来预期最大值*gamma(gamma:经验系数) y_batch[i][0]=reward_batch[i] + GAMMA
* np.max(QValue_batch[i]) y_batch=np.array(y_batch) y_batch=np.reshape(y_batch,
[BATCH_SIZE,1]) state_batch_tensor=Variable(torch.Tensor(state_batch))
y_batch_tensor=Variable(torch.Tensor(y_batch)) y_predict=self
.Q_net(state_batch_tensor).gather(1,action_batch_tensor) loss=self
.loss_func(y_predict,y_batch_tensor) print("loss is "+str(loss)) self
.optimizer.zero_grad() loss.backward()self.optimizer.step() if self.timeStep %
UPDATE_TIME ==0: self.Q_netT.load_state_dict(self.Q_net.state_dict()) self
.save()def setPerception(self,nextObservation,action,reward,terminal):
#print(nextObservation.shape) newState = np.append(self.currentState[1:,:,:],
nextObservation,axis = 0) # newState =
np.append(nextObservation,self.currentState[:,:,1:],axis = 2) self
.replayMemory.append((self.currentState,action,reward,newState,terminal)) if len
(self.replayMemory) > REPLAY_MEMORY: self.replayMemory.popleft() if self
.timeStep > OBSERVE:# Train the network self.train() # print info state = "" if
self.timeStep <= OBSERVE: state = "observe" elif self.timeStep > OBSERVE and
self.timeStep <= OBSERVE + EXPLORE: state = "explore" else: state = "train"
print("TIMESTEP", self.timeStep, "/ STATE", state, \ "/ EPSILON", self.epsilon)
self.currentState = newState self.timeStep += 1 def getAction(self):
currentState=torch.Tensor([self.currentState]) QValue = self
.Q_net(currentState)[0] action = np.zeros(self.actions) if self.timeStep %
FRAME_PER_ACTION ==0: if random.random() <= self.epsilon: action_index =
random.randrange(self.actions) print("choose random action "+str(action_index))
action[action_index] =1 else: action_index = np.argmax(QValue.detach().numpy())
print("choose qnet value action " + str(action_index)) action[action_index] = 1
else: action[0] = 1 # do nothing # change episilon if self.epsilon >
FINAL_EPSILONand self.timeStep > OBSERVE: self.epsilon -= (INITIAL_EPSILON -
FINAL_EPSILON) / EXPLOREreturn action def setInitState(self, observation): self
.currentState = np.stack((observation, observation, observation, observation),
axis=0) print(self.currentState.shape)此代码是
https://github.com/yenchenlin1994/DeepLearningFlappyBird
<https://github.com/yenchenlin1994/DeepLearningFlappyBird>
的pytorch实现版本。所以关于其他细节设计,我尽量靠近原版,包括变量名。

友情链接
KaDraw流程图
API参考文档
OK工具箱
云服务器优惠
阿里云优惠券
腾讯云优惠券
华为云优惠券
站点信息
问题反馈
邮箱:[email protected]
QQ群:637538335
关注微信