def _conv_layer(self, bottom, filter_size, filter_num, scope_name, bottom_channel=None, padding='SAME'):
if not bottom_channel:
_, _, _, bottom_channel = bottom.get_shape().as_list()
with tf.variable_scope(scope_name):
kernel = tf.Variable(
tf.truncated_normal([*filter_size, bottom_channel, filter_num], dtype=tf.float32, stddev=1e-1),
trainable=False,
name='weights'
)
conv = tf.nn.conv2d(bottom, kernel, [1, 1, 1, 1], padding=padding)
biases = tf.Variable(
tf.constant(0.0, shape=[filter_num], dtype=tf.float32),
trainable=True,
name='bias'
)
out = tf.nn.bias_add(conv, biases)
return out
python类Variable()的实例源码
def _variable_on_device(name, shape, initializer, trainable=True):
"""Helper to create a Variable.
Args:
name: name of the variable
shape: list of ints
initializer: initializer for Variable
Returns:
Variable Tensor
"""
# TODO(bichen): fix the hard-coded data type below
dtype = tf.float32
if not callable(initializer):
var = tf.get_variable(name, initializer=initializer, trainable=trainable)
else:
var = tf.get_variable(
name, shape, initializer=initializer, dtype=dtype, trainable=trainable)
return var
def _variable_with_weight_decay(name, shape, wd, initializer, trainable=True):
"""Helper to create an initialized Variable with weight decay.
Note that the Variable is initialized with a truncated normal distribution.
A weight decay is added only if one is specified.
Args:
name: name of the variable
shape: list of ints
wd: add L2Loss weight decay multiplied by this float. If None, weight
decay is not added for this Variable.
Returns:
Variable Tensor
"""
var = _variable_on_device(name, shape, initializer, trainable)
if wd is not None and trainable:
weight_decay = tf.multiply(tf.nn.l2_loss(var), wd, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
return var
def highway(self, input_1, input_2, size_1, size_2, l2_penalty=1e-8, layer_size=1):
output = input_2
for idx in range(layer_size):
with tf.name_scope('output_lin_%d' % idx):
W = tf.Variable(tf.truncated_normal([size_2,size_1], stddev=0.1), name="W")
b = tf.Variable(tf.constant(0.1, shape=[size_1]), name="b")
tf.add_to_collection(name=tf.GraphKeys.REGULARIZATION_LOSSES, value=l2_penalty*tf.nn.l2_loss(W))
tf.add_to_collection(name=tf.GraphKeys.REGULARIZATION_LOSSES, value=l2_penalty*tf.nn.l2_loss(b))
output = tf.nn.relu(tf.nn.xw_plus_b(output,W,b))
with tf.name_scope('transform_lin_%d' % idx):
W = tf.Variable(tf.truncated_normal([size_1,size_1], stddev=0.1), name="W")
b = tf.Variable(tf.constant(0.1, shape=[size_1]), name="b")
tf.add_to_collection(name=tf.GraphKeys.REGULARIZATION_LOSSES, value=l2_penalty*tf.nn.l2_loss(W))
tf.add_to_collection(name=tf.GraphKeys.REGULARIZATION_LOSSES, value=l2_penalty*tf.nn.l2_loss(b))
transform_gate = tf.sigmoid(tf.nn.xw_plus_b(input_1,W,b))
carry_gate = tf.constant(1.0) - transform_gate
output = transform_gate * output + carry_gate * input_1
return output
def conv_block(self, input, out_size, layer, kernalsize=3, l2_penalty=1e-8, shortcut=False):
in_shape = input.get_shape().as_list()
if layer>0:
filter_shape = [kernalsize, 1, in_shape[3], out_size]
else:
filter_shape = [kernalsize, in_shape[2], 1, out_size]
W = tf.Variable(tf.truncated_normal(filter_shape, stddev=0.1), name="W-%s" % layer)
b = tf.Variable(tf.constant(0.1, shape=[out_size]), name="b-%s" % layer)
tf.add_to_collection(name=tf.GraphKeys.REGULARIZATION_LOSSES, value=l2_penalty*tf.nn.l2_loss(W))
tf.add_to_collection(name=tf.GraphKeys.REGULARIZATION_LOSSES, value=l2_penalty*tf.nn.l2_loss(b))
if layer>0:
conv = tf.nn.conv2d(input, W, strides=[1, 1, 1, 1], padding="SAME", name="conv-%s" % layer)
else:
conv = tf.nn.conv2d(input, W, strides=[1, 1, 1, 1], padding="VALID", name="conv-%s" % layer)
if shortcut:
shortshape = [1,1,in_shape[3], out_size]
Ws = tf.Variable(tf.truncated_normal(shortshape, stddev=0.05), name="Ws-%s" % layer)
tf.add_to_collection(name=tf.GraphKeys.REGULARIZATION_LOSSES, value=l2_penalty*tf.nn.l2_loss(Ws))
conv = conv + tf.nn.conv2d(input, Ws, strides=[1, 1, 1, 1], padding="SAME", name="conv-shortcut-%s" % layer)
h = tf.nn.bias_add(conv, b)
h2 = tf.nn.relu(tf.contrib.layers.batch_norm(h, center=True, scale=True, epsilon=1e-5, decay=0.9), name="relu-%s" % layer)
return h2
def create_model(self, model_input, vocab_size, l2_penalty=1e-8, **unused_params):
"""Creates a logistic model.
Args:
model_input: 'batch' x 'num_features' matrix of input features.
vocab_size: The number of classes in the dataset.
Returns:
A dictionary with a tensor containing the probability predictions of the
model in the 'predictions' key. The dimensions of the tensor are
batch_size x num_classes."""
input_size = vocab_size
output_size = FLAGS.hidden_size
with tf.name_scope("rbm"):
self.weights = tf.Variable(tf.truncated_normal([input_size, output_size],
stddev=1.0 / math.sqrt(float(input_size))), name="weights")
self.v_bias = tf.Variable(tf.zeros([input_size]), name="v_bias")
self.h_bias = tf.Variable(tf.zeros([output_size]), name="h_bias")
tf.add_to_collection(name=tf.GraphKeys.REGULARIZATION_LOSSES, value=l2_penalty*tf.nn.l2_loss(self.weights))
tf.add_to_collection(name=tf.GraphKeys.REGULARIZATION_LOSSES, value=l2_penalty*tf.nn.l2_loss(self.v_bias))
tf.add_to_collection(name=tf.GraphKeys.REGULARIZATION_LOSSES, value=l2_penalty*tf.nn.l2_loss(self.h_bias))
def scope_vars(scope, trainable_only=False):
"""
Get variables inside a scope
The scope can be specified as a string
Parameters
----------
scope: str or VariableScope
scope in which the variables reside.
trainable_only: bool
whether or not to return only the variables that were marked as trainable.
Returns
-------
vars: [tf.Variable]
list of variables in `scope`.
"""
return tf.get_collection(
tf.GraphKeys.TRAINABLE_VARIABLES if trainable_only else tf.GraphKeys.GLOBAL_VARIABLES,
scope=scope if isinstance(scope, str) else scope.name
)
def baseline_forward(self, X, size, n_class):
shape = X.get_shape()
_X = tf.transpose(X, [1, 0, 2]) # batch_size x sentence_length x word_length -> batch_size x sentence_length x word_length
_X = tf.reshape(_X, [-1, int(shape[2])]) # (batch_size x sentence_length) x word_length
seq = tf.split(0, int(shape[1]), _X) # sentence_length x (batch_size x word_length)
with tf.name_scope("LSTM"):
lstm_cell = rnn_cell.BasicLSTMCell(size, forget_bias=1.0)
outputs, states = rnn.rnn(lstm_cell, seq, dtype=tf.float32)
with tf.name_scope("LSTM-Classifier"):
W = tf.Variable(tf.random_normal([size, n_class]), name="W")
b = tf.Variable(tf.random_normal([n_class]), name="b")
output = tf.matmul(outputs[-1], W) + b
return output
def get_dis_variables(self):
weights = {
'wc1': tf.Variable(tf.random_normal([4, 4, 1 + self.y_dim, 64], stddev=0.02), name='dis_w1'),
'wc2': tf.Variable(tf.random_normal([4, 4, 64 + self.y_dim, 128], stddev=0.02), name='dis_w2'),
'wc3': tf.Variable(tf.random_normal([128 * 7 * 7 + self.y_dim, 1024], stddev=0.02), name='dis_w3'),
'wd': tf.Variable(tf.random_normal([1024 + self.y_dim, 1], stddev=0.02), name='dis_w4')
}
biases = {
'bc1': tf.Variable(tf.zeros([64]), name='dis_b1'),
'bc2': tf.Variable(tf.zeros([128]), name='dis_b2'),
'bc3': tf.Variable(tf.zeros([1024]), name='dis_b3'),
'bd': tf.Variable(tf.zeros([1]), name='dis_b4')
}
return weights, biases
def get_en_z_variables(self):
weights = {
'e1': tf.Variable(tf.random_normal([4, 4, 1, 64], stddev=0.02), name='enz_w1'),
'e2': tf.Variable(tf.random_normal([4, 4, 64, 128], stddev=0.02), name='enz_w2'),
##z
'e3': tf.Variable(tf.random_normal([128 * 7 * 7, 64], stddev=0.02), name='enz_w3')
}
biases = {
'eb1': tf.Variable(tf.zeros([64]), name='enz_b1'),
'eb2': tf.Variable(tf.zeros([128]), name='enz_b2'),
##z
'eb3': tf.Variable(tf.zeros([64]), name='enz_b3')
}
return weights, biases
def get_en_y_variables(self):
weights = {
'e1': tf.Variable(tf.random_normal([4, 4, 1, 64], stddev=0.02), name='eny_w1'),
'e2': tf.Variable(tf.random_normal([4, 4, 64, 128], stddev=0.02), name='eny_w2'),
'e3': tf.Variable(tf.random_normal([128 * 7 * 7, 10], stddev=0.02), name='eny_w4')
}
biases = {
'eb1': tf.Variable(tf.zeros([64]), name='eny_b1'),
'eb2': tf.Variable(tf.zeros([128]), name='eny_b2'),
'eb3': tf.Variable(tf.zeros([10]), name='eny_b4')
}
return weights, biases
def get_gen_variables(self):
weights = {
'wd': tf.Variable(tf.random_normal([self.sample_size+self.y_dim , 1024], stddev=0.02), name='gen_w1'),
'wc1': tf.Variable(tf.random_normal([1024 + self.y_dim , 7 * 7 * 128], stddev=0.02), name='gen_w2'),
'wc2': tf.Variable(tf.random_normal([4, 4, 64, 128 + self.y_dim], stddev=0.02), name='gen_w3'),
'wc3': tf.Variable(tf.random_normal([4, 4, 1, 64 + self.y_dim], stddev=0.02), name='gen_w4'),
}
biases = {
'bd': tf.Variable(tf.zeros([1024]), name='gen_b1'),
'bc1': tf.Variable(tf.zeros([7 * 7 * 128]), name='gen_b2'),
'bc2': tf.Variable(tf.zeros([64]), name='gen_b3'),
'bc3': tf.Variable(tf.zeros([1]), name='gen_b4')
}
return weights, biases
def accumulate_strings(values, name="strings"):
"""Accumulates strings into a vector.
Args:
values: A 1-d string tensor that contains values to add to the accumulator.
Returns:
A tuple (value_tensor, update_op).
"""
tf.assert_type(values, tf.string)
strings = tf.Variable(
name=name,
initial_value=[],
dtype=tf.string,
trainable=False,
collections=[],
validate_shape=True)
value_tensor = tf.identity(strings)
update_op = tf.assign(
ref=strings, value=tf.concat([strings, values], 0), validate_shape=False)
return value_tensor, update_op
def makeDNN(hidden_layer):
# input from X
prevLayer = X
# make layers
for i in range(hidden_layer):
if i==0:
newWeight = tf.get_variable("W0%d" % i, shape=[features, wide], initializer=tf.contrib.layers.xavier_initializer())
else:
newWeight = tf.get_variable("W0%d" % i, shape=[wide, wide], initializer=tf.contrib.layers.xavier_initializer())
newBias = tf.Variable(tf.random_normal([wide]))
newLayer = tf.nn.relu(tf.matmul(prevLayer, newWeight) + newBias)
newDropLayer = tf.nn.dropout(newLayer, dropout_rate)
prevLayer = newDropLayer
# make output layers
Wo = tf.get_variable("Wo", shape=[wide, labels], initializer=tf.contrib.layers.xavier_initializer())
bo = tf.Variable(tf.random_normal([labels]))
return tf.matmul(prevLayer, Wo) + bo
# tf Graph Input
def _variable_with_weight_decay(name, shape, stddev, wd):
"""Helper to create an initialized Variable with weight decay.
Note that the Variable is initialized with a truncated normal distribution.
A weight decay is added only if one is specified.
Args:
name: name of the variable
shape: list of ints
stddev: standard deviation of a truncated Gaussian
wd: add L2Loss weight decay multiplied by this float. If None, weight
decay is not added for this Variable.
Returns:
Variable Tensor
"""
var = _variable_on_cpu(name, shape,
tf.truncated_normal_initializer(stddev=stddev))
if wd:
# weight_decay = tf.mul(tf.nn.l2_loss(var), wd, name='weight_loss')
weight_decay = tf.multiply(tf.nn.l2_loss(var), wd, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
return var
def _variable_with_weight_decay(name, shape, stddev, wd):
"""Helper to create an initialized Variable with weight decay.
Note that the Variable is initialized with a truncated normal distribution.
A weight decay is added only if one is specified.
Args:
name: name of the variable
shape: list of ints
stddev: standard deviation of a truncated Gaussian
wd: add L2Loss weight decay multiplied by this float. If None, weight
decay is not added for this Variable.
Returns:
Variable Tensor
"""
var = _variable_on_cpu(name, shape,
tf.truncated_normal_initializer(stddev=stddev))
if wd:
# weight_decay = tf.mul(tf.nn.l2_loss(var), wd, name='weight_loss')
weight_decay = tf.multiply(tf.nn.l2_loss(var), wd, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
return var
def _variable_with_weight_decay(name, shape, stddev, wd):
"""Helper to create an initialized Variable with weight decay.
Note that the Variable is initialized with a truncated normal distribution.
A weight decay is added only if one is specified.
Args:
name: name of the variable
shape: list of ints
stddev: standard deviation of a truncated Gaussian
wd: add L2Loss weight decay multiplied by this float. If None, weight
decay is not added for this Variable.
Returns:
Variable Tensor
"""
dtype = tf.float16 if FLAGS.use_fp16 else tf.float32
var = _variable_on_cpu(
name,
shape,
tf.truncated_normal_initializer(stddev=stddev, dtype=dtype))
if wd is not None and not tf.get_variable_scope().reuse:
weight_decay = tf.multiply(tf.nn.l2_loss(var), wd, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
return var
def max_pool2d(inputs,
kernel_size,
scope,
stride=[2, 2],
padding='VALID'):
""" 2D max pooling.
Args:
inputs: 4-D tensor BxHxWxC
kernel_size: a list of 2 ints
stride: a list of 2 ints
Returns:
Variable tensor
"""
with tf.variable_scope(scope) as sc:
kernel_h, kernel_w = kernel_size
stride_h, stride_w = stride
outputs = tf.nn.max_pool(inputs,
ksize=[1, kernel_h, kernel_w, 1],
strides=[1, stride_h, stride_w, 1],
padding=padding,
name=sc.name)
return outputs
def avg_pool2d(inputs,
kernel_size,
scope,
stride=[2, 2],
padding='VALID'):
""" 2D avg pooling.
Args:
inputs: 4-D tensor BxHxWxC
kernel_size: a list of 2 ints
stride: a list of 2 ints
Returns:
Variable tensor
"""
with tf.variable_scope(scope) as sc:
kernel_h, kernel_w = kernel_size
stride_h, stride_w = stride
outputs = tf.nn.avg_pool(inputs,
ksize=[1, kernel_h, kernel_w, 1],
strides=[1, stride_h, stride_w, 1],
padding=padding,
name=sc.name)
return outputs
def max_pool3d(inputs,
kernel_size,
scope,
stride=[2, 2, 2],
padding='VALID'):
""" 3D max pooling.
Args:
inputs: 5-D tensor BxDxHxWxC
kernel_size: a list of 3 ints
stride: a list of 3 ints
Returns:
Variable tensor
"""
with tf.variable_scope(scope) as sc:
kernel_d, kernel_h, kernel_w = kernel_size
stride_d, stride_h, stride_w = stride
outputs = tf.nn.max_pool3d(inputs,
ksize=[1, kernel_d, kernel_h, kernel_w, 1],
strides=[1, stride_d, stride_h, stride_w, 1],
padding=padding,
name=sc.name)
return outputs
def avg_pool3d(inputs,
kernel_size,
scope,
stride=[2, 2, 2],
padding='VALID'):
""" 3D avg pooling.
Args:
inputs: 5-D tensor BxDxHxWxC
kernel_size: a list of 3 ints
stride: a list of 3 ints
Returns:
Variable tensor
"""
with tf.variable_scope(scope) as sc:
kernel_d, kernel_h, kernel_w = kernel_size
stride_d, stride_h, stride_w = stride
outputs = tf.nn.avg_pool3d(inputs,
ksize=[1, kernel_d, kernel_h, kernel_w, 1],
strides=[1, stride_d, stride_h, stride_w, 1],
padding=padding,
name=sc.name)
return outputs
def _optimize(self):
'''
NOTE: The author said that there was no need for 100 d_iter per 100 iters.
https://github.com/igul222/improved_wgan_training/issues/3
'''
global_step = tf.Variable(0, name='global_step')
lr = self.arch['training']['lr']
b1 = self.arch['training']['beta1']
b2 = self.arch['training']['beta2']
optimizer = tf.train.AdamOptimizer(lr, b1, b2)
g_vars = tf.trainable_variables()
with tf.name_scope('Update'):
opt_g = optimizer.minimize(self.loss['G'], var_list=g_vars, global_step=global_step)
return {
'g': opt_g,
'global_step': global_step
}
def _validate(self, machine, n=10):
N = n * n
# same row same z
z = tf.random_normal(shape=[n, self.arch['z_dim']])
z = tf.tile(z, [1, n])
z = tf.reshape(z, [N, -1])
z = tf.Variable(z, trainable=False, dtype=tf.float32)
# same column same y
y = tf.range(0, 10, 1, dtype=tf.int64)
y = tf.reshape(y, [-1, 1])
y = tf.tile(y, [n, 1])
Xh = machine.generate(z, y) # 100, 64, 64, 3
# Xh = gray2jet(Xh)
# Xh = make_png_thumbnail(Xh, n)
Xh = make_png_jet_thumbnail(Xh, n)
return Xh
def _optimize(self):
'''
NOTE: The author said that there was no need for 100 d_iter per 100 iters.
https://github.com/igul222/improved_wgan_training/issues/3
'''
global_step = tf.Variable(0, name='global_step')
lr = self.arch['training']['lr']
b1 = self.arch['training']['beta1']
b2 = self.arch['training']['beta2']
optimizer = tf.train.AdamOptimizer(lr, b1, b2)
trainables = tf.trainable_variables()
g_vars = trainables
# g_vars = [v for v in trainables if 'Generator' in v.name or 'y_emb' in v.name]
with tf.name_scope('Update'):
opt_g = optimizer.minimize(self.loss['G'], var_list=g_vars, global_step=global_step)
return {
'g': opt_g,
'global_step': global_step
}
def _validate(self, machine, n=10):
N = n * n
# same row same z
z = tf.random_normal(shape=[n, self.arch['z_dim']])
z = tf.tile(z, [1, n])
z = tf.reshape(z, [N, -1])
z = tf.Variable(z, trainable=False, dtype=tf.float32)
# same column same y
y = tf.range(0, 10, 1, dtype=tf.int64)
y = tf.reshape(y, [-1,])
y = tf.tile(y, [n,])
Xh = machine.generate(z, y) # 100, 64, 64, 3
Xh = make_png_thumbnail(Xh, n)
return Xh
def batch_norm_layer(self, to_be_normalized, is_training):
if is_training:
train_phase = tf.constant(1)
else:
train_phase = tf.constant(-1)
beta = tf.Variable(tf.constant(0.0, shape=[to_be_normalized.shape[-1]]), name='beta', trainable=True)
gamma = tf.Variable(tf.constant(1.0, shape=[to_be_normalized.shape[-1]]), name='gamma', trainable=True)
# axises = np.arange(len(to_be_normalized.shape) - 1) # change to apply tensorflow 1.3
axises = [0,1,2]
print("start nn.moments")
print("axises : " + str(axises))
batch_mean, batch_var = tf.nn.moments(to_be_normalized, axises, name='moments')
print("nn.moments successful")
ema = tf.train.ExponentialMovingAverage(decay=0.5)
def mean_var_with_update():
ema_apply_op = ema.apply([batch_mean, batch_var])
with tf.control_dependencies([ema_apply_op]):
return tf.identity(batch_mean), tf.identity(batch_var)
mean, var = tf.cond(train_phase > 0, mean_var_with_update, lambda: (ema.average(batch_mean), ema.average(batch_var))) # if is training --> update
normed = tf.nn.batch_normalization(to_be_normalized, mean, var, beta, gamma, 1e-3)
return normed
def input_norm(xs):
fc_mean, fc_var = tf.nn.moments(
xs,
axes=[0],
)
scale = tf.Variable(tf.ones([1]))
shift = tf.Variable(tf.zeros([1]))
epsilon = 0.001
# apply moving average for mean and var when train on batch
ema = tf.train.ExponentialMovingAverage(decay=0.5)
def mean_var_with_update():
ema_apply_op = ema.apply([fc_mean, fc_var])
with tf.control_dependencies([ema_apply_op]):
return tf.identity(fc_mean), tf.identity(fc_var)
mean, var = mean_var_with_update()
xs = tf.nn.batch_normalization(xs, mean, var, shift, scale, epsilon)
return xs
def __init__(self, tag, x, summary_fn=tf.summary.scalar, summary_args=(), scope=None):
"""
Initializes an Average.
Arguments
x: Tensor to be averaged over multiple runs.
tag: Tag for the summary.
summary_fn: Function used for creating a summary.
summary_args: Arguments passed to the summary function.
"""
with tf.variable_scope(scope or type(self).__name__):
counter = tf.Variable(name="counter", initial_value=tf.constant(0),
dtype=tf.int32, trainable=False)
running_sum = tf.Variable(name="running_sum", initial_value=tf.constant(0.),
dtype=tf.float32, trainable=False)
self._running_average = running_sum / tf.cast(counter, tf.float32)
self._summary = summary_fn(tag or x.name + '_avg', self._running_average, **summary_args)
self._update_op = tf.group(counter.assign_add(1), running_sum.assign_add(x))
self._reset_op = tf.group(counter.assign(0), running_sum.assign(0.))
def compile(self, in_x, train_feed, eval_feed):
n = np.product(self.in_d)
m, param_init_fn = [dom[i] for (dom, i) in zip(self.domains, self.chosen)]
#sc = np.sqrt(6.0) / np.sqrt(m + n)
#W = tf.Variable(tf.random_uniform([n, m], -sc, sc))
W = tf.Variable( param_init_fn( [n, m] ) )
b = tf.Variable(tf.zeros([m]))
# if the number of input dimensions is larger than one, flatten the
# input and apply the affine transformation.
if len(self.in_d) > 1:
in_x_flat = tf.reshape(in_x, shape=[-1, n])
out_y = tf.add(tf.matmul(in_x_flat, W), b)
else:
out_y = tf.add(tf.matmul(in_x, W), b)
return out_y
# computes the output dimension based on the padding scheme used.
# this comes from the tensorflow documentation
def compile(self, in_x, train_feed, eval_feed):
in_height, in_width, in_nchannels = self.in_d
nfilters, filter_len, stride, padding, param_init_fn = [dom[i]
for (dom, i) in zip(self.domains, self.chosen)]
# Creation and initialization of the parameters. Should take size of
# the filter into account.
W = tf.Variable(
param_init_fn( [filter_len, filter_len, in_nchannels, nfilters]) )
b = tf.Variable(tf.zeros([nfilters]))
# create the output and add the bias.
out_yaux = tf.nn.conv2d(in_x, W, strides=[1, stride, stride, 1], padding=padding)
out_y = tf.nn.bias_add(out_yaux, b)
#print(in_x.get_shape(), self.get_outdim(), out_y.get_shape())
return out_y