def make_sprite(label_img, save_path):
import math
import torch
import torchvision
# this ensures the sprite image has correct dimension as described in
# https://www.tensorflow.org/get_started/embedding_viz
nrow = int(math.ceil((label_img.size(0)) ** 0.5))
# augment images so that #images equals nrow*nrow
label_img = torch.cat((label_img, torch.randn(nrow ** 2 - label_img.size(0), *label_img.size()[1:]) * 255), 0)
# Dirty fix: no pixel are appended by make_grid call in save_image (https://github.com/pytorch/vision/issues/206)
xx = torchvision.utils.make_grid(torch.Tensor(1, 3, 32, 32), padding=0)
if xx.size(2) == 33:
sprite = torchvision.utils.make_grid(label_img, nrow=nrow, padding=0)
sprite = sprite[:, 1:, 1:]
torchvision.utils.save_image(sprite, os.path.join(save_path, 'sprite.png'))
else:
torchvision.utils.save_image(label_img, os.path.join(save_path, 'sprite.png'), nrow=nrow, padding=0)
python类randn()的实例源码
def test_parallel_apply(self):
l1 = nn.Linear(10, 5).float().cuda(0)
l2 = nn.Linear(10, 5).float().cuda(1)
i1 = Variable(torch.randn(2, 10).float().cuda(0))
i2 = Variable(torch.randn(2, 10).float().cuda(1))
expected1 = l1(i1).data
expected2 = l2(i2).data
inputs = (i1, i2)
modules = (l1, l2)
expected_outputs = (expected1, expected2)
outputs = dp.parallel_apply(modules, inputs)
for out, expected in zip(outputs, expected_outputs):
self.assertEqual(out.data, expected)
inputs = (i1, Variable(i2.data.new()))
expected_outputs = (expected1, expected2.new())
def _test_preserve_sharing(self):
def do_test():
x = torch.randn(5, 5)
data = [x.storage(), x.storage()[1:4], x, x[2], x[:,1]]
q = mp.Queue()
q.put(data)
new_data = q.get()
self.assertEqual(new_data, data, 0)
storage_cdata = data[0]._cdata
self.assertEqual(new_data[0]._cdata, storage_cdata)
for t in new_data[2:]:
self.assertEqual(t.storage()._cdata, storage_cdata)
# TODO: enable after fixing #46
# new_data[0].fill_(10)
# self.assertEqual(new_data[1], new_data[0][1:4], 0)
with leak_checker(self):
do_test()
def test_serialization(self):
x = torch.randn(5, 5).cuda()
y = torch.IntTensor(2, 5).fill_(0).cuda()
q = [x, y, x, y.storage()]
with tempfile.NamedTemporaryFile() as f:
torch.save(q, f)
f.seek(0)
q_copy = torch.load(f)
self.assertEqual(q_copy, q, 0)
q_copy[0].fill_(5)
self.assertEqual(q_copy[0], q_copy[2], 0)
self.assertTrue(isinstance(q_copy[0], torch.cuda.DoubleTensor))
self.assertTrue(isinstance(q_copy[1], torch.cuda.IntTensor))
self.assertTrue(isinstance(q_copy[2], torch.cuda.DoubleTensor))
self.assertTrue(isinstance(q_copy[3], torch.cuda.IntStorage))
q_copy[1].fill_(10)
self.assertTrue(q_copy[3], torch.cuda.IntStorage(10).fill_(10))
def _test_gather(self, dim):
if torch.cuda.device_count() < 2:
raise unittest.SkipTest("only one GPU detected")
x = torch.randn(2, 5).cuda(0)
y = torch.randn(2, 5).cuda(1)
result = comm.gather((x, y), dim)
expected_size = list(x.size())
expected_size[dim] += y.size(dim)
expected_size = torch.Size(expected_size)
self.assertEqual(result.get_device(), 0)
self.assertEqual(result.size(), expected_size)
index = [slice(None, None), slice(None, None)]
index[dim] = slice(0, x.size(dim))
self.assertEqual(result[tuple(index)], x)
index[dim] = slice(x.size(dim), x.size(dim) + y.size(dim))
self.assertEqual(result[tuple(index)], y)
def test_Copy(self):
input = torch.randn(3,4).double()
c = nn.Copy(torch.DoubleTensor, torch.FloatTensor)
output = c.forward(input)
self.assertEqual(torch.typename(output), 'torch.FloatTensor')
self.assertEqual(output, input.float(), 1e-6)
gradInput = c.backward(input, output.fill_(1))
self.assertEqual(torch.typename(gradInput), 'torch.DoubleTensor')
self.assertEqual(gradInput, output.double(), 1e-6)
c.dontCast = True
c.double()
self.assertEqual(torch.typename(output), 'torch.FloatTensor')
# Check that these don't raise errors
c.__repr__()
str(c)
def test_Parallel(self):
input = torch.randn(3, 4, 5)
m = nn.Parallel(0, 2)
m.add(nn.View(4, 5, 1))
m.add(nn.View(4, 5, 1))
m.add(nn.View(4, 5, 1))
# Check that these don't raise errors
m.__repr__()
str(m)
output = m.forward(input)
output2 = input.transpose(0, 2).transpose(0, 1)
self.assertEqual(output2, output)
gradInput = m.backward(input, output2)
self.assertEqual(gradInput, input)
def test_ParallelTable(self):
input = torch.randn(3, 4, 5)
p = nn.ParallelTable()
p.add(nn.View(4,5,1))
p.add(nn.View(4,5,1))
p.add(nn.View(4,5,1))
m = nn.Sequential()
m.add(nn.SplitTable(0))
m.add(p)
m.add(nn.JoinTable(2))
# Check that these don't raise errors
p.__repr__()
str(p)
output = m.forward(input)
output2 = input.transpose(0,2).transpose(0,1)
self.assertEqual(output2, output)
gradInput = m.backward(input, output2)
self.assertEqual(gradInput, input)
def test_MaskedSelect(self):
input = torch.randn(4, 5)
mask = torch.ByteTensor(4, 5).bernoulli_()
module = nn.MaskedSelect()
out = module.forward([input, mask])
self.assertEqual(input.masked_select(mask), out)
gradOut = torch.Tensor((20, 80))
input = torch.Tensor(((10, 20), (30, 40)))
inTarget = torch.Tensor(((20, 0), (0, 80)))
mask = torch.ByteTensor(((1, 0), (0, 1)))
module = nn.MaskedSelect()
module.forward([input, mask])
gradIn = module.backward([input, mask], gradOut)
self.assertEqual(inTarget, gradIn[0])
# Check that these don't raise errors
module.__repr__()
str(module)
def _testMath(self, torchfn, mathfn):
size = (10, 5)
# contiguous
m1 = torch.randn(*size)
res1 = torchfn(m1[4])
res2 = res1.clone().zero_()
for i, v in enumerate(m1[4]):
res2[i] = mathfn(v)
self.assertEqual(res1, res2)
# non-contiguous
m1 = torch.randn(*size)
res1 = torchfn(m1[:,4])
res2 = res1.clone().zero_()
for i, v in enumerate(m1[:,4]):
res2[i] = mathfn(v)
self.assertEqual(res1, res2)
def test_csub(self):
# with a tensor
a = torch.randn(100,90)
b = a.clone().normal_()
res_add = torch.add(a, -1, b)
res_csub = a.clone()
res_csub.sub_(b)
self.assertEqual(res_add, res_csub)
# with a scalar
a = torch.randn(100,100)
scalar = 123.5
res_add = torch.add(a, -scalar)
res_csub = a.clone()
res_csub.sub_(scalar)
self.assertEqual(res_add, res_csub)
def _test_cop(self, torchfn, mathfn):
def reference_implementation(res2):
for i, j in iter_indices(sm1):
idx1d = i * sm1.size(0) + j
res2[i,j] = mathfn(sm1[i,j], sm2[idx1d])
return res2
# contiguous
m1 = torch.randn(10, 10, 10)
m2 = torch.randn(10, 10 * 10)
sm1 = m1[4]
sm2 = m2[4]
res1 = torchfn(sm1, sm2)
res2 = reference_implementation(res1.clone())
self.assertEqual(res1, res2)
# non-contiguous
m1 = torch.randn(10, 10, 10)
m2 = torch.randn(10 * 10, 10 * 10)
sm1 = m1[:,4]
sm2 = m2[:,4]
res1 = torchfn(sm1, sm2)
res2 = reference_implementation(res1.clone())
self.assertEqual(res1, res2)
def test_inverse(self):
M = torch.randn(5,5)
MI = torch.inverse(M)
E = torch.eye(5)
self.assertFalse(MI.is_contiguous(), 'MI is contiguous')
self.assertEqual(E, torch.mm(M, MI), 1e-8, 'inverse value')
self.assertEqual(E, torch.mm(MI, M), 1e-8, 'inverse value')
MII = torch.Tensor(5, 5)
torch.inverse(MII, M)
self.assertFalse(MII.is_contiguous(), 'MII is contiguous')
self.assertEqual(MII, MI, 0, 'inverse value in-place')
# second call, now that MII is transposed
torch.inverse(MII, M)
self.assertFalse(MII.is_contiguous(), 'MII is contiguous')
self.assertEqual(MII, MI, 0, 'inverse value in-place')
def test_index_add(self):
num_copy, num_dest = 3, 3
dest = torch.randn(num_dest, 4, 5)
src = torch.randn(num_copy, 4, 5)
idx = torch.randperm(num_dest).narrow(0, 0, num_copy).long()
dest2 = dest.clone()
dest.index_add_(0, idx, src)
for i in range(idx.size(0)):
dest2[idx[i]].add_(src[i])
self.assertEqual(dest, dest2)
dest = torch.randn(num_dest)
src = torch.randn(num_copy)
idx = torch.randperm(num_dest).narrow(0, 0, num_copy).long()
dest2 = dest.clone()
dest.index_add_(0, idx, src)
for i in range(idx.size(0)):
dest2[idx[i]] = dest2[idx[i]] + src[i]
self.assertEqual(dest, dest2)
# Fill idx with valid indices.
def test_masked_copy(self):
num_copy, num_dest = 3, 10
dest = torch.randn(num_dest)
src = torch.randn(num_copy)
mask = torch.ByteTensor((0, 0, 0, 0, 1, 0, 1, 0, 1, 0))
dest2 = dest.clone()
dest.masked_copy_(mask, src)
j = 0
for i in range(num_dest):
if mask[i]:
dest2[i] = src[j]
j += 1
self.assertEqual(dest, dest2, 0)
# make source bigger than number of 1s in mask
src = torch.randn(num_dest)
dest.masked_copy_(mask, src)
# make src smaller. this should fail
src = torch.randn(num_copy - 1)
with self.assertRaises(RuntimeError):
dest.masked_copy_(mask, src)
def test_serialization(self):
a = [torch.randn(5, 5).float() for i in range(2)]
b = [a[i % 2] for i in range(4)]
b += [a[0].storage()]
b += [a[0].storage()[1:4]]
for use_name in (False, True):
with tempfile.NamedTemporaryFile() as f:
handle = f if not use_name else f.name
torch.save(b, handle)
f.seek(0)
c = torch.load(handle)
self.assertEqual(b, c, 0)
self.assertTrue(isinstance(c[0], torch.FloatTensor))
self.assertTrue(isinstance(c[1], torch.FloatTensor))
self.assertTrue(isinstance(c[2], torch.FloatTensor))
self.assertTrue(isinstance(c[3], torch.FloatTensor))
self.assertTrue(isinstance(c[4], torch.FloatStorage))
c[0].fill_(10)
self.assertEqual(c[0], c[2], 0)
self.assertEqual(c[4], torch.FloatStorage(25).fill_(10), 0)
c[1].fill_(20)
self.assertEqual(c[1], c[3], 0)
self.assertEqual(c[4], c[5][1:4], 0)
def testTensorDataset(self):
# dict input
data = {
# 'input': torch.arange(0,8),
'input': np.arange(0, 8),
'target': np.arange(0, 8),
}
d = dataset.TensorDataset(data)
self.assertEqual(len(d), 8)
self.assertEqual(d[2], {'input': 2, 'target': 2})
# tensor input
a = torch.randn(8)
d = dataset.TensorDataset(a)
self.assertEqual(len(a), len(d))
self.assertEqual(a[1], d[1])
# list of tensors input
d = dataset.TensorDataset([a])
self.assertEqual(len(a), len(d))
self.assertEqual(a[1], d[1][0])
def test_wrapper_works_when_passed_state_with_zero_length_sequences(self):
lstm = LSTM(bidirectional=True, num_layers=3, input_size=3, hidden_size=7, batch_first=True)
encoder = PytorchSeq2SeqWrapper(lstm)
tensor = torch.rand([5, 7, 3])
mask = torch.ones(5, 7)
mask[0, 3:] = 0
mask[1, 4:] = 0
mask[2, 0:] = 0
mask[3, 6:] = 0
# Initial states are of shape (num_layers * num_directions, batch_size, hidden_dim)
initial_states = (Variable(torch.randn(6, 5, 7)),
Variable(torch.randn(6, 5, 7)))
input_tensor = Variable(tensor)
mask = Variable(mask)
_ = encoder(input_tensor, mask, initial_states)
def guide(data):
w_mu = Variable(torch.randn(p, 1).type_as(data.data), requires_grad=True)
w_log_sig = Variable((-3.0 * torch.ones(p, 1) + 0.05 * torch.randn(p, 1)).type_as(data.data), requires_grad=True)
b_mu = Variable(torch.randn(1).type_as(data.data), requires_grad=True)
b_log_sig = Variable((-3.0 * torch.ones(1) + 0.05 * torch.randn(1)).type_as(data.data), requires_grad=True)
# register learnable params in the param store
mw_param = pyro.param("guide_mean_weight", w_mu)
sw_param = softplus(pyro.param("guide_log_sigma_weight", w_log_sig))
mb_param = pyro.param("guide_mean_bias", b_mu)
sb_param = softplus(pyro.param("guide_log_sigma_bias", b_log_sig))
# gaussian guide distributions for w and b
w_dist = Normal(mw_param, sw_param)
b_dist = Normal(mb_param, sb_param)
dists = {'linear.weight': w_dist, 'linear.bias': b_dist}
# overloading the parameters in the module with random samples from the guide distributions
lifted_module = pyro.random_module("module", regression_model, dists)
# sample a regressor
return lifted_module()
# instantiate optim and inference objects
def _test_jacobian(self, input_dim, hidden_dim, multiplier):
jacobian = torch.zeros(input_dim, input_dim)
arn = AutoRegressiveNN(input_dim, hidden_dim, multiplier)
def nonzero(x):
return torch.sign(torch.abs(x))
for output_index in range(multiplier):
for j in range(input_dim):
for k in range(input_dim):
x = Variable(torch.randn(1, input_dim))
epsilon_vector = torch.zeros(1, input_dim)
epsilon_vector[0, j] = self.epsilon
delta = (arn(x + Variable(epsilon_vector)) - arn(x)) / self.epsilon
jacobian[j, k] = float(delta[0, k + output_index * input_dim].data.cpu().numpy()[0])
permutation = arn.get_permutation()
permuted_jacobian = jacobian.clone()
for j in range(input_dim):
for k in range(input_dim):
permuted_jacobian[j, k] = jacobian[permutation[j], permutation[k]]
lower_sum = torch.sum(torch.tril(nonzero(permuted_jacobian), diagonal=0))
self.assertTrue(lower_sum == float(0.0))
def test_batch_dim(batch_dim):
data = Variable(torch.randn(4, 5, 7))
def local_model(ixs, _xs):
xs = _xs.view(-1, _xs.size(2))
return pyro.sample("xs", dist.normal,
xs, Variable(torch.ones(xs.size())))
def model():
return pyro.map_data("md", data, local_model,
batch_size=1, batch_dim=batch_dim)
tr = poutine.trace(model).get_trace()
assert tr.nodes["xs"]["value"].size(0) == data.size(1 - batch_dim)
assert tr.nodes["xs"]["value"].size(1) == data.size(2)
def setUp(self):
# normal-normal; known covariance
self.lam0 = Variable(torch.Tensor([0.1, 0.1])) # precision of prior
self.mu0 = Variable(torch.Tensor([0.0, 0.5])) # prior mean
# known precision of observation noise
self.lam = Variable(torch.Tensor([6.0, 4.0]))
self.n_outer = 3
self.n_inner = 3
self.n_data = Variable(torch.Tensor([self.n_outer * self.n_inner]))
self.data = []
self.sum_data = ng_zeros(2)
for _out in range(self.n_outer):
data_in = []
for _in in range(self.n_inner):
data_in.append(Variable(torch.Tensor([-0.1, 0.3]) + torch.randn(2) / torch.sqrt(self.lam.data)))
self.sum_data += data_in[-1]
self.data.append(data_in)
self.analytic_lam_n = self.lam0 + self.n_data.expand_as(self.lam) * self.lam
self.analytic_log_sig_n = -0.5 * torch.log(self.analytic_lam_n)
self.analytic_mu_n = self.sum_data * (self.lam / self.analytic_lam_n) +\
self.mu0 * (self.lam0 / self.analytic_lam_n)
self.verbose = True
# this tests rao-blackwellization in elbo for nested list map_datas
def sample(self):
# Load trained parameters
g_path = os.path.join(self.model_path, 'generator-%d.pkl' %(self.num_epochs))
d_path = os.path.join(self.model_path, 'discriminator-%d.pkl' %(self.num_epochs))
self.generator.load_state_dict(torch.load(g_path))
self.discriminator.load_state_dict(torch.load(d_path))
self.generator.eval()
self.discriminator.eval()
# Sample the images
noise = self.to_variable(torch.randn(self.sample_size, self.z_dim))
fake_images = self.generator(noise)
sample_path = os.path.join(self.sample_path, 'fake_samples-final.png')
torchvision.utils.save_image(self.denorm(fake_images.data), sample_path, nrow=12)
print("Saved sampled images to '%s'" %sample_path)
def test_forward(self):
import torch
from torch.autograd import Variable
from reid.models.inception import InceptionNet
# model = Inception(num_classes=5, num_features=256, dropout=0.5)
# x = Variable(torch.randn(10, 3, 144, 56), requires_grad=False)
# y = model(x)
# self.assertEquals(y.size(), (10, 5))
model = InceptionNet(num_features=8, norm=True, dropout=0)
x = Variable(torch.randn(10, 3, 144, 56), requires_grad=False)
y = model(x)
self.assertEquals(y.size(), (10, 8))
self.assertEquals(y.norm(2, 1).max(), 1)
self.assertEquals(y.norm(2, 1).min(), 1)
def test_forward_backward(self):
import torch
import torch.nn.functional as F
from torch.autograd import Variable
from reid.loss import OIMLoss
criterion = OIMLoss(3, 3, scalar=1.0, size_average=False)
criterion.lut = torch.eye(3)
x = Variable(torch.randn(3, 3), requires_grad=True)
y = Variable(torch.range(0, 2).long())
loss = criterion(x, y)
loss.backward()
probs = F.softmax(x)
grads = probs.data - torch.eye(3)
abs_diff = torch.abs(grads - x.grad.data)
self.assertEquals(torch.log(probs).diag().sum(), -loss)
self.assertTrue(torch.max(abs_diff) < 1e-6)
def _make_layers(self, cfg):
layers = []
in_channels = 3
for x in cfg:
if x == 'M':
layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
else:
layers += [nn.Conv2d(in_channels, x, kernel_size=3, padding=1),
nn.BatchNorm2d(x),
nn.ReLU(inplace=True)]
in_channels = x
layers += [nn.AvgPool2d(kernel_size=1, stride=1)]
return nn.Sequential(*layers)
# net = VGG('VGG11')
# x = torch.randn(2,3,32,32)
# print(net(Variable(x)).size())
def forward(self, x):
out = self.pre_layers(x)
out = self.a3(out)
out = self.b3(out)
out = self.maxpool(out)
out = self.a4(out)
out = self.b4(out)
out = self.c4(out)
out = self.d4(out)
out = self.e4(out)
out = self.maxpool(out)
out = self.a5(out)
out = self.b5(out)
out = self.avgpool(out)
out = out.view(out.size(0), -1)
out = self.linear(out)
return out
# net = GoogLeNet()
# x = torch.randn(1,3,32,32)
# y = net(Variable(x))
# print(y.size())
def __init__(self, vocab_size, tag_to_ix, embedding_dim, hidden_dim):
super(BiLSTM_CRF, self).__init__()
self.embedding_dim = embedding_dim
self.hidden_dim = hidden_dim
self.vocab_size = vocab_size
self.tag_to_ix = tag_to_ix
self.tagset_size = len(tag_to_ix)
self.word_embeds = nn.Embedding(vocab_size, embedding_dim, padding_idx = 0)
self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2,
num_layers=1, bidirectional=True)
# Maps the output of the LSTM into tag space.
self.hidden2tag = nn.Linear(hidden_dim, self.tagset_size)
# Matrix of transition parameters. Entry i,j is the score of
# transitioning *to* i *from* j.
self.transitions = nn.Parameter(
torch.randn(self.tagset_size, self.tagset_size))
self.hidden = self.init_hidden()
def __init__(self, question_size, passage_size, hidden_size, attn_size=None,
cell_type=nn.GRUCell, num_layers=1, dropout=0, residual=False, **kwargs):
super().__init__()
self.num_layers = num_layers
if attn_size is None:
attn_size = question_size
# TODO: what is V_q? (section 3.4)
v_q_size = question_size
self.question_pooling = AttentionPooling(question_size,
v_q_size, attn_size=attn_size)
self.passage_pooling = AttentionPooling(passage_size,
question_size, attn_size=attn_size)
self.V_q = nn.Parameter(torch.randn(1, 1, v_q_size), requires_grad=True)
self.cell = StackedCell(question_size, question_size, num_layers=num_layers,
dropout=dropout, rnn_cell=cell_type, residual=residual, **kwargs)
def make_sprite(label_img, save_path):
import math
import torch
import torchvision
# this ensures the sprite image has correct dimension as described in
# https://www.tensorflow.org/get_started/embedding_viz
nrow = int(math.ceil((label_img.size(0)) ** 0.5))
# augment images so that #images equals nrow*nrow
label_img = torch.cat((label_img, torch.randn(nrow ** 2 - label_img.size(0), *label_img.size()[1:]) * 255), 0)
# Dirty fix: no pixel are appended by make_grid call in save_image (https://github.com/pytorch/vision/issues/206)
xx = torchvision.utils.make_grid(torch.Tensor(1, 3, 32, 32), padding=0)
if xx.size(2) == 33:
sprite = torchvision.utils.make_grid(label_img, nrow=nrow, padding=0)
sprite = sprite[:, 1:, 1:]
torchvision.utils.save_image(sprite, os.path.join(save_path, 'sprite.png'))
else:
torchvision.utils.save_image(label_img, os.path.join(save_path, 'sprite.png'), nrow=nrow, padding=0)