def build_encoder(self):
"""Inference Network. q(h|X)"""
with tf.variable_scope("encoder"):
self.l1_lin = linear(tf.expand_dims(self.x, 0), self.embed_dim, bias=True, scope="l1")
self.l1 = tf.nn.relu(self.l1_lin)
self.l2_lin = linear(self.l1, self.embed_dim, bias=True, scope="l2")
self.l2 = tf.nn.relu(self.l2_lin)
self.mu = linear(self.l2, self.h_dim, bias=True, scope="mu")
self.log_sigma_sq = linear(self.l2, self.h_dim, bias=True, scope="log_sigma_sq")
self.eps = tf.random_normal((1, self.h_dim), 0, 1, dtype=tf.float32)
self.sigma = tf.sqrt(tf.exp(self.log_sigma_sq))
self.h = tf.add(self.mu, tf.mul(self.sigma, self.eps))
_ = tf.histogram_summary("mu", self.mu)
_ = tf.histogram_summary("sigma", self.sigma)
_ = tf.histogram_summary("h", self.h)
_ = tf.histogram_summary("mu + sigma", self.mu + self.sigma)
python类_linear()的实例源码
def linear(args, output_size, bias, bias_start=0.0, scope=None, squeeze=False, wd=0.0, input_keep_prob=1.0,
is_train=None):
if args is None or (nest.is_sequence(args) and not args):
raise ValueError("`args` must be specified")
if not nest.is_sequence(args):
args = [args]
flat_args = [flatten(arg, 1) for arg in args]
if input_keep_prob < 1.0:
assert is_train is not None
flat_args = [tf.cond(is_train, lambda: tf.nn.dropout(arg, input_keep_prob), lambda: arg)
for arg in flat_args]
flat_out = _linear(flat_args, output_size, bias, bias_start=bias_start, scope=scope)
out = reconstruct(flat_out, args[0], 1)
if squeeze:
out = tf.squeeze(out, [len(args[0].get_shape().as_list())-1])
if wd:
add_wd(wd)
return out
def linear(args, output_size, bias, bias_start=0.0, scope=None, squeeze=False, wd=0.0, input_keep_prob=1.0,
is_train=None):
if args is None or (nest.is_sequence(args) and not args):
raise ValueError("`args` must be specified")
if not nest.is_sequence(args):
args = [args]
flat_args = [flatten(arg, 1) for arg in args]
if input_keep_prob < 1.0:
assert is_train is not None
flat_args = [tf.cond(is_train, lambda: tf.nn.dropout(arg, input_keep_prob), lambda: arg)
for arg in flat_args]
flat_out = _linear(flat_args, output_size, bias, bias_start=bias_start, scope=scope)
out = reconstruct(flat_out, args[0], 1)
if squeeze:
out = tf.squeeze(out, [len(args[0].get_shape().as_list())-1])
if wd:
add_wd(wd)
return out
def _attention(self, query, attn_states):
conv2d = nn_ops.conv2d
reduce_sum = math_ops.reduce_sum
softmax = nn_ops.softmax
tanh = math_ops.tanh
with vs.variable_scope("Attention"):
k = vs.get_variable("AttnW", [1, 1, self._attn_size, self._attn_vec_size])
v = vs.get_variable("AttnV", [self._attn_vec_size])
hidden = array_ops.reshape(attn_states,
[-1, self._attn_length, 1, self._attn_size])
hidden_features = conv2d(hidden, k, [1, 1, 1, 1], "SAME")
y = _linear(query, self._attn_vec_size, True)
y = array_ops.reshape(y, [-1, 1, 1, self._attn_vec_size])
s = reduce_sum(v * tanh(hidden_features + y), [2, 3])
a = softmax(s)
d = reduce_sum(
array_ops.reshape(a, [-1, self._attn_length, 1, 1]) * hidden, [1, 2])
new_attns = array_ops.reshape(d, [-1, self._attn_size])
new_attn_states = array_ops.slice(attn_states, [0, 1, 0], [-1, -1, -1])
return new_attns, new_attn_states
def linear(args, output_size, bias, bias_start=0.0, scope=None, squeeze=False, wd=0.0, input_keep_prob=1.0,
is_train=None):
if args is None or (nest.is_sequence(args) and not args):
raise ValueError("`args` must be specified")
if not nest.is_sequence(args):
args = [args]
flat_args = [flatten(arg, 1) for arg in args]
if input_keep_prob < 1.0:
assert is_train is not None
flat_args = [tf.cond(is_train, lambda: tf.nn.dropout(arg, input_keep_prob), lambda: arg)
for arg in flat_args]
flat_out = _linear(flat_args, output_size, bias, bias_start=bias_start, scope=scope)
out = reconstruct(flat_out, args[0], 1)
if squeeze:
out = tf.squeeze(out, [len(args[0].get_shape().as_list())-1])
if wd:
add_wd(wd)
return out
def build_encoder(self):
"""Inference Network. q(h|X)"""
with tf.variable_scope("encoder"):
self.l1_lin = linear(tf.expand_dims(self.x, 0), self.embed_dim, bias=True, scope="l1")
self.l1 = tf.nn.relu(self.l1_lin)
self.l2_lin = linear(self.l1, self.embed_dim, bias=True, scope="l2")
self.l2 = tf.nn.relu(self.l2_lin)
self.mu = linear(self.l2, self.h_dim, bias=True, scope="mu")
self.log_sigma_sq = linear(self.l2, self.h_dim, bias=True, scope="log_sigma_sq")
self.eps = tf.random_normal((1, self.h_dim), 0, 1, dtype=tf.float32)
self.sigma = tf.sqrt(tf.exp(self.log_sigma_sq))
self.h = tf.add(self.mu, tf.mul(self.sigma, self.eps))
_ = tf.histogram_summary("mu", self.mu)
_ = tf.histogram_summary("sigma", self.sigma)
_ = tf.histogram_summary("h", self.h)
_ = tf.histogram_summary("mu + sigma", self.mu + self.sigma)
def __call__(self, inputs, state, scope=None):
"""Variational recurrent neural network cell (VRNN)."""
with tf.variable_scope(scope or type(self).__name__):
# Update the hidden state.
z_t, z_mean_t, z_log_sigma_sq_t = state
h_t_1 = self._activation(_linear(
[inputs, z_t, z_mean_t, z_log_sigma_sq_t],
2 * self._num_units,
True))
z_mean_t_1, z_log_sigma_sq_t_1 = tf.split(1, 2, h_t_1)
# Sample.
eps = tf.random_normal((tf.shape(inputs)[0], self._num_units), 0.0, 1.0,
dtype=tf.float32)
z_t_1 = tf.add(z_mean_t_1, tf.mul(tf.sqrt(tf.exp(z_log_sigma_sq_t_1)),
eps))
return z_t_1, VRNNStateTuple(z_t_1, z_mean_t_1, z_log_sigma_sq_t_1)
def __call__(self, inputs, state, scope=None):
"""Long short-term memory cell with attention (LSTMA)."""
with vs.variable_scope(scope or type(self).__name__):
if self._state_is_tuple:
state, attns, attn_states = state
else:
states = state
state = array_ops.slice(states, [0, 0], [-1, self._cell.state_size])
attns = array_ops.slice(
states, [0, self._cell.state_size], [-1, self._attn_size])
attn_states = array_ops.slice(
states, [0, self._cell.state_size + self._attn_size],
[-1, self._attn_size * self._attn_length])
attn_states = array_ops.reshape(attn_states,
[-1, self._attn_length, self._attn_size])
input_size = self._input_size
if input_size is None:
input_size = inputs.get_shape().as_list()[1]
inputs = _linear([inputs, attns], input_size, True)
lstm_output, new_state = self._cell(inputs, state)
if self._state_is_tuple:
new_state_cat = array_ops.concat(1, nest.flatten(new_state))
else:
new_state_cat = new_state
new_attns, new_attn_states = self._attention(new_state_cat, attn_states)
with vs.variable_scope("AttnOutputProjection"):
output = _linear([lstm_output, new_attns], self._attn_size, True)
new_attn_states = array_ops.concat(1, [new_attn_states,
array_ops.expand_dims(output, 1)])
new_attn_states = array_ops.reshape(
new_attn_states, [-1, self._attn_length * self._attn_size])
new_state = (new_state, new_attns, new_attn_states)
if not self._state_is_tuple:
new_state = array_ops.concat(1, list(new_state))
return output, new_state
def _linear(self, args, scope="linear"):
out_size = 4 * self._num_units
proj_size = args.get_shape()[-1]
with vs.variable_scope(scope) as scope:
weights = vs.get_variable("weights", [proj_size, out_size])
out = math_ops.matmul(args, weights)
if not self._layer_norm:
bias = vs.get_variable("b", [out_size])
out += bias
return out
def __call__(self, inputs, state, scope=None):
"""LSTM cell with layer normalization and recurrent dropout."""
with vs.variable_scope(scope or type(self).__name__) as scope: # LayerNormBasicLSTMCell # pylint: disable=unused-variables
c, h = state
args = array_ops.concat(1, [inputs, h])
concat = self._linear(args)
i, j, f, o = array_ops.split(1, 4, concat)
if self._layer_norm:
i = self._norm(i, "input")
j = self._norm(j, "transform")
f = self._norm(f, "forget")
o = self._norm(o, "output")
g = self._activation(j)
if (not isinstance(self._keep_prob, float)) or self._keep_prob < 1:
g = nn_ops.dropout(g, self._keep_prob, seed=self._seed)
new_c = (c * math_ops.sigmoid(f + self._forget_bias)
+ math_ops.sigmoid(i) * g)
if self._layer_norm:
new_c = self._norm(new_c, "state")
new_h = self._activation(new_c) * math_ops.sigmoid(o)
new_state = rnn_cell.LSTMStateTuple(new_c, new_h)
return new_h, new_state
def __call__(self, inputs, state, scope=None):
"""Long short-term memory cell with attention (LSTMA)."""
with vs.variable_scope(scope or type(self).__name__):
if self._state_is_tuple:
state, attns, attn_states = state
else:
states = state
state = array_ops.slice(states, [0, 0], [-1, self._cell.state_size])
attns = array_ops.slice(
states, [0, self._cell.state_size], [-1, self._attn_size])
attn_states = array_ops.slice(
states, [0, self._cell.state_size + self._attn_size],
[-1, self._attn_size * self._attn_length])
attn_states = array_ops.reshape(attn_states,
[-1, self._attn_length, self._attn_size])
input_size = self._input_size
if input_size is None:
input_size = inputs.get_shape().as_list()[1]
inputs = _linear([inputs, attns], input_size, True)
lstm_output, new_state = self._cell(inputs, state)
if self._state_is_tuple:
new_state_cat = array_ops.concat(1, nest.flatten(new_state))
else:
new_state_cat = new_state
new_attns, new_attn_states = self._attention(new_state_cat, attn_states)
with vs.variable_scope("AttnOutputProjection"):
output = _linear([lstm_output, new_attns], self._attn_size, True)
new_attn_states = array_ops.concat(1, [new_attn_states,
array_ops.expand_dims(output, 1)])
new_attn_states = array_ops.reshape(
new_attn_states, [-1, self._attn_length * self._attn_size])
new_state = (new_state, new_attns, new_attn_states)
if not self._state_is_tuple:
new_state = array_ops.concat(1, list(new_state))
return output, new_state
def _linear(self, args, scope="linear"):
out_size = 4 * self._num_units
proj_size = args.get_shape()[-1]
with vs.variable_scope(scope) as scope:
weights = vs.get_variable("weights", [proj_size, out_size])
out = math_ops.matmul(args, weights)
if not self._layer_norm:
bias = vs.get_variable("b", [out_size])
out += bias
return out
def __call__(self, inputs, state, scope=None):
"""LSTM cell with layer normalization and recurrent dropout."""
with vs.variable_scope(scope or type(self).__name__) as scope: # LayerNormBasicLSTMCell # pylint: disable=unused-variables
c, h = state
args = array_ops.concat(1, [inputs, h])
concat = self._linear(args)
i, j, f, o = array_ops.split(1, 4, concat)
if self._layer_norm:
i = self._norm(i, "input")
j = self._norm(j, "transform")
f = self._norm(f, "forget")
o = self._norm(o, "output")
g = self._activation(j)
if (not isinstance(self._keep_prob, float)) or self._keep_prob < 1:
g = nn_ops.dropout(g, self._keep_prob, seed=self._seed)
new_c = (c * math_ops.sigmoid(f + self._forget_bias)
+ math_ops.sigmoid(i) * g)
if self._layer_norm:
new_c = self._norm(new_c, "state")
new_h = self._activation(new_c) * math_ops.sigmoid(o)
new_state = rnn_cell.LSTMStateTuple(new_c, new_h)
return new_h, new_state
def hyper_norm(self, layer, dimensions, scope="hyper"):
with tf.variable_scope(scope):
zw = rnn_cell._linear(self.hyper_output,
self.hyper_embedding_size, False, scope=scope+ "z")
alpha = rnn_cell._linear(zw, dimensions, False, scope=scope+ "alpha")
result = tf.mul(alpha, layer)
return result
def cross_attention_rnn(config, cell,
inputs,
padding_mask,
xvector):
""" Input a list of tensors and get back the embedded vector for this list.
NOTE: the difference from this function to the above one is that this takes
vector from another source into consideration when calculating attention
weights. See Tan et al., 2015 "Lstm-based deep learning models for
non-factoid answer selection" for details.
"""
num_steps = len(inputs)
hidden_size = cell.output_size * 2
batch_size = inputs[0].get_shape()[0].value
embed_size = inputs[0].get_shape()[1].value
assert(cell.output_size == config.rnn_hidden_size)
assert(batch_size == config.batch_size)
assert(embed_size == config.word_embed_size)
with tf.variable_scope("attention_RNN"):
input_length = tf.reduce_sum(tf.pack(padding_mask, axis=1), 1)
# input_length = tf.Print(input_length, [padding_mask, input_length],
# message='input length', summarize=50)
outputs, state_fw, state_bw = \
tf.nn.bidirectional_rnn(cell, cell, inputs, dtype=config.data_type,
sequence_length=input_length)
# RESHAPE THE OUTPUTS, JUST IN CASE NONE DIM
shaped_outputs = [tf.reshape(o, [batch_size, hidden_size]) for o in outputs]
outputs = shaped_outputs
outputs_for_attention = [tf.concat(1, [o, xvector]) # [batch_size, 2*hidden_size]
for o in outputs]
# OVERALL SEQUENCE REPRESENTAION
hidden_outputs = []
attention_weights = []
outputs_concat = tf.pack(outputs, axis=1) # [batch_size, num_step, hidden_size]
with tf.variable_scope("attention_computation"):
context_vector = tf.get_variable("context_vector", [2*hidden_size, 1])
# Calculate attention
attention_weights = []
for i in xrange(len(outputs)):
if i > 0: tf.get_variable_scope().reuse_variables()
hidden_output = tf.tanh(rnn_cell._linear(outputs_for_attention[i],
2*hidden_size,
True # If add bias
))
hidden_outputs.append(hidden_output)
attention_weights.append(tf.matmul(hidden_output, context_vector)) # [batch_size, 1]
attention_weights = tf.concat(1, attention_weights)
attention_weights = tf.nn.softmax(attention_weights) * \
tf.pack(padding_mask, axis=1) # [batch_size, num_steps]
attention_weights = tf.div(attention_weights,
1e-12 + tf.reduce_sum(attention_weights, 1, keep_dims=True))
# Attention weighted sum
weighted_sum = tf.reduce_sum(outputs_concat * tf.expand_dims(attention_weights, 2),
1) # [batch_size, hidden_size]
return weighted_sum, outputs_concat, hidden_outputs, attention_weights
def __call__(self, inputs, state, scope=None):
"""Gated recurrent unit (GRU) with nunits cells."""
dim = self._num_units
with vs.variable_scope(scope or type(self).__name__): # "GRUCell"
with vs.variable_scope("Gates"): # Reset gate and update gate.
# We start with bias of 1.0 to not reset and not update.
with vs.variable_scope( "Layer_Parameters"):
s1 = vs.get_variable("s1", initializer=tf.ones([2*dim]), dtype=tf.float32)
s2 = vs.get_variable("s2", initializer=tf.ones([2*dim]), dtype=tf.float32)
s3 = vs.get_variable("s3", initializer=tf.ones([dim]), dtype=tf.float32)
s4 = vs.get_variable("s4", initializer=tf.ones([dim]), dtype=tf.float32)
b1 = vs.get_variable("b1", initializer=tf.zeros([2*dim]), dtype=tf.float32)
b2 = vs.get_variable("b2", initializer=tf.zeros([2*dim]), dtype=tf.float32)
b3 = vs.get_variable("b3", initializer=tf.zeros([dim]), dtype=tf.float32)
b4 = vs.get_variable("b4", initializer=tf.zeros([dim]), dtype=tf.float32)
# Code below initialized for all cells
# s1 = tf.Variable(tf.ones([2 * dim]), name="s1")
# s2 = tf.Variable(tf.ones([2 * dim]), name="s2")
# s3 = tf.Variable(tf.ones([dim]), name="s3")
# s4 = tf.Variable(tf.ones([dim]), name="s4")
# b1 = tf.Variable(tf.zeros([2 * dim]), name="b1")
# b2 = tf.Variable(tf.zeros([2 * dim]), name="b2")
# b3 = tf.Variable(tf.zeros([dim]), name="b3")
# b4 = tf.Variable(tf.zeros([dim]), name="b4")
input_below_ = rnn_cell._linear([inputs],
2 * self._num_units, False, scope="out_1")
input_below_ = ln(input_below_, s1, b1)
state_below_ = rnn_cell._linear([state],
2 * self._num_units, False, scope="out_2")
state_below_ = ln(state_below_, s2, b2)
out =tf.add(input_below_, state_below_)
r, u = array_ops.split(1, 2, out)
r, u = sigmoid(r), sigmoid(u)
with vs.variable_scope("Candidate"):
input_below_x = rnn_cell._linear([inputs],
self._num_units, False, scope="out_3")
input_below_x = ln(input_below_x, s3, b3)
state_below_x = rnn_cell._linear([state],
self._num_units, False, scope="out_4")
state_below_x = ln(state_below_x, s4, b4)
c_pre = tf.add(input_below_x,r * state_below_x)
c = self._activation(c_pre)
new_h = u * state + (1 - u) * c
return new_h, new_h
def __call__(self, inputs, state, scope=None):
"""Long short-term memory cell (LSTM)."""
with vs.variable_scope(scope or type(self).__name__): # "BasicLSTMCell"
# Parameters of gates are concatenated into one multiply for efficiency.
if self._state_is_tuple:
c, h = state
else:
c, h = array_ops.split(1, 2, state)
s1 = vs.get_variable("s1", initializer=tf.ones([4 * self._num_units]), dtype=tf.float32)
s2 = vs.get_variable("s2", initializer=tf.ones([4 * self._num_units]), dtype=tf.float32)
s3 = vs.get_variable("s3", initializer=tf.ones([self._num_units]), dtype=tf.float32)
b1 = vs.get_variable("b1", initializer=tf.zeros([4 * self._num_units]), dtype=tf.float32)
b2 = vs.get_variable("b2", initializer=tf.zeros([4 * self._num_units]), dtype=tf.float32)
b3 = vs.get_variable("b3", initializer=tf.zeros([self._num_units]), dtype=tf.float32)
# s1 = tf.Variable(tf.ones([4 * self._num_units]), name="s1")
# s2 = tf.Variable(tf.ones([4 * self._num_units]), name="s2")
# s3 = tf.Variable(tf.ones([self._num_units]), name="s3")
#
# b1 = tf.Variable(tf.zeros([4 * self._num_units]), name="b1")
# b2 = tf.Variable(tf.zeros([4 * self._num_units]), name="b2")
# b3 = tf.Variable(tf.zeros([self._num_units]), name="b3")
input_below_ = rnn_cell._linear([inputs],
4 * self._num_units, False, scope="out_1")
input_below_ = ln(input_below_, s1, b1)
state_below_ = rnn_cell._linear([h],
4 * self._num_units, False, scope="out_2")
state_below_ = ln(state_below_, s2, b2)
lstm_matrix = tf.add(input_below_, state_below_)
i, j, f, o = array_ops.split(1, 4, lstm_matrix)
new_c = (c * sigmoid(f) + sigmoid(i) *
self._activation(j))
# Currently normalizing c causes lot of nan's in the model, thus commenting it out for now.
# new_c_ = ln(new_c, s3, b3)
new_c_ = new_c
new_h = self._activation(new_c_) * sigmoid(o)
if self._state_is_tuple:
new_state = LSTMStateTuple(new_c, new_h)
else:
new_state = array_ops.concat(1, [new_c, new_h])
return new_h, new_state
def __call__(self, inputs, state, scope=None):
"""Long short-term memory cell (LSTM) with hypernetworks and layer normalization."""
with vs.variable_scope(scope or type(self).__name__):
# Parameters of gates are concatenated into one multiply for efficiency.
total_h, total_c = tf.split(1, 2, state)
h = total_h[:, 0:self._num_units]
c = total_c[:, 0:self._num_units]
self.hyper_state = tf.concat(1, [total_h[:, self._num_units:], total_c[:, self._num_units:]])
hyper_input = tf.concat(1, [inputs, h])
hyper_output, hyper_new_state = self.hyper_cell(hyper_input, self.hyper_state)
self.hyper_output = hyper_output
self.hyper_state = hyper_new_state
input_below_ = rnn_cell._linear([inputs],
4 * self._num_units, False, scope="out_1")
input_below_ = self.hyper_norm(input_below_, 4 * self._num_units, scope="hyper_x")
state_below_ = rnn_cell._linear([h],
4 * self._num_units, False, scope="out_2")
state_below_ = self.hyper_norm(state_below_, 4 * self._num_units, scope="hyper_h")
if self.is_layer_norm:
s1 = vs.get_variable("s1", initializer=tf.ones([4 * self._num_units]), dtype=tf.float32)
s2 = vs.get_variable("s2", initializer=tf.ones([4 * self._num_units]), dtype=tf.float32)
s3 = vs.get_variable("s3", initializer=tf.ones([self._num_units]), dtype=tf.float32)
b1 = vs.get_variable("b1", initializer=tf.zeros([4 * self._num_units]), dtype=tf.float32)
b2 = vs.get_variable("b2", initializer=tf.zeros([4 * self._num_units]), dtype=tf.float32)
b3 = vs.get_variable("b3", initializer=tf.zeros([self._num_units]), dtype=tf.float32)
input_below_ = ln(input_below_, s1, b1)
state_below_ = ln(state_below_, s2, b2)
lstm_matrix = tf.add(input_below_, state_below_)
i, j, f, o = array_ops.split(1, 4, lstm_matrix)
new_c = (c * sigmoid(f) + sigmoid(i) *
self._activation(j))
# Currently normalizing c causes lot of nan's in the model, thus commenting it out for now.
# new_c_ = ln(new_c, s3, b3)
new_c_ = new_c
new_h = self._activation(new_c_) * sigmoid(o)
hyper_h, hyper_c = tf.split(1, 2, hyper_new_state)
new_total_h = tf.concat(1, [new_h, hyper_h])
new_total_c = tf.concat(1, [new_c, hyper_c])
new_total_state = tf.concat(1, [new_total_h, new_total_c])
return new_h, new_total_state