def test_hooks_cycle(self):
import gc
counter = [0]
class GradHook(object):
def __init__(self, var):
self.var = var
def __del__(self):
counter[0] += 1
def __call__(self, *args):
pass
def run_test():
x = Variable(torch.ones(5, 5), requires_grad=True)
y = x * 2
x.register_hook(GradHook(x))
y.register_hook(GradHook(y))
y._backward_hooks[1] = GradHook(y)
run_test()
gc.collect()
self.assertEqual(counter[0], 3)
python类var()的实例源码
def var(x, axis=None, keepdims=False):
def _var(x, axis, keepdims):
y = torch.var(x, axis)
# Since keepdims argument of torch not functional
return y if keepdims else torch.squeeze(y, axis)
def _compute_output_shape(x, axis, keepdims):
if axis is None:
return ()
shape = list(_get_shape(x))
if keepdims:
shape[axis] = 1
else:
del shape[axis]
return tuple(shape)
return get_op(_var, output_shape=_compute_output_shape, arguments=[axis, keepdims])(x)
def forward(self, x):
n = x.size(2) * x.size(3)
t = x.view(x.size(0), x.size(1), n)
mean = torch.mean(t, 2).unsqueeze(2).unsqueeze(3).expand_as(x)
# Calculate the biased var. torch.var returns unbiased var
var = torch.var(t, 2).unsqueeze(2).unsqueeze(3).expand_as(x) * ((n - 1) / float(n))
scale_broadcast = self.scale.unsqueeze(1).unsqueeze(1).unsqueeze(0)
scale_broadcast = scale_broadcast.expand_as(x)
shift_broadcast = self.shift.unsqueeze(1).unsqueeze(1).unsqueeze(0)
shift_broadcast = shift_broadcast.expand_as(x)
out = (x - mean) / torch.sqrt(var + self.eps)
out = out * scale_broadcast + shift_broadcast
return out
def test_type_conversions(self):
x = Variable(torch.randn(5, 5))
self.assertIs(type(x.float().data), torch.FloatTensor)
self.assertIs(type(x.int().data), torch.IntTensor)
if torch.cuda.is_available():
self.assertIs(type(x.float().cuda().data), torch.cuda.FloatTensor)
self.assertIs(type(x.int().cuda().data), torch.cuda.IntTensor)
self.assertIs(type(x.int().cuda().cpu().data), torch.IntTensor)
if torch.cuda.device_count() >= 2:
x2 = x.float().cuda(1)
self.assertIs(type(x2.data), torch.cuda.FloatTensor)
self.assertIs(x2.get_device(), 1)
x2 = x.float().cuda()
self.assertIs(type(x2.data), torch.cuda.FloatTensor)
self.assertIs(x2.get_device(), 0)
x2 = x2.cuda(1)
self.assertIs(type(x2.data), torch.cuda.FloatTensor)
self.assertIs(x2.get_device(), 1)
y = Variable(torch.randn(5).cuda(1), requires_grad=True)
y.cpu().sum().backward()
self.assertIs(y.grad.get_device(), 1)
self.assertIs(y.long().data.get_device(), 1)
for t in [torch.DoubleTensor, torch.FloatTensor, torch.IntTensor, torch.ByteTensor]:
for var in (True, False):
y = torch.randn(5, 5).type(t)
if var:
y = Variable(y)
self.assertIs(type(x.type_as(y).data), t)
def forward(self, x):
n = x.size(2) * x.size(3)
t = x.view(x.size(0), x.size(1), n)
mean = torch.mean(t, 2).unsqueeze(2).expand_as(x)
# Calculate the biased var. torch.var returns unbiased var
var = torch.var(t, 2).unsqueeze(2).expand_as(x) * ((n - 1) / float(n))
scale_broadcast = self.weight.unsqueeze(1).unsqueeze(1).unsqueeze(0)
scale_broadcast = scale_broadcast.expand_as(x)
shift_broadcast = self.bias.unsqueeze(1).unsqueeze(1).unsqueeze(0)
shift_broadcast = shift_broadcast.expand_as(x)
out = (x - mean) / torch.sqrt(var + self.eps)
out = out * scale_broadcast + shift_broadcast
return out
def forward(self, x):
n = x.size(2) * x.size(3)
t = x.view(x.size(0), x.size(1), n)
mean = torch.mean(t, 2).unsqueeze(2).expand_as(x)
# Calculate the biased var. torch.var returns unbiased var
var = torch.var(t, 2).unsqueeze(2).expand_as(x) * ((n - 1) / float(n))
scale_broadcast = self.weight.unsqueeze(1).unsqueeze(1).unsqueeze(0)
scale_broadcast = scale_broadcast.expand_as(x)
shift_broadcast = self.bias.unsqueeze(1).unsqueeze(1).unsqueeze(0)
shift_broadcast = shift_broadcast.expand_as(x)
out = (x - mean) / torch.sqrt(var + self.eps)
out = out * scale_broadcast + shift_broadcast
return out
def test_keepdim_warning(self):
torch.utils.backcompat.keepdim_warning.enabled = True
x = Variable(torch.randn(3, 4), requires_grad=True)
def run_backward(y):
y_ = y
if type(y) is tuple:
y_ = y[0]
# check that backward runs smooth
y_.backward(y_.data.new(y_.size()).normal_())
def keepdim_check(f):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
y = f(x, 1)
self.assertTrue(len(w) == 1)
self.assertTrue(issubclass(w[-1].category, UserWarning))
self.assertTrue("keepdim" in str(w[-1].message))
run_backward(y)
self.assertEqual(x.size(), x.grad.size())
# check against explicit keepdim
y2 = f(x, 1, keepdim=False)
self.assertEqual(y, y2)
run_backward(y2)
y3 = f(x, 1, keepdim=True)
if type(y3) == tuple:
y3 = (y3[0].squeeze(1), y3[1].squeeze(1))
else:
y3 = y3.squeeze(1)
self.assertEqual(y, y3)
run_backward(y3)
keepdim_check(torch.sum)
keepdim_check(torch.prod)
keepdim_check(torch.mean)
keepdim_check(torch.max)
keepdim_check(torch.min)
keepdim_check(torch.mode)
keepdim_check(torch.median)
keepdim_check(torch.kthvalue)
keepdim_check(torch.var)
keepdim_check(torch.std)
torch.utils.backcompat.keepdim_warning.enabled = False