def meanQvalue(Q, samples):
xp = Q.xp
s = np.ndarray(shape=(minibatch_size, STATE_LENGTH, FRAME_WIDTH, FRAME_HEIGHT), dtype=np.float32)
a = np.asarray([sample[1] for sample in samples], dtype=np.int32)
for i in xrange(minibatch_size):
s[i] = samples[i][0]
# to gpu if available
s = xp.asarray(s)
a = xp.asarray(a)
# Prediction: Q(s,a)
y = F.select_item(Q(s), a)
mean_Q = (F.sum(y)/minibatch_size).data
return mean_Q
python类select_item()的实例源码
def meanQvalue(Q, samples):
xp = Q.xp
s = np.ndarray(shape=(minibatch_size, STATE_LENGTH, FRAME_WIDTH, FRAME_HEIGHT), dtype=np.float32)
a = np.asarray([sample[1] for sample in samples], dtype=np.int32)
for i in xrange(minibatch_size):
s[i] = samples[i][0]
# to gpu if available
s = xp.asarray(s)
a = xp.asarray(a)
# Prediction: Q(s,a)
y = F.select_item(Q(s), a)
mean_Q = (F.sum(y)/minibatch_size).data
return mean_Q
def max(self):
with chainer.force_backprop_mode():
return F.select_item(self.q_values, self.greedy_actions)
def evaluate_actions(self, actions):
return F.select_item(self.q_values, actions)
def prob(self, x):
return F.select_item(self.all_prob, x)
def log_prob(self, x):
return F.select_item(self.all_log_prob, x)
def sampled_actions_log_probs(self):
return F.select_item(
self.log_probs,
chainer.Variable(np.asarray(self.action_indices, dtype=np.int32)))
def update(Q, target_Q, opt, samples, gamma=0.99, target_type='double_dqn'):
xp = Q.xp
s = np.ndarray(shape=(minibatch_size, STATE_LENGTH, FRAME_WIDTH, FRAME_HEIGHT), dtype=np.float32)
a = np.asarray([sample[1] for sample in samples], dtype=np.int32)
r = np.asarray([sample[2] for sample in samples], dtype=np.float32)
done = np.asarray([sample[3] for sample in samples], dtype=np.float32)
s_next = np.ndarray(shape=(minibatch_size, STATE_LENGTH, FRAME_WIDTH, FRAME_HEIGHT), dtype=np.float32)
for i in xrange(minibatch_size):
s[i] = samples[i][0]
s_next[i] = samples[i][4]
# to gpu if available
s = xp.asarray(s)
a = xp.asarray(a)
r = xp.asarray(r)
done = xp.asarray(done)
s_next = xp.asarray(s_next)
# Prediction: Q(s,a)
y = F.select_item(Q(s), a)
# Target: r + gamma * max Q_b (s',b)
with chainer.no_backprop_mode():
if target_type == 'dqn':
t = r + gamma * (1 - done) * F.max(target_Q(s_next), axis=1)
elif target_type == 'double_dqn':
t = r + gamma * (1 - done) * F.select_item(
target_Q(s_next), F.argmax(Q(s_next), axis=1))
else:
raise ValueError('Unsupported target_type: {}'.format(target_type))
loss = mean_clipped_loss(y, t)
Q.cleargrads()
loss.backward()
opt.update()
def update(Q, target_Q, opt, samples, gamma=0.99, target_type='double_dqn'):
xp = Q.xp
s = np.ndarray(shape=(minibatch_size, STATE_LENGTH, FRAME_WIDTH, FRAME_HEIGHT), dtype=np.float32)
a = np.asarray([sample[1] for sample in samples], dtype=np.int32)
r = np.asarray([sample[2] for sample in samples], dtype=np.float32)
done = np.asarray([sample[3] for sample in samples], dtype=np.float32)
s_next = np.ndarray(shape=(minibatch_size, STATE_LENGTH, FRAME_WIDTH, FRAME_HEIGHT), dtype=np.float32)
for i in xrange(minibatch_size):
s[i] = samples[i][0]
s_next[i] = samples[i][4]
# to gpu if available
s = xp.asarray(s)
a = xp.asarray(a)
r = xp.asarray(r)
done = xp.asarray(done)
s_next = xp.asarray(s_next)
# Prediction: Q(s,a)
y = F.select_item(Q(s), a)
f0 = Q.conv1.data
print f0.shape
# Target: r + gamma * max Q_b (s',b)
with chainer.no_backprop_mode():
if target_type == 'dqn':
t = r + gamma * (1 - done) * F.max(target_Q(s_next), axis=1)
elif target_type == 'double_dqn':
t = r + gamma * (1 - done) * F.select_item(
target_Q(s_next), F.argmax(Q(s_next), axis=1))
else:
raise ValueError('Unsupported target_type: {}'.format(target_type))
loss = mean_clipped_loss(y, t)
Q.cleargrads()
loss.backward()
opt.update()
def check_forward(self, x_data, t_data):
x = chainer.Variable(x_data)
t = chainer.Variable(t_data)
y = functions.select_item(x, t)
y_exp = cuda.to_cpu(x_data)[range(t_data.size), cuda.to_cpu(t_data)]
self.assertEqual(y.data.dtype, self.dtype)
numpy.testing.assert_equal(cuda.to_cpu(y.data), y_exp)
def check_value_check(self, x_data, t_data):
x = chainer.Variable(x_data)
t = chainer.Variable(t_data)
if self.valid:
# Check if it throws nothing
functions.select_item(x, t)
else:
with self.assertRaises(ValueError):
functions.select_item(x, t)
def calc_loss_recurrent(self, frames, actions, rewards, done_list, size_list):
# TODO self.max_step -> max_step
s = Variable(frames.astype(np.float32))
self.model_target.reset_state() # Refresh model_target's state
self.model_target.q_function(s[0]) # Update target model initial state
target_q = self.xp.zeros((self.max_step, self.replay_batch_size), dtype=np.float32)
selected_q_tuple = [None for _ in range(self.max_step)]
for frame in range(0, self.max_step):
q = self.model.q_function(s[frame])
q_dash = self.model_target.q_function(s[frame+1]) # Q(s',*): shape is (batch_size, action_num)
max_q_dash = q_dash.data.max(axis=1) # max_a Q(s',a): shape is (batch_size,)
if self.clipping:
rs = self.xp.sign(rewards[frame])
else:
rs = rewards[frame]
target_q[frame] = rs + self.xp.logical_not(done_list[frame]).astype(np.int)*(self.gamma*max_q_dash)
selected_q_tuple[frame] = F.select_item(q, actions[frame].astype(np.int))
enable = self.xp.broadcast_to(self.xp.arange(self.max_step), (self.replay_batch_size, self.max_step))
size_list = self.xp.expand_dims(cuda.to_gpu(size_list), -1)
enable = (enable < size_list).T
selected_q = F.concat(selected_q_tuple, axis=0)
# element-wise huber loss
huber_loss = F.huber_loss(
F.expand_dims(F.flatten(target_q), axis=1),
F.expand_dims(selected_q, axis=1), delta=1.0)
huber_loss = F.reshape(huber_loss, enable.shape)
zeros = self.xp.zeros(enable.shape, dtype=np.float32)
loss = F.sum(F.where(enable, huber_loss, zeros)) #/ self.replay_batch_size
#print("loss", loss.data)
return loss