def forward(self, x):
"""
A bidirectional RNN encoder. Has support for global max/average pooling.
:param x: A tuple of Variable's representing padded sentence tensor batch
[seq. length, batch size, embed. size] and sentence lengths.
:return: Global max/average pooled embedding from bidirectional RNN encoder of [batch_size, hidden_size]
"""
sentences, sentence_lengths = x
# Sort sentences by descending length.
sorted_sentence_lengths, sort_indices = torch.sort(sentence_lengths, dim=0, descending=True)
_, unsort_indices = torch.sort(sort_indices, dim=0)
sorted_sentence_lengths = sorted_sentence_lengths.data
sorted_sentences = sentences.index_select(1, sort_indices)
# Handle padding for RNN's.
packed_sentences = nn.utils.rnn.pack_padded_sequence(sorted_sentences, sorted_sentence_lengths.clone().cpu().numpy())
# [seq. length, sentence_batch size, 2 * num. layers * num. hidden]
encoder_outputs = self.encoder(packed_sentences)[0]
encoder_outputs = nn.utils.rnn.pad_packed_sequence(encoder_outputs)[0]
# Unsort outputs.
encoder_outputs = encoder_outputs.index_select(1, unsort_indices)
# Apply global max/average pooling 1D.
encoder_outputs = encoder_outputs.transpose(0, 2).transpose(0, 1)
if self.pooling_mode == "max":
encoder_outputs = F.max_pool1d(encoder_outputs, kernel_size=encoder_outputs.size(2))
elif self.pooling_mode == "avg":
encoder_outputs = F.avg_pool1d(encoder_outputs, kernel_size=encoder_outputs.size(2))
encoder_outputs = encoder_outputs.squeeze()
return encoder_outputs
python类max_pool1d()的实例源码
def forward(self, x):
"""
Args:
x: (batch_size * seq_len)
"""
emb = self.emb(x).unsqueeze(1) # batch_size * 1 * seq_len * emb_dim
convs = [F.relu(conv(emb)).squeeze(3) for conv in self.convs] # [batch_size * num_filter * length]
pools = [F.max_pool1d(conv, conv.size(2)).squeeze(2) for conv in convs] # [batch_size * num_filter]
pred = torch.cat(pools, 1) # batch_size * num_filters_sum
highway = self.highway(pred)
pred = F.sigmoid(highway) * F.relu(highway) + (1. - F.sigmoid(highway)) * pred
pred = self.softmax(self.lin(self.dropout(pred)))
return pred
def conv_and_pool(self, x, conv):
x = F.relu(conv(x)).squeeze(3) #(N,Co,W)
x = F.max_pool1d(x, x.size(2)).squeeze(2)
return x
def base_test():
fc1 = nn.Linear(10,20)
fc1.weight.data.normal_(0.0,1.0)
fc1.bias.data.normal_(0.0,1.0)
fc2 = nn.Linear(20,2)
fc2.weight.data.normal_(0.0,1.0)
fc2.bias.data.normal_(0.0,1.0)
fc3 = nn.Linear(10,2)
fc3.weight.data.normal_(0.0,1.0)
fc3.bias.data.normal_(0.0,1.0)
fc4 = nn.Linear(10,2)
fc4.weight.data.normal_(0.0,1.0)
fc4.bias.data.normal_(0.0,1.0)
softmax = nn.Softmax()
model0 = lambda x: F.log_softmax(fc2(F.relu(fc1(x))))
model1 = lambda x: F.softmax(F.elu(fc3(x)))
model2 = lambda x: F.softmax(F.tanh(fc3(x)))
model3 = lambda x: F.softmax(F.sigmoid(fc3(x)))
model4 = lambda x: softmax(F.leaky_relu(fc4(x))).clone()
model5 = lambda x: softmax(F.logsigmoid(fc4(x.transpose(0,1))))
model6 = lambda x: fc3(F.max_pool2d(x.unsqueeze(dim=0),2).squeeze())
model7 = lambda x: fc3(F.max_pool2d(x.unsqueeze(dim=0),2).squeeze(dim=0))
model8 = lambda x: fc3(F.max_pool3d(x.unsqueeze(0),2).squeeze())
model9 = lambda x: fc3(F.max_pool1d(x.abs().view(1,1,-1),4).squeeze().view(10,10))
#model10 = lambda x: fc3(x.double())
#model10 = lambda x: fc3(x.view(1,10,10).select(0,0))
model10 = lambda x, y: F.softmax(F.tanh(fc3(torch.cat((x,y),1))))
data = Variable(torch.rand(10,10))
data2 = Variable(torch.rand(20,20))
data1a = Variable(torch.rand(10,5))
data1b = Variable(torch.rand(10,5))
data3 = Variable(torch.rand(2,20,20))
out = model0(data) + \
model1(data) * model2(data) / model3(data) / 2.0 + \
2.0 * model4(data) + model5(data) + 1 - 2.0 + \
model6(data2) + model7(data2) + model8(data3) + model9(data2) + model10(data1a,data1b)
out_path = 'out'
if not os.path.isdir(out_path):
os.mkdir(out_path)
uid = str(uuid.uuid4())
torch2c.compile(out,'base',os.path.join(out_path,uid),compile_test=True)
def forward(self, x):
# x = (sequence length, batch_size, dimension of embedding)
text = x.text
batch_size = text.size()[1]
x = self.embed(text)
if self.config.relation_prediction_mode.upper() == "LSTM":
# h0 / c0 = (layer*direction, batch_size, hidden_dim)
if self.config.cuda:
h0 = Variable(torch.zeros(self.config.num_layer * 2, batch_size,
self.config.hidden_size).cuda())
c0 = Variable(torch.zeros(self.config.num_layer * 2, batch_size,
self.config.hidden_size).cuda())
else:
h0 = Variable(torch.zeros(self.config.num_layer * 2, batch_size,
self.config.hidden_size))
c0 = Variable(torch.zeros(self.config.num_layer * 2, batch_size,
self.config.hidden_size))
# output = (sentence length, batch_size, hidden_size * num_direction)
# ht = (layer*direction, batch, hidden_dim)
# ct = (layer*direction, batch, hidden_dim)
outputs, (ht, ct) = self.lstm(x, (h0, c0))
tags = self.hidden2tag(ht[-2:].transpose(0, 1).contiguous().view(batch_size, -1))
scores = F.log_softmax(tags)
return scores
elif self.config.relation_prediction_mode.upper() == "GRU":
if self.config.cuda:
h0 = Variable(torch.zeros(self.config.num_layer * 2, batch_size,
self.config.hidden_size).cuda())
else:
h0 = Variable(torch.zeros(self.config.num_layer * 2, batch_size,
self.config.hidden_size))
outputs, ht = self.gru(x, h0)
tags = self.hidden2tag(ht[-2:].transpose(0, 1).contiguous().view(batch_size, -1))
scores = F.log_softmax(tags)
return scores
elif self.config.relation_prediction_mode.upper() == "CNN":
x = x.transpose(0, 1).contiguous().unsqueeze(1) # (batch, channel_input, sent_len, embed_dim)
x = [F.relu(self.conv1(x)).squeeze(3), F.relu(self.conv2(x)).squeeze(3), F.relu(self.conv3(x)).squeeze(3)]
# (batch, channel_output, ~=sent_len) * Ks
x = [F.max_pool1d(i, i.size(2)).squeeze(2) for i in x] # max-over-time pooling
# (batch, channel_output) * Ks
x = torch.cat(x, 1) # (batch, channel_output * Ks)
x = self.dropout(x)
logit = self.fc1(x) # (batch, target_size)
scores = F.log_softmax(logit)
return scores
else:
print("Unknown Mode")
exit(1)