def test(self, dataset):
self.model.eval()
total_loss = 0
predictions = torch.zeros(len(dataset))
indices = torch.arange(1, dataset.num_classes + 1)
for idx in tqdm(range(len(dataset)),desc='Testing epoch ' + str(self.epoch) + ''):
ltree, lsent, rtree, rsent, label = dataset[idx]
linput, rinput = Var(lsent, volatile=True), Var(rsent, volatile=True)
target = Var(map_label_to_target(label, dataset.num_classes), volatile=True)
if self.args.cuda:
linput, rinput = linput.cuda(), rinput.cuda()
target = target.cuda()
output = self.model(ltree, linput, rtree, rinput)
loss = self.criterion(output, target)
total_loss += loss.data[0]
output = output.data.squeeze().cpu()
predictions[idx] = torch.dot(indices, torch.exp(output))
return total_loss / len(dataset), predictions
python类arange()的实例源码
def testClassErrorMeter(self):
mtr = meter.ClassErrorMeter(topk=[1])
output = torch.eye(3)
if hasattr(torch, "arange"):
target = torch.arange(0, 3)
else:
target = torch.range(0, 2)
mtr.add(output, target)
err = mtr.value()
self.assertEqual(err, [0], "All should be correct")
target[0] = 1
target[1] = 0
target[2] = 0
mtr.add(output, target)
err = mtr.value()
self.assertEqual(err, [50.0], "Half should be correct")
def testTensorDataset(self):
# dict input
data = {
# 'input': torch.arange(0,8),
'input': np.arange(0, 8),
'target': np.arange(0, 8),
}
d = dataset.TensorDataset(data)
self.assertEqual(len(d), 8)
self.assertEqual(d[2], {'input': 2, 'target': 2})
# tensor input
a = torch.randn(8)
d = dataset.TensorDataset(a)
self.assertEqual(len(a), len(d))
self.assertEqual(a[1], d[1])
# list of tensors input
d = dataset.TensorDataset([a])
self.assertEqual(len(a), len(d))
self.assertEqual(a[1], d[1][0])
def reverse_sequence(self, x, x_lens):
batch_size, seq_len, word_dim = x.size()
inv_idx = Variable(torch.arange(seq_len - 1, -1, -1).long())
shift_idx = Variable(torch.arange(0, seq_len).long())
if x.is_cuda:
inv_idx = inv_idx.cuda(x.get_device())
shift_idx = shift_idx.cuda(x.get_device())
inv_idx = inv_idx.unsqueeze(0).unsqueeze(-1).expand_as(x)
shift_idx = shift_idx.unsqueeze(0).unsqueeze(-1).expand_as(x)
shift = (seq_len + (-1 * x_lens)).unsqueeze(-1).unsqueeze(-1).expand_as(x)
shift_idx = shift_idx + shift
shift_idx = shift_idx.clamp(0, seq_len - 1)
x = x.gather(1, inv_idx)
x = x.gather(1, shift_idx)
return x
def sequence_mask(lens, max_len=None):
batch_size = lens.size(0)
if max_len is None:
max_len = lens.max().data[0]
ranges = torch.arange(0, max_len).long()
ranges = ranges.unsqueeze(0).expand(batch_size, max_len)
ranges = Variable(ranges)
if lens.data.is_cuda:
ranges = ranges.cuda()
lens_exp = lens.unsqueeze(1).expand_as(ranges)
mask = ranges < lens_exp
return mask
def value(self):
"""Returns the model's average precision for each class
Return:
ap (FloatTensor): 1xK tensor, with avg precision for each class k
"""
if self.scores.numel() == 0:
return 0
ap = torch.zeros(self.scores.size(1))
rg = torch.arange(1, self.scores.size(0)).float()
# compute average precision for each class
for k in range(self.scores.size(1)):
# sort scores
scores = self.scores[:, k]
targets = self.targets[:, k]
# compute average precision
ap[k] = AveragePrecisionMeter.average_precision(scores, targets, self.difficult_examples)
return ap
def __init__(self, inputsize, outputsize, bias=True):
super(PartialLinear, self).__init__()
# define the layer as a small network:
pt = ParallelTable()
pt.add(Identity()).add(LookupTable(outputsize, inputsize))
self.network = Sequential().add(pt).add(MM(False, True))
if bias:
self.bias = torch.zeros(1, outputsize)
self.gradBias = torch.zeros(1, outputsize)
else:
self.bias = self.gradBias = None
# set partition:
self.inputsize = inputsize
self.outputsize = outputsize
self.allcolumns = torch.arange(0, self.outputsize).long()
self.resetPartition()
self.addBuffer = None
self.buffer = None
def test_cuda_small_tensors(self):
# Check multiple small tensors which will likely use the same
# underlying cached allocation
ctx = mp.get_context('spawn')
tensors = []
for i in range(5):
tensors += [torch.arange(i * 5, (i + 1) * 5).cuda()]
inq = ctx.Queue()
outq = ctx.Queue()
inq.put(tensors)
p = ctx.Process(target=sum_tensors, args=(inq, outq))
p.start()
results = []
for i in range(5):
results.append(outq.get())
p.join()
for i, tensor in enumerate(tensors):
v, device, tensor_size, storage_size = results[i]
self.assertEqual(v, torch.arange(i * 5, (i + 1) * 5).sum())
self.assertEqual(device, 0)
self.assertEqual(tensor_size, 5)
self.assertEqual(storage_size, 5)
def __init__(self, inputsize, outputsize, bias=True):
super(PartialLinear, self).__init__()
# define the layer as a small network:
pt = ParallelTable()
pt.add(Identity()).add(LookupTable(outputsize, inputsize))
self.network = Sequential().add(pt).add(MM(False, True))
if bias:
self.bias = torch.zeros(1, outputsize)
self.gradBias = torch.zeros(1, outputsize)
else:
self.bias = self.gradBias = None
# set partition:
self.inputsize = inputsize
self.outputsize = outputsize
self.allcolumns = torch.arange(0, self.outputsize).long()
self.resetPartition()
self.addBuffer = None
self.buffer = None
def test_cuda_small_tensors(self):
# Check multiple small tensors which will likely use the same
# underlying cached allocation
ctx = mp.get_context('spawn')
tensors = []
for i in range(5):
tensors += [torch.arange(i * 5, (i + 1) * 5).cuda()]
inq = ctx.Queue()
outq = ctx.Queue()
inq.put(tensors)
p = ctx.Process(target=sum_tensors, args=(inq, outq))
p.start()
results = []
for i in range(5):
results.append(outq.get())
p.join()
for i, tensor in enumerate(tensors):
v, device, tensor_size, storage_size = results[i]
self.assertEqual(v, torch.arange(i * 5, (i + 1) * 5).sum())
self.assertEqual(device, 0)
self.assertEqual(tensor_size, 5)
self.assertEqual(storage_size, 5)
def _feature_reorg(self, input, stride=2):
N, C, H, W = input.size()
assert H == W, "H and W is not equal"
w_new = int(W / 2)
idx_left = torch.arange(0, w_new).long().cuda()
idx_right = torch.arange(w_new, W).long().cuda()
idx_left = Variable(idx_left)
idx_right = Variable(idx_right)
output_left = input.index_select(dim=3, index=idx_left)
output_right = input.index_select(dim=3, index=idx_right)
output_left = output_left.view(N, -1, w_new, w_new)
output_right = output_right.view(N, -1, w_new, w_new)
output_cat = torch.cat((output_left, output_right), dim=2)
output = output_cat.view(N, -1, w_new, w_new)
return output
def __init__(self):
super(Chunking, self).__init__()
self.input_size = embedding_size \
+ nb_postags \
+ postag_hn_size * 2
self.w = nn.Parameter(torch.randn(chunking_nb_layers * 2,
max_sentence_size,
chunking_hn_size))
self.h = nn.Parameter(torch.randn(chunking_nb_layers * 2,
max_sentence_size,
chunking_hn_size))
self.embedding = nn.Embedding(nb_postags, chunking_postag_emb_size)
self.aux_emb = torch.arange(0, nb_postags)
self.aux_emb = Variable(self.aux_emb).long()
self.bi_lstm = nn.LSTM(self.input_size,
chunking_hn_size,
chunking_nb_layers,
bidirectional=True)
self.fc = nn.Linear(chunking_hn_size * 2, nb_chunktags)
def positional_embedding(x, min_timescale=1.0, max_timescale=1.0e4):
batch, length, channels = list(x.size())
assert (channels % 2 == 0)
num_timescales = channels // 2
log_timescale_increment = (
math.log(float(max_timescale) / float(min_timescale)) /
(float(num_timescales) - 1.))
position = torch.arange(0, length).float()
inv_timescales = torch.arange(0, num_timescales).float()
if x.is_cuda:
position = position.cuda()
inv_timescales = inv_timescales.cuda()
inv_timescales.mul_(-log_timescale_increment).exp_().mul_(min_timescale)
scaled_time = position.unsqueeze(1).expand(
length, num_timescales) * inv_timescales.unsqueeze(0).expand(length, num_timescales)
# scaled time is now length x num_timescales
# length x channels
signal = torch.cat([scaled_time.sin(), scaled_time.cos()], 1)
return signal.unsqueeze(0).expand(batch, length, channels)
def forward(self, input, offsets=None):
if input.dim() == 2:
if offsets is not None:
raise ValueError("if input is 2D, then offsets has to be None"
", as input is treated is a mini-batch of"
" fixed length sequences. However, found "
"offsets of type {}".format(type(offsets)))
else:
offsets = Variable(torch.arange(0, input.numel(), input.size(1),
out=input.data.new().long()))
input = input.view(-1)
elif input.dim() != 1:
raise ValueError("input has to be 1D or 2D Tensor,"
" but got Tensor of dimension {}".format(input.dim()))
if offsets is None:
raise ValueError("offsets has to be a 1D Tensor but got None")
return self._backend.EmbeddingBag(
self.max_norm, self.norm_type,
self.scale_grad_by_freq, mode=self.mode
)(self.weight, input, offsets)
def __init__(self, inputsize, outputsize, bias=True):
super(PartialLinear, self).__init__()
# define the layer as a small network:
pt = ParallelTable()
pt.add(Identity()).add(LookupTable(outputsize, inputsize))
self.network = Sequential().add(pt).add(MM(False, True))
if bias:
self.bias = torch.zeros(1, outputsize)
self.gradBias = torch.zeros(1, outputsize)
else:
self.bias = self.gradBias = None
# set partition:
self.inputsize = inputsize
self.outputsize = outputsize
self.allcolumns = torch.arange(0, self.outputsize).long()
self.resetPartition()
self.addBuffer = None
self.buffer = None
def test_exact_posterior():
train_mean = Variable(torch.randn(4))
train_y = Variable(torch.randn(4))
test_mean = Variable(torch.randn(4))
# Test case
c1_var = Variable(torch.Tensor([5, 1, 2, 0]), requires_grad=True)
c2_var = Variable(torch.Tensor([6, 0, 1, -1]), requires_grad=True)
indices = Variable(torch.arange(0, 4).long().view(4, 1))
values = Variable(torch.ones(4).view(4, 1))
toeplitz_1 = InterpolatedLazyVariable(ToeplitzLazyVariable(c1_var), indices, values, indices, values)
toeplitz_2 = InterpolatedLazyVariable(ToeplitzLazyVariable(c2_var), indices, values, indices, values)
sum_lv = toeplitz_1 + toeplitz_2
# Actual case
actual = sum_lv.evaluate()
# Test forward
actual_alpha = gpytorch.posterior_strategy(actual).exact_posterior_alpha(train_mean, train_y)
actual_mean = gpytorch.posterior_strategy(actual).exact_posterior_mean(test_mean, actual_alpha)
sum_lv_alpha = sum_lv.posterior_strategy().exact_posterior_alpha(train_mean, train_y)
sum_lv_mean = sum_lv.posterior_strategy().exact_posterior_mean(test_mean, sum_lv_alpha)
assert(torch.norm(actual_mean.data - sum_lv_mean.data) < 1e-4)
def diag(self):
batch_size, n_data, n_interp = self.left_interp_indices.size()
# Batch compute the non-zero values of the outer products w_left^k w_right^k^T
left_interp_values = self.left_interp_values.unsqueeze(3)
right_interp_values = self.right_interp_values.unsqueeze(2)
interp_values = torch.matmul(left_interp_values, right_interp_values)
# Batch compute Toeplitz values that will be non-zero for row k
left_interp_indices = self.left_interp_indices.unsqueeze(3).expand(batch_size, n_data, n_interp, n_interp)
left_interp_indices = left_interp_indices.contiguous()
right_interp_indices = self.right_interp_indices.unsqueeze(2).expand(batch_size, n_data, n_interp, n_interp)
right_interp_indices = right_interp_indices.contiguous()
batch_interp_indices = Variable(left_interp_indices.data.new(batch_size))
torch.arange(0, batch_size, out=batch_interp_indices.data)
batch_interp_indices = batch_interp_indices.view(batch_size, 1, 1, 1)
batch_interp_indices = batch_interp_indices.expand(batch_size, n_data, n_interp, n_interp).contiguous()
base_var_vals = self.base_lazy_variable._batch_get_indices(batch_interp_indices.view(-1),
left_interp_indices.view(-1),
right_interp_indices.view(-1))
base_var_vals = base_var_vals.view(left_interp_indices.size())
diag = (interp_values * base_var_vals).sum(3).sum(2).sum(0)
return diag
def test(self, dataset):
self.model.eval()
loss = 0
predictions = torch.zeros(len(dataset))
indices = torch.arange(1, dataset.num_classes + 1)
for idx in tqdm(range(len(dataset)), desc='Testing epoch ' + str(self.epoch) + ''):
ltree, lsent, rtree, rsent, label = dataset[idx]
linput, rinput = Var(lsent, volatile=True), Var(rsent, volatile=True)
target = Var(map_label_to_target(label, dataset.num_classes), volatile=True)
if self.args.cuda:
linput, rinput = linput.cuda(), rinput.cuda()
target = target.cuda()
output = self.model(ltree, linput, rtree, rinput)
err = self.criterion(output, target)
loss += err.data[0]
predictions[idx] = torch.dot(indices, torch.exp(output.data.cpu()))
return loss / len(dataset), predictions
def get_test_aug(factor):
if not factor or factor == 1:
return [
[False, False, False]]
elif factor == 4:
# transpose, v-flip, h-flip
return [
[False, False, False],
[False, False, True],
[False, True, False],
[True, True, True]]
elif factor == 8:
# return list of all combinations of flips and transpose
return ((1 & np.arange(0, 8)[:, np.newaxis] // 2**np.arange(2, -1, -1)) > 0).tolist()
else:
print('Invalid augmentation factor')
return [
[False, False, False]]
def test_dilate(self):
input = Variable(torch.arange(0, 13).view(1, 1, 13))
dilated, _ = dilate(input, 1)
self.assertEqual(dilated.size(), (1, 1, 13))
self.assertEqual(dilated[0, 0, 4].data[0], 4)
dilated, _ = dilate(input, 2)
self.assertEqual(dilated.size(), (2, 1, 7))
self.assertEqual(dilated[1, 0, 2].data[0], 4)
dilated, _ = dilate(input, 4)
self.assertEqual(dilated.size(), (4, 1, 4))
self.assertEqual(dilated[3, 0, 1].data[0], 4)
dilated, _ = dilate(dilated, 1)
self.assertEqual(dilated.size(), (1, 1, 16))
self.assertEqual(dilated[0, 0, 7].data[0], 4)
def make_length_mask(lengths):
"""
Compute binary length mask.
lengths: Variable torch.LongTensor(batch) should be on the desired
output device.
Returns:
--------
mask: torch.ByteTensor(batch x seq_len)
"""
maxlen, batch = lengths.data.max(), len(lengths)
mask = torch.arange(0, maxlen, out=lengths.data.new()) \
.repeat(batch, 1) \
.lt(lengths.data.unsqueeze(1))
return Variable(mask, volatile=lengths.volatile)
def select_cols(t, vec):
"""
`vec[i]` contains the index of the column to pick from the ith row in `t`
Parameters
----------
- t: torch.Tensor (m x n)
- vec: list or torch.LongTensor of size equal to number of rows in t
>>> x = torch.arange(0, 10).repeat(10, 1).t() # [[0, ...], [1, ...], ...]
>>> list(select_cols(x, list(range(10))))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
"""
if isinstance(vec, list):
vec = torch.LongTensor(vec)
return t.gather(1, vec.unsqueeze(1)).squeeze(1)
def __init__(self, inputsize, outputsize, bias=True):
super(PartialLinear, self).__init__()
# define the layer as a small network:
pt = ParallelTable()
pt.add(Identity()).add(LookupTable(outputsize, inputsize))
self.network = Sequential().add(pt).add(MM(False, True))
if bias:
self.bias = torch.zeros(1, outputsize)
self.gradBias = torch.zeros(1, outputsize)
else:
self.bias = self.gradBias = None
# set partition:
self.inputsize = inputsize
self.outputsize = outputsize
self.allcolumns = torch.arange(0, self.outputsize).long()
self.resetPartition()
self.addBuffer = None
self.buffer = None
def test_slice(self):
# TODO: remove the Variable wrapper once we merge Variable and Tensor
from torch.autograd import Variable
empty = Variable(torch.Tensor())
x = Variable(torch.arange(0, 16).view(4, 4))
self.assertEqual(x.slice(), x)
self.assertEqual(x.slice(0, 4), x)
# start and stop are clamped to the size of dim
self.assertEqual(x.slice(0, 5), x)
# if start >= stop then the result is empty
self.assertEqual(x.slice(2, 1), empty)
self.assertEqual(x.slice(2, 2), empty)
# out of bounds is also empty
self.assertEqual(x.slice(10, 12), empty)
# additional correctness checks
self.assertEqual(x.slice(0, 1).data.tolist(), [[0, 1, 2, 3]])
self.assertEqual(x.slice(0, -3).data.tolist(), [[0, 1, 2, 3]])
self.assertEqual(x.slice(-2, 3, dim=1).data.tolist(), [[2], [6], [10], [14]])
self.assertEqual(x.slice(0, -1, 2).data.tolist(), [[0, 1, 2, 3], [8, 9, 10, 11]])
def test_serialization_backwards_compat(self):
a = [torch.arange(1 + i, 26 + i).view(5, 5).float() for i in range(2)]
b = [a[i % 2] for i in range(4)]
b += [a[0].storage()]
b += [a[0].storage()[1:4]]
path = download_file('https://download.pytorch.org/test_data/legacy_serialized.pt')
c = torch.load(path)
self.assertEqual(b, c, 0)
self.assertTrue(isinstance(c[0], torch.FloatTensor))
self.assertTrue(isinstance(c[1], torch.FloatTensor))
self.assertTrue(isinstance(c[2], torch.FloatTensor))
self.assertTrue(isinstance(c[3], torch.FloatTensor))
self.assertTrue(isinstance(c[4], torch.FloatStorage))
c[0].fill_(10)
self.assertEqual(c[0], c[2], 0)
self.assertEqual(c[4], torch.FloatStorage(25).fill_(10), 0)
c[1].fill_(20)
self.assertEqual(c[1], c[3], 0)
self.assertEqual(c[4][1:4], c[5], 0)
def value(self):
"""Returns the model's average precision for each class
Return:
ap (FloatTensor): 1xK tensor, with avg precision for each class k
"""
if self.scores.numel() == 0:
return 0
ap = torch.zeros(self.scores.size(1))
rg = torch.arange(1, self.scores.size(0)).float()
# compute average precision for each class
for k in range(self.scores.size(1)):
# sort scores
scores = self.scores[:, k]
targets = self.targets[:, k]
# compute average precision
ap[k] = AveragePrecisionMeter.average_precision(scores, targets, self.difficult_examples)
return ap
def train_batch(self, bs):
"""
Get a batch of random images with their attributes.
"""
# image IDs
idx = torch.LongTensor(bs).random_(len(self.images))
# select images / attributes
batch_x = normalize_images(self.images.index_select(0, idx).cuda())
batch_y = self.attributes.index_select(0, idx).cuda()
# data augmentation
if self.v_flip and np.random.rand() <= 0.5:
batch_x = batch_x.index_select(2, torch.arange(batch_x.size(2) - 1, -1, -1).long().cuda())
if self.h_flip and np.random.rand() <= 0.5:
batch_x = batch_x.index_select(3, torch.arange(batch_x.size(3) - 1, -1, -1).long().cuda())
return Variable(batch_x, volatile=False), Variable(batch_y, volatile=False)
def filterbank(self,gx,gy,sigma2,delta):
rng = Variable(torch.arange(0,self.N).view(1,-1))
mu_x = self.compute_mu(gx,rng,delta)
mu_y = self.compute_mu(gy,rng,delta)
a = Variable(torch.arange(0,self.A).view(1,1,-1))
b = Variable(torch.arange(0,self.B).view(1,1,-1))
mu_x = mu_x.view(-1,self.N,1)
mu_y = mu_y.view(-1,self.N,1)
sigma2 = sigma2.view(-1,1,1)
Fx = self.filterbank_matrices(a,mu_x,sigma2)
Fy = self.filterbank_matrices(b,mu_y,sigma2)
return Fx,Fy
def testBatchDataset(self):
if hasattr(torch, "arange"):
t = torch.arange(0, 16).long()
else:
t = torch.range(0, 15).long()
batchsize = 8
d = dataset.ListDataset(t, lambda x: {'input': x})
d = dataset.BatchDataset(d, batchsize)
ex = d[0]['input']
self.assertEqual(len(ex), batchsize)
self.assertEqual(ex[-1], batchsize - 1)
# def testTransformDataset(self):
# d = dataset.TransformDataset(dataset.TensorDataset()
def get_range_vector(size: int, is_cuda: bool) -> torch.Tensor:
"""
Returns a range vector with the desired size, starting at 0. The CUDA implementation
is meant to avoid copy data from CPU to GPU.
"""
if is_cuda:
indices = torch.cuda.LongTensor(size).fill_(1).cumsum(0) - 1
else:
indices = torch.arange(0, size).long()
return Variable(indices, requires_grad=False)