def test_spectral_mixture_gp_mean_abs_error():
gp_model = SpectralMixtureGPModel()
# Optimize the model
gp_model.train()
optimizer = optim.Adam(gp_model.parameters(), lr=0.1)
optimizer.n_iter = 0
gpytorch.functions.fastest = False
for i in range(50):
optimizer.zero_grad()
output = gp_model(train_x)
loss = -gp_model.marginal_log_likelihood(output, train_y)
loss.backward()
optimizer.n_iter += 1
optimizer.step()
# Test the model
gp_model.eval()
gp_model.condition(train_x, train_y)
test_preds = gp_model(test_x).mean()
mean_abs_error = torch.mean(torch.abs(test_y - test_preds))
# The spectral mixture kernel should be trivially able to extrapolate the sine function.
assert(mean_abs_error.data.squeeze()[0] < 0.05)
python类mean()的实例源码
kissgp_kronecker_product_regression_test.py 文件源码
项目:gpytorch
作者: jrg365
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_kissgp_gp_mean_abs_error():
gp_model = GPRegressionModel()
# Optimize the model
gp_model.train()
optimizer = optim.Adam(gp_model.parameters(), lr=0.2)
optimizer.n_iter = 0
for i in range(20):
optimizer.zero_grad()
output = gp_model(train_x)
loss = -gp_model.marginal_log_likelihood(output, train_y)
loss.backward()
optimizer.n_iter += 1
optimizer.step()
# Test the model
gp_model.eval()
gp_model.condition(train_x, train_y)
test_preds = gp_model(test_x).mean()
mean_abs_error = torch.mean(torch.abs(test_y - test_preds))
assert(mean_abs_error.data.squeeze()[0] < 0.1)
def test_kissgp_gp_mean_abs_error():
gp_model = GPRegressionModel()
# Optimize the model
gp_model.train()
optimizer = optim.Adam(gp_model.parameters(), lr=0.2)
optimizer.n_iter = 0
for i in range(20):
optimizer.zero_grad()
output = gp_model(train_x)
loss = -gp_model.marginal_log_likelihood(output, train_y)
loss.backward()
optimizer.n_iter += 1
optimizer.step()
# Test the model
gp_model.eval()
gp_model.condition(train_x, train_y)
test_preds = gp_model(test_x).mean()
mean_abs_error = torch.mean(torch.abs(test_y - test_preds))
assert(mean_abs_error.data.squeeze()[0] < 0.1)
def test_kissgp_gp_mean_abs_error():
train_x, train_y, test_x, test_y = make_data()
gp_model = GPRegressionModel()
# Optimize the model
gp_model.train()
optimizer = optim.Adam(gp_model.parameters(), lr=0.1)
optimizer.n_iter = 0
for i in range(25):
optimizer.zero_grad()
output = gp_model(train_x)
loss = -gp_model.marginal_log_likelihood(output, train_y)
loss.backward()
optimizer.n_iter += 1
optimizer.step()
# Test the model
gp_model.eval()
gp_model.condition(train_x, train_y)
test_preds = gp_model(test_x).mean()
mean_abs_error = torch.mean(torch.abs(test_y - test_preds))
assert(mean_abs_error.data.squeeze()[0] < 0.05)
def test_kissgp_gp_mean_abs_error_cuda():
if torch.cuda.is_available():
train_x, train_y, test_x, test_y = make_data(cuda=True)
gp_model = GPRegressionModel().cuda()
# Optimize the model
gp_model.train()
optimizer = optim.Adam(gp_model.parameters(), lr=0.1)
optimizer.n_iter = 0
for i in range(25):
optimizer.zero_grad()
output = gp_model(train_x)
loss = -gp_model.marginal_log_likelihood(output, train_y)
loss.backward()
optimizer.n_iter += 1
optimizer.step()
# Test the model
gp_model.eval()
gp_model.condition(train_x, train_y)
test_preds = gp_model(test_x).mean()
mean_abs_error = torch.mean(torch.abs(test_y - test_preds))
assert(mean_abs_error.data.squeeze()[0] < 0.02)
def th_corrcoef(x):
"""
mimics np.corrcoef
"""
# calculate covariance matrix of rows
mean_x = th.mean(x, 1)
xm = x.sub(mean_x.expand_as(x))
c = xm.mm(xm.t())
c = c / (x.size(1) - 1)
# normalize covariance matrix
d = th.diag(c)
stddev = th.pow(d, 0.5)
c = c.div(stddev.expand_as(c))
c = c.div(stddev.expand_as(c).t())
# clamp between -1 and 1
c = th.clamp(c, -1.0, 1.0)
return c
def th_matrixcorr(x, y):
"""
return a correlation matrix between
columns of x and columns of y.
So, if X.size() == (1000,4) and Y.size() == (1000,5),
then the result will be of size (4,5) with the
(i,j) value equal to the pearsonr correlation coeff
between column i in X and column j in Y
"""
mean_x = th.mean(x, 0)
mean_y = th.mean(y, 0)
xm = x.sub(mean_x.expand_as(x))
ym = y.sub(mean_y.expand_as(y))
r_num = xm.t().mm(ym)
r_den1 = th.norm(xm,2,0)
r_den2 = th.norm(ym,2,0)
r_den = r_den1.t().mm(r_den2)
r_mat = r_num.div(r_den)
return r_mat
def normalized_cross_correlation(self):
w = self.weight.view(self.weight.size(0), -1)
t_norm = torch.norm(w, p=2, dim=1)
if self.in_channels == 1 & sum(self.kernel_size) == 1:
ncc = w.squeeze() / torch.norm(self.t0_norm, p=2)
ncc = ncc - self.start_ncc
return ncc
#mean = torch.mean(w, dim=1).unsqueeze(1).expand_as(w)
mean = torch.mean(w, dim=1).unsqueeze(1) # 0.2 broadcasting
t_factor = w - mean
h_product = self.t0_factor * t_factor
cov = torch.sum(h_product, dim=1) # (w.size(1) - 1)
# had normalization code commented out
denom = self.t0_norm * t_norm
ncc = cov / denom
ncc = ncc - self.start_ncc
return ncc
def test_Dropout(self):
p = 0.2
input = torch.Tensor(1000).fill_(1 - p)
module = nn.Dropout(p)
output = module.forward(input)
self.assertLess(abs(output.mean() - (1 - p)), 0.05)
gradInput = module.backward(input, input)
self.assertLess(abs(gradInput.mean() - (1 - p)), 0.05)
module = nn.Dropout(p, True)
output = module.forward(input.clone())
self.assertLess(abs(output.mean() - (1 - p)), 0.05)
gradInput = module.backward(input.clone(), input.clone())
self.assertLess(abs(gradInput.mean() - (1 - p)), 0.05)
# Check that these don't raise errors
module.__repr__()
str(module)
def test_SpatialDropout(self):
p = 0.2
b = random.randint(1, 5)
w = random.randint(1, 5)
h = random.randint(1, 5)
nfeats = 1000
input = torch.Tensor(b, nfeats, w, h).fill_(1)
module = nn.SpatialDropout(p)
module.training()
output = module.forward(input)
self.assertLess(abs(output.mean() - (1 - p)), 0.05)
gradInput = module.backward(input, input)
self.assertLess(abs(gradInput.mean() - (1 - p)), 0.05)
# Check that these don't raise errors
module.__repr__()
str(module)
def test_VolumetricDropout(self):
p = 0.2
bsz = random.randint(1, 5)
t = random.randint(1, 5)
w = random.randint(1, 5)
h = random.randint(1, 5)
nfeats = 1000
input = torch.Tensor(bsz, nfeats, t, w, h).fill_(1)
module = nn.VolumetricDropout(p)
module.training()
output = module.forward(input)
self.assertLess(abs(output.mean() - (1 - p)), 0.05)
gradInput = module.backward(input, input)
self.assertLess(abs(gradInput.mean() - (1 - p)), 0.05)
# Check that these don't raise errors
module.__repr__()
str(module)
def forward(self, y, weights, mean, std):
"""
Presents a maximum a-priori objective for a set of predicted means, mixture components,
and standard deviations to model a given ground-truth 'y'. Modeled using negative log
likelihood.
:param y: Non-linear target.
:param weights: Predicted mixture components.
:param mean: Predicted mixture means.
:param std: Predicted mixture standard deviations.
:return:
"""
normalization = 1.0 / ((2.0 * math.pi) ** 0.5)
gaussian_sample = (y.expand_as(mean) - mean) * torch.reciprocal(std)
gaussian_sample = normalization * torch.reciprocal(std) * torch.exp(-0.5 * gaussian_sample ** 2)
return -torch.mean(torch.log(torch.sum(weights * gaussian_sample, dim=1)))
def test_model_custom_loss():
x = torch.rand(20, 4)
y = torch.rand(20, 10)
model = Model(
Dense(10, input_dim=x.size()[-1]),
Activation('relu'),
Dense(5),
Activation('relu'),
Dense(y.size()[-1])
)
opt = SGD(lr=0.01, momentum=0.9)
def mae(y_true, y_pred):
return torch.mean(torch.abs(y_true - y_pred))
history = model.fit(x, y, loss=mae, optimizer=opt, epochs=10)
assert len(history['loss']) == 10
assert all(type(v) is float for v in history['loss'])
assert history['loss'] == sorted(history['loss'], reverse=True)
def forward(self, anchor, positive, negative):
#eucl distance
#dist = torch.sum( (anchor - positive) ** 2 - (anchor - negative) ** 2, dim=1)\
# + self.margin
if self.dist_type == 0:
dist_p = F.pairwise_distance(anchor ,positive)
dist_n = F.pairwise_distance(anchor ,negative)
if self.dist_type == 1:
dist_p = cosine_similarity(anchor, positive)
disp_n = cosine_similarity(anchor, negative)
dist_hinge = torch.clamp(dist_p - dist_n + self.margin, min=0.0)
if self.use_ohem:
v, idx = torch.sort(dist_hinge,descending=True)
loss = torch.mean(v[0:self.ohem_bs])
else:
loss = torch.mean(dist_hinge)
return loss
def mean(x, axis=None, keepdims=False):
def _mean(x, axis=axis, keepdims=keepdims):
y = torch.mean(x, axis)
# Since keepdims argument of torch not functional
return y if keepdims else torch.squeeze(y, axis)
def _compute_output_shape(x, axis=axis, keepdims=keepdims):
if axis is None:
return ()
shape = list(_get_shape(x))
if keepdims:
shape[axis] = 1
else:
del shape[axis]
return tuple(shape)
return get_op(_mean, output_shape=_compute_output_shape, arguments=[axis, keepdims])(x)
def loss(self,x):
self.forward(x)
criterion = nn.BCELoss()
x_recons = self.sigmoid(self.cs[-1])
Lx = criterion(x_recons,x) * self.A * self.B
Lz = 0
kl_terms = [0] * T
for t in xrange(self.T):
mu_2 = self.mus[t] * self.mus[t]
sigma_2 = self.sigmas[t] * self.sigmas[t]
logsigma = self.logsigmas[t]
# Lz += (0.5 * (mu_2 + sigma_2 - 2 * logsigma)) # 11
kl_terms[t] = 0.5 * torch.sum(mu_2+sigma_2-2 * logsigma,1) - self.T * 0.5
Lz += kl_terms[t]
# Lz -= self.T / 2
Lz = torch.mean(Lz) ####################################################
loss = Lz + Lx # 12
return loss
# correct
def compute_loss(self, input, e, b, clusters, it=0):
Loss = Variable(torch.zeros((self.batch_size))).type(dtype)
Ls = Variable(torch.zeros((self.batch_size))).type(dtype)
for cl in range(clusters // 2):
L, m1, m2 = self.compute_diameter(input, e, cl, it=it)
mask = ((e / 2).type(dtype_l) == cl).type(dtype)
# print('mask', mask[0])
n = mask.sum(1).squeeze()
n += (n == 0).type(dtype)
# print('mask', mask[0])
log_probs = torch.log((1 - b) * m1 + b * m2 + (1 - mask) + 1e-8)
Loss += L * log_probs.sum(1) / n
Ls += L
Ls = Ls.mean(0)
Loss = Loss.mean(0)
return Loss, Ls
###########################################################################
# Split Phase #
###########################################################################
def GAN_loss(self, x):
x = x.view(x.size(0), -1)
if isinstance(x, torch.cuda.FloatTensor):
eps = torch.cuda.FloatTensor(x.size(0), self.nz).normal_()
else:
eps = torch.FloatTensor(x.size(0), self.nz).normal_()
alpha = torch.FloatTensor(x.size(0), 1).uniform_(0,1)
alpha = alpha.expand(x.size(0), x.size(1))
recon_pz = self.decode(Variable(eps))
interpolates = alpha * x.data + (1-alpha) * recon_pz.data
interpolates = Variable(interpolates, requires_grad=True)
D_interpolates = self.D(interpolates)
gradients = grad(D_interpolates, interpolates,create_graph=True)[0]
slopes = torch.sum(gradients ** 2, 1).sqrt()
gradient_penalty = (torch.mean(slopes - 1.) ** 2)
return self.D(x) - self.D(recon_pz) - 10 * gradient_penalty
def bpr_loss(x, dynamic_user, item_embedding, config):
'''
bayesian personalized ranking loss for implicit feedback
parameters:
- x: batch of users' baskets
- dynamic_user: batch of users' dynamic representations
- item_embedding: item_embedding matrix
- config: model configuration
'''
nll = 0
ub_seqs = []
for u,du in zip(x, dynamic_user):
du_p_product = torch.mm(du, item_embedding.t()) # shape: max_len, num_item
nll_u = [] # nll for user
for t, basket_t in enumerate(u):
if basket_t[0] != 0 and t != 0:
pos_idx = torch.cuda.LongTensor(basket_t) if config.cuda else torch.LongTensor(basket_t)
# Sample negative products
neg = [random.choice(range(1, config.num_product)) for _ in range(len(basket_t))] # replacement
# neg = random.sample(range(1, config.num_product), len(basket_t)) # without replacement
neg_idx = torch.cuda.LongTensor(neg) if config.cuda else torch.LongTensor(neg)
# Score p(u, t, v > v')
score = du_p_product[t - 1][pos_idx] - du_p_product[t - 1][neg_idx]
#Average Negative log likelihood for basket_t
nll_u.append(- torch.mean(torch.nn.LogSigmoid()(score)))
nll += torch.mean(torch.cat(nll_u))
return nll
def pool_avg(tensor, dim):
return torch.mean(tensor, dim)