def check_type_mismatch(self, x_data):
xp = cuda.get_array_module(x_data)
class DummyFunction(chainer.Function):
label = 'dummy_function'
def forward(self, inputs):
return xp.array(1, np.float32),
def backward(self, inputs, grads):
return [1]
x = chainer.Variable(x_data)
y = DummyFunction()(x)
with six.assertRaisesRegex(self, TypeError, 'dummy_function'):
y.backward()
python类get_array_module()的实例源码
def check_dtype_mismatch(self, x_data):
xp = cuda.get_array_module(x_data)
class DummyFunction(chainer.Function):
label = 'dummy_function'
def forward(self, inputs):
return xp.array(1, np.float32),
def backward(self, inputs, grads):
return xp.array([1], np.int32),
x = chainer.Variable(x_data)
y = DummyFunction()(x)
with six.assertRaisesRegex(self, TypeError, 'dummy_function'):
y.backward()
def check_shape_mismatch(self, x_data):
xp = cuda.get_array_module(x_data)
class DummyFunction(chainer.Function):
label = 'dummy_function'
def forward(self, inputs):
return xp.array(1, np.float32),
def backward(self, inputs, grads):
return xp.array([1, 2], np.float32),
x = chainer.Variable(x_data)
y = DummyFunction()(x)
with six.assertRaisesRegex(self, ValueError, 'dummy_function'):
y.backward()
def check_traceback(self, x_data):
xp = cuda.get_array_module(x_data)
class DummyFunction(chainer.Function):
label = 'dummy_function'
def forward(self, inputs):
return xp.array(1, np.float32),
def backward(self, inputs, grads):
return xp.array([1, 2], np.float32),
x = chainer.Variable(x_data)
line = inspect.currentframe().f_lineno + 1
y = DummyFunction()(x) # `line` is THIS line
try:
y.backward()
self.fail()
except ValueError as e:
self.assertIn('Stacktrace', str(e))
self.assertIn('line %d' % line, str(e))
def check_forward(self, f_data, f_p_data, l2_reg):
xp = cuda.get_array_module(f_data)
num_examples = len(f_data)
f = chainer.Variable(f_data)
f_p = chainer.Variable(f_p_data)
loss = n_pair_mc_loss(f, f_p, l2_reg)
loss_for_each = []
for i in range(num_examples):
exps = []
for j in set(range(num_examples)) - {i}:
exp_ij = xp.exp(f_data[i].dot(f_p_data[j]) -
f_data[i].dot(f_p_data[i]))
exps.append(exp_ij)
loss_i = xp.log(1.0 + sum(exps))
loss_for_each.append(loss_i)
loss_expected = xp.asarray(loss_for_each, dtype=np.float32).mean()
testing.assert_allclose(loss.data, loss_expected, atol=1e-2)
def forward(self, inputs):
xp = cuda.get_array_module(*inputs)
anchor, positive, negative = inputs
N = anchor.shape[0]
def euclidean_d(v1, v2):
return chainer.functions.sum(chainer.functions.math.sqrt((v1-v2) ** 2), axis=1)
d_ap = euclidean_d(anchor, positive)
d_an = euclidean_d(anchor, negative)
dist_p = chainer.functions.exp(d_ap)\
/ (chainer.functions.exp(d_ap)\
+ chainer.functions.exp(d_an))
loss = chainer.functions.sum(dist_p * dist_p) / N
return xp.array(loss, dtype=numpy.float32),
def entropy_filter(self, x, b, ent_T):
xp = cuda.get_array_module(b)
eb = entropy(F.softmax(b))/np.log(b.shape[1])
eb.to_cpu()
if hasattr(eb.data,'get'):
with cuda.get_device(eb.data):
exited = eb.data < ent_T
exited = exited.get()
else:
exited = eb.data < ent_T
y_exit = []
y_cont = []
for i,idx in enumerate(exited):
if idx:
y_exit.append(b[i:i+1])
else:
y_cont.append(x[i:i+1])
if len(y_exit) > 0:
y_exit = F.vstack(y_exit)
if len(y_cont) > 0:
y_cont = F.vstack(y_cont)
return y_exit,y_cont,exited
def backward_cpu(self, inputs, grad_outputs):
x, V, g = inputs[:3]
b = inputs[3] if len(inputs) == 4 else None
if b is None:
gb = None
gx, gW = super(Convolution2DFunction, self).backward_cpu((x, self.W), grad_outputs)
else:
gx, gW, gb = super(Convolution2DFunction, self).backward_cpu((x, self.W, b), grad_outputs)
xp = cuda.get_array_module(x)
gg = xp.sum(gW * self.normalizedV, axis=(1, 2, 3), keepdims=True).astype(g.dtype, copy=False)
gV = g * (gW - gg * self.normalizedV) / self.normV
gV = gV.astype(V.dtype, copy=False)
if b is None:
return gx, gV, gg
else:
return gx, gV, gg, gb
def backward_cpu(self, inputs, grad_outputs):
x, V, g = inputs[:3]
b = inputs[3] if len(inputs) == 4 else None
if b is None:
gb = None
gx, gW = super(Deconvolution2DFunction, self).backward_cpu((x, self.W), grad_outputs)
else:
gx, gW, gb = super(Deconvolution2DFunction, self).backward_cpu((x, self.W, b), grad_outputs)
xp = cuda.get_array_module(x)
gg = xp.sum(gW * self.normalizedV, axis=(0, 2, 3), keepdims=True).astype(g.dtype, copy=False)
gV = g * (gW - gg * self.normalizedV) / self.normV
gV = gV.astype(V.dtype, copy=False)
if b is None:
return gx, gV, gg
else:
return gx, gV, gg, gb
def _enumerate_shifted_anchor(anchor_base, feat_stride, height, width):
# Enumerate all shifted anchors:
#
# add A anchors (1, A, 4) to
# cell K shifts (K, 1, 4) to get
# shift anchors (K, A, 4)
# reshape to (K*A, 4) shifted anchors
xp = cuda.get_array_module(anchor_base)
shift_y = xp.arange(0, height * feat_stride, feat_stride)
shift_x = xp.arange(0, width * feat_stride, feat_stride)
shift_x, shift_y = xp.meshgrid(shift_x, shift_y)
shift = xp.stack((shift_y.ravel(), shift_x.ravel(),
shift_y.ravel(), shift_x.ravel()), axis=1)
A = anchor_base.shape[0]
K = shift.shape[0]
anchor = anchor_base.reshape((1, A, 4)) + \
shift.reshape((1, K, 4)).transpose((1, 0, 2))
anchor = anchor.reshape((K * A, 4)).astype(np.float32)
return anchor
def bbox_transform(ex_rois, gt_rois):
xp = get_array_module(ex_rois)
ex_widths = ex_rois[:, 2] - ex_rois[:, 0] + 1.0
ex_heights = ex_rois[:, 3] - ex_rois[:, 1] + 1.0
ex_ctr_x = ex_rois[:, 0] + 0.5 * ex_widths
ex_ctr_y = ex_rois[:, 1] + 0.5 * ex_heights
gt_widths = gt_rois[:, 2] - gt_rois[:, 0] + 1.0
gt_heights = gt_rois[:, 3] - gt_rois[:, 1] + 1.0
gt_ctr_x = gt_rois[:, 0] + 0.5 * gt_widths
gt_ctr_y = gt_rois[:, 1] + 0.5 * gt_heights
targets_dx = (gt_ctr_x - ex_ctr_x) / ex_widths
targets_dy = (gt_ctr_y - ex_ctr_y) / ex_heights
targets_dw = xp.log(gt_widths / ex_widths)
targets_dh = xp.log(gt_heights / ex_heights)
targets = xp.vstack(
(targets_dx, targets_dy, targets_dw, targets_dh)).transpose()
return targets
def add_noise(h, test, sigma=0.2):
xp = cuda.get_array_module(h.data)
if test:
return h
else:
return h + sigma * xp.random.randn(*h.data.shape)
def multi_overlap(x1, len1, x2, len2):
len1_half = len1/2
len2_half = len2/2
xp = cuda.get_array_module(x1)
left = xp.maximum(x1 - len1_half, x2 - len2_half)
right = xp.minimum(x1 + len1_half, x2 + len2_half)
return right - left
def multi_box_intersection(a, b):
w = multi_overlap(a.x, a.w, b.x, b.w)
h = multi_overlap(a.y, a.h, b.y, b.h)
xp = cuda.get_array_module(w)
zeros = xp.zeros_like(w)
w = xp.maximum(w, zeros)
h = xp.maximum(h, zeros)
area = w * h
return area
def forward(self, inputs):
xp = cuda.get_array_module(*inputs)
y, t = inputs
# Assert arrays have the same shape
if t.shape != y.shape:
raise ValueError("Input arrays have different shapes")
# Computing nDCG on empty array should just return 0.0
if t.shape[0] == 0:
return xp.asarray(0.0),
# Compute predicted indices by arg sorting
predicted_indices = xp.argsort(y)
best_indices = xp.argsort(t)
# Predicted and theoretically best relevance labels
predicted_relevance = xp.flip(t[predicted_indices], axis=0)
best_relevance = xp.flip(t[best_indices], axis=0)
# Compute needed statistics
length = predicted_relevance.shape[0]
arange = xp.arange(length)
last = min(self.k, length)
if last < 1:
last = length
# Compute regular DCG
dcg_numerator = 2 ** predicted_relevance[:last] - 1
dcg_denominator = xp.log2(arange[:last] + 2)
dcg = xp.sum(dcg_numerator / dcg_denominator)
# Compute iDCG for normalization
idcg_numerator = (2 ** best_relevance[:last] - 1)
idcg_denominator = (xp.log2(arange[:last] + 2))
idcg = xp.sum(idcg_numerator / idcg_denominator)
if idcg == 0.0:
return xp.asarray(1.0),
return xp.asarray(dcg / idcg),
def forward(self, inputs):
xp = cuda.get_array_module(*inputs)
x, = inputs
m = x.max(axis=0, keepdims=True)
y = x - m
xp.exp(y, out=y)
y_sum = xp.flip(xp.cumsum(xp.flip(y, axis=0)), axis=0)
self.y = xp.transpose(xp.asarray(xp.log(y_sum) + m))
return self.y,
def backward(self, inputs, grads):
xp = cuda.get_array_module(*inputs)
x, = inputs
gy, = grads
y = self.y
gx = gy * xp.exp(x) * xp.cumsum(xp.exp(-y), axis=0)
return gx,
def _pl_sample(t, ?):
"""
Sample from the plackett luce distribution directly
:param t: The target labels
:return: A random permutation from the plackett-luce distribution
parameterized by the target labels
"""
xp = cuda.get_array_module(t)
t = t[:, 0]
probs = xp.exp(t * ?)
probs /= xp.sum(probs)
return xp.random.choice(probs.shape[0], probs.shape[0], replace=False,
p=probs)
def __init__(self, q_values, q_values_formatter=lambda x: x):
assert isinstance(q_values, chainer.Variable)
self.xp = cuda.get_array_module(q_values.data)
self.q_values = q_values
self.n_actions = q_values.data.shape[1]
self.q_values_formatter = q_values_formatter
def __init__(self, mu, mat, v, min_action=None, max_action=None):
self.xp = cuda.get_array_module(mu.data)
self.mu = mu
self.mat = mat
self.v = v
if min_action is None:
self.min_action = None
else:
self.min_action = self.xp.asarray(min_action, dtype=np.float32)
if max_action is None:
self.max_action = None
else:
self.max_action = self.xp.asarray(max_action, dtype=np.float32)
self.batch_size = self.mu.data.shape[0]