def __call__(self, inputs, state, scope=None):
"""Run one step of SRU."""
with tf.variable_scope(scope or type(self).__name__): # "SRUCell"
with tf.variable_scope("x_hat"):
x = linear([inputs], self._num_units, False)
with tf.variable_scope("gates"):
concat = tf.sigmoid(linear([inputs], 2 * self._num_units, True))
f, r = tf.split(concat, 2, axis = 1)
with tf.variable_scope("candidates"):
c = self._activation(f * state + (1 - f) * x)
# variational dropout as suggested in the paper (disabled)
# if self._is_training and Params.dropout is not None:
# c = tf.nn.dropout(c, keep_prob = 1 - Params.dropout)
# highway connection
# Our implementation is slightly different to the paper
# https://arxiv.org/abs/1709.02755 in a way that highway network
# uses x_hat instead of the cell inputs. Check equation (7) from the original
# paper for SRU.
h = r * c + (1 - r) * x
return h, c
python类variable_scope()的实例源码
def call(self, inputs, state):
"""Gated recurrent unit (GRU) with nunits cells."""
with vs.variable_scope("gates"): # Reset gate and update gate.
# We start with bias of 1.0 to not reset and not update.
bias_ones = self._bias_initializer
if self._bias_initializer is None:
dtype = [a.dtype for a in [inputs, state]][0]
bias_ones = init_ops.constant_initializer(1.0, dtype=dtype)
value = math_ops.sigmoid(
linear([inputs, state], 2 * self._num_units, True, bias_ones,
self._kernel_initializer))
r, u = array_ops.split(value=value, num_or_size_splits=2, axis=1)
with vs.variable_scope("candidate"):
c = self._activation(
linear([inputs, r * state], self._num_units, True,
self._bias_initializer, self._kernel_initializer))
# recurrent dropout as proposed in https://arxiv.org/pdf/1603.05118.pdf (currently disabled)
#if self._is_training and Params.dropout is not None:
#c = tf.nn.dropout(c, 1 - Params.dropout)
new_h = u * state + (1 - u) * c
return new_h, new_h
def basic_rnn_seq2seq(
encoder_inputs, decoder_inputs, cell, dtype=dtypes.float32, scope=None):
"""Basic RNN sequence-to-sequence model.
This model first runs an RNN to encode encoder_inputs into a state vector,
then runs decoder, initialized with the last encoder state, on decoder_inputs.
Encoder and decoder use the same RNN cell type, but don't share parameters.
Args:
encoder_inputs: A list of 2D Tensors [batch_size x input_size].
decoder_inputs: A list of 2D Tensors [batch_size x input_size].
cell: rnn_cell.RNNCell defining the cell function and size.
dtype: The dtype of the initial state of the RNN cell (default: tf.float32).
scope: VariableScope for the created subgraph; default: "basic_rnn_seq2seq".
Returns:
A tuple of the form (outputs, state), where:
outputs: A list of the same length as decoder_inputs of 2D Tensors with
shape [batch_size x output_size] containing the generated outputs.
state: The state of each decoder cell in the final time-step.
It is a 2D Tensor of shape [batch_size x cell.state_size].
"""
with variable_scope.variable_scope(scope or "basic_rnn_seq2seq"):
_, enc_state = rnn.rnn(cell, encoder_inputs, dtype=dtype)
return rnn_decoder(decoder_inputs, enc_state, cell)
def __call__(self, inputs, state, scope=None):
"""Most basic RNN: output = new_state = activation(W * input + U * state + B)."""
with vs.variable_scope(scope or type(self).__name__): # "BasicRNNCell"
state_out = linearTransformIdentityInit(state, self._num_units)
if self._bottom == True:
input_out = linearTransformWithBias([inputs], self._num_units, bias=False, scope=scope)
else:
input_out = linearTransformIdentityInit(inputs, self._num_units, scope=scope)
bias = vs.get_variable(
"input_bias", [self._num_units],
dtype=tf.float32,
initializer=init_ops.constant_initializer(dtype=tf.float32))
output = tf.abs(state_out + input_out + bias)
return output, output
def __call__(self, inputs, state, scope=None):
with vs.variable_scope(scope or type(self).__name__):
t_state = tf.transpose(state)
state_out = doRotations(t_state, self._rotations)
input_out = linearTransformWithBias([inputs],
self._num_units, bias=False, scope=scope)
state_out = tf.transpose(state_out)
bias = vs.get_variable(
"Bias", [self._num_units],
dtype=tf.float32,
initializer=init_ops.constant_initializer(dtype=tf.float32))
output = tf.nn.relu(state_out + input_out + bias)
return output, output
def __call__(self, inputs, state, scope=None):
with vs.variable_scope(scope or type(self).__name__):
t_state = tf.transpose(state)
state_out, sigma = doRotationsSigmas(t_state, self._rotations, self._num_units)
self._sigmas = [sigma]
input_out = linearTransformWithBias([inputs],
self._num_units, bias=False, scope=scope)
state_out = tf.transpose(state_out)
bias = vs.get_variable(
"Bias", [self._num_units],
dtype=tf.float32,
initializer=init_ops.constant_initializer(dtype=tf.float32))
output = tf.abs(state_out + input_out + bias)
return output, output
def __call__(self, inputs, state, scope=None):
with vs.variable_scope(scope or type(self).__name__):
t_state = tf.transpose(state)
state_out = doRotations(t_state, self._rotations)
input_out = linearTransformWithBias([inputs],
self._num_units, bias=False, scope=scope)
state_out = tf.transpose(state_out)
gate = linearTransformWithBias([inputs, state], self._num_units, True, scope='GateLinearTransfrom')
gate = tf.nn.sigmoid(gate, name='GateSigmoid')
bias = vs.get_variable(
"Bias", [self._num_units],
dtype=tf.float32,
initializer=init_ops.constant_initializer(dtype=tf.float32))
input_gate = tf.add(-1.0, gate)
# print(input_gate)
output = state * gate + input_gate * tf.abs(state_out + input_out + bias)
return output, output
def __call__(self, inputs, state, scope=None):
with vs.variable_scope(scope or type(self).__name__):
t_state = tf.transpose(state)
state_out = doRotations(t_state, self._rotations)
input_out = linearTransformWithBias([inputs],
self._num_units, bias=False, scope=scope)
state_out = tf.transpose(state_out)
bias = vs.get_variable(
"Bias", [self._num_units],
dtype=tf.float32,
initializer=init_ops.constant_initializer(dtype=tf.float32))
output = tf.abs(state_out + input_out + bias)
return output, output
def __call__(self, inputs, state, scope=None):
with vs.variable_scope(scope or type(self).__name__):
t_state = tf.transpose(state)
t_inputs = tf.transpose(inputs)
if self._bottom == True:
[state_out] = rotationTransform([("StateL", t_state)], self._num_units , scope, self._num_rots)
input_out = linearTransformWithBias([inputs],
self._num_units, bias=False, scope=scope)
else:
[state_out, input_out] = \
rotationTransform([("StateL", t_state), ("InputL", t_inputs)],
self._num_units, scope)
input_out = tf.transpose(input_out)
state_out = tf.transpose(state_out)
bias = vs.get_variable(
"Bias", [self._num_units],
dtype=tf.float32,
initializer=init_ops.constant_initializer(dtype=tf.float32))
output = tf.abs(state_out + input_out + bias)
return output, output
def __call__(self, inputs, state, scope=None):
with vs.variable_scope(scope or type(self).__name__):
state_rot = rotationTransform(tf.transpose(state), self._num_units, self._num_params,
self._cos_list, self._sin_list, self._nsin_list,
self._cos_idxs, self._sin_idxs, self._nsin_idxs)
state_scale, sigma = diagonalTransform(state_rot, self._num_units)
self.sigma = sigma
state_out = rotationTransform(state_scale, self._num_units, self._num_params,
self._cos_list, self._sin_list, self._nsin_list,
self._cos_idxs, self._sin_idxs, self._nsin_idxs)
state_out = tf.transpose(state_out)
input_out = linearTransformWithBias([inputs], self._num_units, bias=False)
bias = vs.get_variable(
"Bias", [self._num_units],
dtype=tf.float32,
initializer=init_ops.constant_initializer(dtype=tf.float32))
output = tf.abs(state_out + input_out + bias)
return output, output
def dice_coef(labels, logits, class_dice=1):
cfg = gflags.cfg
'''
Dice loss -- works ONLY for binary classification.
labels: GT index class (0 or 1)
logits: softmax output in one-hot notation
'''
with tf.variable_scope('dice_coef'):
labels_f = tf.cast(tf.reshape(labels, [-1]), cfg._FLOATX)
logits_f = tf.reshape(logits[..., class_dice], [-1])
intersection = tf.reduce_sum(labels_f * logits_f)
dice = (2. * intersection + smooth) / (
tf.reduce_sum(labels_f) + tf.reduce_sum(logits_f) + smooth)
return dice
def basic_rnn_seq2seq(encoder_inputs,
decoder_inputs,
cell,
dtype=dtypes.float32,
scope=None):
"""Basic RNN sequence-to-sequence model.
This model first runs an RNN to encode encoder_inputs into a state vector,
then runs decoder, initialized with the last encoder state, on decoder_inputs.
Encoder and decoder use the same RNN cell type, but don't share parameters.
Args:
encoder_inputs: A list of 2D Tensors [batch_size x input_size].
decoder_inputs: A list of 2D Tensors [batch_size x input_size].
cell: core_rnn_cell.RNNCell defining the cell function and size.
dtype: The dtype of the initial state of the RNN cell (default: tf.float32).
scope: VariableScope for the created subgraph; default: "basic_rnn_seq2seq".
Returns:
A tuple of the form (outputs, state), where:
outputs: A list of the same length as decoder_inputs of 2D Tensors with
shape [batch_size x output_size] containing the generated outputs.
state: The state of each decoder cell in the final time-step.
It is a 2D Tensor of shape [batch_size x cell.state_size].
"""
with variable_scope.variable_scope(scope or "basic_rnn_seq2seq"):
enc_cell = copy.deepcopy(cell)
_, enc_state = core_rnn.static_rnn(enc_cell, encoder_inputs, dtype=dtype)
return rnn_decoder(decoder_inputs, enc_state, cell)
def tied_rnn_seq2seq(encoder_inputs,
decoder_inputs,
cell,
loop_function=None,
dtype=dtypes.float32,
scope=None):
"""RNN sequence-to-sequence model with tied encoder and decoder parameters.
This model first runs an RNN to encode encoder_inputs into a state vector, and
then runs decoder, initialized with the last encoder state, on decoder_inputs.
Encoder and decoder use the same RNN cell and share parameters.
Args:
encoder_inputs: A list of 2D Tensors [batch_size x input_size].
decoder_inputs: A list of 2D Tensors [batch_size x input_size].
cell: core_rnn_cell.RNNCell defining the cell function and size.
loop_function: If not None, this function will be applied to i-th output
in order to generate i+1-th input, and decoder_inputs will be ignored,
except for the first element ("GO" symbol), see rnn_decoder for details.
dtype: The dtype of the initial state of the rnn cell (default: tf.float32).
scope: VariableScope for the created subgraph; default: "tied_rnn_seq2seq".
Returns:
A tuple of the form (outputs, state), where:
outputs: A list of the same length as decoder_inputs of 2D Tensors with
shape [batch_size x output_size] containing the generated outputs.
state: The state of each decoder cell in each time-step. This is a list
with length len(decoder_inputs) -- one item for each time-step.
It is a 2D Tensor of shape [batch_size x cell.state_size].
"""
with variable_scope.variable_scope("combined_tied_rnn_seq2seq"):
scope = scope or "tied_rnn_seq2seq"
_, enc_state = core_rnn.static_rnn(
cell, encoder_inputs, dtype=dtype, scope=scope)
variable_scope.get_variable_scope().reuse_variables()
return rnn_decoder(
decoder_inputs,
enc_state,
cell,
loop_function=loop_function,
scope=scope)
def embedding_attention_decoder(initial_state,
attention_states,
cell,
num_symbols,
time_steps,
batch_size,
embedding_size,
output_size=None,
output_projection=None,
feed_previous=False,
update_embedding_for_previous=True,
dtype=None,
scope=None):
if output_size is None:
output_size = cell.output_size
if output_projection is not None:
proj_biases = ops.convert_to_tensor(output_projection[1], dtype=dtype)
proj_biases.get_shape().assert_is_compatible_with([num_symbols])
with variable_scope.variable_scope(
scope or "embedding_attention_decoder", dtype=dtype) as scope:
embedding = variable_scope.get_variable("embedding",
[num_symbols, embedding_size])
loop_function = tf.nn.seq2seq._extract_argmax_and_embed(
embedding, output_projection,
update_embedding_for_previous) if feed_previous else None
return attention_decoder(
initial_state,
attention_states,
cell,
num_symbols,
time_steps,
batch_size,
output_size=output_size,
loop_function=loop_function)
def conv_linear(self, input, kernel_width, nin, nout, bias_start, prefix):
"""Convolutional linear map."""
with tf.variable_scope(prefix):
filter = tf.get_variable("CvK", [kernel_width, nin, nout])
res = tf.nn.conv1d(input, filter, 1, "SAME")
bias_term = tf.get_variable("CvB", [nout], initializer=tf.constant_initializer(0.0))
return res + bias_term + bias_start
def createLoss(self, x_in_indices, y_in, length):
"""perform loss calculation for one bin """
# create mask
mask_1 = tf.cast(tf.equal(x_in_indices, 0), tf.float32)
mask_2 = tf.cast(tf.equal(y_in, 0), tf.float32)
mask = tf.stack([1.0-mask_1*mask_2]*self.num_units,axis=2)
# the input layer
x_in = tf.one_hot(x_in_indices, self.n_input, dtype=tf.float32)
cur = self.conv_linear(x_in, 1, self.n_input, self.num_units, 0.0, "input")
cur = self.hard_tanh(cur, length)
cur = self.dropout(cur)
cur*=mask
allMem = [cur] #execution trace
#computation steps
with vs.variable_scope("steps") as gruScope:
for i in range(length):
cur = self.DCGRU(cur, 3, "dcgru")
cur *= mask
allMem.append(cur)
gruScope.reuse_variables()
# output layer and loss
allMem_tensor = tf.stack(allMem)
prediction = self.conv_linear(cur, 1, self.num_units, self.n_classes, 0.0, "output")
costVector = tf.nn.sparse_softmax_cross_entropy_with_logits(logits = prediction, labels = y_in) # Softmax loss
result = tf.argmax(prediction, 2)
correct_pred = tf.equal(result, y_in)
perItemCost = tf.reduce_mean(costVector, (1))
cost = tf.reduce_mean(perItemCost)
correct_pred = tf.cast(correct_pred, tf.float32)
accuracy = tf.reduce_mean(correct_pred)
return cost, accuracy, allMem_tensor, prediction, perItemCost, result
def createTestGraph(self, test_length):
"""Creates graph for accuracy evaluation"""
with vs.variable_scope("var_lengths"):
itemCount = self.count_list[0]
self.test_x = tf.placeholder("int32", [itemCount, test_length])
self.test_y = tf.placeholder("int64", [itemCount, test_length])
_, self.test_accuracy, self.allMem, _, _, self.result = self.createLoss(self.test_x,self.test_y,test_length)
def _norm(self, inp, scope=None):
reuse = tf.get_variable_scope().reuse
with vs.variable_scope(scope or "Norm") as scope:
normalized = layer_norm(inp, reuse=reuse, scope=scope)
return normalized
def __call__(self, inputs, state, scope=None):
state, fast_weights = state
with vs.variable_scope(scope or type(self).__name__) as scope:
"""Compute Wh(t) + Cx(t)"""
linear = self._fwlinear([state, inputs], self._num_units, False)
"""Compute h_0(t+1) = f(Wh(t) + Cx(t))"""
if not self._reuse_norm:
h = self._activation(self._norm(linear, scope="Norm0"))
else:
h = self._activation(self._norm(linear))
h = self._vector2matrix(h)
linear = self._vector2matrix(linear)
for i in range(self._S):
"""
Compute h_{s+1}(t+1) = f([Wh(t) + Cx(t)] + A(t) h_s(t+1)), S times.
See Eqn (2) in the paper.
"""
if not self._reuse_norm:
h = self._activation(self._norm(linear +
math_ops.batch_matmul(fast_weights, h), scope="Norm%d" % (i + 1)))
else:
h = self._activation(self._norm(linear +
math_ops.batch_matmul(fast_weights, h)))
"""
Compute A(t+1) according to Eqn (4)
"""
state = self._vector2matrix(state)
new_fast_weights = self._lambda * fast_weights + self._eta * math_ops.batch_matmul(state, state, adj_y=True)
h = self._matrix2vector(h)
return h, (h, new_fast_weights)
ln_lstm2.py 文件源码
项目:Multi-channel-speech-extraction-using-DNN
作者: zhr1201
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def __call__(self, inputs, state, scope=None):
"""Long short-term memory cell (LSTM)."""
with tf.variable_scope(scope or type(self).__name__):
c, h = state
# change bias argument to False since LN will add bias via shift
concat = tf.nn.rnn_cell._linear(
[inputs, h], 4 * self._num_units, False)
# ipdb.set_trace()
i, j, f, o = tf.split(1, 4, concat)
# add layer normalization to each gate
i = ln(i, scope='i/')
j = ln(j, scope='j/')
f = ln(f, scope='f/')
o = ln(o, scope='o/')
new_c = (c * tf.nn.sigmoid(f + self._forget_bias) +
tf.nn.sigmoid(i) * self._activation(j))
# add layer_normalization in calculation of new hidden state
new_h = self._activation(
ln(new_c, scope='new_h/')) * tf.nn.sigmoid(o)
new_state = tf.nn.rnn_cell.LSTMStateTuple(new_c, new_h)
return new_h, new_state