def _context(self, p, fb_mat, fbe_mat):
batch_size, source_length, _ = fb_mat.data.shape
# {pe,e}_mat: shape = [batch * srclen, atten]
pe_mat = F.reshape(
F.broadcast_to(
F.expand_dims(self.p_e(p), 1),
[batch_size, source_length, self.atten_size]),
[batch_size * source_length, self.atten_size])
e_mat = F.tanh(fbe_mat + pe_mat)
# a_mat: shape = [batch, srclen]
a_mat = F.softmax(F.reshape(self.e_a(e_mat), [batch_size, source_length]))
# q: shape = [batch, 2 * hidden]
q = F.reshape(
F.batch_matmul(a_mat, fb_mat, transa=True),
[batch_size, 2 * self.hidden_size])
return q
python类batch_matmul()的实例源码
def attend(self, query, key, value, mask, minfs=None):
"""
Input shapes:
q=(b, units, dec_l), k=(b, units, enc_l),
v=(b, units, dec_l, enc_l), m=(b, dec_l, enc_l)
"""
# Calculate Attention Scores with Mask for Zero-padded Areas
pre_a = F.batch_matmul(query, key, transa=True) # (b, dec_l, enc_l)
minfs = self.xp.full(pre_a.shape, -np.inf, pre_a.dtype) \
if minfs is None else minfs
pre_a = F.where(mask, pre_a, minfs)
a = F.softmax(pre_a, axis=2)
# if values in axis=2 are all -inf, they become nan. thus do re-mask.
a = F.where(self.xp.isnan(a.data),
self.xp.zeros(a.shape, dtype=a.dtype), a)
reshaped_a = a[:, None] # (b, 1, dec_xl, enc_l)
# Calculate Weighted Sum
pre_c = F.broadcast_to(reshaped_a, value.shape) * value
c = F.sum(pre_c, axis=3, keepdims=True) # (b, units, dec_xl, 1)
return c
def forward(self, data):
ep_list = [self.p_embed(d[0], d[1]) for d in data]
ec_list = [self.c_embed(d[0], d[1]) for d in data]
er_list = [self.r_embed(d[0], d[1]) for d in data]
p_list = self.p_encode(ep_list)
c_list = self.c_encode(ec_list)
r_list = self.r_encode(er_list)
P = functions.reshape(
functions.concat(p_list, 0),
(1, len(data), self.hidden_size))
C = functions.reshape(
functions.concat(c_list, 0),
(1, len(data), self.hidden_size))
R = functions.concat(r_list, 0)
parent_scores = functions.reshape(
functions.batch_matmul(C, P, transb=True),
(len(data), len(data)))
root_scores = functions.reshape(
self.r_scorer(R),
(1, len(data)))
return parent_scores, root_scores
def __call__(self, a_list, state, batch_size, xp):
e_list = []
sum_e = xp.zeros((batch_size, 1), dtype=xp.float32)
for a in a_list:
w = reshape(batch_matmul(state['h2'], a, transa=True), (batch_size, 1))
w.data = xp.clip(w.data, -40, 40)
e = exp(w)
e_list.append(e)
sum_e = sum_e + e
context = xp.zeros((batch_size, self.hidden_size), dtype=xp.float32)
for a, e in zip(a_list, e_list):
e /= sum_e
context = context + reshape(batch_matmul(a, e), (batch_size, self.hidden_size))
return context, e_list, sum_e
def __call__(self, x, hs):
batch, dim = x.shape
alphas = 0
_sum = 0
for h in F.transpose_sequence(hs[:batch]):
size = h.shape[0]
if size < batch:
h = F.vstack([h, variable.Variable(
self.xp.zeros((batch - size, h.shape[1]), dtype='f'))])
score = self._score_func(x, h)
e = F.exp(score)
_sum += e
alphas += batch_matmul(h, e)
c = F.reshape(batch_matmul(F.reshape(alphas, (batch, dim)),
(1 / _sum)), (batch, dim))
return c
def __call__(self, x1, x2):
xp = self.xp
out_size = self.out_size
batch_size, len1, dim1 = x1.shape
if not self.nobias[0]:
x1 = F.concat((x1, xp.ones((batch_size, len1, 1),
dtype=xp.float32)), axis=2)
dim1 += 1
len2, dim2 = x2.shape[1:]
if not self.nobias[1]:
x2 = F.concat((x2, xp.ones((batch_size, len2, 1),
dtype=xp.float32)), axis=2)
dim2 += 1
x1_reshaped = F.reshape(x1, (batch_size * len1, dim1))
W_reshaped = F.reshape(F.transpose(self.W, (0, 2, 1)),
(dim1, out_size * dim2))
affine = F.reshape(F.matmul(x1_reshaped, W_reshaped),
(batch_size, len1 * out_size, dim2))
biaffine = F.transpose(
F.reshape(batch_matmul(affine, x2, transb=True),
(batch_size, len1, out_size, len2)),
(0, 1, 3, 2))
if not self.nobias[2]:
biaffine += F.broadcast_to(self.b, biaffine.shape)
return biaffine
def calculate_score(self, h, pos, neg, pos_score=None, neg_score=None, multipos=False):
#h_pro = self.act1(self.W_predict(h))
h_pro = h
if multipos:
# If multiple positive vectors are given,
# max score is picked up. (other ones are not propagated)
pos_scoreL = [F.batch_matmul(h_pro, pos_one, transa=True) for pos_one in pos]
pos_score = F.max(F.concat(pos_scoreL, axis=1), axis=1, keepdims=True)
else:
pos_score = F.batch_matmul(h_pro, pos, transa=True)
neg_score = F.batch_matmul(h_pro, neg, transa=True)
return pos_score, neg_score
def matmul_v3(a, b, **kwargs):
if (a.ndim, b.ndim) == (3, 3):
return F.batch_matmul(a, b, **kwargs)
elif (a.ndim, b.ndim) == (2, 2):
return F.matmul(a, b, **kwargs)
else:
raise Exception("unsupported shapes: {}, {}".format(
a.shape, b.shape))
def gram_matrix(x):
b, ch, h, w = x.data.shape
v = F.reshape(x, (b, ch, w * h))
return F.batch_matmul(v, v, transb=True) / np.float32(ch * w * h)
def forward_batch(self, x1, x2):
xp = cuda.get_array_module(x1.data)
batch, slen, hidden = x2.shape
return F.batch_matmul(
F.concat([x1, xp.ones((batch, slen, 1), 'f')], 2), # (batch, slen, hidden+1)
F.reshape(F.linear(F.reshape(x2, (batch * slen, -1)), self.W),
(batch, slen, -1)), transb=True)
def __call__(self, e1, e2):
ele2 = F.reshape(
F.batch_matmul(e1[:,:,None], e2[:,None,:]), (-1, self.in_size1 * self.in_size2))
res = F.matmul(ele2,
F.reshape(self.W, (self.in_size1 * self.in_size2, self.out_size))) + \
F.matmul(e1, self.V1) + \
F.matmul(e2, self.V2)
res, bias = F.broadcast(res, self.b)
return res + bias
def __call__(self, p, train=True):
attention = self._attend(p)
if self.history is not None:
self.history.append(
chainer.cuda.to_cpu(attention.data[0, :, 0]).tolist())
ret = F.batch_matmul(F.swapaxes(self.source_hiddens, 2, 1), attention)
return F.reshape(ret, (self.batchsize, self.dim_out))
def _attend(self, p):
weight = F.batch_matmul(self.source_hiddens, p)
weight = F.where(self.mask, weight, self.minf)
attention = F.softmax(weight)
return attention
def setUp(self):
self.x1 = numpy.random.uniform(
.5, 1, (batch_size, m, k)).astype(numpy.float32)
self.x2 = numpy.random.uniform(
.5, 1, (batch_size, k, n)).astype(numpy.float32)
self.gy = numpy.random.uniform(
-1, 1, (batch_size, m, n)).astype(numpy.float32)
self.op = lambda x, y: F.batch_matmul(x, y)
self.forward_answer = numpy.array([
numpy.dot(self.x1[i], self.x2[i])
for i in six.moves.range(batch_size)])
def setUp(self):
self.x1 = numpy.random.uniform(
.5, 1, (batch_size, k, m)).astype(numpy.float32)
self.x2 = numpy.random.uniform(
.5, 1, (batch_size, k, n)).astype(numpy.float32)
self.gy = numpy.random.uniform(
-1, 1, (batch_size, m, n)).astype(numpy.float32)
self.op = lambda x, y: F.batch_matmul(x, y, transa=True)
self.forward_answer = numpy.array([
numpy.dot(self.x1[i].T, self.x2[i])
for i in six.moves.range(batch_size)])
def setUp(self):
self.x1 = numpy.random.uniform(
.5, 1, (batch_size, m, k)).astype(numpy.float32)
self.x2 = numpy.random.uniform(
.5, 1, (batch_size, n, k)).astype(numpy.float32)
self.gy = numpy.random.uniform(
-1, 1, (batch_size, m, n)).astype(numpy.float32)
self.op = lambda x, y: F.batch_matmul(x, y, transb=True)
self.forward_answer = numpy.array([
numpy.dot(self.x1[i], self.x2[i].T)
for i in six.moves.range(batch_size)])
def setUp(self):
self.x1 = numpy.random.uniform(
.5, 1, (batch_size, k, m)).astype(numpy.float32)
self.x2 = numpy.random.uniform(
.5, 1, (batch_size, n, k)).astype(numpy.float32)
self.gy = numpy.random.uniform(
-1, 1, (batch_size, m, n)).astype(numpy.float32)
self.op = lambda x, y: F.batch_matmul(x, y, transa=True, transb=True)
self.forward_answer = numpy.array([
numpy.dot(self.x1[i].T, self.x2[i].T)
for i in six.moves.range(batch_size)])
def setUp(self):
self.x1 = numpy.random.uniform(
.5, 1, (batch_size, m,)).astype(numpy.float32)
self.x2 = numpy.random.uniform(
.5, 1, (batch_size, m,)).astype(numpy.float32)
self.gy = numpy.random.uniform(
-1, 1, (batch_size, 1, 1)).astype(numpy.float32)
self.op = lambda x, y: F.batch_matmul(x, y, transa=True)
self.forward_answer = numpy.array([
numpy.dot(self.x1[i], self.x2[i])
for i in six.moves.range(batch_size)]).reshape(batch_size, 1, 1)
def setUp(self):
self.x1 = numpy.random.uniform(
.5, 1, (1, m, k)).astype(numpy.float32)
self.x2 = numpy.random.uniform(
.5, 1, (1, k, n)).astype(numpy.float32)
self.gy = numpy.random.uniform(
-1, 1, (1, m, n)).astype(numpy.float32)
self.op = lambda x, y: F.batch_matmul(x, y)
self.forward_answer = numpy.array([
numpy.dot(self.x1[i], self.x2[i])
for i in six.moves.range(1)])
def setUp(self):
self.x1 = numpy.random.uniform(
.5, 1, (batch_size, m, k)).astype(numpy.float32)
self.x2 = numpy.random.uniform(
.5, 1, (1, k, n)).astype(numpy.float32)
self.gy = numpy.random.uniform(
-1, 1, (batch_size, m, n)).astype(numpy.float32)
self.op = lambda x, y: F.batch_matmul(
x, F.broadcast_to(y, (batch_size, k, n)))
self.forward_answer = numpy.array([
numpy.dot(self.x1[i], self.x2[0])
for i in six.moves.range(batch_size)])
def setUp(self):
self.x1 = numpy.random.uniform(
.5, 1, (batch_size, m, k)).astype(numpy.float32)
self.x2 = numpy.random.uniform(
.5, 1, (k, n)).astype(numpy.float32)
self.gy = numpy.random.uniform(
-1, 1, (batch_size, m, n)).astype(numpy.float32)
self.op = lambda x, y: F.batch_matmul(
x, F.broadcast_to(F.expand_dims(y, 0), (batch_size, k, n)))
self.forward_answer = numpy.array([
numpy.dot(self.x1[i], self.x2)
for i in six.moves.range(batch_size)])
def test_identity_cpu(self):
eye = _make_eye(self.x.shape)
x = chainer.Variable(self.x)
y = functions.batch_matmul(x, functions.batch_inv(x))
gradient_check.assert_allclose(y.data, eye,
**self.check_forward_options)
def test_identity_gpu(self):
eye = cuda.to_gpu(_make_eye(self.x.shape))
x = chainer.Variable(cuda.to_gpu(self.x))
y = functions.batch_matmul(x, functions.batch_inv(x))
gradient_check.assert_allclose(y.data, eye,
**self.check_forward_options)
def angular_mc_loss(f, f_p, alpha=45, in_degree=True):
'''
Args:
f (chainer.Variable or xp.npdarray):
Anchor vectors. Each vectors in f must be l2 normalized.
f_p (chainer.Variable or xp.npdarray):
Positive vectors. Each vectors in f must be l2 normalized.
'''
xp = cuda.get_array_module(f)
if in_degree:
alpha = np.deg2rad(alpha)
sq_tan_alpha = np.tan(alpha) ** 2
n_pairs = len(f)
# first and second term of f_{a,p,n}
term1 = 4 * sq_tan_alpha + matmul(f + f_p, transpose(f_p))
term2 = 2 * (1 + sq_tan_alpha) * F.sum(f * f_p, axis=1, keepdims=True)
# term2 = 2 * (1 + sq_tan_alpha) * F.batch_matmul(f, f_p, transa=True).reshape(n_pairs, 1)
f_apn = term1 - F.broadcast_to(term2, (n_pairs, n_pairs))
# multiply zero to diagonal components of f_apn
mask = xp.ones_like(f_apn.data) - xp.eye(n_pairs, dtype=f.dtype)
f_apn = f_apn * mask
return F.average(F.logsumexp(f_apn, axis=1))
def forward(self, data):
self.reset_state()
x_list = [XP.iarray([d[0]]) for d in data]
ep_list = [self.p_embed(x) for x in x_list]
ec_list = [self.c_embed(x) for x in x_list]
er_list = [self.r_embed(x) for x in x_list]
p_list = self.p_encode(ep_list)
c_list = self.c_encode(ec_list)
r_list = self.r_encode(er_list)
P = functions.reshape(
functions.concat(p_list, 0),
(1, len(data), self.hidden_size))
C = functions.reshape(
functions.concat(c_list, 0),
(1, len(data), self.hidden_size))
R = functions.concat(r_list, 0)
parent_scores = functions.reshape(
functions.batch_matmul(C, P, transb=True),
(len(data), len(data)))
root_scores = functions.reshape(
self.r_scorer(R),
(1, len(data)))
return parent_scores, root_scores
def forward(self, data):
self.reset_state()
x_list = [XP.iarray([d[0]]) for d in data]
ep_list = [self.p_embed(x) for x in x_list]
ec_list = [self.c_embed(x) for x in x_list]
er_list = [self.r_embed(x) for x in x_list]
p_list = self.p_encode(ep_list)
c_list = self.c_encode(ec_list)
r_list = self.r_encode(er_list)
P = functions.reshape(
functions.concat(p_list, 0),
(1, len(data), self.hidden_size))
C = functions.reshape(
functions.concat(c_list, 0),
(1, len(data), self.hidden_size))
R = functions.concat(r_list, 0)
parent_scores = functions.reshape(
functions.batch_matmul(C, P, transb=True),
(len(data), len(data)))
root_scores = functions.reshape(
self.r_scorer(R),
(1, len(data)))
return parent_scores, root_scores
def forward(self, data):
self.reset_state()
x_list = [XP.iarray([d[0]]) for d in data]
ep_list = [self.p_embed(x) for x in x_list]
ec_list = [self.c_embed(x) for x in x_list]
er_list = [self.r_embed(x) for x in x_list]
p_list = self.p_encode(ep_list)
c_list = self.c_encode(ec_list)
r_list = self.r_encode(er_list)
P = functions.reshape(
functions.concat(p_list, 0),
(1, len(data), self.hidden_size))
C = functions.reshape(
functions.concat(c_list, 0),
(1, len(data), self.hidden_size))
R = functions.concat(r_list, 0)
parent_scores = functions.reshape(
functions.batch_matmul(C, P, transb=True),
(len(data), len(data)))
root_scores = functions.reshape(
self.r_scorer(R),
(1, len(data)))
return parent_scores, root_scores
def __call__(self, S, h):
return F.squeeze(F.softmax(F.batch_matmul(S, h)), axis=2)
def __call__(self, S, h):
batch_size, src_len, hidden_size = S.data.shape
S = self.inner_weight(F.reshape(S, (batch_size * src_len, hidden_size)))
S = F.reshape(S, (batch_size, src_len, hidden_size))
a = F.softmax(F.squeeze(F.batch_matmul(S, h), axis = 2))
return a
# MLP layer, as of Bahdanau+ 15
def __call__(self, a_list, state, batch_size, xp):
e_list = []
sum_e = xp.zeros((batch_size, 1), dtype=xp.float32)
for a in a_list:
w = self.aw(a, state['h2'])
w.data = xp.clip(w.data, -20, 20)
e = exp(w)
e_list.append(e)
sum_e = sum_e + e
context = xp.zeros((batch_size, self.hidden_size), dtype=xp.float32)
for a, e in zip(a_list, e_list):
e /= sum_e
context = context + reshape(batch_matmul(a, e), (batch_size, self.hidden_size))
return context, e_list, sum_e