def bn(x, is_training):
x_shape = x.get_shape()
params_shape = x_shape[-1:]
axis = list(range(len(x_shape) - 1))
beta = _get_variable('beta', params_shape, initializer=tf.zeros_initializer())
gamma = _get_variable('gamma', params_shape, initializer=tf.ones_initializer())
moving_mean = _get_variable('moving_mean', params_shape, initializer=tf.zeros_initializer(), trainable=False)
moving_variance = _get_variable('moving_variance', params_shape, initializer=tf.ones_initializer(), trainable=False)
# These ops will only be preformed when training.
mean, variance = tf.nn.moments(x, axis)
update_moving_mean = moving_averages.assign_moving_average(moving_mean, mean, BN_DECAY)
update_moving_variance = moving_averages.assign_moving_average(moving_variance, variance, BN_DECAY)
tf.add_to_collection(UPDATE_OPS_COLLECTION, update_moving_mean)
tf.add_to_collection(UPDATE_OPS_COLLECTION, update_moving_variance)
mean, variance = control_flow_ops.cond(
is_training, lambda: (mean, variance),
lambda: (moving_mean, moving_variance))
return tf.nn.batch_normalization(x, mean, variance, beta, gamma, BN_EPSILON)
python类ones_initializer()的实例源码
def layer_norm(x, filters=None, epsilon=1e-6, name=None, reuse=None):
"""Layer normalize the tensor x, averaging over the last dimension."""
if filters is None:
filters = x.get_shape()[-1]
with tf.variable_scope(
name, default_name="layer_norm", values=[x], reuse=reuse):
scale = tf.get_variable(
"layer_norm_scale", [filters], initializer=tf.ones_initializer())
bias = tf.get_variable(
"layer_norm_bias", [filters], initializer=tf.zeros_initializer())
if allow_defun:
result = layer_norm_compute(x, tf.constant(epsilon), scale, bias)
result.set_shape(x.get_shape())
else:
result = layer_norm_compute_python(x, epsilon, scale, bias)
return result
def testWhileLoopProblem(self):
"""Tests L2L applied to problem with while loop."""
def while_loop_problem():
x = tf.get_variable("x", shape=[], initializer=tf.ones_initializer())
# Strange way of squaring the variable.
_, x_squared = tf.while_loop(
cond=lambda t, _: t < 1,
body=lambda t, x: (t + 1, x * x),
loop_vars=(0, x),
name="loop")
return x_squared
optimizer = meta.MetaOptimizer(net=dict(
net="CoordinateWiseDeepLSTM",
net_options={"layers": ()}))
minimize_ops = optimizer.meta_minimize(while_loop_problem, 3)
with self.test_session() as sess:
sess.run(tf.global_variables_initializer())
train(sess, minimize_ops, 1, 2)
def add_param(self, spec, shape, name, **kwargs):
param = self.add_param_plain(spec, shape, name, **kwargs)
if name is not None and name.startswith("W") and self.weight_normalization:
# Hacky: check if the parameter is a weight matrix. If so, apply weight normalization
if len(param.get_shape()) == 2:
v = param
g = self.add_param_plain(tf.ones_initializer(), (shape[1],), name=name + "_wn/g")
param = v * (tf.reshape(g, (1, -1)) / tf.sqrt(tf.reduce_sum(tf.square(v), 0, keep_dims=True)))
elif len(param.get_shape()) == 4:
v = param
g = self.add_param_plain(tf.ones_initializer(), (shape[3],), name=name + "_wn/g")
param = v * (tf.reshape(g, (1, 1, 1, -1)) / tf.sqrt(tf.reduce_sum(tf.square(v), [0, 1, 2],
keep_dims=True)))
else:
raise NotImplementedError
return param
def apply_ln(layer):
def _normalize(x, prefix):
EPS = 1e-5
dim = x.get_shape()[-1].value
bias_name = prefix + "_ln/bias"
scale_name = prefix + "_ln/scale"
if bias_name not in layer.norm_params:
layer.norm_params[bias_name] = layer.add_param(
tf.zeros_initializer(), (dim,), name=bias_name, regularizable=False)
if scale_name not in layer.norm_params:
layer.norm_params[scale_name] = layer.add_param(
tf.ones_initializer(), (dim,), name=scale_name)
bias = layer.norm_params[bias_name]
scale = layer.norm_params[scale_name]
mean, var = tf.nn.moments(x, axes=[1], keep_dims=True)
x_normed = (x - mean) / tf.sqrt(var + EPS)
return x_normed * scale + bias
return _normalize
def add_param(self, spec, shape, name, **kwargs):
param = self.add_param_plain(spec, shape, name, **kwargs)
if name is not None and name.startswith("W") and self.weight_normalization:
# Hacky: check if the parameter is a weight matrix. If so, apply weight normalization
if len(param.get_shape()) == 2:
v = param
g = self.add_param_plain(tf.ones_initializer, (shape[1],), name=name + "_wn/g")
param = v * (tf.reshape(g, (1, -1)) / tf.sqrt(tf.reduce_sum(tf.square(v), 0, keep_dims=True)))
elif len(param.get_shape()) == 4:
v = param
g = self.add_param_plain(tf.ones_initializer, (shape[3],), name=name + "_wn/g")
param = v * (tf.reshape(g, (1, 1, 1, -1)) / tf.sqrt(tf.reduce_sum(tf.square(v), [0, 1, 2],
keep_dims=True)))
else:
raise NotImplementedError
return param
def apply_ln(layer):
def _normalize(x, prefix):
EPS = 1e-5
dim = x.get_shape()[-1].value
bias_name = prefix + "_ln/bias"
scale_name = prefix + "_ln/scale"
if bias_name not in layer.norm_params:
layer.norm_params[bias_name] = layer.add_param(
tf.zeros_initializer, (dim,), name=bias_name, regularizable=False)
if scale_name not in layer.norm_params:
layer.norm_params[scale_name] = layer.add_param(
tf.ones_initializer, (dim,), name=scale_name)
bias = layer.norm_params[bias_name]
scale = layer.norm_params[scale_name]
mean, var = tf.nn.moments(x, axes=[1], keep_dims=True)
x_normed = (x - mean) / tf.sqrt(var + EPS)
return x_normed * scale + bias
return _normalize
def trainable_initial_state(self, batch_size):
"""
Create a trainable initial state for the MultiSkipLSTMCell
:param batch_size: number of samples per batch
:return: list of SkipLSTMStateTuple
"""
initial_states = []
for idx in range(self._num_layers - 1):
with tf.variable_scope('layer_%d' % (idx + 1)):
with tf.variable_scope('initial_c'):
initial_c = rnn_ops.create_initial_state(batch_size, self._num_units[idx])
with tf.variable_scope('initial_h'):
initial_h = rnn_ops.create_initial_state(batch_size, self._num_units[idx])
initial_states.append(LSTMStateTuple(initial_c, initial_h))
with tf.variable_scope('layer_%d' % self._num_layers):
with tf.variable_scope('initial_c'):
initial_c = rnn_ops.create_initial_state(batch_size, self._num_units[-1])
with tf.variable_scope('initial_h'):
initial_h = rnn_ops.create_initial_state(batch_size, self._num_units[-1])
with tf.variable_scope('initial_update_prob'):
initial_update_prob = rnn_ops.create_initial_state(batch_size, 1, trainable=False,
initializer=tf.ones_initializer())
with tf.variable_scope('initial_cum_update_prob'):
initial_cum_update_prob = rnn_ops.create_initial_state(batch_size, 1, trainable=False,
initializer=tf.zeros_initializer())
initial_states.append(SkipLSTMStateTuple(initial_c, initial_h,
initial_update_prob, initial_cum_update_prob))
return initial_states
def trainable_initial_state(self, batch_size):
"""
Create a trainable initial state for the SkipGRUCell
:param batch_size: number of samples per batch
:return: SkipGRUStateTuple
"""
with tf.variable_scope('initial_h'):
initial_h = rnn_ops.create_initial_state(batch_size, self._num_units)
with tf.variable_scope('initial_update_prob'):
initial_update_prob = rnn_ops.create_initial_state(batch_size, 1, trainable=False,
initializer=tf.ones_initializer())
with tf.variable_scope('initial_cum_update_prob'):
initial_cum_update_prob = rnn_ops.create_initial_state(batch_size, 1, trainable=False,
initializer=tf.zeros_initializer())
return SkipGRUStateTuple(initial_h, initial_update_prob, initial_cum_update_prob)
def _batch_norm_without_layers(self, input_layer, decay, use_scale, epsilon):
"""Batch normalization on `input_layer` without tf.layers."""
# We make this function as similar as possible to the
# tf.contrib.layers.batch_norm, to minimize the differences between using
# layers and not using layers.
shape = input_layer.shape
num_channels = shape[3] if self.data_format == 'NHWC' else shape[1]
beta = self.get_variable('beta', [num_channels], tf.float32, tf.float32,
initializer=tf.zeros_initializer())
if use_scale:
gamma = self.get_variable('gamma', [num_channels], tf.float32,
tf.float32, initializer=tf.ones_initializer())
else:
gamma = tf.constant(1.0, tf.float32, [num_channels])
# For moving variables, we use tf.get_variable instead of self.get_variable,
# since self.get_variable returns the result of tf.cast which we cannot
# assign to.
moving_mean = tf.get_variable('moving_mean', [num_channels],
tf.float32,
initializer=tf.zeros_initializer(),
trainable=False)
moving_variance = tf.get_variable('moving_variance', [num_channels],
tf.float32,
initializer=tf.ones_initializer(),
trainable=False)
if self.phase_train:
bn, batch_mean, batch_variance = tf.nn.fused_batch_norm(
input_layer, gamma, beta, epsilon=epsilon,
data_format=self.data_format, is_training=True)
mean_update = moving_averages.assign_moving_average(
moving_mean, batch_mean, decay=decay, zero_debias=False)
variance_update = moving_averages.assign_moving_average(
moving_variance, batch_variance, decay=decay, zero_debias=False)
tf.add_to_collection(tf.GraphKeys.UPDATE_OPS, mean_update)
tf.add_to_collection(tf.GraphKeys.UPDATE_OPS, variance_update)
else:
bn, _, _ = tf.nn.fused_batch_norm(
input_layer, gamma, beta, mean=moving_mean,
variance=moving_variance, epsilon=epsilon,
data_format=self.data_format, is_training=False)
return bn
def Layernorm(x, axis, name):
'''
Layer normalization (Ba, 2016)
J: Z-normalization using all nodes of the layer on a per-sample basis.
Input:
`x`: channel_first/NCHW format! (or fully-connected)
`axis`: list
`name`: must be assigned
Example:
```python
axis = [1, 2, 3]
x = tf.random_normal([64, 3, 10, 10])
name = 'D_layernorm'
Return:
(x - u)/s * scale + offset
Source:
https://github.com/igul222/improved_wgan_training/blob/master/tflib/ops/layernorm.py
'''
mean, var = tf.nn.moments(x, axis, keep_dims=True)
n_neurons = x.get_shape().as_list()[axis[0]]
offset = tf.get_variable(
name+'.offset',
shape=[n_neurons] + [1 for _ in range(len(axis) -1)],
initializer=tf.zeros_initializer
)
scale = tf.get_variable(
name+'.scale',
shape=[n_neurons] + [1 for _ in range(len(axis) -1)],
initializer=tf.ones_initializer
)
return tf.nn.batch_normalization(x, mean, var, offset, scale, 1e-5)
```
def diagonal_bilinear_attention(seq1, seq2, len2, scaled=True, with_sentinel=True):
v = tf.get_variable('attn_weight', [1, 1, seq1.get_shape()[-1].value], tf.float32,
initializer=tf.ones_initializer())
attn_scores = tf.einsum('abc,adc->abd', v * seq1, seq2)
attn_scores += tf.layers.dense(seq1, 1, use_bias=False)
attn_scores += tf.transpose(tf.layers.dense(seq2, 1, use_bias=False), [0, 2, 1])
if scaled:
attn_scores /= math.sqrt(float(seq1.get_shape()[-1].value))
return apply_attention(attn_scores, seq2, len2, seq1 is seq2, with_sentinel)
def __init__(self, incoming, **kwargs):
super().__init__(incoming, **kwargs)
# self.temp = self.add_param(tf.ones_initializer, shape=(), name="temperature")
def __init__(self, incoming, center=True, scale=False, epsilon=0.001, decay=0.9,
beta=tf.zeros_initializer, gamma=tf.ones_initializer, moving_mean=tf.zeros_initializer,
moving_variance=tf.ones_initializer, **kwargs):
super(BatchNormLayer, self).__init__(incoming, **kwargs)
self.center = center
self.scale = scale
self.epsilon = epsilon
self.decay = decay
input_shape = incoming.output_shape
axis = list(range(len(input_shape) - 1))
params_shape = input_shape[-1:]
if center:
self.beta = self.add_param(beta, shape=params_shape, name='beta', trainable=True, regularizable=False)
else:
self.beta = None
if scale:
self.gamma = self.add_param(gamma, shape=params_shape, name='gamma', trainable=True, regularizable=True)
else:
self.gamma = None
self.moving_mean = self.add_param(moving_mean, shape=params_shape, name='moving_mean', trainable=False,
regularizable=False)
self.moving_variance = self.add_param(moving_variance, shape=params_shape, name='moving_variance',
trainable=False, regularizable=False)
self.axis = axis
def batch_norm(x, name_scope, training, epsilon=1e-3, decay=0.999):
"""Assume 2d [batch, values] tensor"""
with tf.variable_scope(name_scope):
size = x.get_shape().as_list()[1]
scale = tf.get_variable('scale', [size],
initializer=tf.constant_initializer(0.1))
offset = tf.get_variable('offset', [size])
pop_mean = tf.get_variable('pop_mean', [size],
initializer=tf.zeros_initializer(),
trainable=False)
pop_var = tf.get_variable('pop_var', [size],
initializer=tf.ones_initializer(),
trainable=False)
batch_mean, batch_var = tf.nn.moments(x, [0])
train_mean_op = tf.assign(
pop_mean,
pop_mean * decay + batch_mean * (1 - decay))
train_var_op = tf.assign(
pop_var,
pop_var * decay + batch_var * (1 - decay))
def batch_statistics():
with tf.control_dependencies([train_mean_op, train_var_op]):
return tf.nn.batch_normalization(x, batch_mean, batch_var, offset, scale, epsilon)
def population_statistics():
return tf.nn.batch_normalization(x, pop_mean, pop_var, offset, scale, epsilon)
return tf.cond(training, batch_statistics, population_statistics)
def layernorm(x, axis, name):
'''
Layer normalization (Ba, 2016)
J: Z-normalization using all nodes of the layer on a per-sample basis.
Input:
`x`: channel_first/NCHW format! (or fully-connected)
`axis`: list
`name`: must be assigned
Example:
# axis = [1, 2, 3]
# x = tf.random_normal([64, 3, 10, 10])
# name = 'D_layernorm'
Return:
(x - u)/s * scale + offset
Source:
https://github.com/igul222/improved_wgan_training/blob/master/tflib/ops/layernorm.py
'''
mean, var = tf.nn.moments(x, axis, keep_dims=True)
n_neurons = x.get_shape().as_list()[axis[0]]
offset = tf.get_variable(
name+'.offset',
shape=[n_neurons] + [1 for _ in range(len(axis) -1)],
initializer=tf.zeros_initializer
)
scale = tf.get_variable(
name+'.scale',
shape=[n_neurons] + [1 for _ in range(len(axis) -1)],
initializer=tf.ones_initializer
)
return tf.nn.batch_normalization(x, mean, var, offset, scale, 1e-5)
def testInitializers(self):
inputs = tf.placeholder(tf.float32, shape=[self.batch_size, self.in_size])
prev_state = tf.placeholder(tf.float32,
shape=[self.batch_size, self.hidden_size])
with self.assertRaisesRegexp(KeyError, "Invalid initializer keys.*"):
snt.VanillaRNN(name="rnn",
hidden_size=self.hidden_size,
initializers={"invalid": None})
err = "Initializer for 'w' is not a callable function"
with self.assertRaisesRegexp(TypeError, err):
snt.VanillaRNN(name="rnn",
hidden_size=self.hidden_size,
initializers={"in_to_hidden": {"w": tf.zeros([10, 10])}})
# Nested initializer.
valid_initializers = {
"in_to_hidden": {
"w": tf.ones_initializer(),
},
"hidden_to_hidden": {
"b": tf.ones_initializer(),
}
}
vanilla_rnn = snt.VanillaRNN(name="rnn",
hidden_size=self.hidden_size,
initializers=valid_initializers)
vanilla_rnn(inputs, prev_state)
init = tf.global_variables_initializer()
with self.test_session() as sess:
sess.run(init)
w_v, b_v = sess.run([
vanilla_rnn.in_to_hidden_linear.w,
vanilla_rnn.hidden_to_hidden_linear.b,
])
self.assertAllClose(w_v, np.ones([self.in_size, self.hidden_size]))
self.assertAllClose(b_v, np.ones([self.hidden_size]))
def create_gamma_initializer():
"""Returns a default initializer for the `gamma` in layer norm."""
return tf.ones_initializer()
def create_gamma_initializer():
"""Returns a default initializer for the `gamma` in batch norm."""
return tf.ones_initializer()
def create_variance_initializer():
"""Returns a default initializer for the `moving_variance` in batch norm."""
return tf.ones_initializer()