def forward_lm(self, inpt, lang_h, ctx_h):
"""Run forward pass for language modeling."""
# embed words
inpt_emb = self.word_encoder(inpt)
# append the context embedding to every input word embedding
ctx_h_rep = ctx_h.narrow(0, ctx_h.size(0) - 1, 1).expand(
inpt.size(0), ctx_h.size(1), ctx_h.size(2))
inpt_emb = torch.cat([inpt_emb, ctx_h_rep], 2)
inpt_emb = self.dropout(inpt_emb)
out, _ = self.reader(inpt_emb, lang_h)
decoded = self.decoder(out.view(-1, out.size(2)))
# tie weights between word embedding/decoding
decoded = F.linear(decoded, self.word_encoder.weight)
return decoded.view(out.size(0), out.size(1), decoded.size(1)), out
python类linear()的实例源码
def KrauseLSTMCell(input, hidden, w_ih, w_hh, b_ih=None, b_hh=None):
# Terminology matchup:
# - This implementation uses the trick of having all gates concatenated
# together into a single tensor, so you can do one matrix multiply to
# compute all the gates.
# - Thus, w_ih holds W_hx, W_ix, W_ox, W_fx
# and w_hh holds W_hh, W_ih, W_oh, W_fh
# - Notice that the indices are swapped, because F.linear has swapped
# arguments. "Cancelling" indices are always next to each other.
hx, cx = hidden
gates = F.linear(input, w_ih, b_ih) + F.linear(hx, w_hh, b_hh)
ingate, forgetgate, hiddengate, outgate = gates.chunk(4, 1)
ingate = F.sigmoid(ingate)
outgate = F.sigmoid(outgate)
forgetgate = F.sigmoid(forgetgate)
cy = (forgetgate * cx) + (ingate * hiddengate)
hy = F.tanh(cy * outgate)
return hy, cy
def MultiplicativeLSTMCell(input, hidden, w_xm, w_hm, w_ih, w_mh, b_xm=None, b_hm=None, b_ih=None, b_mh=None):
# w_ih holds W_hx, W_ix, W_ox, W_fx
# w_mh holds W_hm, W_im, W_om, W_fm
hx, cx = hidden
# Key difference:
m = F.linear(input, w_xm, b_xm) * F.linear(hx, w_hm, b_hm)
gates = F.linear(input, w_ih, b_ih) + F.linear(m, w_mh, b_mh)
ingate, forgetgate, hiddengate, outgate = gates.chunk(4, 1)
ingate = F.sigmoid(ingate)
outgate = F.sigmoid(outgate)
forgetgate = F.sigmoid(forgetgate)
cy = (forgetgate * cx) + (ingate * hiddengate)
hy = F.tanh(cy * outgate)
return hy, cy
def SkipConnectFastGRUCell(input, hidden, hidden_skip, w_ih, w_hh, b_ih=None, b_hh=None, noise_in=None, noise_hidden=None):
if noise_in is not None:
input = input * noise_in
hx = torch.cat([hidden, hidden_skip], dim=1)
if noise_hidden is not None:
hx = hx * noise_hidden
if input.is_cuda:
gi = F.linear(input, w_ih)
gh = F.linear(hx, w_hh)
state = fusedBackend.GRUFused()
return state(gi, gh, hidden) if b_ih is None else state(gi, gh, hidden, b_ih, b_hh)
gi = F.linear(input, w_ih, b_ih)
gh = F.linear(hx, w_hh, b_hh)
i_r, i_i, i_n = gi.chunk(3, 1)
h_r, h_i, h_n = gh.chunk(3, 1)
resetgate = F.sigmoid(i_r + h_r)
inputgate = F.sigmoid(i_i + h_i)
newgate = F.tanh(i_n + resetgate * h_n)
hy = newgate + inputgate * (hidden - newgate)
return hy
def VarFastGRUCell(input, hidden, w_ih, w_hh, b_ih=None, b_hh=None, noise_in=None, noise_hidden=None):
if noise_in is not None:
input = input * noise_in
hx = hidden if noise_hidden is None else hidden * noise_hidden
if input.is_cuda:
gi = F.linear(input, w_ih)
gh = F.linear(hx, w_hh)
state = fusedBackend.GRUFused()
return state(gi, gh, hidden) if b_ih is None else state(gi, gh, hidden, b_ih, b_hh)
gi = F.linear(input, w_ih, b_ih)
gh = F.linear(hx, w_hh, b_hh)
i_r, i_i, i_n = gi.chunk(3, 1)
h_r, h_i, h_n = gh.chunk(3, 1)
resetgate = F.sigmoid(i_r + h_r)
inputgate = F.sigmoid(i_i + h_i)
newgate = F.tanh(i_n + resetgate * h_n)
hy = newgate + inputgate * (hidden - newgate)
return hy
def test_reuse_function(self):
@torch.jit.compile(nderivs=0)
def clinear(*args):
return F.linear(*args)
def cast(x):
return x
input = Variable(cast(torch.randn(1, 1)))
weights = Variable(cast(torch.randn(1, 1)))
bias = Variable(cast(torch.randn(1, 1)))
# linear AKA addmm without bias is of particular interest
# because we allocate a zero-filled new variable when we execute,
# and then *fill* it with the result
r1_ = clinear(input, weights)
with self.assertCompiled(clinear):
r1 = clinear(r1_, weights)
r2 = F.linear(F.linear(input, weights), weights)
self.assertEqual(r1, r2)
def forward(self, input, hx):
h, c = hx
pre = F.linear(input, self.weight_ih, self.bias) \
+ F.linear(h, self.weight_hh)
pre = sparsify_grad(pre, self.k, self.simplified)
if self.grad_clip:
pre = clip_grad(pre, -self.grad_clip, self.grad_clip)
i = F.sigmoid(pre[:, :self.hidden_size])
f = F.sigmoid(pre[:, self.hidden_size: self.hidden_size * 2])
g = F.tanh(pre[:, self.hidden_size * 2: self.hidden_size * 3])
o = F.sigmoid(pre[:, self.hidden_size * 3:])
c = f * c + i * g
h = o * F.tanh(c)
return h, c
def forward(self, input, h):
ih = F.linear(input, self.weight_ih, self.bias)
hh_rz = F.linear(h, self.weight_hh_rz)
if self.grad_clip:
ih = clip_grad(ih, -self.grad_clip, self.grad_clip)
hh_rz = clip_grad(hh_rz, -self.grad_clip, self.grad_clip)
r = F.sigmoid(ih[:, :self.hidden_size] + hh_rz[:, :self.hidden_size])
i = F.sigmoid(ih[:, self.hidden_size: self.hidden_size * 2] + hh_rz[:, self.hidden_size:])
hhr = F.linear(h * r, self.weight_hh)
if self.grad_clip:
hhr = clip_grad(hhr, -self.grad_clip, self.grad_clip)
n = F.relu(ih[:, self.hidden_size * 2:] + hhr)
h = (1 - i) * n + i * h
return h
def forward(self, input, hx):
h, c = hx
pre = F.linear(input, self.weight_ih, self.bias) \
+ F.linear(h, self.weight_hh)
if self.grad_clip:
pre = clip_grad(pre, -self.grad_clip, self.grad_clip)
i = F.sigmoid(pre[:, :self.hidden_size])
f = F.sigmoid(pre[:, self.hidden_size: self.hidden_size * 2])
g = F.tanh(pre[:, self.hidden_size * 2: self.hidden_size * 3])
o = F.sigmoid(pre[:, self.hidden_size * 3:])
c = f * c + i * g
h = o * F.tanh(c)
h = F.linear(h, self.weight_rec)
return h, c
def f(params, inputs, mode):
o = inputs.view(inputs.size(0), 1, 28, 28)
o = F.conv2d(o, params['conv0.weight'], params['conv0.bias'], stride=2)
o = F.relu(o)
o = F.conv2d(o, params['conv1.weight'], params['conv1.bias'], stride=2)
o = F.relu(o)
o = o.view(o.size(0), -1)
o = F.linear(o, params['linear2.weight'], params['linear2.bias'])
o = F.relu(o)
o = F.linear(o, params['linear3.weight'], params['linear3.bias'])
return o
def f(params, inputs, mode):
o = inputs.view(inputs.size(0), 1, 28, 28)
o = F.conv2d(o, params['conv0.weight'], params['conv0.bias'], stride=2)
o = F.relu(o)
o = F.conv2d(o, params['conv1.weight'], params['conv1.bias'], stride=2)
o = F.relu(o)
o = o.view(o.size(0), -1)
o = F.linear(o, params['linear2.weight'], params['linear2.bias'])
o = F.relu(o)
o = F.linear(o, params['linear3.weight'], params['linear3.bias'])
return o
def forward(self, input, hidden):
hx, cx = hidden
gates = F.linear(input, self.w_ih, self.b_ih) + F.linear(hx, self.w_hh, self.b_hh) # [bsz, 4*hidden_size]
in_gate, forget_gate, cell_gate, out_gate = gates.chunk(4, 1)
in_gate, forget_gate, out_gate = map(F.sigmoid, [in_gate, forget_gate, out_gate])
cell_gate = F.tanh(cell_gate)
cy = forget_gate*cx + in_gate*cell_gate
hy = out_gate*F.tanh(cy)
return hy, cy
def __init__(self, num_features, padding_idx=0, rnn_class='lstm',
emb_size=128, hidden_size=128, num_layers=2, dropout=0.1,
bidir_input=False, share_output=True,
attn_type='none', attn_length=-1):
super().__init__()
if padding_idx != 0:
raise RuntimeError('This module\'s output layer needs to be fixed '
'if you want a padding_idx other than zero.')
self.dropout = dropout
self.layers = num_layers
self.hsz = hidden_size
self.lt = nn.Embedding(num_features, emb_size, padding_idx=padding_idx)
self.rnn = rnn_class(emb_size, hidden_size, num_layers,
dropout=dropout, batch_first=True)
# rnn output to embedding
self.o2e = nn.Linear(hidden_size, emb_size)
# embedding to scores, use custom linear to possibly share weights
shared_weight = self.lt.weight if share_output else None
self.e2s = Linear(emb_size, num_features, bias=False,
shared_weight=shared_weight)
self.shared = shared_weight is not None
self.attn_type = attn_type
self.attention = AttentionLayer(attn_type=attn_type,
hidden_size=hidden_size,
emb_size=emb_size,
bidirectional=bidir_input,
attn_length=attn_length)
def forward(self, input):
# detach weight to prevent gradients from changing weight when shared
weight = self.weight
if self.shared:
weight = weight.detach()
return F.linear(input, weight, self.bias)
def incremental_forward(self, input):
"""Forward convolution one time step at a time.
This function maintains an internal state to buffer signal and accepts
a single frame as input. If the input order changes between time steps,
call reorder_incremental_state. To apply to fresh inputs, call
clear_incremental_state.
"""
# reshape weight
weight = self._get_linearized_weight()
kw = self.kernel_size[0]
bsz = input.size(0) # input: bsz x len x dim
if kw > 1:
input = input.data
if self.input_buffer is None:
self.input_buffer = input.new(bsz, kw, input.size(2))
self.input_buffer.zero_()
else:
# shift buffer
self.input_buffer[:, :-1, :] = self.input_buffer[:, 1:, :].clone()
# append next input
self.input_buffer[:, -1, :] = input[:, -1, :]
input = torch.autograd.Variable(self.input_buffer, volatile=True)
output = F.linear(input.view(bsz, -1), weight, self.bias)
return output.view(bsz, 1, -1)
def forward(self, _input):
"""
the forward method that does the masked linear computation and returns the result
"""
masked_weight = self.weight * torch.autograd.Variable(self.mask)
return F.linear(_input, masked_weight, self.bias)
def incremental_forward(self, input):
"""Forward convolution one time step at a time.
This function maintains an internal state to buffer signal and accepts
a single frame as input. If the input order changes between time steps,
call reorder_incremental_state. To apply to fresh inputs, call
clear_incremental_state.
"""
# reshape weight
weight = self._get_linearized_weight()
kw = self.kernel_size[0]
bsz = input.size(0) # input: bsz x len x dim
if kw > 1:
input = input.data
if self.input_buffer is None:
self.input_buffer = input.new(bsz, kw, input.size(2))
self.input_buffer.zero_()
else:
# shift buffer
self.input_buffer[:, :-1, :] = self.input_buffer[:, 1:, :].clone()
# append next input
self.input_buffer[:, -1, :] = input[:, -1, :]
input = torch.autograd.Variable(self.input_buffer, volatile=True)
output = F.linear(input.view(bsz, -1), weight, self.bias)
return output.view(bsz, 1, -1)
def score_sent(self, sent, lang_h, ctx_h, temperature):
"""Computes likelihood of a given sentence."""
score = 0
# remove batch dimension from the language and context hidden states
lang_h = lang_h.squeeze(1)
ctx_h = ctx_h.squeeze(1)
inpt = Variable(torch.LongTensor(1))
inpt.data.fill_(self.word_dict.get_idx('YOU:'))
inpt = self.to_device(inpt)
lang_hs = []
for word in sent:
# add the context to the word embedding
inpt_emb = torch.cat([self.word_encoder(inpt), ctx_h], 1)
# update RNN state with last word
lang_h = self.writer(inpt_emb, lang_h)
lang_hs.append(lang_h)
# decode words using the inverse of the word embedding matrix
out = self.decoder(lang_h)
scores = F.linear(out, self.word_encoder.weight).div(temperature)
# subtract constant to avoid overflows in exponentiation
scores = scores.add(-scores.max().data[0]).squeeze(0)
mask = Variable(self.special_token_mask)
scores = scores.add(mask)
logprob = F.log_softmax(scores)
score += logprob[word[0]].data[0]
inpt = Variable(word)
# update the hidden state with the <eos> token
inpt_emb = torch.cat([self.word_encoder(inpt), ctx_h], 1)
lang_h = self.writer(inpt_emb, lang_h)
lang_hs.append(lang_h)
# add batch dimension back
lang_h = lang_h.unsqueeze(1)
return score, lang_h, torch.cat(lang_hs)
def forward(self, input):
torch.randn(self.epsilon_weight.size(), out=self.epsilon_weight)
bias = self.bias
if bias is not None:
torch.randn(self.epsilon_bias.size(), out=self.epsilon_bias)
bias = bias + self.sigma_bias * Variable(self.epsilon_bias)
return F.linear(input, self.weight + self.sigma_weight * Variable(self.epsilon_weight), bias)
def forward(self, input):
torch.randn(self.epsilon_input.size(), out=self.epsilon_input)
torch.randn(self.epsilon_output.size(), out=self.epsilon_output)
func = lambda x: torch.sign(x) * torch.sqrt(torch.abs(x))
eps_in = func(self.epsilon_input)
eps_out = func(self.epsilon_output)
bias = self.bias
if bias is not None:
bias = bias + self.sigma_bias * Variable(eps_out.t())
noise_v = Variable(torch.mul(eps_in, eps_out))
return F.linear(input, self.weight + self.sigma_weight * noise_v, bias)
def forward(self, input, sigma=None):
res = F.linear(input, self.weight, self.bias)
if sigma is None:
return res
if self.rand_buf is None or self.rand_buf.size() != res.size():
self.rand_buf = torch.FloatTensor(res.size())
if input.is_cuda:
self.rand_buf = self.rand_buf.cuda()
torch.randn(self.rand_buf.size(), out=self.rand_buf)
# print(m.size(), res.size())
return res + torch.mul(sigma, Variable(self.rand_buf))
def test_lstm_fusion(self):
input = Variable(torch.randn(3, 10).cuda())
hx = Variable(torch.randn(3, 20).cuda())
cx = Variable(torch.randn(3, 20).cuda())
module = nn.LSTMCell(10, 20).cuda() # Just to allocate weights with correct sizes
def LSTMCell(input, hidden, w_ih, w_hh, b_ih=None, b_hh=None):
hx, cx = hidden
gates = F.linear(input, w_ih, b_ih) + F.linear(hx, w_hh, b_hh)
ingate, forgetgate, cellgate, outgate = gates.chunk(4, 1)
ingate = F.sigmoid(ingate)
forgetgate = F.sigmoid(forgetgate)
cellgate = F.tanh(cellgate)
outgate = F.sigmoid(outgate)
cy = (forgetgate * cx) + (ingate * cellgate)
hy = outgate * F.tanh(cy)
return hy, cy
trace, _ = torch.jit.trace(LSTMCell, (input, (hx, cx)) + tuple(module.parameters()))
torch._C._jit_pass_lint(trace)
torch._C._jit_pass_onnx(trace)
torch._C._jit_pass_lint(trace)
torch._C._jit_pass_fuse(trace)
torch._C._jit_pass_lint(trace)
self.assertExpected(str(trace))
def forward(self, input):
if isinstance(input, Variable):
return F.linear(input, self.weight, self.bias)
elif isinstance(input, tuple) or isinstance(input, list):
return my_data_parallel(self, input)
else:
raise RuntimeError('unknown input type')
def forward(self, input):
return self.norm_scale_bias(F.linear(input, self.weight))
def forward(self, input_left, input_right):
'''
Args:
input_left: Tensor
the left input tensor with shape = [batch1, batch2, ..., left_features]
input_right: Tensor
the right input tensor with shape = [batch1, batch2, ..., right_features]
Returns:
'''
left_size = input_left.size()
right_size = input_right.size()
assert left_size[:-1] == right_size[:-1], \
"batch size of left and right inputs mis-match: (%s, %s)" % (left_size[:-1], right_size[:-1])
batch = int(np.prod(left_size[:-1]))
# convert left and right input to matrices [batch, left_features], [batch, right_features]
input_left = input_left.view(batch, self.left_features)
input_right = input_right.view(batch, self.right_features)
# output [batch, out_features]
output = F.bilinear(input_left, input_right, self.U, self.bias)
output = output + F.linear(input_left, self.W_l, None) + F.linear(input_right, self.W_r, None)
# convert back to [batch1, batch2, ..., out_features]
return output.view(left_size[:-1] + (self.out_features, ))
def SkipConnectRNNReLUCell(input, hidden, hidden_skip, w_ih, w_hh, b_ih=None, b_hh=None, noise_in=None, noise_hidden=None, noise_skip=None):
if noise_in is not None:
input = input * noise_in
hidden = torch.cat([hidden, hidden_skip], dim=1)
if noise_hidden is not None:
hidden = hidden * noise_hidden
hy = F.relu(F.linear(input, w_ih, b_ih) + F.linear(hidden, w_hh, b_hh))
return hy
def SkipConnectRNNTanhCell(input, hidden, hidden_skip, w_ih, w_hh, b_ih=None, b_hh=None, noise_in=None, noise_hidden=None):
if noise_in is not None:
input = input * noise_in
hidden = torch.cat([hidden, hidden_skip], dim=1)
if noise_hidden is not None:
hidden = hidden * noise_hidden
hy = F.tanh(F.linear(input, w_ih, b_ih) + F.linear(hidden, w_hh, b_hh))
return hy
def SkipConnectFastLSTMCell(input, hidden, hidden_skip, w_ih, w_hh, b_ih=None, b_hh=None, noise_in=None, noise_hidden=None):
if noise_in is not None:
input = input * noise_in
hx, cx = hidden
hx = torch.cat([hx, hidden_skip], dim=1)
if noise_hidden is not None:
hx = hx * noise_hidden
if input.is_cuda:
igates = F.linear(input, w_ih)
hgates = F.linear(hx, w_hh)
state = fusedBackend.LSTMFused()
return state(igates, hgates, cx) if b_ih is None else state(igates, hgates, cx, b_ih, b_hh)
gates = F.linear(input, w_ih, b_ih) + F.linear(hx, w_hh, b_hh)
ingate, forgetgate, cellgate, outgate = gates.chunk(4, 1)
ingate = F.sigmoid(ingate)
forgetgate = F.sigmoid(forgetgate)
cellgate = F.tanh(cellgate)
outgate = F.sigmoid(outgate)
cy = (forgetgate * cx) + (ingate * cellgate)
hy = outgate * F.tanh(cy)
return hy, cy
def VarRNNTanhCell(input, hidden, w_ih, w_hh, b_ih=None, b_hh=None, noise_in=None, noise_hidden=None):
if noise_in is not None:
input = input * noise_in
if noise_hidden is not None:
hidden = hidden * noise_hidden
hy = F.tanh(F.linear(input, w_ih, b_ih) + F.linear(hidden, w_hh, b_hh))
return hy
def VarFastLSTMCell(input, hidden, w_ih, w_hh, b_ih=None, b_hh=None, noise_in=None, noise_hidden=None):
if noise_in is not None:
input = input * noise_in
if input.is_cuda:
igates = F.linear(input, w_ih)
hgates = F.linear(hidden[0], w_hh) if noise_hidden is None else F.linear(hidden[0] * noise_hidden, w_hh)
state = fusedBackend.LSTMFused()
return state(igates, hgates, hidden[1]) if b_ih is None else state(igates, hgates, hidden[1], b_ih, b_hh)
hx, cx = hidden
if noise_hidden is not None:
hx = hx * noise_hidden
gates = F.linear(input, w_ih, b_ih) + F.linear(hx, w_hh, b_hh)
ingate, forgetgate, cellgate, outgate = gates.chunk(4, 1)
ingate = F.sigmoid(ingate)
forgetgate = F.sigmoid(forgetgate)
cellgate = F.tanh(cellgate)
outgate = F.sigmoid(outgate)
cy = (forgetgate * cx) + (ingate * cellgate)
hy = outgate * F.tanh(cy)
return hy, cy