def evaluate(model, testloader, use_cuda=False):
correct = 0
total = 0
for i, data in enumerate(testloader, 0):
if i == 10:
break
inputs, targets = data
inputs = inputs.unsqueeze(1)
targets = target_onehot_to_classnum_tensor(targets)
if use_cuda and cuda_ava:
inputs = Variable(inputs.float().cuda())
targets = targets.cuda()
else:
inputs = Variable(inputs.float())
outputs = model(inputs)
_, predicted = torch.max(outputs.data, 1)
total += targets.size(0)
correct += (predicted == targets).sum()
print("Accuracy of the network is: %.5f %%" % (correct / total * 100))
return correct / total
python类Variable()的实例源码
def train(self, dataset):
self.model.train()
self.optimizer.zero_grad()
total_loss = 0.0
indices = torch.randperm(len(dataset))
for idx in tqdm(range(len(dataset)),desc='Training epoch ' + str(self.epoch + 1) + ''):
ltree, lsent, rtree, rsent, label = dataset[indices[idx]]
linput, rinput = Var(lsent), Var(rsent)
target = Var(map_label_to_target(label, dataset.num_classes))
if self.args.cuda:
linput, rinput = linput.cuda(), rinput.cuda()
target = target.cuda()
output = self.model(ltree, linput, rtree, rinput)
loss = self.criterion(output, target)
total_loss += loss.data[0]
loss.backward()
if idx % self.args.batchsize == 0 and idx > 0:
self.optimizer.step()
self.optimizer.zero_grad()
self.epoch += 1
return total_loss / len(dataset)
# helper function for testing
def test(self, dataset):
self.model.eval()
total_loss = 0
predictions = torch.zeros(len(dataset))
indices = torch.arange(1, dataset.num_classes + 1)
for idx in tqdm(range(len(dataset)),desc='Testing epoch ' + str(self.epoch) + ''):
ltree, lsent, rtree, rsent, label = dataset[idx]
linput, rinput = Var(lsent, volatile=True), Var(rsent, volatile=True)
target = Var(map_label_to_target(label, dataset.num_classes), volatile=True)
if self.args.cuda:
linput, rinput = linput.cuda(), rinput.cuda()
target = target.cuda()
output = self.model(ltree, linput, rtree, rinput)
loss = self.criterion(output, target)
total_loss += loss.data[0]
output = output.data.squeeze().cpu()
predictions[idx] = torch.dot(indices, torch.exp(output))
return total_loss / len(dataset), predictions
def query(self, images):
if self.pool_size == 0:
return images
return_images = []
for image in images.data:
image = torch.unsqueeze(image, 0)
if self.num_imgs < self.pool_size:
self.num_imgs = self.num_imgs + 1
self.images.append(image)
return_images.append(image)
else:
p = random.uniform(0, 1)
if p > 0.5:
random_id = random.randint(0, self.pool_size-1)
tmp = self.images[random_id].clone()
self.images[random_id] = image
return_images.append(tmp)
else:
return_images.append(image)
return_images = Variable(torch.cat(return_images, 0))
return return_images
def get_target_tensor(self, input, target_is_real):
target_tensor = None
if target_is_real:
create_label = ((self.real_label_var is None) or
(self.real_label_var.numel() != input.numel()))
if create_label:
real_tensor = self.Tensor(input.size()).fill_(self.real_label)
self.real_label_var = Variable(real_tensor, requires_grad=False)
target_tensor = self.real_label_var
else:
create_label = ((self.fake_label_var is None) or
(self.fake_label_var.numel() != input.numel()))
if create_label:
fake_tensor = self.Tensor(input.size()).fill_(self.fake_label)
self.fake_label_var = Variable(fake_tensor, requires_grad=False)
target_tensor = self.fake_label_var
return target_tensor
def train(self):
for i, data in enumerate(self.dataset, self.iterations + 1):
batch_input, batch_target = data
self.call_plugins('batch', i, batch_input, batch_target)
input_var = Variable(batch_input)
target_var = Variable(batch_target)
plugin_data = [None, None]
def closure():
batch_output = self.model(input_var)
loss = self.criterion(batch_output, target_var)
loss.backward()
if plugin_data[0] is None:
plugin_data[0] = batch_output.data
plugin_data[1] = loss.data
return loss
self.optimizer.zero_grad()
self.optimizer.step(closure)
self.call_plugins('iteration', i, batch_input, batch_target,
*plugin_data)
self.call_plugins('update', i, self.model)
self.iterations += i
def __setattr__(self, name, value):
_parameters = self.__dict__.get('_parameters')
if isinstance(value, Parameter):
if _parameters is None:
raise AttributeError(
"cannot assign parameter before Module.__init__() call")
if value.creator:
raise ValueError(
"Cannot assign non-leaf Variable to parameter '{0}'. Model "
"parameters must be created explicitly. To express '{0}' "
"as a function of another variable, compute the value in "
"the forward() method.".format(name))
_parameters[name] = value
elif _parameters and name in _parameters:
if value is not None:
raise TypeError("cannot assign '{}' object to parameter '{}' "
"(torch.nn.Parameter or None required)"
.format(torch.typename(value), name))
_parameters[name] = value
else:
object.__setattr__(self, name, value)
def _test_dropout(self, cls, input):
p = 0.2
input.fill_(1-p)
module = cls(p)
input_var = Variable(input, requires_grad=True)
output = module(input_var)
self.assertLess(abs(output.data.mean() - (1-p)), 0.05)
output.backward(input)
self.assertLess(abs(input_var.grad.mean() - (1-p)), 0.05)
module = cls(p, True)
input_var = Variable(input.clone(), requires_grad=True)
output = module(input_var + 0)
self.assertLess(abs(output.data.mean() - (1-p)), 0.05)
output.backward(input)
self.assertLess(abs(input_var.grad.mean() - (1-p)), 0.05)
# Check that these don't raise errors
module.__repr__()
str(module)
def test_parallel_apply(self):
l1 = nn.Linear(10, 5).float().cuda(0)
l2 = nn.Linear(10, 5).float().cuda(1)
i1 = Variable(torch.randn(2, 10).float().cuda(0))
i2 = Variable(torch.randn(2, 10).float().cuda(1))
expected1 = l1(i1).data
expected2 = l2(i2).data
inputs = (i1, i2)
modules = (l1, l2)
expected_outputs = (expected1, expected2)
outputs = dp.parallel_apply(modules, inputs)
for out, expected in zip(outputs, expected_outputs):
self.assertEqual(out.data, expected)
inputs = (i1, Variable(i2.data.new()))
expected_outputs = (expected1, expected2.new())
def test_load_parameter_dict(self):
l = nn.Linear(5, 5)
block = nn.Container(
conv=nn.Conv2d(3, 3, 3, bias=False)
)
net = nn.Container(
linear1=l,
linear2=l,
block=block,
empty=None,
)
param_dict = {
'linear1.weight': Variable(torch.ones(5, 5)),
'block.conv.bias': Variable(torch.range(1, 3)),
}
net.load_parameter_dict(param_dict)
self.assertIs(net.linear1.weight, param_dict['linear1.weight'])
self.assertIs(net.block.conv.bias, param_dict['block.conv.bias'])
def test_MaxUnpool2d_output_size(self):
m = nn.MaxPool2d(3, stride=2, return_indices=True)
mu = nn.MaxUnpool2d(3, stride=2)
big_t = torch.rand(1, 1, 6, 6)
big_t[0][0][4][4] = 100
output_big, indices_big = m(Variable(big_t))
self.assertRaises(RuntimeError, lambda: mu(output_big, indices_big))
small_t = torch.rand(1, 1, 5, 5)
for i in range(0, 4, 2):
for j in range(0, 4, 2):
small_t[:,:,i,j] = 100
output_small, indices_small = m(Variable(small_t))
for h in range(3, 10):
for w in range(3, 10):
if 4 <= h <= 6 and 4 <= w <= 6:
size = (h, w)
if h == 5:
size = torch.LongStorage(size)
elif h == 6:
size = torch.LongStorage((1, 1) + size)
mu(output_small, indices_small, output_size=size)
else:
self.assertRaises(ValueError, lambda:
mu(output_small, indices_small, (h, w)))
def _test_basic_cases_template(self, weight, bias, input, constructor):
weight = Variable(weight, requires_grad=True)
bias = Variable(bias, requires_grad=True)
input = Variable(input, requires_grad=False)
optimizer = constructor(weight, bias)
def fn():
y = weight.mv(input)
if y.is_cuda and bias.is_cuda and y.get_device() != bias.get_device():
y = y.cuda(bias.get_device())
return (y + bias).abs().sum()
initial_value = fn().data[0]
for i in range(200):
weight.grad.zero_()
bias.grad.zero_()
fn().backward()
optimizer.step()
self.assertLessEqual(fn().data[0], initial_value)
def _numerical_jacobian(self, module, input, jacobian_input=True, jacobian_parameters=True):
output = self._forward(module, input)
output_size = output.nelement()
if jacobian_parameters:
param, d_param = self._get_parameters(module)
def fw(input):
out = self._forward(module, input)
if isinstance(out, Variable):
return out.data
return out
res = tuple()
# TODO: enable non-contig tests
input = contiguous(input)
if jacobian_input:
res += get_numerical_jacobian(fw, input, input),
if jacobian_parameters:
res += torch.cat(list(get_numerical_jacobian(fw, input, p) for p in param), 0),
return res
def __call__(self, test_case):
module = self.constructor(*self.constructor_args)
input = self._get_input()
if self.reference_fn is not None:
out = test_case._forward(module, input)
if isinstance(out, Variable):
out = out.data
ref_input = self._unpack_input(deepcopy(input))
expected_out = self.reference_fn(ref_input, test_case._get_parameters(module)[0])
test_case.assertEqual(out, expected_out)
# TODO: do this with in-memory files as soon as torch.save will support it
with TemporaryFile() as f:
test_case._forward(module, input)
torch.save(module, f)
f.seek(0)
module_copy = torch.load(f)
test_case.assertEqual(test_case._forward(module, input), test_case._forward(module_copy, input))
self._do_test(test_case, module, input)
def __call__(self, test_case):
module = self.constructor(*self.constructor_args)
input = self._get_input()
# Check that these methods don't raise errors
module.__repr__()
str(module)
if self.reference_fn is not None:
out = test_case._forward_criterion(module, input, self.target)
target = self.target
if isinstance(target, Variable):
target = target.data
expected_out = self.reference_fn(deepcopy(self._unpack_input(input)),
deepcopy(target), module)
test_case.assertEqual(out, expected_out)
test_case.check_criterion_jacobian(module, input, self.target)
def to_gpu(obj, type_map={}):
if torch.is_tensor(obj):
t = type_map.get(type(obj), get_gpu_type(type(obj)))
return obj.clone().type(t)
elif torch.is_storage(obj):
return obj.new().resize_(obj.size()).copy_(obj)
elif isinstance(obj, Variable):
assert obj.creator is None
t = type_map.get(type(obj.data), get_gpu_type(type(obj.data)))
return Variable(obj.data.clone().type(t), requires_grad=obj.requires_grad)
elif isinstance(obj, list):
return [to_gpu(o, type_map) for o in obj]
elif isinstance(obj, tuple):
return tuple(to_gpu(o, type_map) for o in obj)
else:
return deepcopy(obj)
def forward(self, input):
input_torch = torch.from_numpy(input)
if self.use_gpu:
input_torch = input_torch.cuda()
else:
input_torch = input_torch.float()
input_var = Variable(input_torch)
# forward
out = self.model.forward(input_var)
if type(out) is list:
clean_out = []
for v in out:
clean_out.append(v.data.cpu().numpy())
out = clean_out
else:
out = out.data.cpu().numpy()
self.ready = True
return out
def __init__(self, phase, base, extras, head, num_classes):
super(SSD, self).__init__()
self.phase = phase
self.num_classes = num_classes
# TODO: implement __call__ in PriorBox
self.priorbox = PriorBox(v2)
self.priors = Variable(self.priorbox.forward(), volatile=True)
self.size = 300
# SSD network
self.vgg = nn.ModuleList(base)
# Layer learns to scale the l2 normalized features from conv4_3
self.L2Norm = L2Norm(512, 20)
self.extras = nn.ModuleList(extras)
self.loc = nn.ModuleList(head[0])
self.conf = nn.ModuleList(head[1])
if phase == 'test':
self.softmax = nn.Softmax()
self.detect = Detect(num_classes, 0, 200, 0.01, 0.45)
def fast_heat_similarity_matrix(X, sigma):
"""
PyTorch based similarity calculation
:param X: the matrix with the data
:param sigma: scaling factor
:return: the similarity matrix
"""
use_gpu = False
# Use GPU if available
if torch.cuda.device_count() > 0:
use_gpu = True
X = Variable(torch.from_numpy(np.float32(X)))
sigma = Variable(torch.from_numpy(np.float32([sigma])))
if use_gpu:
X, sigma = X.cuda(), sigma.cuda()
D = sym_heat_similarity_matrix(X, sigma)
if use_gpu:
D = D.cpu()
return D.data.numpy()
def test(netG, opt):
assert opt.netG != ''
test_dir = opt.testdata_dir
for f in os.listdir(test_dir):
fname, ext = os.path.splitext(f)
if ext == '.cmp':
print(fname)
cmp_file = os.path.join(test_dir, f)
ac_data = read_binary_file(cmp_file, dim=47)
ac_data = torch.FloatTensor(ac_data)
noise = torch.FloatTensor(ac_data.size(0), nz)
if opt.cuda:
ac_data, noise = ac_data.cuda(), noise.cuda()
ac_data = Variable(ac_data)
noise = Variable(noise)
noise.data.normal_(0, 1)
generated_pulses = netG(noise, ac_data)
generated_pulses = generated_pulses.data.cpu().numpy()
generated_pulses = generated_pulses.reshape(ac_data.size(0), -1)
out_file = os.path.join(test_dir, fname + '.pls')
with open(out_file, 'wb') as fid:
generated_pulses.tofile(fid)