def _access(self, memory_vb): # write
"""
variables needed:
wl_curr_vb: [batch_size x num_heads x mem_hei]
erase_vb: [batch_size x num_heads x mem_wid]
-> /in (0, 1)
add_vb: [batch_size x num_heads x mem_wid]
-> w/ no restrictions in range
memory_vb: [batch_size x mem_hei x mem_wid]
returns:
memory_vb: [batch_size x mem_hei x mem_wid]
NOTE: IMPORTANT: https://github.com/deepmind/dnc/issues/10
"""
# first let's do erasion
weighted_erase_vb = torch.bmm(self.wl_curr_vb.contiguous().view(-1, self.mem_hei, 1),
self.erase_vb.contiguous().view(-1, 1, self.mem_wid)).view(-1, self.num_heads, self.mem_hei, self.mem_wid)
keep_vb = torch.prod(1. - weighted_erase_vb, dim=1)
memory_vb = memory_vb * keep_vb
# finally let's write (do addition)
return memory_vb + torch.bmm(self.wl_curr_vb.transpose(1, 2), self.add_vb)
python类prod()的实例源码
def _access(self, memory_vb): # write
"""
variables needed:
wl_curr_vb: [batch_size x num_heads x mem_hei]
erase_vb: [batch_size x num_heads x mem_wid]
-> /in (0, 1)
add_vb: [batch_size x num_heads x mem_wid]
-> w/ no restrictions in range
memory_vb: [batch_size x mem_hei x mem_wid]
returns:
memory_vb: [batch_size x mem_hei x mem_wid]
NOTE: IMPORTANT: https://github.com/deepmind/dnc/issues/10
"""
# first let's do erasion
weighted_erase_vb = torch.bmm(self.wl_curr_vb.contiguous().view(-1, self.mem_hei, 1),
self.erase_vb.contiguous().view(-1, 1, self.mem_wid)).view(-1, self.num_heads, self.mem_hei, self.mem_wid)
keep_vb = torch.prod(1. - weighted_erase_vb, dim=1)
memory_vb = memory_vb * keep_vb
# finally let's write (do addition)
return memory_vb + torch.bmm(self.wl_curr_vb.transpose(1, 2), self.add_vb)
def fake_cumprod(vb):
"""
args:
vb: [hei x wid]
-> NOTE: we are lazy here so now it only supports cumprod along wid
"""
# real_cumprod = torch.cumprod(vb.data, 1)
vb = vb.unsqueeze(0)
mul_mask_vb = Variable(torch.zeros(vb.size(2), vb.size(1), vb.size(2))).type_as(vb)
for i in range(vb.size(2)):
mul_mask_vb[i, :, :i+1] = 1
add_mask_vb = 1 - mul_mask_vb
vb = vb.expand_as(mul_mask_vb) * mul_mask_vb + add_mask_vb
# vb = torch.prod(vb, 2).transpose(0, 2) # 0.1.12
vb = torch.prod(vb, 2, keepdim=True).transpose(0, 2) # 0.2.0
# print(real_cumprod - vb.data) # NOTE: checked, ==0
return vb
def prod(x, axis=None, keepdims=False):
def _prod(x, axis, keepdims):
y = torch.prod(x, axis)
# Since keepdims argument of torch not functional
return y if keepdims else torch.squeeze(y, axis)
def _compute_output_shape(x, axis, keepdims):
if axis is None:
return ()
shape = list(_get_shape(x))
if keepdims:
shape[axis] = 1
else:
del shape[axis]
return tuple(shape)
return get_op(_prod, output_shape=_compute_output_shape, arguments=[axis, keepdims])(x)
def reshape(x, shape):
def _reshape(x, shape=shape):
return x.view(shape)
def _compute_output_shape(x, shape=shape):
if -1 not in shape:
return shape
else:
n_elems = np.prod(list(_get_shape(x)))
new_shape = list(shape)
new_shape.remove(-1)
new_axis = n_elems // np.prod(new_shape)
s = list(shape)
s[s.index(-1)] = new_axis
return tuple(s)
return get_op(_reshape, output_shape=_compute_output_shape, arguments=shape)(x)
def test_prod(self):
x = torch.rand(100, 100)
res1 = torch.prod(x, 1)
res2 = torch.Tensor()
torch.prod(res2, x, 1)
self.assertEqual(res1, res2)
def _consecutive(self, size, start=1):
sequence = torch.ones(int(torch.Tensor(size).prod(0)[0])).cumsum(0)
sequence.add_(start - 1)
return sequence.resize_(*size)
def test_prod(self):
x = torch.rand(100, 100)
res1 = torch.prod(x, 1)
res2 = torch.Tensor()
torch.prod(x, 1, out=res2)
self.assertEqual(res1, res2)
def _consecutive(self, size, start=1):
sequence = torch.ones(int(torch.Tensor(size).prod(0)[0])).cumsum(0)
sequence.add_(start - 1)
return sequence.resize_(*size)
def test_dim_reduction(self):
dim_red_fns = [
"mean", "median", "mode", "norm", "prod",
"std", "sum", "var", "max", "min"]
def normfn_attr(t, dim, keepdim=True):
attr = getattr(torch, "norm")
return attr(t, 2, dim, keepdim)
for fn_name in dim_red_fns:
x = torch.randn(3, 4, 5)
fn_attr = getattr(torch, fn_name) if fn_name != "norm" else normfn_attr
def fn(t, dim, keepdim=True):
ans = fn_attr(x, dim, keepdim)
return ans if not isinstance(ans, tuple) else ans[0]
dim = random.randint(0, 2)
self.assertEqual(fn(x, dim, False).unsqueeze(dim), fn(x, dim))
self.assertEqual(x.ndimension() - 1, fn(x, dim, False).ndimension())
self.assertEqual(x.ndimension(), fn(x, dim, True).ndimension())
# check 1-d behavior
x = torch.randn(1)
dim = 0
self.assertEqual(fn(x, dim), fn(x, dim, True))
self.assertEqual(x.ndimension(), fn(x, dim).ndimension())
self.assertEqual(x.ndimension(), fn(x, dim, True).ndimension())
def test_prod(self):
x = torch.rand(100, 100)
res1 = torch.prod(x, 1)
res2 = torch.Tensor()
torch.prod(x, 1, out=res2)
self.assertEqual(res1, res2)
def _consecutive(self, size, start=1):
sequence = torch.ones(int(torch.Tensor(size).prod(0)[0])).cumsum(0)
sequence.add_(start - 1)
return sequence.resize_(*size)
def vector_to_parameters(vec, parameters):
"""Convert one vector to the parameters
Arguments:
vec (Variable): a single vector represents the parameters of a model.
parameters (Iterable[Variable]): an iterator of Variables that are the
parameters of a model.
"""
# Ensure vec of type Variable
if not isinstance(vec, Variable):
raise TypeError('expected torch.autograd.Variable, but got: {}'
.format(torch.typename(vec)))
# Flag for the device where the parameter is located
param_device = None
# Pointer for slicing the vector for each parameter
pointer = 0
for param in parameters:
# Ensure the parameters are located in the same device
param_device = _check_param_device(param, param_device)
# The length of the parameter
num_param = torch.prod(torch.LongTensor(list(param.size())))
# Slice the vector, reshape it, and replace the old data of the parameter
param.data = vec[pointer:pointer + num_param].view(param.size()).data
# Increment the pointer
pointer += num_param
def _test_dim_reduction(self, cast):
dim_red_fns = [
"mean", "median", "mode", "norm", "prod",
"std", "sum", "var", "max", "min"]
def normfn_attr(t, dim, keepdim=False):
attr = getattr(torch, "norm")
return attr(t, 2, dim, keepdim)
for fn_name in dim_red_fns:
fn_attr = getattr(torch, fn_name) if fn_name != "norm" else normfn_attr
def fn(x, dim, keepdim=False):
ans = fn_attr(x, dim, keepdim=keepdim)
return ans if not isinstance(ans, tuple) else ans[0]
def test_multidim(x, dim):
self.assertEqual(fn(x, dim).unsqueeze(dim), fn(x, dim, keepdim=True))
self.assertEqual(x.ndimension() - 1, fn(x, dim).ndimension())
self.assertEqual(x.ndimension(), fn(x, dim, keepdim=True).ndimension())
# general case
x = cast(torch.randn(3, 4, 5))
dim = random.randint(0, 2)
test_multidim(x, dim)
# check 1-d behavior
x = cast(torch.randn(1))
dim = 0
self.assertEqual(fn(x, dim), fn(x, dim, keepdim=True))
self.assertEqual(x.ndimension(), fn(x, dim).ndimension())
self.assertEqual(x.ndimension(), fn(x, dim, keepdim=True).ndimension())
# check reducing of a singleton dimension
dims = [3, 4, 5]
singleton_dim = random.randint(0, 2)
dims[singleton_dim] = 1
x = cast(torch.randn(dims))
test_multidim(x, singleton_dim)
def test_prod(self):
x = torch.rand(100, 100)
res1 = torch.prod(x, 1)
res2 = torch.Tensor()
torch.prod(x, 1, out=res2)
self.assertEqual(res1, res2)
def _consecutive(self, size, start=1):
sequence = torch.ones(int(torch.Tensor(size).prod(0)[0])).cumsum(0)
sequence.add_(start - 1)
return sequence.resize_(*size)
def _update_usage(self, hidden_vb, prev_usage_vb):
"""
calculates the new usage after reading and freeing from memory
variables needed:
hidden_vb: [batch_size x hidden_dim]
prev_usage_vb: [batch_size x mem_hei]
free_gate_vb: [batch_size x num_heads x 1]
wl_prev_vb: [batch_size x num_heads x mem_hei]
returns:
usage_vb: [batch_size x mem_hei]
"""
self.free_gate_vb = F.sigmoid(self.hid_2_free_gate(hidden_vb)).view(-1, self.num_heads, 1)
free_read_weights_vb = self.free_gate_vb.expand_as(self.wl_prev_vb) * self.wl_prev_vb
psi_vb = torch.prod(1. - free_read_weights_vb, 1)
return prev_usage_vb * psi_vb
def _update_usage(self, prev_usage_vb):
"""
calculates the new usage after writing to memory
variables needed:
prev_usage_vb: [batch_size x mem_hei]
wl_prev_vb: [batch_size x num_write_heads x mem_hei]
returns:
usage_vb: [batch_size x mem_hei]
"""
# calculate the aggregated effect of all write heads
# NOTE: how multiple write heads are delt w/ is not discussed in the paper
# NOTE: this part is only shown in the source code
write_weights_vb = 1. - torch.prod(1. - self.wl_prev_vb, 1)
return prev_usage_vb + (1. - prev_usage_vb) * write_weights_vb
def vector_to_parameters(vec, parameters):
r"""Convert one vector to the parameters
Arguments:
vec (Variable): a single vector represents the parameters of a model.
parameters (Iterable[Variable]): an iterator of Variables that are the
parameters of a model.
"""
# Ensure vec of type Variable
if not isinstance(vec, Variable):
raise TypeError('expected torch.autograd.Variable, but got: {}'
.format(torch.typename(vec)))
# Flag for the device where the parameter is located
param_device = None
# Pointer for slicing the vector for each parameter
pointer = 0
for param in parameters:
# Ensure the parameters are located in the same device
param_device = _check_param_device(param, param_device)
# The length of the parameter
num_param = torch.prod(torch.LongTensor(list(param.size())))
# Slice the vector, reshape it, and replace the old data of the parameter
param.data = vec[pointer:pointer + num_param].view(param.size()).data
# Increment the pointer
pointer += num_param
def forward(ctx, input, dim=None, keepdim=None):
ctx.dim = dim
ctx.keepdim = False if keepdim is None else keepdim
ctx.input_size = input.size()
if dim is None:
ctx.result = input.prod()
ctx.save_for_backward(input)
return input.new((ctx.result,))
else:
if keepdim is not None:
output = input.prod(dim, keepdim=keepdim)
else:
output = input.prod(dim)
ctx.save_for_backward(input, output)
return output
def _test_dim_reduction(self, cast):
dim_red_fns = [
"mean", "median", "mode", "norm", "prod",
"std", "sum", "var", "max", "min"]
def normfn_attr(t, dim, keepdim=False):
attr = getattr(torch, "norm")
return attr(t, 2, dim, keepdim)
for fn_name in dim_red_fns:
fn_attr = getattr(torch, fn_name) if fn_name != "norm" else normfn_attr
def fn(x, dim, keepdim=False):
ans = fn_attr(x, dim, keepdim=keepdim)
return ans if not isinstance(ans, tuple) else ans[0]
def test_multidim(x, dim):
self.assertEqual(fn(x, dim).unsqueeze(dim), fn(x, dim, keepdim=True))
self.assertEqual(x.ndimension() - 1, fn(x, dim).ndimension())
self.assertEqual(x.ndimension(), fn(x, dim, keepdim=True).ndimension())
# general case
x = cast(torch.randn(3, 4, 5))
dim = random.randint(0, 2)
test_multidim(x, dim)
# check 1-d behavior
x = cast(torch.randn(1))
dim = 0
self.assertEqual(fn(x, dim), fn(x, dim, keepdim=True))
self.assertEqual(x.ndimension(), fn(x, dim).ndimension())
self.assertEqual(x.ndimension(), fn(x, dim, keepdim=True).ndimension())
# check reducing of a singleton dimension
dims = [3, 4, 5]
singleton_dim = random.randint(0, 2)
dims[singleton_dim] = 1
x = cast(torch.randn(dims))
test_multidim(x, singleton_dim)
def test_prod(self):
x = torch.rand(100, 100)
res1 = torch.prod(x, 1)
res2 = torch.Tensor()
torch.prod(x, 1, out=res2)
self.assertEqual(res1, res2)
def _consecutive(self, size, start=1):
sequence = torch.ones(int(torch.Tensor(size).prod(0)[0])).cumsum(0)
sequence.add_(start - 1)
return sequence.resize_(*size)
def count_params(x):
return np.prod(x.eval().size())
def flatten(x):
def _flatten(x):
return x.view([-1])
def _compute_output_shape(x):
return (np.prod(list(_get_shape(x))),)
return get_op(_flatten, output_shape=_compute_output_shape)(x)
def test_keepdim_warning(self):
torch.utils.backcompat.keepdim_warning.enabled = True
x = Variable(torch.randn(3, 4), requires_grad=True)
def run_backward(y):
y_ = y
if type(y) is tuple:
y_ = y[0]
# check that backward runs smooth
y_.backward(y_.data.new(y_.size()).normal_())
def keepdim_check(f):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
y = f(x, 1)
self.assertTrue(len(w) == 1)
self.assertTrue(issubclass(w[-1].category, UserWarning))
self.assertTrue("keepdim" in str(w[-1].message))
run_backward(y)
self.assertEqual(x.size(), x.grad.size())
# check against explicit keepdim
y2 = f(x, 1, keepdim=False)
self.assertEqual(y, y2)
run_backward(y2)
y3 = f(x, 1, keepdim=True)
if type(y3) == tuple:
y3 = (y3[0].squeeze(1), y3[1].squeeze(1))
else:
y3 = y3.squeeze(1)
self.assertEqual(y, y3)
run_backward(y3)
keepdim_check(torch.sum)
keepdim_check(torch.prod)
keepdim_check(torch.mean)
keepdim_check(torch.max)
keepdim_check(torch.min)
keepdim_check(torch.mode)
keepdim_check(torch.median)
keepdim_check(torch.kthvalue)
keepdim_check(torch.var)
keepdim_check(torch.std)
torch.utils.backcompat.keepdim_warning.enabled = False
def summary(self, input_size):
def register_hook(module):
def hook(module, input, output):
class_name = str(module.__class__).split('.')[-1].split("'")[0]
module_idx = len(summary)
m_key = '%s-%i' % (class_name, module_idx+1)
summary[m_key] = OrderedDict()
summary[m_key]['input_shape'] = list(input[0].size())
summary[m_key]['input_shape'][0] = -1
summary[m_key]['output_shape'] = list(output.size())
summary[m_key]['output_shape'][0] = -1
params = 0
if hasattr(module, 'weight'):
params += th.prod(th.LongTensor(list(module.weight.size())))
if module.weight.requires_grad:
summary[m_key]['trainable'] = True
else:
summary[m_key]['trainable'] = False
if hasattr(module, 'bias'):
params += th.prod(th.LongTensor(list(module.bias.size())))
summary[m_key]['nb_params'] = params
if not isinstance(module, nn.Sequential) and \
not isinstance(module, nn.ModuleList) and \
not (module == self.model):
hooks.append(module.register_forward_hook(hook))
# create properties
summary = OrderedDict()
hooks = []
# register forward hooks
self.model.apply(register_hook)
if isinstance(input_size[0], (list, tuple)):
x = [Variable(th.rand(1,*in_size)) for in_size in input_size]
self.model(*x)
else:
x = Variable(th.rand(1,*input_size))
self.model(x)
# remove these hooks
for h in hooks:
h.remove()
return summary
def dap_deploy(m, x, labels, data, att_crit=None):
"""
Deploy DAP
:param m:
:param x:
:param labels:
:param data:
:param att_crit:
:return: Pandas series
"""
res = m(x)
if res.embed_pred is not None:
embed_logits = res.embed_pred @ data.attributes.embeds.t()
att_probs = [torch.sigmoid(embed_logits)]
else:
att_probs = []
# Start off with the embedding probabilities
if res.att_pred is None:
domains = []
else:
domains = att_crit.domains_per_att
start_col = 0
for gt_col, d_size in enumerate(domains):
# Get the attributes per verb
atts_by_verb = data.attributes.atts_matrix[:, gt_col]
if d_size == 1:
# Get the right indexing by taking the outer product between the
# [batch_size] attributes \in {+1, -1} and the logits
# This gives us a [batch_size x num_labels] matrix.
raw_ap = torch.ger(
res.att_pred[:, start_col],
2*(atts_by_verb.float() - 0.5),
)
att_probs.append(torch.sigmoid(raw_ap))
else:
# [batch_size x attribute domain_size] matrix
ap = F.softmax(res.att_pred[:, start_col:(start_col+d_size)])
#[batch_size x num_labels]
prob_contrib_by_label = torch.index_select(ap, 1, atts_by_verb)
att_probs.append(prob_contrib_by_label)
start_col += d_size
#[batch_size x num labels x num attributes]
probs_by_att = torch.stack(att_probs, 2)
# [batch_size, range size]
probs_prod = torch.prod(probs_by_att + 1e-12, 2).squeeze(2)
denom = probs_prod.sum(1) # [batch_size, 1]
probs = probs_prod / denom.expand_as(probs_prod)
return probs
###