Deep-EBQL理论基础
原文链接:Ensemble Bootstrapping for Q-Learning
Deep-EBQL是EBQL的深度学习版本,也即是在DQN的基础上,引入集成的思想,解决DQN过估计的问题。深度版本的EBQL在Atari环境下有着非常好的表现。
EBQL的理论基础与代码复现可以看这篇文章:Ensemble Bootstrapping for Q-Learning(EBQL)【论文复现】。
这是原文中在Atari环境下做的实验,可以看到EBQL算法确实有不俗的表现。
Deep-EBQL代码实现
下面介绍Deep-EBQL算法的代码实现,这个算法在DQN的基础上加以改进就可以了。环境是基于Pendulum-v0的,因为这个环境能很好的观察到DQN的过估计现象。
经验池
利用Deque数据结构实现一个经验池:
class ReplayBuffer:
"""经验回放池"""
def __init__(self, capacity):
self.buffer = deque(maxlen=capacity) # 队列,先进先出
# 将数据加入buffer
def add(self, state, action, reward, next_state, done):
self.buffer.append((state, action, reward, next_state, done))
# 从buffer中采样数据,数量为batch_size
def sample(self, batch_size):
transitions = random.sample(self.buffer, batch_size)
state, action, reward, next_state, done = zip(*transitions)
return np.array(state), action, reward, np.array(next_state), done
# 目前buffer中数据的数量
def size(self):
return len(self.buffer)
网络设置
本文使用的环境是Pendulum-v0,在这个环境中状态和动作维数很小,所以一层隐藏层就行了。
class Qnet(nn.Module):
def __init__(self, state_dim, hidden_dim, action_dim):
super(Qnet, self).__init__()
self.layer = nn.Sequential(
nn.Linear(state_dim, hidden_dim),
nn.ReLU(),
nn.Linear(hidden_dim, action_dim)
)
def forward(self, s):
s = self.layer(s)
return s
Deep-EBQL实现
这里的思想和标准的EBQL一模一样,只是把Q表格换成了神经网络!
class Deep_EBQL:
def __init__(self, args):
self.args = args
self.K = args.K # 使用Qnet的个数
self.hidden_dim = args.hidden_dim
self.batch_size = args.batch_size
self.lr = args.lr
self.gamma = args.gamma # 折扣因子
self.epsilon = args.epsilon # epsilon-贪婪策略
self.target_update = args.target_update # 目标网络更新频率
self.count = 0 # 计数器,记录更新次数
self.num_episodes = args.num_episodes
self.minimal_size = args.minimal_size
self.env = gym.make(args.env_name)
random.seed(args.seed)
np.random.seed(args.seed)
self.env.seed(args.seed)
torch.manual_seed(args.seed)
self.replay_buffer = ReplayBuffer(args.buffer_size)
self.state_dim = self.env.observation_space.shape[0]
self.action_dim = 11 # 将连续动作分成11个离散动作
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.q_net = []
self.optimizer = []
for i in range(self.K):
self.q_net.append(Qnet(self.state_dim, self.hidden_dim, self.action_dim).to(self.device))
self.optimizer.append(Adam(self.q_net[i].parameters(), lr=self.lr))
self.target_q_net = copy.deepcopy(self.q_net)
def select_action(self, state): # epsilon-贪婪策略采取动作
if np.random.random() < self.epsilon:
action = np.random.randint(self.action_dim)
else:
state = torch.tensor([state], dtype=torch.float).to(self.device)
action_Q_values = torch.zeros(state.shape[0], self.action_dim)
for k in range(self.K):
action_Q_values += self.q_net[k](state)
action_Q_values = action_Q_values / self.K
action = action_Q_values.argmax().item()
return action
def max_q_value(self, state): # 为了显示算法的过估计现象
state = torch.tensor([state], dtype=torch.float).to(self.device)
for i in range(self.K):
return self.q_net[i](state).max().item()
def update(self, transition):
states = torch.tensor(transition["states"], dtype=torch.float).to(self.device)
actions = torch.tensor(transition["actions"]).view(-1, 1).to(self.device)
rewards = torch.tensor(transition["rewards"], dtype=torch.float).view(-1, 1).to(self.device)
next_states = torch.tensor(transition["next_states"], dtype=torch.float).to(self.device)
dones = torch.tensor(transition["dones"], dtype=torch.float).view(-1, 1).to(self.device)
######################################################################
kt = np.random.randint(0, self.K)
q_values = self.q_net[kt](states).gather(1, actions) # Q value
max_next_q_values = torch.zeros(self.batch_size, 1).to(self.device)
for k in range(self.K):
if k != kt:
max_next_q_values += self.target_q_net[k](next_states).max(1)[0].view(-1, 1) # 下个状态的最大Q值
max_next_q_values = max_next_q_values / (self.K - 1)
q_targets = rewards + self.gamma * max_next_q_values * (1 - dones) # TD error
######################################################################
loss = torch.mean(F.mse_loss(q_values, q_targets)) # 均方误差损失函数
self.optimizer[kt].zero_grad() # PyTorch中默认梯度会累积,这里需要显式将梯度置为0
loss.backward() # 反向传播更新参数
self.optimizer[kt].step()
if self.count % self.target_update == 0: # 更新目标网络
for k in range(self.K):
self.target_q_net[k].load_state_dict(self.q_net[k].state_dict())
self.count += 1
参数设置
def define_args():
parser = argparse.ArgumentParser(description='Deep EBQN parametes settings')
parser.add_argument('--batch_size', type=int, default=64, metavar='N', help='batch size')
parser.add_argument('--lr', type=float, default=1e-2, help='Learning rate for the net.')
parser.add_argument('--num_episodes', type=int, default=200, help='the num of train epochs')
parser.add_argument('--seed', type=int, default=0, metavar='S', help='Random seed.')
parser.add_argument('--gamma', type=float, default=0.9, metavar='S', help='the discount rate')
parser.add_argument('--epsilon', type=float, default=0.01, metavar='S', help='the epsilon rate')
parser.add_argument('--K', type=int, default=5, metavar='S', help='the number of Qnet used to algorithm')
parser.add_argument('--target_update', type=float, default=10, metavar='S', help='the frequency of the target net')
parser.add_argument('--buffer_size', type=float, default=5000, metavar='S', help='the size of the buffer')
parser.add_argument('--minimal_size', type=float, default=500, metavar='S', help='the minimal size of the learning')
parser.add_argument('--hidden_size', type=float, default=128, metavar='S', help='the size of the hidden layer')
parser.add_argument('--env_name', type=str, default="Pendulum-v0", metavar='S', help='the name of the environment')
args = parser.parse_args()
return args
代码运行结果
Deep-EBQL、Double-DQN和DQN的对比
不是说Deep-EBQL可以解决Q值过估计问题吗,那我们就拿DQN和Double-DQN来做一个比较,看一下Deep-EBQL对于Q值估计的效果。
其中,DQN与Douvble-DQN的理论基础与代码实现看如下链接:
在实际对比中,我们尽量使一样的参数保持相同,比如经验池大小之类参数。在Pendulum-v0环境下,运行200个episodes,得到的结果如下:
可以看到,Deep-EBQL算法可以很好的解决Q值过估计问题。