def __call__(self, inputs, state, scope=None):
"""Gated recurrent unit (GRU) with nunits cells."""
with _checked_scope(self, scope or "gru_cell"):
with vs.variable_scope("gates"): # Reset gate and update gate.
# We start with bias of 1.0 to not reset and not update.
value = sigmoid(_linear(
[inputs, state], 2 * self._num_units, True, 1.0))
r, u = array_ops.split(
value=value,
num_or_size_splits=2,
axis=1)
with vs.variable_scope("candidate"):
res = self._activation(_linear([inputs, r * state],
self._num_units, True))
if self._batch_norm:
c = batch_norm(res,
center=True, scale=True,
is_training=self._is_training,
scope='bn1')
else:
c = res
new_h = u * state + (1 - u) * c
return new_h, new_h
python类variable_scope()的实例源码
def _fwlinear(self, args, output_size, scope=None):
if args is None or (nest.is_sequence(args) and not args):
raise ValueError("`args` must be specified")
if not nest.is_sequence(args):
args = [args]
assert len(args) == 2
assert args[0].get_shape().as_list()[1] == output_size
dtype = [a.dtype for a in args][0]
with vs.variable_scope(scope or "Linear"):
matrixW = vs.get_variable(
"MatrixW", dtype=dtype, initializer=tf.convert_to_tensor(np.eye(output_size, dtype=np.float32) * .05))
matrixC = vs.get_variable(
"MatrixC", [args[1].get_shape().as_list()[1], output_size], dtype=dtype)
res = tf.matmul(args[0], matrixW) + tf.matmul(args[1], matrixC)
return res
ln_lstm2.py 文件源码
项目:Multi-channel-speech-extraction-using-DNN
作者: zhr1201
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def ln(tensor, scope=None, epsilon=1e-5):
""" Layer normalizes a 2D tensor along its second axis """
assert(len(tensor.get_shape()) == 2)
m, v = tf.nn.moments(tensor, [1], keep_dims=True)
if not isinstance(scope, str):
scope = ''
with tf.variable_scope(scope + 'layer_norm'):
scale = tf.get_variable('scale',
shape=[tensor.get_shape()[1]],
initializer=tf.constant_initializer(1))
shift = tf.get_variable('shift',
shape=[tensor.get_shape()[1]],
initializer=tf.constant_initializer(0))
LN_initial = (tensor - m) / tf.sqrt(v + epsilon)
return LN_initial * scale + shift
def encoder(self,inputs,inputs_sequence_length):
with tf.variable_scope("encoder"):
basic_cell=[]
for i in xrange(len(self.hidden_layer_size)):
if self.hidden_layer_type[i]=="tanh":
basic_cell.append(tf.contrib.rnn.BasicRNNCell(num_units=self.encoder_layer_size[i]))
if self.hidden_layer_type[i]=="lstm":
basic_cell.append(tf.contrib.rnn.BasicLSTMCell(num_units=self.encoder_layer_size[i]))
if self.hidden_layer_type[i]=="gru":
basic_cell.append(GRUCell(num_units=self.encoder_layer_size[i]))
multicell=MultiRNNCell(basic_cell)
enc_output, enc_state=tf.nn.bidirectional_dynamic_rnn(cell_fw=multicell,cell_bw=multicell,inputs=inputs,\
sequence_length=inputs_sequence_length,dtype=tf.float32)
enc_output=tf.concat(enc_output,2)
#enc_state=(tf.concat(enc_state[0])
return enc_output, enc_state
def __call__(self, inputs, state):
"""Gated recurrent unit (GRU) with nunits cells."""
with vs.variable_scope("gates"): # Reset gate and update gate.
# We start with bias of 1.0 to not reset and not update.
bias_ones = self._bias_initializer
if self._bias_initializer is None:
dtype = [a.dtype for a in [inputs, state]][0]
bias_ones = init_ops.constant_initializer(1.0, dtype=dtype)
value = rnn_cell_impl._linear([inputs, state], 2 * self._num_units, True, bias_ones,\
self._kernel_initializer)
r, u = array_ops.split(value=value, num_or_size_splits=2, axis=1)
r,u=layer_normalization(r,scope="r/"),layer_normalization(u,scope="u/")
r,u=math_ops.sigmoid(r),math_ops.sigmoid(u)
with vs.variable_scope("candidate"):
c = self._activation(rnn_cell_impl._linear([inputs, r * state], self._num_units, True, self._bias_initializer, self._kernel_initializer))
new_h = u * state + (1 - u) * c
return new_h, new_h
def conv_layer(input_, filter_shape, stride_shape=[1, 1, 1, 1], padding='SAME', name=None):
"""
Args:
input_: a 4D Tensor of size [batch_size x height x width x channel]
filter_shape: desired filter size [height, width, in_ch, out_ch]
stride_shape: desired stride size [1, stride_h, stride_w, 1]
padding: "SAME" or "VALID"
"""
input_shape = input_.get_shape()
with vs.variable_scope(name or "conv"):
initializer = tf.contrib.layers.xavier_initializer_conv2d(uniform=True, seed=None, dtype=tf.float32)
W = _weight_variable(shape=filter_shape, initializer=initializer)
b = _bias_variable(shape=[filter_shape[3],])
y = tf.nn.conv2d(input_, filter=W, strides=stride_shape, padding=padding)
y = tf.nn.bias_add(y, b)
return y
def __call__(self, inputs, state, scope=None):
current_state = state[0]
noise_i = state[1]
noise_h = state[2]
for i in range(self.depth):
with tf.variable_scope('h_'+str(i)):
if i == 0:
h = tf.tanh(linear([inputs * noise_i, current_state * noise_h], self._num_units, True))
else:
h = tf.tanh(linear([current_state * noise_h], self._num_units, True))
with tf.variable_scope('t_'+str(i)):
if i == 0:
t = tf.sigmoid(linear([inputs * noise_i, current_state * noise_h], self._num_units, True, self.forget_bias))
else:
t = tf.sigmoid(linear([current_state * noise_h], self._num_units, True, self.forget_bias))
current_state = (h - current_state)* t + current_state
return current_state, [current_state, noise_i, noise_h]
def multilayer_perceptron(_X, input_size, n_hidden, n_class, forward_only=False):
with variable_scope.variable_scope("DNN"):
bias_start = 0.0
weight_hidden = variable_scope.get_variable("Weight_Hidden", [input_size, n_hidden])
bias_hidden = variable_scope.get_variable("Bias_Hidden", [n_hidden],
initializer=init_ops.constant_initializer(bias_start))
#Hidden layer with RELU activation
layer_1 = tf.nn.relu(tf.add(tf.matmul(_X, weight_hidden), bias_hidden))
if not forward_only:
layer_1 = tf.nn.dropout(layer_1, 0.5)
weight_out = variable_scope.get_variable("Weight_Out", [n_hidden, n_class])
bias_out = variable_scope.get_variable("Bias_Out", [n_class],
initializer=init_ops.constant_initializer(bias_start))
output = tf.matmul(layer_1, weight_out) + bias_out
#regularizers = tf.nn.l2_loss(weight_hidden) + tf.nn.l2_loss(bias_hidden) + tf.nn.l2_loss(weight_out) + tf.nn.l2_loss(bias_out)
return output
def basic_rnn_seq2seq(
encoder_inputs, decoder_inputs, cell, dtype=dtypes.float32, scope=None):
"""Basic RNN sequence-to-sequence model.
This model first runs an RNN to encode encoder_inputs into a state vector,
then runs decoder, initialized with the last encoder state, on decoder_inputs.
Encoder and decoder use the same RNN cell type, but don't share parameters.
Args:
encoder_inputs: A list of 2D Tensors [batch_size x input_size].
decoder_inputs: A list of 2D Tensors [batch_size x input_size].
cell: rnn_cell.RNNCell defining the cell function and size.
dtype: The dtype of the initial state of the RNN cell (default: tf.float32).
scope: VariableScope for the created subgraph; default: "basic_rnn_seq2seq".
Returns:
A tuple of the form (outputs, state), where:
outputs: A list of the same length as decoder_inputs of 2D Tensors with
shape [batch_size x output_size] containing the generated outputs.
state: The state of each decoder cell in the final time-step.
It is a 2D Tensor of shape [batch_size x cell.state_size].
"""
with variable_scope.variable_scope(scope or "basic_rnn_seq2seq"):
_, enc_state = rnn.rnn(cell, encoder_inputs, dtype=dtype)
return rnn_decoder(decoder_inputs, enc_state, cell)
def rnn_seq2seq(encoder_inputs,
decoder_inputs,
encoder_cell,
decoder_cell=None,
dtype=dtypes.float32,
scope=None):
"""RNN Sequence to Sequence model.
Args:
encoder_inputs: List of tensors, inputs for encoder.
decoder_inputs: List of tensors, inputs for decoder.
encoder_cell: RNN cell to use for encoder.
decoder_cell: RNN cell to use for decoder, if None encoder_cell is used.
dtype: Type to initialize encoder state with.
scope: Scope to use, if None new will be produced.
Returns:
List of tensors for outputs and states for trianing and sampling sub-graphs.
"""
with vs.variable_scope(scope or "rnn_seq2seq"):
_, last_enc_state = nn.rnn(encoder_cell, encoder_inputs, dtype=dtype)
return rnn_decoder(decoder_inputs, last_enc_state, decoder_cell or
encoder_cell)
def categorical_variable(tensor_in, n_classes, embedding_size, name):
"""Creates an embedding for categorical variable with given number of classes.
Args:
tensor_in: Input tensor with class identifier (can be batch or
N-dimensional).
n_classes: Number of classes.
embedding_size: Size of embedding vector to represent each class.
name: Name of this categorical variable.
Returns:
Tensor of input shape, with additional dimension for embedding.
Example:
Calling categorical_variable([1, 2], 5, 10, "my_cat"), will return 2 x 10
tensor, where each row is representation of the class.
"""
with vs.variable_scope(name):
embeddings = vs.get_variable(name + "_embeddings",
[n_classes, embedding_size])
return embedding_lookup(embeddings, tensor_in)
def softmax(logits, scope=None):
"""Performs softmax on Nth dimension of N-dimensional logit tensor.
For two-dimensional logits this reduces to tf.nn.softmax. The N-th dimension
needs to have a specified number of elements (number of classes).
Args:
logits: N-dimensional `Tensor` with logits, where N > 1.
scope: Optional scope for variable_scope.
Returns:
a `Tensor` with same shape and type as logits.
"""
# TODO(jrru): Add axis argument which defaults to last dimension.
with variable_scope.variable_scope(scope, 'softmax', [logits]):
num_logits = utils.last_dimension(logits.get_shape(), min_rank=2)
logits_2d = array_ops.reshape(logits, [-1, num_logits])
predictions = nn.softmax(logits_2d)
predictions = array_ops.reshape(predictions, array_ops.shape(logits))
predictions.set_shape(logits.get_shape())
return predictions
def testKFeatureTrainingConstruction(self):
# pylint: disable=W0612
data = constant_op.constant(
[[random.uniform(-1, 1) for i in range(self.params.num_features)]
for _ in range(100)])
labels = [1 for _ in range(100)]
with variable_scope.variable_scope(
"KFeatureDecisionsToDataThenNNTest.testKFeatureTrainingContruction"):
graph_builder = (
k_feature_decisions_to_data_then_nn.KFeatureDecisionsToDataThenNN(
self.params))
graph = graph_builder.training_graph(data, labels, None)
self.assertTrue(isinstance(graph, Operation))
def testConstructionPollution(self):
"""Ensure that graph building doesn't modify the params in a bad way."""
# pylint: disable=W0612
data = [[random.uniform(-1, 1) for i in range(self.params.num_features)]
for _ in range(100)]
self.assertTrue(isinstance(self.params, tensor_forest.ForestHParams))
self.assertFalse(
isinstance(self.params.num_trees, tensor_forest.ForestHParams))
with variable_scope.variable_scope(
"DecisionsToDataThenNNTest_testContructionPollution"):
graph_builder = decisions_to_data_then_nn.DecisionsToDataThenNN(
self.params)
self.assertTrue(isinstance(self.params, tensor_forest.ForestHParams))
self.assertFalse(
isinstance(self.params.num_trees, tensor_forest.ForestHParams))
def _make_auc_histograms(boolean_labels, scores, score_range, nbins):
"""Create histogram tensors from one batch of labels/scores."""
with variable_scope.variable_scope(
None, 'make_auc_histograms', [boolean_labels, scores, nbins]):
# Histogram of scores for records in this batch with True label.
hist_true = histogram_ops.histogram_fixed_width(
array_ops.boolean_mask(scores, boolean_labels),
score_range,
nbins=nbins,
dtype=dtypes.int64,
name='hist_true')
# Histogram of scores for records in this batch with False label.
hist_false = histogram_ops.histogram_fixed_width(
array_ops.boolean_mask(scores, math_ops.logical_not(boolean_labels)),
score_range,
nbins=nbins,
dtype=dtypes.int64,
name='hist_false')
return hist_true, hist_false
def _attention(self, query, attn_states):
conv2d = nn_ops.conv2d
reduce_sum = math_ops.reduce_sum
softmax = nn_ops.softmax
tanh = math_ops.tanh
with vs.variable_scope("Attention"):
k = vs.get_variable("AttnW", [1, 1, self._attn_size, self._attn_vec_size])
v = vs.get_variable("AttnV", [self._attn_vec_size])
hidden = array_ops.reshape(attn_states,
[-1, self._attn_length, 1, self._attn_size])
hidden_features = conv2d(hidden, k, [1, 1, 1, 1], "SAME")
y = _linear(query, self._attn_vec_size, True)
y = array_ops.reshape(y, [-1, 1, 1, self._attn_vec_size])
s = reduce_sum(v * tanh(hidden_features + y), [2, 3])
a = softmax(s)
d = reduce_sum(
array_ops.reshape(a, [-1, self._attn_length, 1, 1]) * hidden, [1, 2])
new_attns = array_ops.reshape(d, [-1, self._attn_size])
new_attn_states = array_ops.slice(attn_states, [0, 1, 0], [-1, -1, -1])
return new_attns, new_attn_states
def categorical_variable(tensor_in, n_classes, embedding_size, name):
"""Creates an embedding for categorical variable with given number of classes.
Args:
tensor_in: Input tensor with class identifier (can be batch or
N-dimensional).
n_classes: Number of classes.
embedding_size: Size of embedding vector to represent each class.
name: Name of this categorical variable.
Returns:
Tensor of input shape, with additional dimension for embedding.
Example:
Calling categorical_variable([1, 2], 5, 10, "my_cat"), will return 2 x 10
tensor, where each row is representation of the class.
"""
with vs.variable_scope(name):
embeddings = vs.get_variable(name + '_embeddings',
[n_classes, embedding_size])
return embedding_lookup(embeddings, tensor_in)
def build_model(self, features, feature_columns, is_training):
"""See base class."""
self._feature_columns = feature_columns
partitioner = partitioned_variables.min_max_variable_partitioner(
max_partitions=self._num_ps_replicas,
min_slice_size=64 << 20)
with variable_scope.variable_scope(
self._scope,
values=features.values(),
partitioner=partitioner) as scope:
if self._joint_weights:
logits, _, _ = layers.joint_weighted_sum_from_feature_columns(
columns_to_tensors=features,
feature_columns=self._get_feature_columns(),
num_outputs=self._num_label_columns,
weight_collections=[self._scope],
scope=scope)
else:
logits, _, _ = layers.weighted_sum_from_feature_columns(
columns_to_tensors=features,
feature_columns=self._get_feature_columns(),
num_outputs=self._num_label_columns,
weight_collections=[self._scope],
scope=scope)
return logits
def softmax(logits, scope=None):
"""Performs softmax on Nth dimension of N-dimensional logit tensor.
For two-dimensional logits this reduces to tf.nn.softmax. The N-th dimension
needs to have a specified number of elements (number of classes).
Args:
logits: N-dimensional `Tensor` with logits, where N > 1.
scope: Optional scope for variable_scope.
Returns:
a `Tensor` with same shape and type as logits.
"""
# TODO(jrru): Add axis argument which defaults to last dimension.
with variable_scope.variable_scope(scope, 'softmax', [logits]):
num_logits = utils.last_dimension(logits.get_shape(), min_rank=2)
logits_2d = array_ops.reshape(logits, [-1, num_logits])
predictions = nn.softmax(logits_2d)
predictions = array_ops.reshape(predictions, array_ops.shape(logits))
predictions.set_shape(logits.get_shape())
return predictions
def _adaptive_max_norm(norm, std_factor, decay, global_step, epsilon, name):
"""Find max_norm given norm and previous average."""
with vs.variable_scope(name, "AdaptiveMaxNorm", [norm]):
log_norm = math_ops.log(norm + epsilon)
def moving_average(name, value, decay):
moving_average_variable = vs.get_variable(
name, shape=value.get_shape(), dtype=value.dtype,
initializer=init_ops.zeros_initializer, trainable=False)
return moving_averages.assign_moving_average(
moving_average_variable, value, decay, zero_debias=False)
# quicker adaptation at the beginning
if global_step is not None:
n = math_ops.to_float(global_step)
decay = math_ops.minimum(decay, n / (n + 1.))
# update averages
mean = moving_average("mean", log_norm, decay)
sq_mean = moving_average("sq_mean", math_ops.square(log_norm), decay)
variance = sq_mean - math_ops.square(mean)
std = math_ops.sqrt(math_ops.maximum(epsilon, variance))
max_norms = math_ops.exp(mean + std_factor*std)
return max_norms, mean
def testKFeatureTrainingConstruction(self):
# pylint: disable=W0612
data = constant_op.constant(
[[random.uniform(-1, 1) for i in range(self.params.num_features)]
for _ in range(100)])
labels = [1 for _ in range(100)]
with variable_scope.variable_scope(
"KFeatureDecisionsToDataThenNNTest.testKFeatureTrainingContruction"):
graph_builder = (
k_feature_decisions_to_data_then_nn.KFeatureDecisionsToDataThenNN(
self.params))
graph = graph_builder.training_graph(data, labels, None)
self.assertTrue(isinstance(graph, Operation))
def testConstructionPollution(self):
"""Ensure that graph building doesn't modify the params in a bad way."""
# pylint: disable=W0612
data = [[random.uniform(-1, 1) for i in range(self.params.num_features)]
for _ in range(100)]
self.assertTrue(isinstance(self.params, tensor_forest.ForestHParams))
self.assertFalse(
isinstance(self.params.num_trees, tensor_forest.ForestHParams))
with variable_scope.variable_scope(
"DecisionsToDataThenNNTest_testContructionPollution"):
graph_builder = decisions_to_data_then_nn.DecisionsToDataThenNN(
self.params)
self.assertTrue(isinstance(self.params, tensor_forest.ForestHParams))
self.assertFalse(
isinstance(self.params.num_trees, tensor_forest.ForestHParams))
def _make_auc_histograms(boolean_labels, scores, score_range, nbins):
"""Create histogram tensors from one batch of labels/scores."""
with variable_scope.variable_scope(
None, 'make_auc_histograms', [boolean_labels, scores, nbins]):
# Histogram of scores for records in this batch with True label.
hist_true = histogram_ops.histogram_fixed_width(
array_ops.boolean_mask(scores, boolean_labels),
score_range,
nbins=nbins,
dtype=dtypes.int64,
name='hist_true')
# Histogram of scores for records in this batch with False label.
hist_false = histogram_ops.histogram_fixed_width(
array_ops.boolean_mask(scores, math_ops.logical_not(boolean_labels)),
score_range,
nbins=nbins,
dtype=dtypes.int64,
name='hist_false')
return hist_true, hist_false
def _attention(self, query, attn_states):
conv2d = nn_ops.conv2d
reduce_sum = math_ops.reduce_sum
softmax = nn_ops.softmax
tanh = math_ops.tanh
with vs.variable_scope("Attention"):
k = vs.get_variable("AttnW", [1, 1, self._attn_size, self._attn_vec_size])
v = vs.get_variable("AttnV", [self._attn_vec_size])
hidden = array_ops.reshape(attn_states,
[-1, self._attn_length, 1, self._attn_size])
hidden_features = conv2d(hidden, k, [1, 1, 1, 1], "SAME")
y = _linear(query, self._attn_vec_size, True)
y = array_ops.reshape(y, [-1, 1, 1, self._attn_vec_size])
s = reduce_sum(v * tanh(hidden_features + y), [2, 3])
a = softmax(s)
d = reduce_sum(
array_ops.reshape(a, [-1, self._attn_length, 1, 1]) * hidden, [1, 2])
new_attns = array_ops.reshape(d, [-1, self._attn_size])
new_attn_states = array_ops.slice(attn_states, [0, 1, 0], [-1, -1, -1])
return new_attns, new_attn_states
def __call__(self, inputs, state, scope=None):
with vs.variable_scope(scope or "eunn_cell"):
state = _eunn_loop(state, self._capacity, self.diag_vec, self.off_vec, self.diag, self._fft)
input_matrix_init = init_ops.random_uniform_initializer(-0.01, 0.01)
if self._comp:
input_matrix_re = vs.get_variable("U_re", [inputs.get_shape()[-1], self._hidden_size], initializer=input_matrix_init)
input_matrix_im = vs.get_variable("U_im", [inputs.get_shape()[-1], self._hidden_size], initializer=input_matrix_init)
inputs_re = math_ops.matmul(inputs, input_matrix_re)
inputs_im = math_ops.matmul(inputs, input_matrix_im)
inputs = math_ops.complex(inputs_re, inputs_im)
else:
input_matrix = vs.get_variable("U", [inputs.get_shape()[-1], self._hidden_size], initializer=input_matrix_init)
inputs = math_ops.matmul(inputs, input_matrix)
bias = vs.get_variable("modReLUBias", [self._hidden_size], initializer=init_ops.constant_initializer())
output = self._activation((inputs + state), bias, self._comp)
return output, output
def __call__(self, inputs, state, scope=None):
"""Run this multi-layer cell on inputs, starting from state."""
with vs.variable_scope(scope or "MultiRNNC2DCell"):
cur_state_pos = 0
cur_inp = inputs
new_states = []
for i, cell in enumerate(self._cells):
with vs.variable_scope("Cell%d" % i):
if not nest.is_sequence(state):
raise ValueError(
"Expected state to be a tuple of length %d, but received: %s"
% (len(self.state_size), state))
cur_state = state[i]
cur_inp, new_state = cell(cur_inp, cur_state)
new_states.append(new_state)
new_states = (tuple(new_states) if self._state_is_tuple
else tf.concat(1, new_states))
return cur_inp, new_states
def __call__(self, inputs, state, scope=None):
"""Long short-term memory cell (LSTM)."""
with vs.variable_scope(scope or type(self).__name__): # "BasicLSTMCell"
# Parameters of gates are concatenated into one multiply for efficiency.
if self._state_is_tuple:
c, h = state
else:
c, h = tf.split(1, 2, state)
concat = _linear([inputs, h], 4 * self._num_units, True, device=self._device)
# i = input_gate, j = new_input, f = forget_gate, o = output_gate
i, j, f, o = tf.split(1, 4, concat)
new_c = (c * tf.sigmoid(f + self._forget_bias) + tf.sigmoid(i) *
self._activation(j))
new_h = self._activation(new_c) * tf.sigmoid(o)
if self._state_is_tuple:
new_state = tf.nn.rnn_cell.LSTMStateTuple(new_c, new_h)
else:
new_state = tf.concat(1, [new_c, new_h])
return new_h, new_state
def __call__(self, inputs, state, mask, scope=None):
"""Long short-term memory cell (LSTM)."""
with vs.variable_scope(scope or type(self).__name__): # "BasicLSTMCell"
# Parameters of gates are concatenated into one multiply for efficiency.
c, h = array_ops.split(1, 2, state)
concat = linear([inputs, h], 4 * self._num_units, True)
# i = input_gate, j = new_input, f = forget_gate, o = output_gate
i, j, f, o = array_ops.split(1, 4, concat)
new_c = c * sigmoid(f + self._forget_bias) + sigmoid(i) * tanh(j)
mask = array_ops.expand_dims(mask, 1)
new_c = mask * new_c + (1. - mask) * c
new_h = tanh(new_c) * sigmoid(o)
new_h = mask * new_h + (1. - mask) * h
return new_h, array_ops.concat(1, [new_c, new_h])
def __call__(self, inputs, state, scope=None):
"""Run the cell on embedded inputs."""
with vs.variable_scope(scope or type(self).__name__): # "EmbeddingWrapper"
with ops.device("/cpu:0"):
if self._embedding:
embedding = self._embedding
else:
if self._initializer:
initializer = self._initializer
elif vs.get_variable_scope().initializer:
initializer = vs.get_variable_scope().initializer
else:
# Default initializer for embeddings should have variance=1.
sqrt3 = math.sqrt(3) # Uniform(-sqrt(3), sqrt(3)) has variance=1.
initializer = init_ops.random_uniform_initializer(-sqrt3, sqrt3)
embedding = vs.get_variable("embedding", [self._embedding_classes,
self._cell.input_size],
initializer=initializer)
embedded = embedding_ops.embedding_lookup(
embedding, array_ops.reshape(inputs, [-1]))
return self._cell(embedded, state)
def __call__(self, inputs, state, scope=None):
"""Convolutional Long short-term memory cell (ConvLSTM)."""
with vs.variable_scope(scope or type(self).__name__): # "ConvLSTMCell"
if self._state_is_tuple:
c, h = state
else:
c, h = array_ops.split(3, 2, state)
# batch_size * height * width * channel
concat = _conv([inputs, h], 4 * self._num_units, self._k_size, True, initializer=self._initializer)
# i = input_gate, j = new_input, f = forget_gate, o = output_gate
i, j, f, o = array_ops.split(3, 4, concat)
new_c = (c * sigmoid(f + self._forget_bias) + sigmoid(i) *
self._activation(j))
new_h = self._activation(new_c) * sigmoid(o)
if self._state_is_tuple:
new_state = LSTMStateTuple(new_c, new_h)
else:
new_state = array_ops.concat(3, [new_c, new_h])
return new_h, new_state