def calc_loss(self, states, actions, rewards, next_states, episode_ends):
qv = self.agent.q(states)
q_t = self.target(next_states) # Q(s', *)
max_q_prime = np.array(list(map(np.max, q_t.data)), dtype=np.float32) # max_a Q(s', a)
target = cuda.to_cpu(qv.data.copy())
for i in range(self.replay_size):
if episode_ends[i][0] is True:
_r = np.sign(rewards[i])
else:
_r = np.sign(rewards[i]) + self.gamma * max_q_prime[i]
target[i, actions[i]] = _r
td = Variable(self.target.arr_to_gpu(target)) - qv
td_tmp = td.data + 1000.0 * (abs(td.data) <= 1) # Avoid zero division
td_clip = td * (abs(td.data) <= 1) + td/abs(td_tmp) * (abs(td.data) > 1)
zeros = Variable(self.target.arr_to_gpu(np.zeros((self.replay_size, self.target.n_action), dtype=np.float32)))
loss = F.mean_squared_error(td_clip, zeros)
self._loss = loss.data
self._qv = np.max(qv.data)
return loss
python类Variable()的实例源码
def visualize_layer_activations(model, im, layer_idx):
"""Compute the activations for each feature map for the given layer for
this particular image. Note that the input x should be a mini-batch
of size one, i.e. a single image.
"""
if model._device_id is not None and model._device_id >= 0: # Using GPU
im = cuda.cupy.array(im)
activations = model.activations(Variable(im), layer_idx)
if isinstance(activations, cuda.ndarray):
activations = cuda.cupy.asnumpy(activations)
# Rescale to [0, 255]
activations -= activations.min()
activations /= activations.max()
activations *= 255
return activations.astype(np.uint8)
def test_backward():
# Construct test data
x = Variable(np.array([5., 3., 3., 1., 0.]))
g = Variable(np.ones(5))
expected_result = np.array([0.7717692057972512, 0.562087881852882,
1.4058826163342215, 0.9213241007090265,
1.3389361953066183])
# Generate object
lcse = LogCumsumExp()
# Run forward and backward pass
lcse.forward((x.data,))
result = lcse.backward((x.data, ), (g.data, ))
# Assert that the result equals the expected result
assert_true(np.array_equal(result[0], expected_result))
def __call__(self, x, update=True):
"""Normalize mean and variance of values based on emprical values.
Args:
x (ndarray or Variable): Input values
update (bool): Flag to learn the input values
Returns:
ndarray or Variable: Normalized output values
"""
xp = self.xp
mean = xp.broadcast_to(self._mean, x.shape)
std_inv = xp.broadcast_to(self._std_inverse, x.shape)
if update:
self.experience(x)
return (x - mean) * std_inv
def bound_by_tanh(x, low, high):
"""Bound a given value into [low, high] by tanh.
Args:
x (chainer.Variable): value to bound
low (numpy.ndarray): lower bound
high (numpy.ndarray): upper bound
Returns: chainer.Variable
"""
assert isinstance(x, chainer.Variable)
assert low is not None
assert high is not None
xp = cuda.get_array_module(x.data)
x_scale = (high - low) / 2
x_scale = xp.expand_dims(xp.asarray(x_scale), axis=0)
x_mean = (high + low) / 2
x_mean = xp.expand_dims(xp.asarray(x_mean), axis=0)
return F.tanh(x) * x_scale + x_mean
def compute_policy_gradient_sample_correction(
action_distrib, action_distrib_mu, action_value, v,
truncation_threshold):
"""Compute off-policy bias correction term wrt a sampled action."""
assert np.isscalar(v)
assert truncation_threshold is not None
with chainer.no_backprop_mode():
sample_action = action_distrib.sample().data
rho_dash_inv = compute_importance(
action_distrib_mu, action_distrib, sample_action)
if (truncation_threshold > 0 and
rho_dash_inv >= 1 / truncation_threshold):
return chainer.Variable(np.asarray([0], dtype=np.float32))
correction_weight = max(0, 1 - truncation_threshold * rho_dash_inv)
assert correction_weight <= 1
q = float(action_value.evaluate_actions(sample_action).data[0])
correction_advantage = q - v
return -(correction_weight *
action_distrib.log_prob(sample_action) *
correction_advantage)
def test_compute_advantage(self):
sample_actions = np.random.randint(self.action_size,
size=self.batch_size)
greedy_actions = self.q_values.argmax(axis=1)
ret = self.qout.compute_advantage(sample_actions)
self.assertIsInstance(ret, chainer.Variable)
for b in range(self.batch_size):
if sample_actions[b] == greedy_actions[b]:
self.assertAlmostEqual(ret.data[b], 0)
else:
# An advantage to the optimal policy must be always negative
self.assertLess(ret.data[b], 0)
q = self.q_values[b, sample_actions[b]]
v = self.q_values[b, greedy_actions[b]]
adv = q - v
self.assertAlmostEqual(ret.data[b], adv)
def test_max_unbounded(self):
n_batch = 7
ndim_action = 3
mu = np.random.randn(n_batch, ndim_action).astype(np.float32)
mat = np.broadcast_to(
np.eye(ndim_action, dtype=np.float32)[None],
(n_batch, ndim_action, ndim_action))
v = np.random.randn(n_batch).astype(np.float32)
q_out = action_value.QuadraticActionValue(
chainer.Variable(mu),
chainer.Variable(mat),
chainer.Variable(v))
v_out = q_out.max
self.assertIsInstance(v_out, chainer.Variable)
v_out = v_out.data
np.testing.assert_almost_equal(v_out, v)
def setUp(self):
def evaluator(actions):
# negative square norm of actions
return -F.sum(actions ** 2, axis=1)
self.evaluator = evaluator
if self.has_maximizer:
def maximizer():
return chainer.Variable(np.zeros(
(self.batch_size, self.action_size), dtype=np.float32))
else:
maximizer = None
self.maximizer = maximizer
self.av = action_value.SingleActionValue(
evaluator=evaluator, maximizer=maximizer)
def _test_call_given_model(self, model, gpu):
# This method only check if a given model can receive random input
# data and return output data with the correct interface.
batch_size = 7
obs = np.random.rand(batch_size, self.n_dim_obs).astype(np.float32)
action = np.random.rand(
batch_size, self.n_dim_action).astype(np.float32)
if gpu >= 0:
model.to_gpu(gpu)
obs = chainer.cuda.to_gpu(obs)
action = chainer.cuda.to_gpu(action)
y = model(obs, action)
self.assertTrue(isinstance(y, chainer.Variable))
self.assertEqual(y.shape, (batch_size, 1))
self.assertEqual(chainer.cuda.get_array_module(y),
chainer.cuda.get_array_module(obs))
def test_copy_param(self):
a = L.Linear(1, 5)
b = L.Linear(1, 5)
s = chainer.Variable(np.random.rand(1, 1).astype(np.float32))
a_out = list(a(s).data.ravel())
b_out = list(b(s).data.ravel())
self.assertNotEqual(a_out, b_out)
# Copy b's parameters to a
copy_param.copy_param(a, b)
a_out_new = list(a(s).data.ravel())
b_out_new = list(b(s).data.ravel())
self.assertEqual(a_out_new, b_out)
self.assertEqual(b_out_new, b_out)
def test_boltzmann(self):
# T=1
q_values = chainer.Variable(np.asarray([[-1, 1, 0]], dtype=np.float32))
action_count = count_actions_selected_by_boltzmann(1, q_values)
print('T=1', action_count)
# Actions with larger values must be selected more often
self.assertGreater(action_count[1], action_count[2])
self.assertGreater(action_count[2], action_count[0])
# T=0.5
action_count_t05 = count_actions_selected_by_boltzmann(0.5, q_values)
print('T=0.5', action_count_t05)
# Actions with larger values must be selected more often
self.assertGreater(action_count_t05[1], action_count_t05[2])
self.assertGreater(action_count_t05[2], action_count_t05[0])
# T=0.5 must be more greedy than T=1
self.assertGreater(action_count_t05[1], action_count[1])
def validate(test_data, test_labels, model, batchsize, silent, gpu):
N_test = test_data.shape[0]
pbar = ProgressBar(0, N_test)
sum_accuracy = 0
sum_loss = 0
for i in range(0, N_test, batchsize):
x_batch = test_data[i:i + batchsize]
y_batch = test_labels[i:i + batchsize]
if gpu >= 0:
x_batch = cuda.to_gpu(x_batch.astype(np.float32))
y_batch = cuda.to_gpu(y_batch.astype(np.int32))
x = Variable(x_batch)
t = Variable(y_batch)
loss, acc = model(x, t, train=False)
sum_loss += float(cuda.to_cpu(loss.data)) * y_batch.size
sum_accuracy += float(cuda.to_cpu(acc.data)) * y_batch.size
if not silent:
pbar.update(i + y_batch.size)
return sum_loss, sum_accuracy
def test_call(self):
xp = chainer.cuda.cupy
x = chainer.Variable(xp.asarray(self.x, dtype=xp.float32))
gt_boxes = self.gt_boxes
im_info = self.im_info
labels, bbox_targets, bbox_inside_weights, bbox_outside_weights = \
self.anchor_target_layer(x, gt_boxes, im_info)
n_anchors = self.anchor_target_layer.n_anchors
self.assertEqual(labels.shape,
(1, n_anchors, self.height, self.width))
self.assertEqual(bbox_targets.shape,
(1, n_anchors * 4, self.height, self.width))
self.assertEqual(bbox_inside_weights.shape,
(1, n_anchors * 4, self.height, self.width))
self.assertEqual(bbox_outside_weights.shape,
(1, n_anchors * 4, self.height, self.width))
def test_forward_cpu_VGG16(self):
print('test_forward_cpu_VGG16')
gpu = -1
trunk = VGG16
rpn_in_ch = 512
rpn_out_ch = 512
n_anchors = 9
feat_stride = 16
anchor_scales = [8, 16, 32]
num_classes = 21
spatial_scale = 0.0625
model = FasterRCNN(
gpu, trunk, rpn_in_ch, rpn_out_ch, n_anchors, feat_stride,
anchor_scales, num_classes, spatial_scale)
model.train = False
ret = model(chainer.Variable(self.x, volatile=True), self.im_info)
assert(len(ret) == 2)
assert(isinstance(ret[0], chainer.Variable))
assert(isinstance(ret[1], np.ndarray))
def check_transform_grad(inds, w, transformer, dtype, toll):
from chainer import gradient_check
inds = cuda.to_gpu(inds)
W = Variable(w.astype(dtype))
R = transformer(inds)
RW = R(W)
RW.grad = cp.random.randn(*RW.data.shape).astype(dtype)
RW.backward(retain_grad=True)
func = RW.creator
fn = lambda: func.forward((W.data,))
gW, = gradient_check.numerical_grad(fn, (W.data,), (RW.grad,))
gan = cuda.to_cpu(gW)
gat = cuda.to_cpu(W.grad)
relerr = np.max(np.abs(gan - gat) / np.maximum(np.abs(gan), np.abs(gat)))
print (dtype, toll, relerr)
assert relerr < toll
def check_equivariance(im, layers, input_array, output_array, point_group):
# Transform the image
f = input_array(im)
g = point_group.rand()
gf = g * f
im1 = gf.v
# Apply layers to both images
im = Variable(cuda.to_gpu(im))
im1 = Variable(cuda.to_gpu(im1))
fmap = im
fmap1 = im1
for layer in layers:
layer.to_gpu()
fmap = layer(fmap)
fmap1 = layer(fmap1)
# Transform the computed feature maps
fmap1_garray = output_array(cuda.to_cpu(fmap1.data))
r_fmap1_data = (g.inv() * fmap1_garray).v
fmap_data = cuda.to_cpu(fmap.data)
assert np.allclose(fmap_data, r_fmap1_data, rtol=1e-5, atol=1e-3)
def test_save_normal_graphs(self):
x = np.random.uniform(-1, 1, self.x_shape)
x = Variable(x.astype(np.float32))
for depth in six.moves.range(1, self.n_encdec + 1):
model = segnet.SegNet(
n_encdec=self.n_encdec, in_channel=self.x_shape[1])
y = model(x, depth)
cg = build_computational_graph(
[y],
variable_style=_var_style,
function_style=_func_style
).dump()
for e in range(1, self.n_encdec + 1):
self.assertTrue('encdec{}'.format(e) in model._children)
fn = 'tests/SegNet_x_depth-{}_{}.dot'.format(self.n_encdec, depth)
if os.path.exists(fn):
continue
with open(fn, 'w') as f:
f.write(cg)
subprocess.call(
'dot -Tpng {} -o {}'.format(
fn, fn.replace('.dot', '.png')), shell=True)
def nearest_neighbor_patch(x, patch, patch_norm):
assert patch.data.shape[0] == 1, 'mini batch size of patch must be 1'
assert patch_norm.data.shape[0] == 1, 'mini batch size of patch_norm must be 1'
xp = cuda.get_array_module(x.data)
z = x.data
b, ch, h, w = z.shape
z = z.transpose((1, 0, 2, 3)).reshape((ch, -1))
norm = xp.expand_dims(xp.sum(z ** 2, axis=0) ** 0.5, 0)
z = z / xp.broadcast_to(norm, z.shape)
p = patch.data
p_norm = patch_norm.data
p = p.reshape((ch, -1))
p_norm = p_norm.reshape((1, -1))
p_normalized = p / xp.broadcast_to(p_norm, p.shape)
correlation = z.T.dot(p_normalized)
min_index = xp.argmax(correlation, axis=1)
nearest_neighbor = p.take(min_index, axis=1).reshape((ch, b, h, w)).transpose((1, 0, 2, 3))
return Variable(nearest_neighbor)
def update_core(self):
batch = self._iterators['main'].next()
in_arrays = self.converter(batch, self.device)
loss_detail = self.loss_maker.calc_loss(*tuple(chainer.Variable(x) for x in in_arrays), test=False)
# main network
main_optimizer = self.main_optimizer
main_optimizer.update(self.main_lossfun, loss_detail)
# reinput network
reinput_optimizer_list = self.reinput_optimizer
if reinput_optimizer_list is not None:
for i_reinput, reinput_optimizer in enumerate(reinput_optimizer_list):
reinput_optimizer.update(self.reinput_lossfun, i_reinput, loss_detail)
if self.discriminator_optimizer is not None:
self.discriminator_optimizer.update(self.discriminator_lossfun, loss_detail)
def __call__(
self,
h,
one_dimension_feature_list,
test=False,
):
# type: (chainer.Variable, typing.List[chainer.Variable], bool) -> any
batchsize = h.data.shape[0]
height = h.data.shape[2]
width = h.data.shape[3]
h_global = chainer.functions.concat(one_dimension_feature_list)
channel = h_global.data.shape[1]
h_global = chainer.functions.broadcast_to(h_global, (height, width, batchsize, channel))
h_global = chainer.functions.transpose(h_global, (2, 3, 0, 1))
h = chainer.functions.concat((h, h_global))
h = chainer.functions.relu(self.conv(h))
return h
def _forward_reinput(self, color, image_rgb, image_real, test):
outputs = []
for i_reinput in range(len(self.args.loss_blend_ratio_reinput)):
model_reinput = self.model_reinput_list[i_reinput]
image_input = model_reinput.xp.copy(color.data)
image_input_residual = model_reinput.xp.copy(image_input)
# convert gray color range '''output to input'''
image_input[:, 0, :, :] = comicolorization.utility.color.normalize(
image_input[:, 0, :, :],
in_min=self.range_output_luminance[0], in_max=self.range_output_luminance[1],
out_min=self.range_input_luminance[0], out_max=self.range_input_luminance[1],
)
color, other, disc_real, disc_gen = \
self._forward_model(model_reinput, image_input, image_rgb, image_real, test=test)
if self.args.use_residual_reinput:
color += chainer.Variable(image_input_residual)
outputs.append([color, other, disc_real, disc_gen])
return outputs
def update_core(self):
gen_optimizer = self.get_optimizer('gen')
dis_optimizer = self.get_optimizer('dis')
batch = self.get_iterator('main').next()
x_real = Variable(self.converter(batch, self.device)) / 255.
xp = chainer.cuda.get_array_module(x_real.data)
gen, dis = self.gen, self.dis
batchsize = len(batch)
y_real = dis(x_real)
z = Variable(xp.asarray(gen.make_hidden(batchsize)))
x_fake = gen(z)
y_fake = dis(x_fake)
dis_optimizer.update(self.loss_dis, dis, y_fake, y_real)
gen_optimizer.update(self.loss_gen, gen, y_fake)
def eval_single_run(env, model, phi, deterministic=False):
model.reset_state()
test_r = 0
obs = env.reset()
done = False
while not done:
s = chainer.Variable(np.expand_dims(phi(obs), 0))
pout = model.pi_and_v(s)[0]
model.unchain_backward()
if deterministic:
a = pout.most_probable_actions[0]
else:
a = pout.action_indices[0]
obs, r, done, info = env.step(a)
test_r += r
return test_r
def eval_performance(rom, p_func, n_runs):
assert n_runs > 1, 'Computing stdev requires at least two runs'
scores = []
for i in range(n_runs):
env = ale.ALE(rom, treat_life_lost_as_terminal=False)
test_r = 0
while not env.is_terminal:
s = chainer.Variable(np.expand_dims(dqn_phi(env.state), 0))
pout = p_func(s)
a = pout.action_indices[0]
test_r += env.receive_action(a)
scores.append(test_r)
print('test_{}:'.format(i), test_r)
mean = statistics.mean(scores)
median = statistics.median(scores)
stdev = statistics.stdev(scores)
return mean, median, stdev
def eval_performance(rom, model, deterministic=False, use_sdl=False,
record_screen_dir=None):
env = ale.ALE(rom, treat_life_lost_as_terminal=False, use_sdl=use_sdl,
record_screen_dir=record_screen_dir)
model.reset_state()
test_r = 0
while not env.is_terminal:
s = chainer.Variable(np.expand_dims(dqn_phi(env.state), 0))
pout = model.pi_and_v(s)[0]
model.unchain_backward()
if deterministic:
a = pout.most_probable_actions[0]
else:
a = pout.action_indices[0]
test_r += env.receive_action(a)
return test_r
def eval_performance(process_idx, make_env, model, phi, n_runs):
assert n_runs > 1, 'Computing stdev requires at least two runs'
scores = []
for i in range(n_runs):
model.reset_state()
env = make_env(process_idx, test=True)
obs = env.reset()
done = False
test_r = 0
while not done:
s = chainer.Variable(np.expand_dims(phi(obs), 0))
pout, _ = model.pi_and_v(s)
a = pout.action_indices[0]
obs, r, done, info = env.step(a)
test_r += r
scores.append(test_r)
print('test_{}:'.format(i), test_r)
mean = statistics.mean(scores)
median = statistics.median(scores)
stdev = statistics.stdev(scores)
return mean, median, stdev
def __call__(self, xs):
"""
xs: (batchsize, hidden_dim)
"""
if self.h is not None:
h = self.h
c = self.c
else:
xp = chainer.cuda.get_array_module(xs.data)
batchsize = xs.shape[0]
h = Variable(xp.zeros((batchsize, self.outsize), 'f'), volatile='AUTO')
c = Variable(xp.zeros((batchsize, self.outsize), 'f'), volatile='AUTO')
in_gate = F.sigmoid(self.linear_in(F.concat([xs, h, c])))
new_in = F.tanh(self.linear_c(F.concat([xs, h])))
self.c = in_gate * new_in + (1. - in_gate) * c
out_gate = F.sigmoid(self.linear_out(F.concat([xs, h, self.c])))
self.h = F.tanh(self.c) * out_gate
return self.h
def __call__(self, xs, ts):
"""
Inputs:
xs (tuple(Variable, Variable, Variable)):
each of Variables is of dim (batchsize,)
ts Variable:
(batchsize)
"""
words, suffixes, caps = xs[:,:7], xs[:, 7:14], xs[:, 14:]
h_w = self.emb_word(words)
h_c = self.emb_caps(caps)
h_s = self.emb_suffix(suffixes)
h = F.concat([h_w, h_c, h_s], 2)
batchsize, ntokens, hidden = h.data.shape
h = F.reshape(h, (batchsize, ntokens * hidden))
ys = self.linear(h)
loss = F.softmax_cross_entropy(ys, ts)
acc = F.accuracy(ys, ts)
chainer.report({
"loss": loss,
"accuracy": acc
}, self)
return loss
def _calculate_loss(self, sent):
# sent is a batch of sentences.
sent_arr = self.xp.asarray(sent, dtype=np.int32)
sent_y = self._contexts_rep(sent_arr)
sent_x = []
for i in range(sent_arr.shape[1]):
x = chainer.Variable(sent_arr[:,i])
sent_x.append(x)
accum_loss = None
for y,x in izip(sent_y, sent_x):
loss = self.loss_func(y, x)
accum_loss = accum_loss + loss if accum_loss is not None else loss
return accum_loss