def calc_loss(self, state, state_dash, actions, rewards, done_list):
assert(state.shape == state_dash.shape)
s = state.reshape((state.shape[0], reduce(lambda x, y: x*y, state.shape[1:]))).astype(np.float32)
s_dash = state_dash.reshape((state.shape[0], reduce(lambda x, y: x*y, state.shape[1:]))).astype(np.float32)
q = self.model.q_function(s)
q_dash = self.model_target.q_function(s_dash) # Q(s',*)
max_q_dash = np.asarray(list(map(np.max, q_dash.data)), dtype=np.float32) # max_a Q(s',a)
target = q.data.copy()
for i in range(self.replay_batch_size):
assert(self.replay_batch_size == len(done_list))
r = np.sign(rewards[i]) if self.clipping else rewards[i]
if done_list[i]:
discounted_sum = r
else:
discounted_sum = r + self.gamma * max_q_dash[i]
assert(self.replay_batch_size == len(actions))
target[i, actions[i]] = discounted_sum
loss = F.sum(F.huber_loss(Variable(target), q, delta=1.0)) #/ self.replay_batch_size
return loss, q
python类huber_loss()的实例源码
def compute_value_loss(y, t, clip_delta=True, batch_accumulator='mean'):
"""Compute a loss for value prediction problem.
Args:
y (Variable or ndarray): Predicted values.
t (Variable or ndarray): Target values.
clip_delta (bool): Use the Huber loss function if set True.
batch_accumulator (str): 'mean' or 'sum'. 'mean' will use the mean of
the loss values in a batch. 'sum' will use the sum.
Returns:
(Variable) scalar loss
"""
assert batch_accumulator in ('mean', 'sum')
y = F.reshape(y, (-1, 1))
t = F.reshape(t, (-1, 1))
if clip_delta:
loss_sum = F.sum(F.huber_loss(y, t, delta=1.0))
if batch_accumulator == 'mean':
loss = loss_sum / y.shape[0]
elif batch_accumulator == 'sum':
loss = loss_sum
else:
loss_mean = F.mean_squared_error(y, t) / 2
if batch_accumulator == 'mean':
loss = loss_mean
elif batch_accumulator == 'sum':
loss = loss_mean * y.shape[0]
return loss
def compute_weighted_value_loss(y, t, weights,
clip_delta=True, batch_accumulator='mean'):
"""Compute a loss for value prediction problem.
Args:
y (Variable or ndarray): Predicted values.
t (Variable or ndarray): Target values.
weights (ndarray): Weights for y, t.
clip_delta (bool): Use the Huber loss function if set True.
batch_accumulator (str): 'mean' will devide loss by batchsize
Returns:
(Variable) scalar loss
"""
assert batch_accumulator in ('mean', 'sum')
y = F.reshape(y, (-1, 1))
t = F.reshape(t, (-1, 1))
if clip_delta:
losses = F.huber_loss(y, t, delta=1.0)
else:
losses = F.square(y - t) / 2
losses = F.reshape(losses, (-1,))
loss_sum = F.sum(losses * weights)
if batch_accumulator == 'mean':
loss = loss_sum / y.shape[0]
elif batch_accumulator == 'sum':
loss = loss_sum
return loss
def mean_clipped_loss(y, t):
# Add an axis because F.huber_loss only accepts arrays with ndim >= 2
y = F.expand_dims(y, axis=-1)
t = F.expand_dims(t, axis=-1)
return F.sum(F.huber_loss(y, t, 1.0)) / y.shape[0]
def mean_clipped_loss(y, t):
# Add an axis because F.huber_loss only accepts arrays with ndim >= 2
y = F.expand_dims(y, axis=-1)
t = F.expand_dims(t, axis=-1)
return F.sum(F.huber_loss(y, t, 1.0)) / y.shape[0]
def check_forward(self, x_data, t_data):
x = chainer.Variable(x_data)
t = chainer.Variable(t_data)
loss = functions.huber_loss(x, t, delta=1)
self.assertEqual(loss.data.dtype, numpy.float32)
loss_value = cuda.to_cpu(loss.data)
diff_data = cuda.to_cpu(x_data) - cuda.to_cpu(t_data)
expected_result = numpy.zeros(self.shape)
mask = numpy.abs(diff_data) < 1
expected_result[mask] = 0.5 * diff_data[mask]**2
expected_result[~mask] = numpy.abs(diff_data[~mask]) - 0.5
loss_expect = numpy.sum(expected_result, axis=1)
gradient_check.assert_allclose(loss_value, loss_expect)
def calc_loss_recurrent(self, frames, actions, rewards, done_list, size_list):
# TODO self.max_step -> max_step
s = Variable(frames.astype(np.float32))
self.model_target.reset_state() # Refresh model_target's state
self.model_target.q_function(s[0]) # Update target model initial state
target_q = self.xp.zeros((self.max_step, self.replay_batch_size), dtype=np.float32)
selected_q_tuple = [None for _ in range(self.max_step)]
for frame in range(0, self.max_step):
q = self.model.q_function(s[frame])
q_dash = self.model_target.q_function(s[frame+1]) # Q(s',*): shape is (batch_size, action_num)
max_q_dash = q_dash.data.max(axis=1) # max_a Q(s',a): shape is (batch_size,)
if self.clipping:
rs = self.xp.sign(rewards[frame])
else:
rs = rewards[frame]
target_q[frame] = rs + self.xp.logical_not(done_list[frame]).astype(np.int)*(self.gamma*max_q_dash)
selected_q_tuple[frame] = F.select_item(q, actions[frame].astype(np.int))
enable = self.xp.broadcast_to(self.xp.arange(self.max_step), (self.replay_batch_size, self.max_step))
size_list = self.xp.expand_dims(cuda.to_gpu(size_list), -1)
enable = (enable < size_list).T
selected_q = F.concat(selected_q_tuple, axis=0)
# element-wise huber loss
huber_loss = F.huber_loss(
F.expand_dims(F.flatten(target_q), axis=1),
F.expand_dims(selected_q, axis=1), delta=1.0)
huber_loss = F.reshape(huber_loss, enable.shape)
zeros = self.xp.zeros(enable.shape, dtype=np.float32)
loss = F.sum(F.where(enable, huber_loss, zeros)) #/ self.replay_batch_size
#print("loss", loss.data)
return loss
def clear(self):
self.loss = None
# self.accuracy = None
# def forward(self, x, t):
# self.clear()
# #x = chainer.Variable(x_data) # x_data.astype(np.float32)
# #t = chainer.Variable(t_data) # [Note]: x_data, t_data must be np.float32 type
#
# #self.loss = F.huber_loss(h, t, delta= 1 / 255.)
# self.loss = F.mean_squared_error(self(x), t)
# # self.accuracy = F.accuracy(h, t) # type inconpatible
# return self.loss
#