def _mix_rbf_kernel(X, Y, sigma_list):
assert(X.size(0) == Y.size(0))
m = X.size(0)
Z = torch.cat((X, Y), 0)
ZZT = torch.mm(Z, Z.t())
diag_ZZT = torch.diag(ZZT).unsqueeze(1)
Z_norm_sqr = diag_ZZT.expand_as(ZZT)
exponent = Z_norm_sqr - 2 * ZZT + Z_norm_sqr.t()
K = 0.0
for sigma in sigma_list:
gamma = 1.0 / (2 * sigma**2)
K += torch.exp(-gamma * exponent)
return K[:m, :m], K[:m, m:], K[m:, m:], len(sigma_list)
python类mm()的实例源码
def eval_pred(dr_model, ub):
'''
evaluate dream model for predicting next basket on all training users
in batches
'''
item_embedding = dr_model.encode.weight
dr_model.eval()
dr_hidden = dr_model.init_hidden(dr_model.config.batch_size)
start_time = time()
id_u, score_u = [], [] # user's id, user's score
num_batchs = ceil(len(ub) / dr_model.config.batch_size)
for i,x in enumerate(batchify(ub, dr_model.config.batch_size)):
print(i)
baskets, lens, uids = x
_, dynamic_user, _ = dr_model(baskets, lens, dr_hidden)# shape: batch_size, max_len, embedding_size
dr_hidden = repackage_hidden(dr_hidden)
for i,l,du in zip(uids, lens, dynamic_user):
du_latest = du[l - 1].unsqueeze(0) # shape: 1, embedding_size
score_up = torch.mm(du_latest, item_embedding.t()) # shape: 1, num_item
score_u.append(score_up.cpu().data.numpy())
id_u.append(i)
elapsed = time() - start_time
print('[Predicting] Elapsed: {02.2f}'.format(elapsed))
return score_ub, id_u
def reorder_bpr_loss(re_x, his_x, dynamic_user, item_embedding, config):
'''
loss function for reorder prediction
re_x padded reorder baskets
his_x padded history bought items
'''
nll = 0
ub_seqs = []
for u, h, du in zip(re_x, his_x, dynamic_user):
du_p_product = torch.mm(du, item_embedding.t()) # shape: max_len, num_item
nll_u = [] # nll for user
for t, basket_t in enumerate(u):
if basket_t[0] != 0:
pos_idx = torch.cuda.LongTensor(basket_t) if config.cuda else torch.LongTensor(basket_t)
# Sample negative products
neg = [random.choice(h[t]) for _ in range(len(basket_t))] # replacement
# neg = random.sample(range(1, config.num_product), len(basket_t)) # without replacement
neg_idx = torch.cuda.LongTensor(neg) if config.cuda else torch.LongTensor(neg)
# Score p(u, t, v > v')
score = du_p_product[t - 1][pos_idx] - du_p_product[t - 1][neg_idx]
# Average Negative log likelihood for basket_t
nll_u.append(- torch.mean(torch.nn.LogSigmoid()(score)))
nll += torch.mean(torch.cat(nll_u))
return nll
def backward(self, grad_output):
tensors = self.saved_tensors
if len(tensors) == 2:
input, weight = tensors
bias = None
else:
input, weight, bias = tensors
grad_input = grad_weight = grad_bias = None
if self.needs_input_grad[0]:
grad_input = torch.mm(grad_output, weight)
if self.needs_input_grad[1]:
grad_weight = torch.mm(grad_output.t(), input)
if bias is not None and self.needs_input_grad[2]:
grad_bias = torch.mv(grad_output.t(), self.add_buffer)
if bias is not None:
return grad_input, grad_weight, grad_bias
else:
return grad_input, grad_weight
def backward(self, grad_output):
matrix1, matrix2 = self.saved_tensors
grad_add_matrix = grad_matrix1 = grad_matrix2 = None
if self.needs_input_grad[0]:
grad_add_matrix = grad_output
if self.alpha != 1:
grad_add_matrix = grad_add_matrix.mul(self.alpha)
if self.needs_input_grad[1]:
grad_matrix1 = torch.mm(grad_output, matrix2.t())
if self.beta != 1:
grad_matrix1 *= self.beta
if self.needs_input_grad[2]:
grad_matrix2 = torch.mm(matrix1.t(), grad_output)
if self.beta != 1:
grad_matrix2 *= self.beta
return grad_add_matrix, grad_matrix1, grad_matrix2
def backward(self, grad_output):
vector1, vector2 = self.saved_tensors
grad_add_matrix = grad_vector1 = grad_vector2 = None
if self.needs_input_grad[0]:
grad_add_matrix = grad_output
if self.alpha != 1:
grad_add_matrix = grad_add_matrix.mul(self.alpha)
if self.needs_input_grad[1]:
grad_vector1 = torch.mv(grad_output, vector2)
if self.beta != 1:
grad_vector1 *= self.beta
if self.needs_input_grad[2]:
# TODO: maybe it's better to do transpose + mv + transpose
grad_vector2 = torch.mm(vector1.unsqueeze(0), grad_output)
if self.beta != 1:
grad_vector2 *= self.beta
return grad_add_matrix, grad_vector1, grad_vector2
def updateOutput(self, input):
assert len(input) == 2
a, b = input
assert a.ndimension() == 2 or a.ndimension() == 3
assert a.dim() == b.dim()
if a.ndimension() == 2:
if self.transA:
a = a.t()
if self.transB:
b = b.t()
self.output.resize_(a.size(0), b.size(1))
torch.mm(self.output, a, b)
else:
if self.transA:
a = a.transpose(2, 3)
if self.transB:
b = b.transpose(2, 3)
self.output.resize_(a.size(0), a.size(1), b.size(2))
torch.bmm(self.output, a, b)
return self.output
def updateOutput(self, input):
self._assertInput(input)
# set up buffer:
self.buff2 = self.buff2 or input[0].new()
self.buff2.resize_as_(input[1])
# compute output scores:
self.output.resize_(input[0].size(0), self.weight.size(0))
for k in range(self.weight.size(0)):
torch.mm(self.buff2, input[0], self.weight[k])
self.buff2.mul_(input[1])
torch.sum(self.output.narrow(1, k, 1), self.buff2, 1)
if self.bias:
self.output.add_(self.bias.view(1, self.bias.nelement()).expand_as(self.output))
return self.output
def test_ormqr(self):
mat1 = torch.randn(10, 10)
mat2 = torch.randn(10, 10)
q, r = torch.qr(mat1)
m, tau = torch.geqrf(mat1)
res1 = torch.mm(q, mat2)
res2, _ = torch.ormqr(m, tau, mat2)
self.assertEqual(res1, res2)
res1 = torch.mm(mat2, q)
res2, _ = torch.ormqr(m, tau, mat2, False)
self.assertEqual(res1, res2)
res1 = torch.mm(q.t(), mat2)
res2, _ = torch.ormqr(m, tau, mat2, True, True)
self.assertEqual(res1, res2)
res1 = torch.mm(mat2, q.t())
res2, _ = torch.ormqr(m, tau, mat2, False, True)
self.assertEqual(res1, res2)
def test_cholesky(self):
x = torch.rand(10, 10) + 1e-1
A = torch.mm(x, x.t())
# default Case
C = torch.potrf(A)
B = torch.mm(C.t(), C)
self.assertEqual(A, B, 1e-14)
# test Upper Triangular
U = torch.potrf(A, True)
B = torch.mm(U.t(), U)
self.assertEqual(A, B, 1e-14, 'potrf (upper) did not allow rebuilding the original matrix')
# test Lower Triangular
L = torch.potrf(A, False)
B = torch.mm(L, L.t())
self.assertEqual(A, B, 1e-14, 'potrf (lower) did not allow rebuilding the original matrix')
def test_potrs(self):
a=torch.Tensor(((6.80, -2.11, 5.66, 5.97, 8.23),
(-6.05, -3.30, 5.36, -4.44, 1.08),
(-0.45, 2.58, -2.70, 0.27, 9.04),
(8.32, 2.71, 4.35, -7.17, 2.14),
(-9.67, -5.14, -7.26, 6.08, -6.87))).t()
b=torch.Tensor(((4.02, 6.19, -8.22, -7.57, -3.03),
(-1.56, 4.00, -8.67, 1.75, 2.86),
(9.81, -4.09, -4.57, -8.61, 8.99))).t()
# make sure 'a' is symmetric PSD
a = torch.mm(a, a.t())
# upper Triangular Test
U = torch.potrf(a)
x = torch.potrs(b, U)
self.assertLessEqual(b.dist(torch.mm(a, x)), 1e-12)
# lower Triangular Test
L = torch.potrf(a, False)
x = torch.potrs(b, L, False)
self.assertLessEqual(b.dist(torch.mm(a, x)), 1e-12)
def sym_distance_matrix(A, B, eps=1e-18, self_similarity=False):
"""
Defines the symbolic matrix that contains the distances between the vectors of A and B
:param A: the first data matrix
:param B: the second data matrix
:param self_similarity: zeros the diagonial to improve the stability
:params eps: the minimum distance between two vectors (set to a very small number to improve stability)
:return:
"""
# Compute the squared distances
AA = torch.sum(A * A, 1).view(-1, 1)
BB = torch.sum(B * B, 1).view(1, -1)
AB = torch.mm(A, B.transpose(0, 1))
D = AA + BB - 2 * AB
# Zero the diagonial
if self_similarity:
D = D.view(-1)
D[::B.size(0) + 1] = 0
D = D.view(A.size(0), B.size(0))
# Return the square root
D = torch.sqrt(torch.clamp(D, min=eps))
return D
def forward(self, input1, input2):
if (
not self.training and # test mode
self.beam_size is not None and # beam size is set
input1.dim() == 3 and # only support batched input
input1.size(1) == 1 # single time step update
):
bsz, beam = input1.size(0), self.beam_size
# bsz x 1 x nhu --> bsz/beam x beam x nhu
input1 = input1[:, 0, :].unfold(0, beam, beam).transpose(2, 1)
# bsz x sz2 x nhu --> bsz/beam x sz2 x nhu
input2 = input2.unfold(0, beam, beam)[:, :, :, 0]
# use non batched operation if bsz = beam
if input1.size(0) == 1:
output = torch.mm(input1[0, :, :], input2[0, :, :])
else:
output = input1.bmm(input2)
return output.view(bsz, 1, -1)
else:
return input1.bmm(input2)
def pairwise_distance(features, query=None, gallery=None, metric=None):
if query is None and gallery is None:
n = len(features)
x = torch.cat(list(features.values()))
x = x.view(n, -1)
if metric is not None:
x = metric.transform(x)
dist = torch.pow(x, 2).sum(dim=1, keepdim=True) * 2
dist = dist.expand(n, n) - 2 * torch.mm(x, x.t())
return dist
x = torch.cat([features[f].unsqueeze(0) for f, _, _ in query], 0)
y = torch.cat([features[f].unsqueeze(0) for f, _, _ in gallery], 0)
m, n = x.size(0), y.size(0)
x = x.view(m, -1)
y = y.view(n, -1)
if metric is not None:
x = metric.transform(x)
y = metric.transform(y)
dist = torch.pow(x, 2).sum(dim=1, keepdim=True).expand(m, n) + \
torch.pow(y, 2).sum(dim=1, keepdim=True).expand(n, m).t()
dist.addmm_(1, -2, x, y.t())
return dist
def forward(self, input1, input2):
if (
not self.training and # test mode
self.beam_size is not None and # beam size is set
input1.dim() == 3 and # only support batched input
input1.size(1) == 1 # single time step update
):
bsz, beam = input1.size(0), self.beam_size
# bsz x 1 x nhu --> bsz/beam x beam x nhu
input1 = input1[:, 0, :].unfold(0, beam, beam).transpose(2, 1)
# bsz x sz2 x nhu --> bsz/beam x sz2 x nhu
input2 = input2.unfold(0, beam, beam)[:, :, :, 0]
# use non batched operation if bsz = beam
if input1.size(0) == 1:
output = torch.mm(input1[0, :, :], input2[0, :, :])
else:
output = input1.bmm(input2)
return output.view(bsz, 1, -1)
else:
return input1.bmm(input2)
def _combine_last(self, r, h_t):
'''
inputs:
r : batch x n_dim
h_t : batch x n_dim (this is the output from the gru unit)
params :
W_x : n_dim x n_dim
W_p : n_dim x n_dim
out :
h_star : batch x n_dim
'''
W_p_r = torch.mm(r, self.W_p) # batch x n_dim
W_x_h = torch.mm(h_t, self.W_x) # batch x n_dim
h_star = F.tanh(W_p_r + W_x_h) # batch x n_dim
return h_star
def new_att_module(self):
class NewAttModule(nn.Module):
def __init__(self):
super(NewAttModule, self).__init__()
def forward(self, linput, rinput):
self.lPad = linput.view(-1, linput.size(0), linput.size(1))
self.lPad = linput # self.lPad = Padding(0, 0)(linput) TODO: figureout why padding?
self.M_r = torch.mm(self.lPad, rinput.t())
self.alpha = F.softmax(self.M_r.transpose(0, 1))
self.Yl = torch.mm(self.alpha, self.lPad)
return self.Yl
att_module = NewAttModule()
if getattr(self, "att_module_master", None):
for (tar_param, src_param) in zip(att_module.parameters(), self.att_module_master.parameters()):
tar_param.grad.data = src_param.grad.data.clone()
return att_module
def forward(self, input1, input2, weight, bias=None):
self.save_for_backward(input1, input2, weight, bias)
output = input1.new(input1.size(0), weight.size(0))
buff = input1.new()
# compute output scores:
for k, w in enumerate(weight):
torch.mm(input1, w, out=buff)
buff.mul_(input2)
torch.sum(buff, 1, out=output.narrow(1, k, 1))
if bias is not None:
output.add_(bias.expand_as(output))
return output
def backward(ctx, grad_output):
matrix1, matrix2 = ctx.saved_variables
grad_add_matrix = grad_matrix1 = grad_matrix2 = None
if ctx.needs_input_grad[0]:
grad_add_matrix = grad_output
if ctx.alpha != 1:
grad_add_matrix = grad_add_matrix.mul(ctx.alpha)
if ctx.needs_input_grad[1]:
grad_matrix1 = torch.mm(grad_output, matrix2.t())
if ctx.beta != 1:
grad_matrix1 *= ctx.beta
if ctx.needs_input_grad[2]:
grad_matrix2 = torch.mm(matrix1.t(), grad_output)
if ctx.beta != 1:
grad_matrix2 *= ctx.beta
return grad_add_matrix, grad_matrix1, grad_matrix2, None, None, None
def backward(ctx, grad_output):
vector1, vector2 = ctx.saved_variables
grad_add_matrix = grad_vector1 = grad_vector2 = None
if ctx.needs_input_grad[0]:
grad_add_matrix = grad_output
if ctx.alpha != 1:
grad_add_matrix = grad_add_matrix.mul(ctx.alpha)
if ctx.needs_input_grad[1]:
grad_vector1 = torch.mv(grad_output, vector2)
if ctx.beta != 1:
grad_vector1 *= ctx.beta
if ctx.needs_input_grad[2]:
# TODO: maybe it's better to do transpose + mv + transpose
grad_vector2 = torch.mm(vector1.unsqueeze(0), grad_output).squeeze(0)
if ctx.beta != 1:
grad_vector2 *= ctx.beta
return grad_add_matrix, grad_vector1, grad_vector2, None, None, None
def updateOutput(self, input):
self._assertInput(input)
# set up buffer:
if self.buff2 is None:
self.buff2 = input[0].new()
self.buff2.resize_as_(input[1])
# compute output scores:
self.output.resize_(input[0].size(0), self.weight.size(0))
for k in range(self.weight.size(0)):
torch.mm(input[0], self.weight[k], out=self.buff2)
self.buff2.mul_(input[1])
torch.sum(self.buff2, 1, out=self.output.narrow(1, k, 1))
if self.bias is not None:
self.output.add_(self.bias.view(1, self.bias.nelement()).expand_as(self.output))
return self.output
def test_ormqr(self):
mat1 = torch.randn(10, 10)
mat2 = torch.randn(10, 10)
q, r = torch.qr(mat1)
m, tau = torch.geqrf(mat1)
res1 = torch.mm(q, mat2)
res2, _ = torch.ormqr(m, tau, mat2)
self.assertEqual(res1, res2)
res1 = torch.mm(mat2, q)
res2, _ = torch.ormqr(m, tau, mat2, False)
self.assertEqual(res1, res2)
res1 = torch.mm(q.t(), mat2)
res2, _ = torch.ormqr(m, tau, mat2, True, True)
self.assertEqual(res1, res2)
res1 = torch.mm(mat2, q.t())
res2, _ = torch.ormqr(m, tau, mat2, False, True)
self.assertEqual(res1, res2)
def test_inverse(self):
M = torch.randn(5, 5)
MI = torch.inverse(M)
E = torch.eye(5)
self.assertFalse(MI.is_contiguous(), 'MI is contiguous')
self.assertEqual(E, torch.mm(M, MI), 1e-8, 'inverse value')
self.assertEqual(E, torch.mm(MI, M), 1e-8, 'inverse value')
MII = torch.Tensor(5, 5)
torch.inverse(M, out=MII)
self.assertFalse(MII.is_contiguous(), 'MII is contiguous')
self.assertEqual(MII, MI, 0, 'inverse value in-place')
# second call, now that MII is transposed
torch.inverse(M, out=MII)
self.assertFalse(MII.is_contiguous(), 'MII is contiguous')
self.assertEqual(MII, MI, 0, 'inverse value in-place')
def test_cholesky(self):
x = torch.rand(10, 10) + 1e-1
A = torch.mm(x, x.t())
# default Case
C = torch.potrf(A)
B = torch.mm(C.t(), C)
self.assertEqual(A, B, 1e-14)
# test Upper Triangular
U = torch.potrf(A, True)
B = torch.mm(U.t(), U)
self.assertEqual(A, B, 1e-14, 'potrf (upper) did not allow rebuilding the original matrix')
# test Lower Triangular
L = torch.potrf(A, False)
B = torch.mm(L, L.t())
self.assertEqual(A, B, 1e-14, 'potrf (lower) did not allow rebuilding the original matrix')
def test_potrs(self):
a = torch.Tensor(((6.80, -2.11, 5.66, 5.97, 8.23),
(-6.05, -3.30, 5.36, -4.44, 1.08),
(-0.45, 2.58, -2.70, 0.27, 9.04),
(8.32, 2.71, 4.35, -7.17, 2.14),
(-9.67, -5.14, -7.26, 6.08, -6.87))).t()
b = torch.Tensor(((4.02, 6.19, -8.22, -7.57, -3.03),
(-1.56, 4.00, -8.67, 1.75, 2.86),
(9.81, -4.09, -4.57, -8.61, 8.99))).t()
# make sure 'a' is symmetric PSD
a = torch.mm(a, a.t())
# upper Triangular Test
U = torch.potrf(a)
x = torch.potrs(b, U)
self.assertLessEqual(b.dist(torch.mm(a, x)), 1e-12)
# lower Triangular Test
L = torch.potrf(a, False)
x = torch.potrs(b, L, False)
self.assertLessEqual(b.dist(torch.mm(a, x)), 1e-12)
def forward(self, x, hidden):
h, c = hidden
h = h.view(h.size(1), -1)
c = c.view(c.size(1), -1)
x = x.view(x.size(1), -1)
# Linear mappings
i_t = th.mm(x, self.w_xi) + th.mm(h, self.w_hi) + self.b_i
f_t = th.mm(x, self.w_xf) + th.mm(h, self.w_hf) + self.b_f
o_t = th.mm(x, self.w_xo) + th.mm(h, self.w_ho) + self.b_o
# activations
i_t.sigmoid_()
f_t.sigmoid_()
o_t.sigmoid_()
# cell computations
c_t = th.mm(x, self.w_xc) + th.mm(h, self.w_hc) + self.b_c
c_t.tanh_()
c_t = th.mul(c, f_t) + th.mul(i_t, c_t)
h_t = th.mul(o_t, th.tanh(c_t))
# Reshape for compatibility
h_t = h_t.view(1, h_t.size(0), -1)
c_t = c_t.view(1, c_t.size(0), -1)
if self.dropout > 0.0:
F.dropout(h_t, p=self.dropout, training=self.training, inplace=True)
return h_t, (h_t, c_t)
def forward(self, input1, input2, weight, bias=None):
self.save_for_backward(input1, input2, weight, bias)
output = input1.new(input1.size(0), weight.size(0))
buff = input1.new()
# compute output scores:
for k, w in enumerate(weight):
torch.mm(input1, w, out=buff)
buff.mul_(input2)
torch.sum(buff, 1, out=output.narrow(1, k, 1))
if bias is not None:
output.add_(bias.expand_as(output))
return output
def backward(ctx, grad_output):
matrix1, matrix2 = ctx.saved_variables
grad_add_matrix = grad_matrix1 = grad_matrix2 = None
if ctx.needs_input_grad[0]:
grad_add_matrix = grad_output
if ctx.alpha != 1:
grad_add_matrix = grad_add_matrix.mul(ctx.alpha)
if ctx.needs_input_grad[1]:
grad_matrix1 = torch.mm(grad_output, matrix2.t())
if ctx.beta != 1:
grad_matrix1 *= ctx.beta
if ctx.needs_input_grad[2]:
grad_matrix2 = torch.mm(matrix1.t(), grad_output)
if ctx.beta != 1:
grad_matrix2 *= ctx.beta
return grad_add_matrix, grad_matrix1, grad_matrix2, None, None, None
def backward(ctx, grad_output):
vector1, vector2 = ctx.saved_variables
grad_add_matrix = grad_vector1 = grad_vector2 = None
if ctx.needs_input_grad[0]:
grad_add_matrix = grad_output
if ctx.alpha != 1:
grad_add_matrix = grad_add_matrix.mul(ctx.alpha)
if ctx.needs_input_grad[1]:
grad_vector1 = torch.mv(grad_output, vector2)
if ctx.beta != 1:
grad_vector1 *= ctx.beta
if ctx.needs_input_grad[2]:
# TODO: maybe it's better to do transpose + mv + transpose
grad_vector2 = torch.mm(vector1.unsqueeze(0), grad_output).squeeze(0)
if ctx.beta != 1:
grad_vector2 *= ctx.beta
return grad_add_matrix, grad_vector1, grad_vector2, None, None, None
def updateOutput(self, input):
self._assertInput(input)
# set up buffer:
if self.buff2 is None:
self.buff2 = input[0].new()
self.buff2.resize_as_(input[1])
# compute output scores:
self.output.resize_(input[0].size(0), self.weight.size(0))
for k in range(self.weight.size(0)):
torch.mm(input[0], self.weight[k], out=self.buff2)
self.buff2.mul_(input[1])
torch.sum(self.buff2, 1, True, out=self.output.narrow(1, k, 1))
if self.bias is not None:
self.output.add_(self.bias.view(1, self.bias.nelement()).expand_as(self.output))
return self.output