def batch_log_pdf(self, x):
"""
Diagonal Normal log-likelihood
Ref: :py:meth:`pyro.distributions.distribution.Distribution.batch_log_pdf`
"""
# expand to patch size of input
mu = self.mu.expand(self.shape(x))
sigma = self.sigma.expand(self.shape(x))
log_pxs = -1 * (torch.log(sigma) + 0.5 * np.log(2.0 * np.pi) + 0.5 * torch.pow((x - mu) / sigma, 2))
# XXX this allows for the user to mask out certain parts of the score, for example
# when the data is a ragged tensor. also useful for KL annealing. this entire logic
# will likely be done in a better/cleaner way in the future
if self.log_pdf_mask is not None:
log_pxs = log_pxs * self.log_pdf_mask
batch_log_pdf = torch.sum(log_pxs, -1)
batch_log_pdf_shape = self.batch_shape(x) + (1,)
return batch_log_pdf.contiguous().view(batch_log_pdf_shape)
python类pow()的实例源码
def log_gamma(xx):
if isinstance(xx, torch.Tensor):
xx = Variable(xx)
ttype = xx.data.type()
gamma_coeff = [
76.18009172947146,
-86.50532032941677,
24.01409824083091,
-1.231739572450155,
0.1208650973866179e-2,
-0.5395239384953e-5,
]
magic1 = 1.000000000190015
magic2 = 2.5066282746310005
x = xx - 1.0
t = x + 5.5
t = t - (x + 0.5) * torch.log(t)
ser = Variable(torch.ones(x.size()).type(ttype)) * magic1
for c in gamma_coeff:
x = x + 1.0
ser = ser + torch.pow(x / c, -1)
return torch.log(ser * magic2) - t
def pairwise_distance(features, query=None, gallery=None, metric=None):
if query is None and gallery is None:
n = len(features)
x = torch.cat(list(features.values()))
x = x.view(n, -1)
if metric is not None:
x = metric.transform(x)
dist = torch.pow(x, 2).sum(dim=1, keepdim=True) * 2
dist = dist.expand(n, n) - 2 * torch.mm(x, x.t())
return dist
x = torch.cat([features[f].unsqueeze(0) for f, _, _ in query], 0)
y = torch.cat([features[f].unsqueeze(0) for f, _, _ in gallery], 0)
m, n = x.size(0), y.size(0)
x = x.view(m, -1)
y = y.view(n, -1)
if metric is not None:
x = metric.transform(x)
y = metric.transform(y)
dist = torch.pow(x, 2).sum(dim=1, keepdim=True).expand(m, n) + \
torch.pow(y, 2).sum(dim=1, keepdim=True).expand(n, m).t()
dist.addmm_(1, -2, x, y.t())
return dist
def forward(self, inputs, targets):
n = inputs.size(0)
# Compute pairwise distance, replace by the official when merged
dist = torch.pow(inputs, 2).sum(dim=1, keepdim=True).expand(n, n)
dist = dist + dist.t()
dist.addmm_(1, -2, inputs, inputs.t())
dist = dist.clamp(min=1e-12).sqrt() # for numerical stability
# For each anchor, find the hardest positive and negative
mask = targets.expand(n, n).eq(targets.expand(n, n).t())
dist_ap, dist_an = [], []
for i in range(n):
dist_ap.append(dist[i][mask[i]].max())
dist_an.append(dist[i][mask[i] == 0].min())
dist_ap = torch.cat(dist_ap)
dist_an = torch.cat(dist_an)
# Compute ranking hinge loss
y = dist_an.data.new()
y.resize_as_(dist_an.data)
y.fill_(1)
y = Variable(y)
loss = self.ranking_loss(dist_an, dist_ap, y)
prec = (dist_an.data > dist_ap.data).sum() * 1. / y.size(0)
return loss, prec
def th_corrcoef(x):
"""
mimics np.corrcoef
"""
# calculate covariance matrix of rows
mean_x = th.mean(x, 1)
xm = x.sub(mean_x.expand_as(x))
c = xm.mm(xm.t())
c = c / (x.size(1) - 1)
# normalize covariance matrix
d = th.diag(c)
stddev = th.pow(d, 0.5)
c = c.div(stddev.expand_as(c))
c = c.div(stddev.expand_as(c).t())
# clamp between -1 and 1
c = th.clamp(c, -1.0, 1.0)
return c
def forward(self, input):
mask_le_mone = input.le(-1).type_as(input)
self.mask_ge_one = input.ge(1).type_as(input)
index_gt_mone = input.gt(-1).type_as(input)
index_lt_one = input.lt(1).type_as(input)
self.mask_mone_one = index_lt_one * index_gt_mone
mone = input.new().resize_as_(input).fill_(-1)
mone= mone * mask_le_mone
between_one = torch.pow(1 + input, 2) -1
between_one = between_one * self.mask_mone_one
ge_one = input * 4 - 1
ge_one = ge_one * self.mask_ge_one
between_one = mone + between_one
ge_one = between_one + ge_one
self.input = input
return ge_one
def forward(self, inds):
state_inp = self.state_inp.index_select(0, inds)
state_out = self.state_model.forward(state_inp)
goal_out = self.goal_model.forward(self.goal_inp)
recon = torch.mm(state_out, goal_out.t())
mask_select = self.mask.index_select(0, inds)
true_select = self.mat.index_select(0, inds)
# pdb.set_trace()
diff = torch.pow(recon - true_select, 2)
mse = diff.sum()
return mse
def train(epoch):
color_model.train()
try:
for batch_idx, (data, classes) in enumerate(train_loader):
messagefile = open('./message.txt', 'a')
original_img = data[0].unsqueeze(1).float()
img_ab = data[1].float()
if have_cuda:
original_img = original_img.cuda()
img_ab = img_ab.cuda()
classes = classes.cuda()
original_img = Variable(original_img)
img_ab = Variable(img_ab)
classes = Variable(classes)
optimizer.zero_grad()
class_output, output = color_model(original_img, original_img)
ems_loss = torch.pow((img_ab - output), 2).sum() / torch.from_numpy(np.array(list(output.size()))).prod()
cross_entropy_loss = 1/300 * F.cross_entropy(class_output, classes)
loss = ems_loss + cross_entropy_loss
lossmsg = 'loss: %.9f\n' % (loss.data[0])
messagefile.write(lossmsg)
ems_loss.backward(retain_variables=True)
cross_entropy_loss.backward()
optimizer.step()
if batch_idx % 500 == 0:
message = 'Train Epoch:%d\tPercent:[%d/%d (%.0f%%)]\tLoss:%.9f\n' % (
epoch, batch_idx * len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.data[0])
messagefile.write(message)
torch.save(color_model.state_dict(), 'colornet_params.pkl')
messagefile.close()
# print('Train Epoch: {}[{}/{}({:.0f}%)]\tLoss: {:.9f}\n'.format(
# epoch, batch_idx * len(data), len(train_loader.dataset),
# 100. * batch_idx / len(train_loader), loss.data[0]))
except Exception:
logfile = open('log.txt', 'w')
logfile.write(traceback.format_exc())
logfile.close()
finally:
torch.save(color_model.state_dict(), 'colornet_params.pkl')
def forward(self, a, b):
self.save_for_backward(a, b)
return a.pow(b)
def backward(self, grad_output):
a, b = self.saved_tensors
return grad_output.mul(b).mul_(a.pow(b-1)), grad_output.mul(a.pow(b)).mul_(a.log())
def forward(self, a):
if self.tensor_power:
self.fw_result = torch.pow(self.constant, a)
return result
else:
self.save_for_backward(a)
return a.pow(self.constant)
def backward(self, grad_output):
if self.tensor_power:
return grad_output.mul(self.fw_result).mul_(math.log(self.constant))
else:
a = self.saved_tensors[0]
return grad_output.mul(self.constant).mul_(a.pow(self.constant-1))
def test_cinv(self):
a = torch.randn(100,89)
zeros = torch.Tensor().resize_as_(a).zero_()
res_pow = torch.pow(a, -1)
res_inv = a.clone()
res_inv.cinv_()
self.assertEqual(res_inv, res_pow)
def test_pow(self):
# [res] torch.pow([res,] x)
# base - tensor, exponent - number
# contiguous
m1 = torch.randn(100,100)
res1 = torch.pow(m1[4], 3)
res2 = res1.clone().zero_()
for i in range(res2.size(0)):
res2[i] = math.pow(m1[4][i], 3)
self.assertEqual(res1, res2)
# non-contiguous
m1 = torch.randn(100,100)
res1 = torch.pow(m1[:,4], 3)
res2 = res1.clone().zero_()
for i in range(res2.size(0)):
res2[i] = math.pow(m1[i,4], 3)
self.assertEqual(res1, res2)
# base - number, exponent - tensor
# contiguous
m1 = torch.randn(100,100)
res1 = torch.pow(3, m1[4])
res2 = res1.clone().zero_()
for i in range(res2.size(0)):
res2[i] = math.pow(3, m1[4,i])
self.assertEqual(res1, res2)
# non-contiguous
m1 = torch.randn(100,100)
res1 = torch.pow(3, m1[:,4])
res2 = res1.clone().zero_()
for i in range(res2.size(0)):
res2[i] = math.pow(3, m1[i][4])
self.assertEqual(res1, res2)
def forward(self, input_x):
if torch.has_cudnn:
# Initialization of the hidden states
h_t_fr = Variable(torch.zeros(self._B, self._F).cuda(), requires_grad=False)
h_t_bk = Variable(torch.zeros(self._B, self._F).cuda(), requires_grad=False)
H_enc = Variable(torch.zeros(self._B, self._T - (2 * self._L), 2 * self._F).cuda(), requires_grad=False)
# Input is of the shape : (B (batches), T (time-sequence), N(frequency sub-bands))
# Cropping some "un-necessary" frequency sub-bands
cxin = Variable(torch.pow(torch.from_numpy(input_x[:, :, :self._F]).cuda(), self._alpha))
else:
# Initialization of the hidden states
h_t_fr = Variable(torch.zeros(self._B, self._F), requires_grad=False)
h_t_bk = Variable(torch.zeros(self._B, self._F), requires_grad=False)
H_enc = Variable(torch.zeros(self._B, self._T - (2 * self._L), 2 * self._F), requires_grad=False)
# Input is of the shape : (B (batches), T (time-sequence), N(frequency sub-bands))
# Cropping some "un-necessary" frequency sub-bands
cxin = Variable(torch.pow(torch.from_numpy(input_x[:, :, :self._F]), self._alpha))
for t in range(self._T):
# Bi-GRU Encoding
h_t_fr = self.gruEncF((cxin[:, t, :]), h_t_fr)
h_t_bk = self.gruEncB((cxin[:, self._T - t - 1, :]), h_t_bk)
# Residual connections
h_t_fr += cxin[:, t, :]
h_t_bk += cxin[:, self._T - t - 1, :]
# Remove context and concatenate
if (t >= self._L) and (t < self._T - self._L):
h_t = torch.cat((h_t_fr, h_t_bk), dim=1)
H_enc[:, t - self._L, :] = h_t
return H_enc
def forward(self, x0, x1, y):
# euclidian distance
diff = x0 - x1
dist_sq = torch.sum(torch.pow(diff, 2), 1)
dist = torch.sqrt(dist_sq)
mdist = self.margin - dist
dist = torch.clamp(mdist, min=0.0)
loss = y * dist_sq + (1 - y) * torch.pow(dist, 2)
loss = torch.sum(loss) / 2.0 / x0.size()[0]
return loss
def param_mse(name, target):
return torch.sum(torch.pow(target - pyro.param(name), 2.0)).data.cpu().numpy()[0]
def do_elbo_test(self, reparameterized, n_steps):
pyro.clear_param_store()
def model():
mu_latent = pyro.sample("mu_latent", dist.normal,
self.mu0, torch.pow(self.lam0, -0.5))
pyro.map_data("aaa", self.data, lambda i,
x: pyro.observe(
"obs_%d" % i, dist.normal,
x, mu_latent, torch.pow(self.lam, -0.5)),
batch_size=self.batch_size)
return mu_latent
def guide():
mu_q = pyro.param("mu_q", Variable(self.analytic_mu_n.data + 0.134 * torch.ones(2),
requires_grad=True))
log_sig_q = pyro.param("log_sig_q", Variable(
self.analytic_log_sig_n.data - 0.14 * torch.ones(2),
requires_grad=True))
sig_q = torch.exp(log_sig_q)
pyro.sample("mu_latent", dist.Normal(mu_q, sig_q, reparameterized=reparameterized))
pyro.map_data("aaa", self.data, lambda i, x: None,
batch_size=self.batch_size)
adam = optim.Adam({"lr": .001})
svi = SVI(model, guide, adam, loss="ELBO", trace_graph=False)
for k in range(n_steps):
svi.step()
mu_error = param_mse("mu_q", self.analytic_mu_n)
log_sig_error = param_mse("log_sig_q", self.analytic_log_sig_n)
self.assertEqual(0.0, mu_error, prec=0.05)
self.assertEqual(0.0, log_sig_error, prec=0.05)
def do_elbo_test(self, reparameterized, n_steps):
pyro.clear_param_store()
pt_guide = LogNormalNormalGuide(self.log_mu_n.data + 0.17,
self.log_tau_n.data - 0.143)
def model():
mu_latent = pyro.sample("mu_latent", dist.normal,
self.mu0, torch.pow(self.tau0, -0.5))
sigma = torch.pow(self.tau, -0.5)
pyro.observe("obs0", dist.lognormal, self.data[0], mu_latent, sigma)
pyro.observe("obs1", dist.lognormal, self.data[1], mu_latent, sigma)
return mu_latent
def guide():
pyro.module("mymodule", pt_guide)
mu_q, tau_q = torch.exp(pt_guide.mu_q_log), torch.exp(pt_guide.tau_q_log)
sigma = torch.pow(tau_q, -0.5)
pyro.sample("mu_latent", dist.Normal(mu_q, sigma, reparameterized=reparameterized))
adam = optim.Adam({"lr": .0005, "betas": (0.96, 0.999)})
svi = SVI(model, guide, adam, loss="ELBO", trace_graph=False)
for k in range(n_steps):
svi.step()
mu_error = param_abs_error("mymodule$$$mu_q_log", self.log_mu_n)
tau_error = param_abs_error("mymodule$$$tau_q_log", self.log_tau_n)
self.assertEqual(0.0, mu_error, prec=0.07)
self.assertEqual(0.0, tau_error, prec=0.07)
def test_elbo_with_transformed_distribution(self):
pyro.clear_param_store()
def model():
zero = Variable(torch.zeros(1))
one = Variable(torch.ones(1))
mu_latent = pyro.sample("mu_latent", dist.normal,
self.mu0, torch.pow(self.tau0, -0.5))
bijector = AffineExp(torch.pow(self.tau, -0.5), mu_latent)
x_dist = TransformedDistribution(dist.normal, bijector)
pyro.observe("obs0", x_dist, self.data[0], zero, one)
pyro.observe("obs1", x_dist, self.data[1], zero, one)
return mu_latent
def guide():
mu_q_log = pyro.param(
"mu_q_log",
Variable(
self.log_mu_n.data +
0.17,
requires_grad=True))
tau_q_log = pyro.param("tau_q_log", Variable(self.log_tau_n.data - 0.143,
requires_grad=True))
mu_q, tau_q = torch.exp(mu_q_log), torch.exp(tau_q_log)
pyro.sample("mu_latent", dist.normal, mu_q, torch.pow(tau_q, -0.5))
adam = optim.Adam({"lr": .0005, "betas": (0.96, 0.999)})
svi = SVI(model, guide, adam, loss="ELBO", trace_graph=False)
for k in range(12001):
svi.step()
mu_error = param_abs_error("mu_q_log", self.log_mu_n)
tau_error = param_abs_error("tau_q_log", self.log_tau_n)
self.assertEqual(0.0, mu_error, prec=0.05)
self.assertEqual(0.0, tau_error, prec=0.05)
def param_mse(name, target):
return torch.sum(torch.pow(target - pyro.param(name), 2.0)).data.cpu().numpy()[0]
def do_elbo_test(self, reparameterized, n_steps):
if self.verbose:
print(" - - - - - DO NORMALNORMAL ELBO TEST [reparameterized = %s] - - - - - " % reparameterized)
pyro.clear_param_store()
def model():
mu_latent = pyro.sample(
"mu_latent",
dist.Normal(self.mu0, torch.pow(self.lam0, -0.5), reparameterized=reparameterized))
for i, x in enumerate(self.data):
pyro.observe("obs_%d" % i, dist.normal, x, mu_latent,
torch.pow(self.lam, -0.5))
return mu_latent
def guide():
mu_q = pyro.param("mu_q", Variable(self.analytic_mu_n.data + 0.334 * torch.ones(2),
requires_grad=True))
log_sig_q = pyro.param("log_sig_q", Variable(
self.analytic_log_sig_n.data - 0.29 * torch.ones(2),
requires_grad=True))
sig_q = torch.exp(log_sig_q)
mu_latent = pyro.sample("mu_latent",
dist.Normal(mu_q, sig_q, reparameterized=reparameterized),
baseline=dict(use_decaying_avg_baseline=True))
return mu_latent
adam = optim.Adam({"lr": .0015, "betas": (0.97, 0.999)})
svi = SVI(model, guide, adam, loss="ELBO", trace_graph=True)
for k in range(n_steps):
svi.step()
mu_error = param_mse("mu_q", self.analytic_mu_n)
log_sig_error = param_mse("log_sig_q", self.analytic_log_sig_n)
if k % 250 == 0 and self.verbose:
print("mu error, log(sigma) error: %.4f, %.4f" % (mu_error, log_sig_error))
self.assertEqual(0.0, mu_error, prec=0.03)
self.assertEqual(0.0, log_sig_error, prec=0.03)
def test_elbo_nonreparameterized(self):
if self.verbose:
print(" - - - - - DO BERNOULLI-BETA ELBO TEST - - - - - ")
pyro.clear_param_store()
def model():
p_latent = pyro.sample("p_latent", dist.beta, self.alpha0, self.beta0)
for i, x in enumerate(self.data):
pyro.observe("obs_{}".format(i), dist.bernoulli, x,
torch.pow(torch.pow(p_latent, 2.0), 0.5))
return p_latent
def guide():
alpha_q_log = pyro.param("alpha_q_log",
Variable(self.log_alpha_n.data + 0.17, requires_grad=True))
beta_q_log = pyro.param("beta_q_log",
Variable(self.log_beta_n.data - 0.143, requires_grad=True))
alpha_q, beta_q = torch.exp(alpha_q_log), torch.exp(beta_q_log)
p_latent = pyro.sample("p_latent", dist.beta, alpha_q, beta_q,
baseline=dict(use_decaying_avg_baseline=True))
return p_latent
adam = optim.Adam({"lr": .0007, "betas": (0.96, 0.999)})
svi = SVI(model, guide, adam, loss="ELBO", trace_graph=True)
for k in range(3000):
svi.step()
alpha_error = param_abs_error("alpha_q_log", self.log_alpha_n)
beta_error = param_abs_error("beta_q_log", self.log_beta_n)
if k % 500 == 0 and self.verbose:
print("alpha_error, beta_error: %.4f, %.4f" % (alpha_error, beta_error))
self.assertEqual(0.0, alpha_error, prec=0.03)
self.assertEqual(0.0, beta_error, prec=0.04)
def do_elbo_test(self, reparameterized, n_steps, beta1, lr):
if self.verbose:
print(" - - - - - DO LOGNORMAL-NORMAL ELBO TEST [repa = %s] - - - - - " % reparameterized)
pyro.clear_param_store()
pt_guide = LogNormalNormalGuide(self.log_mu_n.data + 0.17,
self.log_tau_n.data - 0.143)
def model():
mu_latent = pyro.sample("mu_latent", dist.normal,
self.mu0, torch.pow(self.tau0, -0.5))
sigma = torch.pow(self.tau, -0.5)
pyro.observe("obs0", dist.lognormal, self.data[0], mu_latent, sigma)
pyro.observe("obs1", dist.lognormal, self.data[1], mu_latent, sigma)
return mu_latent
def guide():
pyro.module("mymodule", pt_guide)
mu_q, tau_q = torch.exp(pt_guide.mu_q_log), torch.exp(pt_guide.tau_q_log)
sigma = torch.pow(tau_q, -0.5)
pyro.sample("mu_latent",
dist.Normal(mu_q, sigma, reparameterized=reparameterized),
baseline=dict(use_decaying_avg_baseline=True))
adam = optim.Adam({"lr": lr, "betas": (beta1, 0.999)})
svi = SVI(model, guide, adam, loss="ELBO", trace_graph=True)
for k in range(n_steps):
svi.step()
mu_error = param_abs_error("mymodule$$$mu_q_log", self.log_mu_n)
tau_error = param_abs_error("mymodule$$$tau_q_log", self.log_tau_n)
if k % 500 == 0 and self.verbose:
print("mu_error, tau_error = %.4f, %.4f" % (mu_error, tau_error))
self.assertEqual(0.0, mu_error, prec=0.05)
self.assertEqual(0.0, tau_error, prec=0.05)
def test_elbo_with_transformed_distribution(self):
if self.verbose:
print(" - - - - - DO LOGNORMAL-NORMAL ELBO TEST [uses TransformedDistribution] - - - - - ")
pyro.clear_param_store()
def model():
mu_latent = pyro.sample("mu_latent", dist.normal,
self.mu0, torch.pow(self.tau0, -0.5))
bijector = AffineExp(torch.pow(self.tau, -0.5), mu_latent)
x_dist = TransformedDistribution(dist.normal, bijector)
pyro.observe("obs0", x_dist, self.data[0], ng_zeros(1), ng_ones(1))
pyro.observe("obs1", x_dist, self.data[1], ng_zeros(1), ng_ones(1))
return mu_latent
def guide():
mu_q_log = pyro.param(
"mu_q_log",
Variable(
self.log_mu_n.data +
0.17,
requires_grad=True))
tau_q_log = pyro.param("tau_q_log", Variable(self.log_tau_n.data - 0.143,
requires_grad=True))
mu_q, tau_q = torch.exp(mu_q_log), torch.exp(tau_q_log)
pyro.sample("mu_latent", dist.normal, mu_q, torch.pow(tau_q, -0.5))
adam = optim.Adam({"lr": 0.001, "betas": (0.95, 0.999)})
svi = SVI(model, guide, adam, loss="ELBO", trace_graph=True)
for k in range(7000):
svi.step()
mu_error = param_abs_error("mu_q_log", self.log_mu_n)
tau_error = param_abs_error("tau_q_log", self.log_tau_n)
if k % 500 == 0 and self.verbose:
print("mu_error, tau_error = %.4f, %.4f" % (mu_error, tau_error))
self.assertEqual(0.0, mu_error, prec=0.05)
self.assertEqual(0.0, tau_error, prec=0.05)
def batch_log_pdf(self, x):
"""
Ref: :py:meth:`pyro.distributions.distribution.Distribution.batch_log_pdf`
"""
# expand to patch size of input
mu = self.mu.expand(self.shape(x))
gamma = self.gamma.expand(self.shape(x))
x_0 = torch.pow((x - mu) / gamma, 2)
px = 2 / (np.pi * gamma * (1 + x_0))
batch_log_pdf_shape = self.batch_shape(x) + (1,)
return torch.sum(torch.log(px), -1).contiguous().view(batch_log_pdf_shape)
def analytic_var(self):
"""
Ref: :py:meth:`pyro.distributions.distribution.Distribution.analytic_var`
"""
return torch.pow(self.sigma, 2)
def analytic_var(self):
"""
Ref: :py:meth:`pyro.distributions.distribution.Distribution.analytic_var`
"""
return torch.pow(self.analytic_mean(), 2.0) * self.beta / \
(self.alpha * (self.alpha + self.beta + Variable(torch.ones([1]))))
def batch_log_pdf(self, x):
"""
Ref: :py:meth:`pyro.distributions.distribution.Distribution.batch_log_pdf`
"""
# expand to patch size of input
mu = self.mu.expand(self.shape(x))
gamma = self.gamma.expand(self.shape(x))
x_0 = torch.pow((x - mu) / gamma, 2)
px = np.pi * gamma * (1 + x_0)
log_pdf = -1 * torch.sum(torch.log(px), -1)
batch_log_pdf_shape = self.batch_shape(x) + (1,)
return log_pdf.contiguous().view(batch_log_pdf_shape)
def analytic_var(self):
"""
Ref: :py:meth:`pyro.distributions.distribution.Distribution.analytic_var`
"""
return self.alpha / torch.pow(self.beta, 2.0)