def _defuse_padding(self, IR_node, extra_str = ""):
input_node = self.parent_variable_name(IR_node)
if IR_node.get_attr('auto_pad') == 'VALID':
return input_node
if is_valid_padding(IR_node.get_attr("pads")) == True:
return input_node
padding = self._convert_padding(IR_node)
input_node = IR_node.variable_name + '_pad'
self.add_body(2, "{:<15} = F.pad({}, {}{})".format(
input_node,
self.parent_variable_name(IR_node),
padding,
extra_str
))
return input_node
python类pad()的实例源码
def emit_Pad(self, IR_node):
if IR_node.get_attr('mode') == 'constant':
mode = "mode = 'constant', value = {}".format(0)
elif IR_node.get_attr('mode') == 'reflect':
mode = "mode = 'reflect'"
elif IR_node.get_attr('mode') == 'SYMMETRIC':
mode = "mode = 'replicate'"
else:
assert False
padding = self._convert_padding(IR_node)
self.add_body(2, "{:<15} = F.pad({}, {}, {})".format(
IR_node.variable_name,
self.parent_variable_name(IR_node),
padding,
mode))
def forward(self, embedding):
def act(x):
return F.relu(x, inplace=True)
def up(x):
m = nn.UpsamplingNearest2d(scale_factor=2)
return m(x)
x_ae = embedding # Bx256
x_ae = act(self.ae_fc1_bn(self.ae_fc1(x_ae))) # 128x3x5
x_ae = x_ae.view(-1, 128, 3, 5)
x_ae = up(x_ae) # 6x10
x_ae = act(self.ae_c1_bn(self.ae_c1(x_ae))) # 6x10
x_ae = up(x_ae) # 12x20
x_ae = act(self.ae_c2_bn(self.ae_c2(x_ae))) # 12x20 -> 10x20
x_ae = F.pad(x_ae, (0, 0, 1, 0)) # 11x20
x_ae = up(x_ae) # 22x40
x_ae = act(self.ae_c3_bn(self.ae_c3(x_ae))) # 22x40
x_ae = up(x_ae) # 44x80
x_ae = F.pad(x_ae, (0, 0, 1, 0)) # add 1px at top (from 44 to 45)
x_ae = F.sigmoid(self.ae_c4(x_ae))
return x_ae
def pad_if_needed(input, padding, kind, k_h, k_w, s_h=1, s_w=1, dilation=1):
if padding == 'VALID':
return input
elif padding == 'SAME' and kind in ('conv2d', 'pool2d'):
in_height, in_width = input.size(2), input.size(3)
out_height = int(np.ceil(float(in_height) / float(s_h)))
out_width = int(np.ceil(float(in_width) / float(s_w)))
pad_along_height = max((out_height - 1) * s_h + k_h - in_height, 0)
pad_along_width = max((out_width - 1) * s_w + k_w - in_width, 0)
pad_top = pad_along_height // 2
pad_bottom = pad_along_height - pad_top
pad_left = pad_along_width // 2
pad_right = pad_along_width - pad_left
input = F.pad(input, (pad_left, pad_right, pad_top, pad_bottom))
return input
elif kind in ('atrous_conv2d',):
effective_height = k_h + (k_h - 1) * (dilation - 1)
effective_width = k_w + (k_w - 1) * (dilation - 1)
return pad_if_needed(input, padding, 'conv2d', effective_height, effective_width, s_h, s_w, dilation=1)
else:
raise NotImplementedError
def forward(self, inputs1, inputs2):
outputs2 = self.up(inputs2)
offset = outputs2.size()[2] - inputs1.size()[2]
padding = 2 * [offset // 2, offset // 2]
outputs1 = F.pad(inputs1, padding)
return self.conv(torch.cat([outputs1, outputs2], 1))
def _extract_patches(x, kernel_size, stride, padding):
if padding[0] + padding[1] > 0:
x = F.pad(x, (padding[1], padding[1], padding[0],
padding[0])).data # Actually check dims
x = x.unfold(2, kernel_size[0], stride[0])
x = x.unfold(3, kernel_size[1], stride[1])
x = x.transpose_(1, 2).transpose_(2, 3).contiguous()
x = x.view(
x.size(0), x.size(1), x.size(2), x.size(3) * x.size(4) * x.size(5))
return x
def _convert_padding(IR_node):
padding = IR_node.get_attr('pads')
padding = convert_onnx_pad_to_tf(padding)[1:-1]
new_padding = []
for pad in padding:
new_padding.insert(0, pad)
return tuple(np.array(new_padding).reshape(-1).tolist())
def forward(self, input, lengths):
N, T = input.size(0), input.size(1)
conv_bank_out = []
input_t = input.transpose(1, 2) # NxTxH -> NxHxT
for i in range(self.num_filters):
tmp_input = input_t
if i % 2 == 0:
tmp_input = tmp_input.unsqueeze(-1)
tmp_input = F.pad(tmp_input, (0,0,0,1)).squeeze(-1) # NxHxT
conv_bank_out.append(self.conv_bank[i](tmp_input))
residual = torch.cat(conv_bank_out, dim=1) # NxHFxT
residual = F.relu(self.bn_list[0](residual))
residual = F.max_pool1d(residual, 2, stride=1)
residual = self.conv1(residual) # NxHxT
residual = F.relu(self.bn_list[1](residual))
residual = self.conv2(residual) # NxHxT
residual = self.bn_list[2](residual).transpose(1,2) # NxHxT -> NxTxH
rnn_input = input
if rnn_input.size() != residual.size():
rnn_input = self.residual_proj(rnn_input)
rnn_input = rnn_input + residual
rnn_input = self.highway(rnn_input).view(N, T, -1)
output = rnn.pack_padded_sequence(rnn_input, lengths, True)
output, _ = self.BGRU(output) # zero h_0 is used by default
output, _ = rnn.pad_packed_sequence(output, True) # NxTx2H
return output
def test_pad(self):
inputs = Variable(torch.randn(1, 3, 4, 4), requires_grad=True)
self.assertTrue(gradcheck(lambda x: F.pad(x, (1, 1, 1, 1)), (inputs,)))
self.assertTrue(gradcheck(lambda x: F.pad(x, (-1, 1, -2, 1)), (inputs,)))
self.assertTrue(gradcheck(lambda x: F.pad(x, (-1, 1, -2, 1), value=2), (inputs,)))
self.assertTrue(gradcheck(lambda x: F.pad(x, (-1, 1, -2, 1), mode='replicate'), (inputs,)))
self.assertTrue(gradcheck(lambda x: F.pad(x, (-1, 1, -2, 1), mode='reflect'), (inputs,)))
inputs = Variable(torch.randn(1, 2, 3, 4, 4), requires_grad=True)
self.assertTrue(gradcheck(lambda x: F.pad(x, (1, 1, 1, 1, 1, 1), mode='replicate'), (inputs,)))
def test_pack_padded_sequence(self):
def pad(tensor, length):
return torch.cat([tensor, tensor.new(length - tensor.size(0), *tensor.size()[1:]).zero_()])
lengths = [10, 8, 4, 2, 2, 2, 1]
max_length = lengths[0]
batch_sizes = [sum(map(bool, filter(lambda x: x >= i, lengths))) for i in range(1, max_length + 1)]
offset = 0
padded = torch.cat([pad(i * 100 + torch.arange(1, 5 * l + 1).view(l, 1, 5), max_length)
for i, l in enumerate(lengths, 1)], 1)
padded = Variable(padded, requires_grad=True)
expected_data = [[torch.arange(1, 6) + i * 100 for i in range(batch_size)] for batch_size in batch_sizes]
expected_data = list(itertools.chain.from_iterable(expected_data))
expected_data = torch.cat(expected_data)
for batch_first in (True, False):
src = padded
if batch_first:
src = src.transpose(0, 1)
# check output
packed = rnn_utils.pack_padded_sequence(src, lengths, batch_first=batch_first)
self.assertEqual(packed.data, expected_data)
self.assertEqual(packed.batch_sizes, batch_sizes)
# test inverse
unpacked, unpacked_len = rnn_utils.pad_packed_sequence(packed, batch_first=batch_first)
self.assertEqual(unpacked, src)
self.assertEqual(unpacked_len, lengths)
# check grad
if padded.grad is not None:
padded.grad.data.zero_()
grad_output = unpacked.data.clone().normal_()
unpacked.backward(grad_output)
if batch_first:
grad_output.transpose_(0, 1)
for i, l in enumerate(lengths):
self.assertEqual(padded.grad.data[:l, i], grad_output[:l, i])
if l < 10:
self.assertEqual(padded.grad.data[l:, i].abs().sum(), 0)
def forward(self, x):
h = self.up1(x)
h = F.pad(h, (1, 1, 1, 1), mode='reflect')
h = self.b3(self.c2(h))
return F.relu(h)
def test_pad(self):
inputs = Variable(torch.randn(1, 3, 4, 4), requires_grad=True)
self.assertTrue(gradcheck(lambda x: F.pad(x, (1, 1, 1, 1)), (inputs,)))
self.assertTrue(gradcheck(lambda x: F.pad(x, (-1, 1, -2, 1)), (inputs,)))
self.assertTrue(gradcheck(lambda x: F.pad(x, (-1, 1, -2, 1), value=2), (inputs,)))
self.assertTrue(gradcheck(lambda x: F.pad(x, (-1, 1, -2, 1), mode='replicate'), (inputs,)))
self.assertTrue(gradcheck(lambda x: F.pad(x, (-1, 1, -2, 1), mode='reflect'), (inputs,)))
inputs = Variable(torch.randn(1, 2, 3, 4, 4), requires_grad=True)
self.assertTrue(gradcheck(lambda x: F.pad(x, (1, 1, 1, 1, 1, 1), mode='replicate'), (inputs,)))
def test_pack_padded_sequence(self):
def pad(tensor, length):
return torch.cat([tensor, tensor.new(length - tensor.size(0), *tensor.size()[1:]).zero_()])
lengths = [10, 8, 4, 2, 2, 2, 1]
max_length = lengths[0]
batch_sizes = [sum(map(bool, filter(lambda x: x >= i, lengths))) for i in range(1, max_length + 1)]
offset = 0
padded = torch.cat([pad(i * 100 + torch.arange(1, 5 * l + 1).view(l, 1, 5), max_length)
for i, l in enumerate(lengths, 1)], 1)
padded = Variable(padded, requires_grad=True)
expected_data = [[torch.arange(1, 6) + (i + 1) * 100 + 5 * n for i in range(batch_size)]
for n, batch_size in enumerate(batch_sizes)]
expected_data = list(itertools.chain.from_iterable(expected_data))
expected_data = torch.stack(expected_data, dim=0)
for batch_first in (True, False):
src = padded
if batch_first:
src = src.transpose(0, 1)
# check output
packed = rnn_utils.pack_padded_sequence(src, lengths, batch_first=batch_first)
self.assertEqual(packed.data.data, expected_data)
self.assertEqual(packed.batch_sizes, batch_sizes)
# test inverse
unpacked, unpacked_len = rnn_utils.pad_packed_sequence(packed, batch_first=batch_first)
self.assertEqual(unpacked, src)
self.assertEqual(unpacked_len, lengths)
# check grad
if padded.grad is not None:
padded.grad.data.zero_()
grad_output = unpacked.data.clone().normal_()
unpacked.backward(grad_output)
if batch_first:
grad_output.transpose_(0, 1)
for i, l in enumerate(lengths):
self.assertEqual(padded.grad.data[:l, i], grad_output[:l, i])
if l < 10:
self.assertEqual(padded.grad.data[l:, i].abs().sum(), 0)
def user_representation(self, item_sequences):
"""
Compute user representation from a given sequence.
Returns
-------
tuple (all_representations, final_representation)
The first element contains all representations from step
-1 (no items seen) to t - 1 (all but the last items seen).
The second element contains the final representation
at step t (all items seen). This final state can be used
for prediction or evaluation.
"""
# Make the embedding dimension the channel dimension
sequence_embeddings = (self.item_embeddings(item_sequences)
.permute(0, 2, 1))
# Add a trailing dimension of 1
sequence_embeddings = (sequence_embeddings
.unsqueeze(3))
# Pad it with zeros from left
sequence_embeddings = F.pad(sequence_embeddings,
(0, 0, 1, 0))
# Average representations, ignoring padding.
sequence_embedding_sum = torch.cumsum(sequence_embeddings, 2)
non_padding_entries = (
torch.cumsum((sequence_embeddings != 0.0).float(), 2)
.expand_as(sequence_embedding_sum)
)
user_representations = (
sequence_embedding_sum / (non_padding_entries + 1)
).squeeze(3)
return user_representations[:, :, :-1], user_representations[:, :, -1]
def user_representation(self, item_sequences):
"""
Compute user representation from a given sequence.
Returns
-------
tuple (all_representations, final_representation)
The first element contains all representations from step
-1 (no items seen) to t - 1 (all but the last items seen).
The second element contains the final representation
at step t (all items seen). This final state can be used
for prediction or evaluation.
"""
# Make the embedding dimension the channel dimension
sequence_embeddings = (self.item_embeddings(item_sequences)
.permute(0, 2, 1))
# Add a trailing dimension of 1
sequence_embeddings = (sequence_embeddings
.unsqueeze(3))
# Pad it with zeros from left
sequence_embeddings = (F.pad(sequence_embeddings,
(0, 0, 1, 0))
.squeeze(3))
sequence_embeddings = sequence_embeddings.permute(0, 2, 1)
user_representations, _ = self.lstm(sequence_embeddings)
user_representations = user_representations.permute(0, 2, 1)
return user_representations[:, :, :-1], user_representations[:, :, -1]
def test_pad(self):
inputs = Variable(torch.randn(1, 3, 4, 4), requires_grad=True)
_assertGradAndGradgradChecks(self, lambda x: F.pad(x, (1, 1, 1, 1)), (inputs,))
_assertGradAndGradgradChecks(self, lambda x: F.pad(x, (-1, 1, -2, 1)), (inputs,))
_assertGradAndGradgradChecks(self, lambda x: F.pad(x, (-1, 1, -2, 1), value=2), (inputs,))
self.assertTrue(gradcheck(lambda x: F.pad(x, (-1, 1, -2, 1), mode='replicate'), (inputs,)))
self.assertTrue(gradcheck(lambda x: F.pad(x, (-1, 1, -2, 1), mode='reflect'), (inputs,)))
inputs = Variable(torch.randn(1, 2, 3, 4, 4), requires_grad=True)
self.assertTrue(gradcheck(lambda x: F.pad(x, (1, 1, 1, 1, 1, 1), mode='replicate'), (inputs,)))
def test_pack_padded_sequence(self):
def pad(tensor, length):
return torch.cat([tensor, tensor.new(length - tensor.size(0), *tensor.size()[1:]).zero_()])
lengths = [10, 8, 4, 2, 2, 2, 1]
max_length = lengths[0]
batch_sizes = [sum(map(bool, filter(lambda x: x >= i, lengths))) for i in range(1, max_length + 1)]
offset = 0
padded = torch.cat([pad(i * 100 + torch.arange(1, 5 * l + 1).view(l, 1, 5), max_length)
for i, l in enumerate(lengths, 1)], 1)
padded = Variable(padded, requires_grad=True)
expected_data = [[torch.arange(1, 6) + (i + 1) * 100 + 5 * n for i in range(batch_size)]
for n, batch_size in enumerate(batch_sizes)]
expected_data = list(itertools.chain.from_iterable(expected_data))
expected_data = torch.stack(expected_data, dim=0)
for batch_first in (True, False):
src = padded
if batch_first:
src = src.transpose(0, 1)
# check output
packed = rnn_utils.pack_padded_sequence(src, lengths, batch_first=batch_first)
self.assertEqual(packed.data.data, expected_data)
self.assertEqual(packed.batch_sizes, batch_sizes)
# test inverse
unpacked, unpacked_len = rnn_utils.pad_packed_sequence(packed, batch_first=batch_first)
self.assertEqual(unpacked, src)
self.assertEqual(unpacked_len, lengths)
# check grad
if padded.grad is not None:
padded.grad.data.zero_()
grad_output = unpacked.data.clone().normal_()
unpacked.backward(grad_output)
if batch_first:
grad_output.transpose_(0, 1)
for i, l in enumerate(lengths):
self.assertEqual(padded.grad.data[:l, i], grad_output[:l, i])
if l < 10:
self.assertEqual(padded.grad.data[l:, i].abs().sum(), 0)
def dilate(sigs, dilation):
"""
Note this will fail if the dilation doesn't allow a whole number amount of padding
:param x: Tensor or Variable of size (N, L, C), where N is the input dilation, C is the number of channels, and L is the input length
:param dilation: Target dilation. Will be the size of the first dimension of the output tensor.
:param pad_start: If the input length is not compatible with the specified dilation, zero padding is used. This parameter determines wether the zeros are added at the start or at the end.
:return: The dilated Tensor or Variable of size (dilation, C, L*N / dilation). The output might be zero padded at the start
"""
n, c, l = sigs.size()
dilation_factor = dilation / n
if dilation_factor == 1:
return sigs, 0.
# zero padding for reshaping
new_n = int(dilation)
new_l = int(np.ceil(l*n/dilation))
pad_len = (new_n*new_l-n*l)/n
if pad_len > 0:
print("Padding: {}, {}, {}".format(new_n, new_l, pad_len))
# TODO pad output tensor unevenly for indivisible dilations
assert pad_len == int(pad_len)
# "squeeze" then "unsqueeze" due to limitation of pad function
# which only works with 4d/5d tensors
padding = (int(pad_len), 0, 0, 0) # (d3_St, d3_End, d2_St, d2_End), d0 and d1 unpadded
sigs = pad1d(sigs, padding)
# reshape according to dilation
sigs = sigs.permute(1, 2, 0).contiguous() # (n, c, l) -> (c, l, n)
sigs = sigs.view(c, new_l, new_n)
sigs = sigs.permute(2, 0, 1).contiguous() # (c, l, n) -> (n, c, l)
return sigs, pad_len
def forward(self, input):
x = input
if len(x.size()) == 3:
x = x.view((1,)+x.size())
x = F.pad(x, self.padding, 'constant', self.value)
x = x.view(x.size()[1:])
return x
def forward(self, input):
x = input
ys = []
target_size = None
depth_dim = 0
for seq in self.seq_list:
# print(seq)
# print(self.outputSize)
# print('x_size:', x.size())
y = seq(x)
y_size = y.size()
# print('y_size:', y_size)
ys.append(y)
#
if target_size is None:
target_size = [0] * len(y_size)
#
for i in range(len(target_size)):
target_size[i] = max(target_size[i], y_size[i])
depth_dim += y_size[1]
target_size[1] = depth_dim
# print('target_size:', target_size)
for i in range(len(ys)):
y_size = ys[i].size()
pad_l = int((target_size[3] - y_size[3]) // 2)
pad_t = int((target_size[2] - y_size[2]) // 2)
pad_r = target_size[3] - y_size[3] - pad_l
pad_b = target_size[2] - y_size[2] - pad_t
ys[i] = F.pad(ys[i], (pad_l, pad_r, pad_t, pad_b))
output = torch.cat(ys, 1)
return output
def test_pad(self):
inputs = Variable(torch.randn(1, 3, 4, 4), requires_grad=True)
_assertGradAndGradgradChecks(self, lambda x: F.pad(x, (1, 1, 1, 1)), (inputs,))
_assertGradAndGradgradChecks(self, lambda x: F.pad(x, (-1, 1, -2, 1)), (inputs,))
_assertGradAndGradgradChecks(self, lambda x: F.pad(x, (-1, 1, -2, 1), value=2), (inputs,))
self.assertTrue(gradcheck(lambda x: F.pad(x, (-1, 1, -2, 1), mode='replicate'), (inputs,)))
self.assertTrue(gradcheck(lambda x: F.pad(x, (-1, 1, -2, 1), mode='reflect'), (inputs,)))
inputs = Variable(torch.randn(1, 2, 3, 4, 4), requires_grad=True)
self.assertTrue(gradcheck(lambda x: F.pad(x, (1, 1, 1, 1, 1, 1), mode='replicate'), (inputs,)))
def test_pack_padded_sequence(self):
def pad(tensor, length):
return torch.cat([tensor, tensor.new(length - tensor.size(0), *tensor.size()[1:]).zero_()])
lengths = [10, 8, 4, 2, 2, 2, 1]
max_length = lengths[0]
batch_sizes = [sum(map(bool, filter(lambda x: x >= i, lengths))) for i in range(1, max_length + 1)]
offset = 0
padded = torch.cat([pad(i * 100 + torch.arange(1, 5 * l + 1).view(l, 1, 5), max_length)
for i, l in enumerate(lengths, 1)], 1)
padded = Variable(padded, requires_grad=True)
expected_data = [[torch.arange(1, 6) + (i + 1) * 100 + 5 * n for i in range(batch_size)]
for n, batch_size in enumerate(batch_sizes)]
expected_data = list(itertools.chain.from_iterable(expected_data))
expected_data = torch.stack(expected_data, dim=0)
for batch_first in (True, False):
src = padded
if batch_first:
src = src.transpose(0, 1)
# check output
packed = rnn_utils.pack_padded_sequence(src, lengths, batch_first=batch_first)
self.assertEqual(packed.data.data, expected_data)
self.assertEqual(packed.batch_sizes, batch_sizes)
# test inverse
unpacked, unpacked_len = rnn_utils.pad_packed_sequence(packed, batch_first=batch_first)
self.assertEqual(unpacked, src)
self.assertEqual(unpacked_len, lengths)
# check grad
if padded.grad is not None:
padded.grad.data.zero_()
grad_output = unpacked.data.clone().normal_()
unpacked.backward(grad_output)
if batch_first:
grad_output.transpose_(0, 1)
for i, l in enumerate(lengths):
self.assertEqual(padded.grad.data[:l, i], grad_output[:l, i])
if l < 10:
self.assertEqual(padded.grad.data[l:, i].abs().sum(), 0)
def test_conv_tbc(self):
inp = Variable(torch.randn(9, 4, 5), requires_grad=True)
weight = Variable(torch.randn(3, 5, 6), requires_grad=True)
bias = Variable(torch.randn(6), requires_grad=True)
gradcheck(lambda i, w, b, pad: F.conv_tbc(i, w, b, pad), (inp, weight, bias, 3))
def forward(self, inputs):
"""
A 1D dilated convolution w/ padding such that the output
is the same size as the input.
:param inputs: (batch size, # channels, height)
:return: (batch size, # channels, height)
"""
x = F.pad(inputs.unsqueeze(2), (self.left_padding, 0, 0, 0)).squeeze(2)
return super(CausalConv1d, self).forward(x)
def _test_variable_sequence(self, cuda):
def pad(var, length):
if var.size(0) == length:
return var
return torch.cat([var, Variable(var.data.new(length - var.size(0), *var.size()[1:]).zero_())])
lengths = [10, 10, 6, 2, 2, 1, 1]
max_length = lengths[0]
x_leaf = Variable(torch.randn(max_length, len(lengths), 3), requires_grad=True)
lstm = nn.LSTM(3, 4, bidirectional=True, num_layers=2)
lstm2 = deepcopy(lstm)
if cuda:
x = x_leaf.cuda()
lstm.cuda()
lstm2.cuda()
else:
x = x_leaf
# Compute sequences separately
seq_outs = []
seq_hiddens = []
for i, l in enumerate(lengths):
out, hid = lstm2(x[:l, i:i + 1])
out_pad = pad(out, max_length)
seq_outs.append(out_pad)
seq_hiddens.append(hid)
seq_out = torch.cat(seq_outs, 1)
seq_hidden = tuple(torch.cat(hids, 1) for hids in zip(*seq_hiddens))
# Use packed format
packed = rnn_utils.pack_padded_sequence(x, lengths)
packed_out, packed_hidden = lstm(packed)
unpacked, unpacked_len = rnn_utils.pad_packed_sequence(packed_out)
# Check forward
self.assertEqual(packed_hidden, seq_hidden)
self.assertEqual(unpacked, seq_out)
self.assertEqual(unpacked_len, lengths)
# Check backward
seq_out.sum().backward()
grad_x = x_leaf.grad.data.clone()
x_leaf.grad.data.zero_()
unpacked.sum().backward()
self.assertEqual(x_leaf.grad.data, grad_x)
for p1, p2 in zip(lstm.parameters(), lstm2.parameters()):
self.assertEqual(p1.grad, p2.grad)
def _test_variable_sequence(self, cuda):
def pad(var, length):
if var.size(0) == length:
return var
return torch.cat([var, Variable(var.data.new(length - var.size(0), *var.size()[1:]).zero_())])
lengths = [10, 10, 6, 2, 2, 1, 1]
max_length = lengths[0]
x_leaf = Variable(torch.randn(max_length, len(lengths), 3), requires_grad=True)
lstm = nn.LSTM(3, 4, bidirectional=True, num_layers=2)
lstm2 = deepcopy(lstm)
if cuda:
x = x_leaf.cuda()
lstm.cuda()
lstm2.cuda()
else:
x = x_leaf
# Compute sequences separately
seq_outs = []
seq_hiddens = []
for i, l in enumerate(lengths):
out, hid = lstm2(x[:l, i:i + 1])
out_pad = pad(out, max_length)
seq_outs.append(out_pad)
seq_hiddens.append(hid)
seq_out = torch.cat(seq_outs, 1)
seq_hidden = tuple(torch.cat(hids, 1) for hids in zip(*seq_hiddens))
# Use packed format
packed = rnn_utils.pack_padded_sequence(x, lengths)
packed_out, packed_hidden = lstm(packed)
unpacked, unpacked_len = rnn_utils.pad_packed_sequence(packed_out)
# Check forward
self.assertEqual(packed_hidden, seq_hidden)
self.assertEqual(unpacked, seq_out)
self.assertEqual(unpacked_len, lengths)
# Check backward
seq_out.sum().backward()
grad_x = x_leaf.grad.data.clone()
x_leaf.grad.data.zero_()
unpacked.sum().backward()
self.assertEqual(x_leaf.grad.data, grad_x)
for p1, p2 in zip(lstm.parameters(), lstm2.parameters()):
self.assertEqual(p1.grad, p2.grad)
def user_representation(self, item_sequences):
"""
Compute user representation from a given sequence.
Returns
-------
tuple (all_representations, final_representation)
The first element contains all representations from step
-1 (no items seen) to t - 1 (all but the last items seen).
The second element contains the final representation
at step t (all items seen). This final state can be used
for prediction or evaluation.
"""
# Make the embedding dimension the channel dimension
sequence_embeddings = (self.item_embeddings(item_sequences)
.permute(0, 2, 1))
# Add a trailing dimension of 1
sequence_embeddings = (sequence_embeddings
.unsqueeze(3))
# Pad so that the CNN doesn't have the future
# of the sequence in its receptive field.
receptive_field_width = (self.kernel_width[0] +
(self.kernel_width[0] - 1) *
(self.dilation[0] - 1))
x = F.pad(sequence_embeddings,
(0, 0, receptive_field_width, 0))
x = self.nonlinearity(self.cnn_layers[0](x))
if self.residual_connections:
residual = F.pad(sequence_embeddings,
(0, 0, 1, 0))
x = x + residual
for (cnn_layer, kernel_width, dilation) in zip(self.cnn_layers[1:],
self.kernel_width[1:],
self.dilation[1:]):
receptive_field_width = (kernel_width +
(kernel_width - 1) *
(dilation - 1))
residual = x
x = F.pad(x, (0, 0, receptive_field_width - 1, 0))
x = self.nonlinearity(cnn_layer(x))
if self.residual_connections:
x = x + residual
x = x.squeeze(3)
return x[:, :, :-1], x[:, :, -1]
def _test_variable_sequence(self, cuda):
def pad(var, length):
if var.size(0) == length:
return var
return torch.cat([var, Variable(var.data.new(length - var.size(0), *var.size()[1:]).zero_())])
lengths = [10, 10, 6, 2, 2, 1, 1]
max_length = lengths[0]
x_leaf = Variable(torch.randn(max_length, len(lengths), 3), requires_grad=True)
lstm = nn.LSTM(3, 4, bidirectional=True, num_layers=2)
lstm2 = deepcopy(lstm)
if cuda:
x = x_leaf.cuda()
lstm.cuda()
lstm2.cuda()
else:
x = x_leaf
# Compute sequences separately
seq_outs = []
seq_hiddens = []
for i, l in enumerate(lengths):
out, hid = lstm2(x[:l, i:i + 1])
out_pad = pad(out, max_length)
seq_outs.append(out_pad)
seq_hiddens.append(hid)
seq_out = torch.cat(seq_outs, 1)
seq_hidden = tuple(torch.cat(hids, 1) for hids in zip(*seq_hiddens))
# Use packed format
packed = rnn_utils.pack_padded_sequence(x, lengths)
packed_out, packed_hidden = lstm(packed)
unpacked, unpacked_len = rnn_utils.pad_packed_sequence(packed_out)
# Check forward
self.assertEqual(packed_hidden, seq_hidden)
self.assertEqual(unpacked, seq_out)
self.assertEqual(unpacked_len, lengths)
# Check backward
seq_out.sum().backward()
grad_x = x_leaf.grad.data.clone()
x_leaf.grad.data.zero_()
unpacked.sum().backward()
self.assertEqual(x_leaf.grad.data, grad_x)
for p1, p2 in zip(lstm.parameters(), lstm2.parameters()):
self.assertEqual(p1.grad, p2.grad)
def __init__(self, inputSize, kernelSize, kernelStride, outputSize, reduceSize, pool, useBatchNorm, reduceStride=None, padding=True):
super(Inception, self).__init__()
#
self.seq_list = []
self.outputSize = outputSize
#
# 1x1 conv (reduce) -> 3x3 conv
# 1x1 conv (reduce) -> 5x5 conv
# ...
for i in range(len(kernelSize)):
od = OrderedDict()
# 1x1 conv
od['1_conv'] = Conv2d(inputSize, reduceSize[i], (1, 1),
reduceStride[i] if reduceStride is not None else 1, (0, 0))
if useBatchNorm:
od['2_bn'] = BatchNorm(reduceSize[i])
od['3_relu'] = nn.ReLU()
# nxn conv
pad = int(numpy.floor(kernelSize[i] / 2)) if padding else 0
od['4_conv'] = Conv2d(
reduceSize[i], outputSize[i], kernelSize[i], kernelStride[i], pad)
if useBatchNorm:
od['5_bn'] = BatchNorm(outputSize[i])
od['6_relu'] = nn.ReLU()
#
self.seq_list.append(nn.Sequential(od))
ii = len(kernelSize)
# pool -> 1x1 conv
od = OrderedDict()
od['1_pool'] = pool
if ii < len(reduceSize) and reduceSize[ii] is not None:
i = ii
od['2_conv'] = Conv2d(inputSize, reduceSize[i], (1, 1),
reduceStride[i] if reduceStride is not None else 1, (0, 0))
if useBatchNorm:
od['3_bn'] = BatchNorm(reduceSize[i])
od['4_relu'] = nn.ReLU()
#
self.seq_list.append(nn.Sequential(od))
ii += 1
# reduce: 1x1 conv (channel-wise pooling)
if ii < len(reduceSize) and reduceSize[ii] is not None:
i = ii
od = OrderedDict()
od['1_conv'] = Conv2d(inputSize, reduceSize[i], (1, 1),
reduceStride[i] if reduceStride is not None else 1, (0, 0))
if useBatchNorm:
od['2_bn'] = BatchNorm(reduceSize[i])
od['3_relu'] = nn.ReLU()
self.seq_list.append(nn.Sequential(od))
self.seq_list = nn.ModuleList(self.seq_list)
def _test_variable_sequence(self, cuda):
def pad(var, length):
if var.size(0) == length:
return var
return torch.cat([var, Variable(var.data.new(length - var.size(0), *var.size()[1:]).zero_())])
lengths = [10, 10, 6, 2, 2, 1, 1]
max_length = lengths[0]
x_leaf = Variable(torch.randn(max_length, len(lengths), 3), requires_grad=True)
lstm = nn.LSTM(3, 4, bidirectional=True, num_layers=2)
lstm2 = deepcopy(lstm)
if cuda:
x = x_leaf.cuda()
lstm.cuda()
lstm2.cuda()
else:
x = x_leaf
# Compute sequences separately
seq_outs = []
seq_hiddens = []
for i, l in enumerate(lengths):
out, hid = lstm2(x[:l, i:i + 1])
out_pad = pad(out, max_length)
seq_outs.append(out_pad)
seq_hiddens.append(hid)
seq_out = torch.cat(seq_outs, 1)
seq_hidden = tuple(torch.cat(hids, 1) for hids in zip(*seq_hiddens))
# Use packed format
packed = rnn_utils.pack_padded_sequence(x, lengths)
packed_out, packed_hidden = lstm(packed)
unpacked, unpacked_len = rnn_utils.pad_packed_sequence(packed_out)
# Check forward
self.assertEqual(packed_hidden, seq_hidden)
self.assertEqual(unpacked, seq_out)
self.assertEqual(unpacked_len, lengths)
# Check backward
seq_out.sum().backward()
grad_x = x_leaf.grad.data.clone()
x_leaf.grad.data.zero_()
unpacked.sum().backward()
self.assertEqual(x_leaf.grad.data, grad_x)
for p1, p2 in zip(lstm.parameters(), lstm2.parameters()):
self.assertEqual(p1.grad, p2.grad)