def _fwlinear(self, args, output_size, scope=None):
if args is None or (nest.is_sequence(args) and not args):
raise ValueError("`args` must be specified")
if not nest.is_sequence(args):
args = [args]
assert len(args) == 2
assert args[0].get_shape().as_list()[1] == output_size
dtype = [a.dtype for a in args][0]
with vs.variable_scope(scope or "Linear"):
matrixW = vs.get_variable(
"MatrixW", dtype=dtype, initializer=tf.convert_to_tensor(np.eye(output_size, dtype=np.float32) * .05))
matrixC = vs.get_variable(
"MatrixC", [args[1].get_shape().as_list()[1], output_size], dtype=dtype)
res = tf.matmul(args[0], matrixW) + tf.matmul(args[1], matrixC)
return res
python类get_variable()的实例源码
ln_lstm2.py 文件源码
项目:Multi-channel-speech-extraction-using-DNN
作者: zhr1201
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def ln(tensor, scope=None, epsilon=1e-5):
""" Layer normalizes a 2D tensor along its second axis """
assert(len(tensor.get_shape()) == 2)
m, v = tf.nn.moments(tensor, [1], keep_dims=True)
if not isinstance(scope, str):
scope = ''
with tf.variable_scope(scope + 'layer_norm'):
scale = tf.get_variable('scale',
shape=[tensor.get_shape()[1]],
initializer=tf.constant_initializer(1))
shift = tf.get_variable('shift',
shape=[tensor.get_shape()[1]],
initializer=tf.constant_initializer(0))
LN_initial = (tensor - m) / tf.sqrt(v + epsilon)
return LN_initial * scale + shift
def multilayer_perceptron(_X, input_size, n_hidden, n_class, forward_only=False):
with variable_scope.variable_scope("DNN"):
bias_start = 0.0
weight_hidden = variable_scope.get_variable("Weight_Hidden", [input_size, n_hidden])
bias_hidden = variable_scope.get_variable("Bias_Hidden", [n_hidden],
initializer=init_ops.constant_initializer(bias_start))
#Hidden layer with RELU activation
layer_1 = tf.nn.relu(tf.add(tf.matmul(_X, weight_hidden), bias_hidden))
if not forward_only:
layer_1 = tf.nn.dropout(layer_1, 0.5)
weight_out = variable_scope.get_variable("Weight_Out", [n_hidden, n_class])
bias_out = variable_scope.get_variable("Bias_Out", [n_class],
initializer=init_ops.constant_initializer(bias_start))
output = tf.matmul(layer_1, weight_out) + bias_out
#regularizers = tf.nn.l2_loss(weight_hidden) + tf.nn.l2_loss(bias_hidden) + tf.nn.l2_loss(weight_out) + tf.nn.l2_loss(bias_out)
return output
def categorical_variable(tensor_in, n_classes, embedding_size, name):
"""Creates an embedding for categorical variable with given number of classes.
Args:
tensor_in: Input tensor with class identifier (can be batch or
N-dimensional).
n_classes: Number of classes.
embedding_size: Size of embedding vector to represent each class.
name: Name of this categorical variable.
Returns:
Tensor of input shape, with additional dimension for embedding.
Example:
Calling categorical_variable([1, 2], 5, 10, "my_cat"), will return 2 x 10
tensor, where each row is representation of the class.
"""
with vs.variable_scope(name):
embeddings = vs.get_variable(name + "_embeddings",
[n_classes, embedding_size])
return embedding_lookup(embeddings, tensor_in)
def to_weighted_sum(self,
input_tensor,
num_outputs=1,
weight_collections=None,
trainable=True):
def _weight(name):
return variable_scope.get_variable(
name,
shape=[self.dimension, num_outputs],
initializer=init_ops.zeros_initializer,
collections=_add_variable_collection(weight_collections))
if self.name:
weight = _weight("weight")
else:
# Old behavior to support a subset of old checkpoints.
weight = _weight("_weight")
# The _RealValuedColumn has the shape of [batch_size, column.dimension].
log_odds_by_dim = math_ops.matmul(input_tensor, weight, name="matmul")
return log_odds_by_dim, [weight]
def _get_sharded_variable(name, shape, dtype, num_shards):
"""Get a list of sharded variables with the given dtype."""
if num_shards > shape[0]:
raise ValueError("Too many shards: shape=%s, num_shards=%d" %
(shape, num_shards))
unit_shard_size = int(math.floor(shape[0] / num_shards))
remaining_rows = shape[0] - unit_shard_size * num_shards
shards = []
for i in range(num_shards):
current_size = unit_shard_size
if i < remaining_rows:
current_size += 1
shards.append(vs.get_variable(name + "_%d" % i, [current_size] + shape[1:],
dtype=dtype))
return shards
def _attention(self, query, attn_states):
conv2d = nn_ops.conv2d
reduce_sum = math_ops.reduce_sum
softmax = nn_ops.softmax
tanh = math_ops.tanh
with vs.variable_scope("Attention"):
k = vs.get_variable("AttnW", [1, 1, self._attn_size, self._attn_vec_size])
v = vs.get_variable("AttnV", [self._attn_vec_size])
hidden = array_ops.reshape(attn_states,
[-1, self._attn_length, 1, self._attn_size])
hidden_features = conv2d(hidden, k, [1, 1, 1, 1], "SAME")
y = _linear(query, self._attn_vec_size, True)
y = array_ops.reshape(y, [-1, 1, 1, self._attn_vec_size])
s = reduce_sum(v * tanh(hidden_features + y), [2, 3])
a = softmax(s)
d = reduce_sum(
array_ops.reshape(a, [-1, self._attn_length, 1, 1]) * hidden, [1, 2])
new_attns = array_ops.reshape(d, [-1, self._attn_size])
new_attn_states = array_ops.slice(attn_states, [0, 1, 0], [-1, -1, -1])
return new_attns, new_attn_states
def categorical_variable(tensor_in, n_classes, embedding_size, name):
"""Creates an embedding for categorical variable with given number of classes.
Args:
tensor_in: Input tensor with class identifier (can be batch or
N-dimensional).
n_classes: Number of classes.
embedding_size: Size of embedding vector to represent each class.
name: Name of this categorical variable.
Returns:
Tensor of input shape, with additional dimension for embedding.
Example:
Calling categorical_variable([1, 2], 5, 10, "my_cat"), will return 2 x 10
tensor, where each row is representation of the class.
"""
with vs.variable_scope(name):
embeddings = vs.get_variable(name + '_embeddings',
[n_classes, embedding_size])
return embedding_lookup(embeddings, tensor_in)
def _get_sharded_variable(name, shape, dtype, num_shards):
"""Get a list of sharded variables with the given dtype."""
if num_shards > shape[0]:
raise ValueError("Too many shards: shape=%s, num_shards=%d" %
(shape, num_shards))
unit_shard_size = int(math.floor(shape[0] / num_shards))
remaining_rows = shape[0] - unit_shard_size * num_shards
shards = []
for i in range(num_shards):
current_size = unit_shard_size
if i < remaining_rows:
current_size += 1
shards.append(vs.get_variable(name + "_%d" % i, [current_size] + shape[1:],
dtype=dtype))
return shards
def _attention(self, query, attn_states):
conv2d = nn_ops.conv2d
reduce_sum = math_ops.reduce_sum
softmax = nn_ops.softmax
tanh = math_ops.tanh
with vs.variable_scope("Attention"):
k = vs.get_variable("AttnW", [1, 1, self._attn_size, self._attn_vec_size])
v = vs.get_variable("AttnV", [self._attn_vec_size])
hidden = array_ops.reshape(attn_states,
[-1, self._attn_length, 1, self._attn_size])
hidden_features = conv2d(hidden, k, [1, 1, 1, 1], "SAME")
y = _linear(query, self._attn_vec_size, True)
y = array_ops.reshape(y, [-1, 1, 1, self._attn_vec_size])
s = reduce_sum(v * tanh(hidden_features + y), [2, 3])
a = softmax(s)
d = reduce_sum(
array_ops.reshape(a, [-1, self._attn_length, 1, 1]) * hidden, [1, 2])
new_attns = array_ops.reshape(d, [-1, self._attn_size])
new_attn_states = array_ops.slice(attn_states, [0, 1, 0], [-1, -1, -1])
return new_attns, new_attn_states
def __call__(self, inputs, state, scope=None):
with vs.variable_scope(scope or "eunn_cell"):
state = _eunn_loop(state, self._capacity, self.diag_vec, self.off_vec, self.diag, self._fft)
input_matrix_init = init_ops.random_uniform_initializer(-0.01, 0.01)
if self._comp:
input_matrix_re = vs.get_variable("U_re", [inputs.get_shape()[-1], self._hidden_size], initializer=input_matrix_init)
input_matrix_im = vs.get_variable("U_im", [inputs.get_shape()[-1], self._hidden_size], initializer=input_matrix_init)
inputs_re = math_ops.matmul(inputs, input_matrix_re)
inputs_im = math_ops.matmul(inputs, input_matrix_im)
inputs = math_ops.complex(inputs_re, inputs_im)
else:
input_matrix = vs.get_variable("U", [inputs.get_shape()[-1], self._hidden_size], initializer=input_matrix_init)
inputs = math_ops.matmul(inputs, input_matrix)
bias = vs.get_variable("modReLUBias", [self._hidden_size], initializer=init_ops.constant_initializer())
output = self._activation((inputs + state), bias, self._comp)
return output, output
def _get_sharded_variable(name, shape, dtype, num_shards):
"""Get a list of sharded variables with the given dtype."""
if num_shards > shape[0]:
raise ValueError("Too many shards: shape=%s, num_shards=%d" %
(shape, num_shards))
unit_shard_size = int(math.floor(shape[0] / num_shards))
remaining_rows = shape[0] - unit_shard_size * num_shards
shards = []
for i in range(num_shards):
current_size = unit_shard_size
if i < remaining_rows:
current_size += 1
shards.append(vs.get_variable(name + "_%d" % i, [current_size] + shape[1:],
dtype=dtype))
return shards
rnn_module.py 文件源码
项目:LSTM-CRF-For-Named-Entity-Recognition
作者: zpppy
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def _get_sharded_variable(name, shape, dtype, num_shards):
"""Get a list of sharded variables with the given dtype."""
if num_shards > shape[0]:
raise ValueError("Too many shards: shape=%s, num_shards=%d" %
(shape, num_shards))
unit_shard_size = int(math.floor(shape[0] / num_shards))
remaining_rows = shape[0] - unit_shard_size * num_shards
shards = []
for i in range(num_shards):
current_size = unit_shard_size
if i < remaining_rows:
current_size += 1
shards.append(vs.get_variable(name + "_%d" % i, [current_size] + shape[1:],
dtype=dtype))
return shards
def _get_sharded_variable(name, shape, dtype, num_shards):
"""Get a list of sharded variables with the given dtype."""
if num_shards > shape[0]:
raise ValueError("Too many shards: shape=%s, num_shards=%d" %
(shape, num_shards))
unit_shard_size = int(math.floor(shape[0] / num_shards))
remaining_rows = shape[0] - unit_shard_size * num_shards
shards = []
for i in range(num_shards):
current_size = unit_shard_size
if i < remaining_rows:
current_size += 1
shards.append(vs.get_variable(name + "_%d" % i, [current_size, shape[1]],
dtype=dtype))
return shards
def __call__(self, inputs, state, scope=None):
"""Run the cell on embedded inputs."""
with vs.variable_scope(scope or type(self).__name__): # "EmbeddingWrapper"
with ops.device("/cpu:0"):
if self._embedding:
embedding = self._embedding
else:
if self._initializer:
initializer = self._initializer
elif vs.get_variable_scope().initializer:
initializer = vs.get_variable_scope().initializer
else:
# Default initializer for embeddings should have variance=1.
sqrt3 = math.sqrt(3) # Uniform(-sqrt(3), sqrt(3)) has variance=1.
initializer = init_ops.random_uniform_initializer(-sqrt3, sqrt3)
embedding = vs.get_variable("embedding", [self._embedding_classes,
self._cell.input_size],
initializer=initializer)
embedded = embedding_ops.embedding_lookup(
embedding, array_ops.reshape(inputs, [-1]))
return self._cell(embedded, state)
def __call__(self, inputs, state, scope=None):
"""Most basic RNN: output = new_state = activation(W * input + U * state + B)."""
with vs.variable_scope(scope or type(self).__name__): # "BasicRNNCell"
state_out = linearTransformIdentityInit(state, self._num_units)
if self._bottom == True:
input_out = linearTransformWithBias([inputs], self._num_units, bias=False, scope=scope)
else:
input_out = linearTransformIdentityInit(inputs, self._num_units, scope=scope)
bias = vs.get_variable(
"input_bias", [self._num_units],
dtype=tf.float32,
initializer=init_ops.constant_initializer(dtype=tf.float32))
output = tf.abs(state_out + input_out + bias)
return output, output
def __call__(self, inputs, state, scope=None):
with vs.variable_scope(scope or type(self).__name__):
t_state = tf.transpose(state)
state_out = doRotations(t_state, self._rotations)
input_out = linearTransformWithBias([inputs],
self._num_units, bias=False, scope=scope)
state_out = tf.transpose(state_out)
bias = vs.get_variable(
"Bias", [self._num_units],
dtype=tf.float32,
initializer=init_ops.constant_initializer(dtype=tf.float32))
output = tf.nn.relu(state_out + input_out + bias)
return output, output
def __call__(self, inputs, state, scope=None):
"""Most basic RNN: output = new_state = activation(W * input + U * state + B)."""
with vs.variable_scope(scope or type(self).__name__): # "BasicRNNCell"
state_out = linearTransformIdentityInit(state, self._num_units)
if self._bottom == True:
input_out = linearTransformWithBias([inputs], self._num_units, bias=False, scope=scope)
else:
input_out = linearTransformIdentityInit(inputs, self._num_units, scope=scope)
bias = vs.get_variable(
"input_bias", [self._num_units],
dtype=tf.float32,
initializer=init_ops.constant_initializer(dtype=tf.float32))
output = tf.nn.relu(state_out + input_out + bias)
return output, output
def __call__(self, inputs, state, scope=None):
with vs.variable_scope(scope or type(self).__name__):
t_state = tf.transpose(state)
state_out = doRotations(t_state, self._rotations)
input_out = linearTransformWithBias([inputs],
self._num_units, bias=False, scope=scope)
state_out = tf.transpose(state_out)
gate = linearTransformWithBias([inputs, state], self._num_units, True, scope='GateLinearTransfrom')
gate = tf.nn.sigmoid(gate, name='GateSigmoid')
bias = vs.get_variable(
"Bias", [self._num_units],
dtype=tf.float32,
initializer=init_ops.constant_initializer(dtype=tf.float32))
input_gate = tf.add(-1.0, gate)
# print(input_gate)
output = state * gate + input_gate * tf.abs(state_out + input_out + bias)
return output, output
def __call__(self, inputs, state, scope=None):
with vs.variable_scope(scope or type(self).__name__):
t_state = tf.transpose(state)
state_out = doRotations(t_state, self._rotations)
input_out = linearTransformWithBias([inputs],
self._num_units, bias=False, scope=scope)
state_out = tf.transpose(state_out)
bias = vs.get_variable(
"Bias", [self._num_units],
dtype=tf.float32,
initializer=init_ops.constant_initializer(dtype=tf.float32))
output = tf.abs(state_out + input_out + bias)
return output, output
def __call__(self, inputs, state, scope=None):
with vs.variable_scope(scope or type(self).__name__):
t_state = tf.transpose(state)
t_inputs = tf.transpose(inputs)
if self._bottom == True:
[state_out] = rotationTransform([("StateL", t_state)], self._num_units , scope, self._num_rots)
input_out = linearTransformWithBias([inputs],
self._num_units, bias=False, scope=scope)
else:
[state_out, input_out] = \
rotationTransform([("StateL", t_state), ("InputL", t_inputs)],
self._num_units, scope)
input_out = tf.transpose(input_out)
state_out = tf.transpose(state_out)
bias = vs.get_variable(
"Bias", [self._num_units],
dtype=tf.float32,
initializer=init_ops.constant_initializer(dtype=tf.float32))
output = tf.abs(state_out + input_out + bias)
return output, output
def __call__(self, inputs, state, scope=None):
with vs.variable_scope(scope or type(self).__name__):
state_rot = rotationTransform(tf.transpose(state), self._num_units, self._num_params,
self._cos_list, self._sin_list, self._nsin_list,
self._cos_idxs, self._sin_idxs, self._nsin_idxs)
state_scale, sigma = diagonalTransform(state_rot, self._num_units)
self.sigma = sigma
state_out = rotationTransform(state_scale, self._num_units, self._num_params,
self._cos_list, self._sin_list, self._nsin_list,
self._cos_idxs, self._sin_idxs, self._nsin_idxs)
state_out = tf.transpose(state_out)
input_out = linearTransformWithBias([inputs], self._num_units, bias=False)
bias = vs.get_variable(
"Bias", [self._num_units],
dtype=tf.float32,
initializer=init_ops.constant_initializer(dtype=tf.float32))
output = tf.abs(state_out + input_out + bias)
return output, output
def embedding_rnn_decoder(decoder_inputs, initial_state, cell, num_symbols,
embedding_size, output_projection=None,
feed_previous=False,
update_embedding_for_previous=True, scope=None):
"""RNN decoder with embedding and a pure-decoding option.
"""
if output_projection is not None:
proj_weights = ops.convert_to_tensor(output_projection[0],
dtype=dtypes.float32)
proj_weights.get_shape().assert_is_compatible_with([None, num_symbols])
proj_biases = ops.convert_to_tensor(
output_projection[1], dtype=dtypes.float32)
proj_biases.get_shape().assert_is_compatible_with([num_symbols])
with variable_scope.variable_scope(scope or "embedding_rnn_decoder"):
embedding = variable_scope.get_variable("embedding",
[num_symbols, embedding_size])
loop_function = _extract_argmax_and_embed(
embedding, output_projection,
update_embedding_for_previous) if feed_previous else None
emb_inp = (
embedding_ops.embedding_lookup(embedding, i) for i in decoder_inputs)
return rnn_decoder(emb_inp, initial_state, cell,
loop_function=loop_function)
def embedding_rnn_decoder(decoder_inputs, initial_state, cell, num_symbols,
embedding_size, output_projection=None,
feed_previous=False,
update_embedding_for_previous=True, scope=None):
"""RNN decoder with embedding and a pure-decoding option.
"""
if output_projection is not None:
proj_weights = ops.convert_to_tensor(output_projection[0],
dtype=dtypes.float32)
proj_weights.get_shape().assert_is_compatible_with([None, num_symbols])
proj_biases = ops.convert_to_tensor(
output_projection[1], dtype=dtypes.float32)
proj_biases.get_shape().assert_is_compatible_with([num_symbols])
with variable_scope.variable_scope(scope or "embedding_rnn_decoder"):
embedding = variable_scope.get_variable("embedding",
[num_symbols, embedding_size])
loop_function = _extract_argmax_and_embed(
embedding, output_projection,
update_embedding_for_previous) if feed_previous else None
emb_inp = (
embedding_ops.embedding_lookup(embedding, i) for i in decoder_inputs)
return rnn_decoder(emb_inp, initial_state, cell,
loop_function=loop_function)
def embedding_rnn_decoder(decoder_inputs, initial_state, cell, num_symbols,
embedding_size, output_projection=None,
feed_previous=False,
update_embedding_for_previous=True, scope=None):
"""RNN decoder with embedding and a pure-decoding option.
"""
if output_projection is not None:
proj_weights = ops.convert_to_tensor(output_projection[0],
dtype=dtypes.float32)
proj_weights.get_shape().assert_is_compatible_with([None, num_symbols])
proj_biases = ops.convert_to_tensor(
output_projection[1], dtype=dtypes.float32)
proj_biases.get_shape().assert_is_compatible_with([num_symbols])
with variable_scope.variable_scope(scope or "embedding_rnn_decoder"):
embedding = variable_scope.get_variable("embedding",
[num_symbols, embedding_size])
loop_function = _extract_argmax_and_embed(
embedding, output_projection,
update_embedding_for_previous) if feed_previous else None
emb_inp = (
embedding_ops.embedding_lookup(embedding, i) for i in decoder_inputs)
return rnn_decoder(emb_inp, initial_state, cell,
loop_function=loop_function)
def embedding_rnn_decoder(decoder_inputs, initial_state, cell, num_symbols,
embedding_size, output_projection=None,
feed_previous=False,
update_embedding_for_previous=True, scope=None):
"""RNN decoder with embedding and a pure-decoding option.
"""
if output_projection is not None:
proj_weights = ops.convert_to_tensor(output_projection[0],
dtype=dtypes.float32)
proj_weights.get_shape().assert_is_compatible_with([None, num_symbols])
proj_biases = ops.convert_to_tensor(
output_projection[1], dtype=dtypes.float32)
proj_biases.get_shape().assert_is_compatible_with([num_symbols])
with variable_scope.variable_scope(scope or "embedding_rnn_decoder"):
embedding = variable_scope.get_variable("embedding",
[num_symbols, embedding_size])
loop_function = _extract_argmax_and_embed(
embedding, output_projection,
update_embedding_for_previous) if feed_previous else None
emb_inp = (
embedding_ops.embedding_lookup(embedding, i) for i in decoder_inputs)
return rnn_decoder(emb_inp, initial_state, cell,
loop_function=loop_function)
def _get_sharded_variable(name, shape, dtype, num_shards):
"""Get a list of sharded variables with the given dtype."""
if num_shards > shape[0]:
raise ValueError("Too many shards: shape=%s, num_shards=%d" %
(shape, num_shards))
unit_shard_size = int(math.floor(shape[0] / num_shards))
remaining_rows = shape[0] - unit_shard_size * num_shards
shards = []
for i in range(num_shards):
current_size = unit_shard_size
if i < remaining_rows:
current_size += 1
shards.append(vs.get_variable(name + "_%d" % i, [current_size] + shape[1:],
dtype=dtype))
return shards
def _fwlinear(self, args, output_size, scope=None):
if args is None or (nest.is_sequence(args) and not args):
raise ValueError("`args` must be specified")
if not nest.is_sequence(args):
args = [args]
assert len(args) == 2
assert args[0].get_shape().as_list()[1] == output_size
dtype = [a.dtype for a in args][0]
with vs.variable_scope(scope or "Linear"):
# matrixW = vs.get_variable(
# "MatrixW", dtype=dtype, initializer=tf.convert_to_tensor(np.eye(output_size, dtype=np.float32) * .05))
matrixW = vs.get_variable("MatrixW", [output_size, output_size], dtype=dtype)
matrixC = vs.get_variable(
"MatrixC", [args[1].get_shape().as_list()[1], output_size], dtype=dtype)
res = tf.matmul(args[0], matrixW) + tf.matmul(args[1], matrixC)
return res
def _get_sharded_variable(name, shape, dtype, num_shards):
"""Get a list of sharded variables with the given dtype."""
if num_shards > shape[0]:
raise ValueError("Too many shards: shape=%s, num_shards=%d" %
(shape, num_shards))
unit_shard_size = int(math.floor(shape[0] / num_shards))
remaining_rows = shape[0] - unit_shard_size * num_shards
shards = []
for i in range(num_shards):
current_size = unit_shard_size
if i < remaining_rows:
current_size += 1
shards.append(vs.get_variable(name + "_%d" % i, [current_size] + shape[1:],
dtype=dtype))
return shards
def __call__(self, inputs, state, scope=None):
"""Run the cell on embedded inputs."""
with vs.variable_scope(scope or type(self).__name__): # "EmbeddingWrapper"
with ops.device("/cpu:0"):
if self._initializer:
initializer = self._initializer
elif vs.get_variable_scope().initializer:
initializer = vs.get_variable_scope().initializer
else:
# Default initializer for embeddings should have variance=1.
sqrt3 = math.sqrt(3) # Uniform(-sqrt(3), sqrt(3)) has variance=1.
initializer = init_ops.random_uniform_initializer(-sqrt3, sqrt3)
if type(state) is tuple:
data_type = state[0].dtype
else:
data_type = state.dtype
embedding = vs.get_variable(
"embedding", [self._embedding_classes, self._embedding_size],
initializer=initializer,
dtype=data_type)
embedded = embedding_ops.embedding_lookup(
embedding, array_ops.reshape(inputs, [-1]))
return self._cell(embedded, state)