def discriminator_labeler(image, output_dim, config, reuse=None):
batch_size=tf.shape(image)[0]
with tf.variable_scope("disc_labeler",reuse=reuse) as vs:
dl_bn1 = batch_norm(name='dl_bn1')
dl_bn2 = batch_norm(name='dl_bn2')
dl_bn3 = batch_norm(name='dl_bn3')
h0 = lrelu(conv2d(image, config.df_dim, name='dl_h0_conv'))#16,32,32,64
h1 = lrelu(dl_bn1(conv2d(h0, config.df_dim*2, name='dl_h1_conv')))#16,16,16,128
h2 = lrelu(dl_bn2(conv2d(h1, config.df_dim*4, name='dl_h2_conv')))#16,16,16,248
h3 = lrelu(dl_bn3(conv2d(h2, config.df_dim*8, name='dl_h3_conv')))
dim3=np.prod(h3.get_shape().as_list()[1:])
h3_flat=tf.reshape(h3, [-1,dim3])
D_labels_logits = linear(h3_flat, output_dim, 'dl_h3_Label')
D_labels = tf.nn.sigmoid(D_labels_logits)
variables = tf.contrib.framework.get_variables(vs)
return D_labels, D_labels_logits, variables
python类linear()的实例源码
def discriminator_gen_labeler(image, output_dim, config, reuse=None):
batch_size=tf.shape(image)[0]
with tf.variable_scope("disc_gen_labeler",reuse=reuse) as vs:
dl_bn1 = batch_norm(name='dl_bn1')
dl_bn2 = batch_norm(name='dl_bn2')
dl_bn3 = batch_norm(name='dl_bn3')
h0 = lrelu(conv2d(image, config.df_dim, name='dgl_h0_conv'))#16,32,32,64
h1 = lrelu(dl_bn1(conv2d(h0, config.df_dim*2, name='dgl_h1_conv')))#16,16,16,128
h2 = lrelu(dl_bn2(conv2d(h1, config.df_dim*4, name='dgl_h2_conv')))#16,16,16,248
h3 = lrelu(dl_bn3(conv2d(h2, config.df_dim*8, name='dgl_h3_conv')))
dim3=np.prod(h3.get_shape().as_list()[1:])
h3_flat=tf.reshape(h3, [-1,dim3])
D_labels_logits = linear(h3_flat, output_dim, 'dgl_h3_Label')
D_labels = tf.nn.sigmoid(D_labels_logits)
variables = tf.contrib.framework.get_variables(vs)
return D_labels, D_labels_logits,variables
def discriminator_on_z(image, config, reuse=None):
batch_size=tf.shape(image)[0]
with tf.variable_scope("disc_z_labeler",reuse=reuse) as vs:
dl_bn1 = batch_norm(name='dl_bn1')
dl_bn2 = batch_norm(name='dl_bn2')
dl_bn3 = batch_norm(name='dl_bn3')
h0 = lrelu(conv2d(image, config.df_dim, name='dzl_h0_conv'))#16,32,32,64
h1 = lrelu(dl_bn1(conv2d(h0, config.df_dim*2, name='dzl_h1_conv')))#16,16,16,128
h2 = lrelu(dl_bn2(conv2d(h1, config.df_dim*4, name='dzl_h2_conv')))#16,16,16,248
h3 = lrelu(dl_bn3(conv2d(h2, config.df_dim*8, name='dzl_h3_conv')))
dim3=np.prod(h3.get_shape().as_list()[1:])
h3_flat=tf.reshape(h3, [-1,dim3])
D_labels_logits = linear(h3_flat, config.z_dim, 'dzl_h3_Label')
D_labels = tf.nn.tanh(D_labels_logits)
variables = tf.contrib.framework.get_variables(vs)
return D_labels,variables
def discriminator(self, opts, input_, is_training,
prefix='DISCRIMINATOR', reuse=False):
"""Encoder function, suitable for simple toy experiments.
"""
num_filters = opts['d_num_filters']
with tf.variable_scope(prefix, reuse=reuse):
h0 = ops.conv2d(opts, input_, num_filters / 8, scope='h0_conv')
h0 = ops.batch_norm(opts, h0, is_training, reuse, scope='bn_layer1')
h0 = tf.nn.relu(h0)
h1 = ops.conv2d(opts, h0, num_filters / 4, scope='h1_conv')
h1 = ops.batch_norm(opts, h1, is_training, reuse, scope='bn_layer2')
h1 = tf.nn.relu(h1)
h2 = ops.conv2d(opts, h1, num_filters / 2, scope='h2_conv')
h2 = ops.batch_norm(opts, h2, is_training, reuse, scope='bn_layer3')
h2 = tf.nn.relu(h2)
h3 = ops.conv2d(opts, h2, num_filters, scope='h3_conv')
h3 = ops.batch_norm(opts, h3, is_training, reuse, scope='bn_layer4')
h3 = tf.nn.relu(h3)
# Already has NaNs!!
latent_mean = ops.linear(opts, h3, opts['latent_space_dim'], scope='h3_lin')
log_latent_sigmas = ops.linear(opts, h3, opts['latent_space_dim'], scope='h3_lin_sigma')
return latent_mean, log_latent_sigmas
def generator(self, opts, noise, reuse=False):
"""Generator function, suitable for simple toy experiments.
Args:
noise: [num_points, dim] array, where dim is dimensionality of the
latent noise space.
Returns:
[num_points, dim1, dim2, dim3] array, where the first coordinate
indexes the points, which all are of the shape (dim1, dim2, dim3).
"""
output_shape = self._data.data_shape
num_filters = opts['g_num_filters']
with tf.variable_scope("GENERATOR", reuse=reuse):
h0 = ops.linear(opts, noise, num_filters, 'h0_lin')
h0 = tf.nn.relu(h0)
h1 = ops.linear(opts, h0, num_filters, 'h1_lin')
h1 = tf.nn.relu(h1)
h2 = ops.linear(opts, h1, np.prod(output_shape), 'h2_lin')
h2 = tf.reshape(h2, [-1] + list(output_shape))
if opts['input_normalize_sym']:
return tf.nn.tanh(h2)
else:
return h2
def discriminator(self, opts, input_,
prefix='DISCRIMINATOR', reuse=False):
"""Discriminator function, suitable for simple toy experiments.
"""
shape = input_.get_shape().as_list()
num_filters = opts['d_num_filters']
assert len(shape) > 0, 'No inputs to discriminate.'
with tf.variable_scope(prefix, reuse=reuse):
h0 = ops.linear(opts, input_, num_filters, 'h0_lin')
h0 = tf.nn.relu(h0)
h1 = ops.linear(opts, h0, num_filters, 'h1_lin')
h1 = tf.nn.relu(h1)
h2 = ops.linear(opts, h1, 1, 'h2_lin')
return h2
def generator(self, opts, noise, reuse=False):
"""Generator function, suitable for simple toy experiments.
Args:
noise: [num_points, dim] array, where dim is dimensionality of the
latent noise space.
Returns:
[num_points, dim1, dim2, dim3] array, where the first coordinate
indexes the points, which all are of the shape (dim1, dim2, dim3).
"""
output_shape = self._data.data_shape
num_filters = opts['g_num_filters']
with tf.variable_scope("GENERATOR", reuse=reuse):
h0 = ops.linear(opts, noise, num_filters, 'h0_lin')
h0 = tf.nn.tanh(h0)
h1 = ops.linear(opts, h0, num_filters, 'h1_lin')
h1 = tf.nn.tanh(h1)
h2 = ops.linear(opts, h1, np.prod(output_shape), 'h2_lin')
h2 = tf.reshape(h2, [-1] + list(output_shape))
if opts['input_normalize_sym']:
return tf.nn.tanh(h2)
else:
return h2
def discriminator(self, opts, input_,
prefix='DISCRIMINATOR', reuse=False):
"""Discriminator function, suitable for simple toy experiments.
"""
shape = input_.get_shape().as_list()
num_filters = opts['d_num_filters']
assert len(shape) > 0, 'No inputs to discriminate.'
with tf.variable_scope(prefix, reuse=reuse):
h0 = ops.linear(opts, input_, num_filters, 'h0_lin')
h0 = tf.nn.tanh(h0)
h1 = ops.linear(opts, h0, num_filters, 'h1_lin')
h1 = tf.nn.tanh(h1)
h2 = ops.linear(opts, h1, 1, 'h2_lin')
return h2
def discriminator(self, opts, input_, is_training,
prefix='DISCRIMINATOR', reuse=False):
"""Discriminator function, suitable for simple toy experiments.
"""
num_filters = opts['d_num_filters']
with tf.variable_scope(prefix, reuse=reuse):
h0 = ops.conv2d(opts, input_, num_filters, scope='h0_conv')
h0 = ops.batch_norm(opts, h0, is_training, reuse, scope='bn_layer1')
h0 = ops.lrelu(h0)
h1 = ops.conv2d(opts, h0, num_filters * 2, scope='h1_conv')
h1 = ops.batch_norm(opts, h1, is_training, reuse, scope='bn_layer2')
h1 = ops.lrelu(h1)
h2 = ops.conv2d(opts, h1, num_filters * 4, scope='h2_conv')
h2 = ops.batch_norm(opts, h2, is_training, reuse, scope='bn_layer3')
h2 = ops.lrelu(h2)
h3 = ops.linear(opts, h2, 1, scope='h3_lin')
return h3
def generator(self, opts, noise, is_training, reuse=False):
with tf.variable_scope("GENERATOR", reuse=reuse):
h0 = ops.linear(opts, noise, 100, scope='h0_lin')
h0 = ops.batch_norm(opts, h0, is_training, reuse, scope='bn_layer1', scale=False)
h0 = tf.nn.softplus(h0)
h1 = ops.linear(opts, h0, 100, scope='h1_lin')
h1 = ops.batch_norm(opts, h1, is_training, reuse, scope='bn_layer2', scale=False)
h1 = tf.nn.softplus(h1)
h2 = ops.linear(opts, h1, 28 * 28, scope='h2_lin')
# h2 = ops.batch_norm(opts, h2, is_training, reuse, scope='bn_layer3')
h2 = tf.reshape(h2, [-1, 28, 28, 1])
if opts['input_normalize_sym']:
return tf.nn.tanh(h2)
else:
return tf.nn.sigmoid(h2)
def discriminator(self, image, y=None, reuse=False):
if reuse:
tf.get_variable_scope().reuse_variables()
s = self.output_size
if np.mod(s, 16) == 0:
h0 = lrelu(conv2d(image, self.df_dim, name='d_h0_conv'))
h1 = lrelu(self.d_bn1(conv2d(h0, self.df_dim*2, name='d_h1_conv')))
h2 = lrelu(self.d_bn2(conv2d(h1, self.df_dim*4, name='d_h2_conv')))
h3 = lrelu(self.d_bn3(conv2d(h2, self.df_dim*8, name='d_h3_conv')))
h4 = linear(tf.reshape(h3, [self.batch_size, -1]), 1, 'd_h3_lin')
return tf.nn.sigmoid(h4), h4
else:
h0 = lrelu(conv2d(image, self.df_dim, name='d_h0_conv'))
h1 = lrelu(self.d_bn1(conv2d(h0, self.df_dim*2, name='d_h1_conv')))
h2 = linear(tf.reshape(h1, [self.batch_size, -1]), 1, 'd_h2_lin')
if not self.config.use_kernel:
return tf.nn.sigmoid(h2), h2
else:
return tf.nn.sigmoid(h2), h2, h1, h0
def discriminator_k(self, image, reuse=False):
if reuse:
tf.get_variable_scope().reuse_variables()
#1024, 512, 128
h0 = tf.nn.sigmoid(linear(image, 512, 'dk_h0_lin', stddev=self.config.init))
h1 = tf.nn.sigmoid(linear(h0, 256, 'dk_h1_lin', stddev=self.config.init))
h2 = tf.nn.sigmoid(linear(h1, 256, 'dk_h2_lin', stddev=self.config.init))
h3 = tf.nn.sigmoid(linear(h2, 128, 'dk_h3_lin', stddev=self.config.init))
h4 = tf.nn.relu(linear(h3, 64, 'dk_h4_lin', stddev=self.config.init))
if self.config.use_gan:
h5 = linear(h4, 1, 'dk_h5_lin', stddev=self.config.init)
return image, h0, h1, h2, h3, h4, h5
elif self.config.use_layer_kernel:
return image, h0, h1, h2, h3, h4
elif self.config.use_scale_kernel:
return tf.concat(1, [image, (28.0 * 28.0/512.0) * h0, (28 * 28.0/256.0) * h1, (28.0 * 28.0/256.0) * h2, (28.0 * 28.0/128.0) * h3,
(28.0 * 28.0/64.0) * h4])
else:
return tf.concat(1, [image, h0, h1, h2, h3, h4])
def discriminator(self, image, y=None, reuse=False):
if reuse:
tf.get_variable_scope().reuse_variables()
s = self.output_size
if np.mod(s, 16) == 0:
h0 = lrelu(conv2d(image, self.df_dim, name='d_h0_conv'))
h1 = lrelu(self.d_bn1(conv2d(h0, self.df_dim*2, name='d_h1_conv')))
h2 = lrelu(self.d_bn2(conv2d(h1, self.df_dim*4, name='d_h2_conv')))
h3 = lrelu(self.d_bn3(conv2d(h2, self.df_dim*8, name='d_h3_conv')))
h4 = linear(tf.reshape(h3, [self.batch_size, -1]), 1, 'd_h3_lin')
return tf.nn.sigmoid(h4), h4
else:
h0 = lrelu(conv2d(image, self.df_dim, name='d_h0_conv'))
h1 = lrelu(self.d_bn1(conv2d(h0, self.df_dim*2, name='d_h1_conv')))
h2 = linear(tf.reshape(h1, [self.batch_size, -1]), 1, 'd_h2_lin')
if not self.config.use_kernel:
return tf.nn.sigmoid(h2), h2
else:
return tf.nn.sigmoid(h2), h2, h1, h0
def discriminator(self, image, y=None, reuse=False):
if reuse:
tf.get_variable_scope().reuse_variables()
s = self.output_size
if np.mod(s, 16) == 0:
h0 = lrelu(conv2d(image, self.df_dim, name='d_h0_conv'))
h1 = lrelu(self.d_bn1(conv2d(h0, self.df_dim*2, name='d_h1_conv')))
h2 = lrelu(self.d_bn2(conv2d(h1, self.df_dim*4, name='d_h2_conv')))
h3 = lrelu(self.d_bn3(conv2d(h2, self.df_dim*8, name='d_h3_conv')))
h4 = linear(tf.reshape(h3, [self.batch_size, -1]), 1, 'd_h3_lin')
return tf.nn.sigmoid(h4), h4
else:
h0 = lrelu(conv2d(image, self.df_dim, name='d_h0_conv'))
h1 = lrelu(self.d_bn1(conv2d(h0, self.df_dim*2, name='d_h1_conv')))
h2 = linear(tf.reshape(h1, [self.batch_size, -1]), 1, 'd_h2_lin')
if not self.config.use_kernel:
return tf.nn.sigmoid(h2), h2
else:
return tf.nn.sigmoid(h2), h2, h1, h0
def __call__(self, inputs, is_train=True, is_debug=False):
self.is_train = is_train
self.is_debug = is_debug
outputs = tf.convert_to_tensor(inputs) # Check if necessary
# assert input shape
with tf.variable_scope(self.name, reuse=self.reuse) as scope:
print_message(scope.name)
with tf.variable_scope('conv1') as vscope:
outputs = conv3d(outputs, [self.batch_size] + self.configs.conv_info.l1,
is_train=self.is_train, with_w=True)
if is_debug and not self.reuse:
print(vscope.name, outputs)
outputs = tf.layers.dropout(outputs, rate=self.configs.dropout, training=self.is_train, name='outputs')
self.net['conv1_outputs'] = outputs
with tf.variable_scope('conv2') as vscope:
outputs = conv3d(outputs, [self.batch_size] + self.configs.conv_info.l2,
is_train=self.is_train, with_w=True)
if is_debug and not self.reuse:
print(vscope.name, outputs)
outputs = tf.layers.dropout(outputs, rate=self.configs.dropout, training=self.is_train, name='outputs')
self.net['conv2_outputs'] = outputs
with tf.variable_scope('conv3') as vscope:
outputs = conv3d(outputs, [self.batch_size] + self.configs.conv_info.l3,
is_train=self.is_train, with_w=True)
if is_debug and not self.reuse:
print(vscope.name, outputs)
outputs = tf.layers.dropout(outputs, rate=self.configs.dropout, training=self.is_train, name='outputs')
self.net['conv3_outputs'] = outputs
with tf.variable_scope('fc') as vscope:
fc_dim = reduce(mul, self.configs.conv_info.l3, 1)
outputs = tf.reshape(outputs, [self.batch_size] + [fc_dim], name='reshape')
outputs = linear(outputs, 1)
if is_debug and not self.reuse:
print(vscope.name, outputs)
self.net['fc_outputs'] = outputs
self.reuse = True
self.variables = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope=self.name)
return tf.nn.sigmoid(outputs), outputs
def discriminator(self, opts, input_, is_training,
prefix='DISCRIMINATOR', reuse=False):
shape = tf.shape(input_)
num = shape[0]
with tf.variable_scope(prefix, reuse=reuse):
h0 = input_
h0 = tf.add(h0, tf.random_normal(shape, stddev=0.3))
h0 = ops.linear(opts, h0, 1000, scope='h0_linear')
# h0 = ops.batch_norm(opts, h0, is_training, reuse, scope='bn_layer1')
h0 = tf.nn.relu(h0)
h1 = tf.add(h0, tf.random_normal([num, 1000], stddev=0.5))
h1 = ops.linear(opts, h1, 500, scope='h1_linear')
# h1 = ops.batch_norm(opts, h1, is_training, reuse, scope='bn_layer2')
h1 = tf.nn.relu(h1)
h2 = tf.add(h1, tf.random_normal([num, 500], stddev=0.5))
h2 = ops.linear(opts, h2, 250, scope='h2_linear')
# h2 = ops.batch_norm(opts, h2, is_training, reuse, scope='bn_layer3')
h2 = tf.nn.relu(h2)
h3 = tf.add(h2, tf.random_normal([num, 250], stddev=0.5))
h3 = ops.linear(opts, h3, 250, scope='h3_linear')
# h3 = ops.batch_norm(opts, h3, is_training, reuse, scope='bn_layer4')
h3 = tf.nn.relu(h3)
h4 = tf.add(h3, tf.random_normal([num, 250], stddev=0.5))
h4 = ops.linear(opts, h4, 250, scope='h4_linear')
# h4 = ops.batch_norm(opts, h4, is_training, reuse, scope='bn_layer5')
h4 = tf.nn.relu(h4)
h5 = ops.linear(opts, h4, 10, scope='h5_linear')
return h5, h3
def generator(self, opts, noise, is_training=False, reuse=False, keep_prob=1.):
""" Decoder actually.
"""
output_shape = self._data.data_shape
num_units = opts['g_num_filters']
with tf.variable_scope("GENERATOR", reuse=reuse):
# if not opts['convolutions']:
if opts['g_arch'] == 'mlp':
layer_x = noise
for i in range(opts['g_num_layers']):
layer_x = ops.linear(opts, layer_x, num_units, 'h%d_lin' % i)
layer_x = tf.nn.relu(layer_x)
if opts['batch_norm']:
layer_x = ops.batch_norm(
opts, layer_x, is_training, reuse, scope='bn%d' % i)
out = ops.linear(opts, layer_x, np.prod(output_shape), 'h%d_lin' % (i + 1))
out = tf.reshape(out, [-1] + list(output_shape))
if opts['input_normalize_sym']:
return tf.nn.tanh(out)
else:
return tf.nn.sigmoid(out)
elif opts['g_arch'] in ['dcgan', 'dcgan_mod']:
return self.dcgan_like_arch(opts, noise, is_training, reuse, keep_prob)
elif opts['g_arch'] == 'conv_up_res':
return self.conv_up_res(opts, noise, is_training, reuse, keep_prob)
elif opts['g_arch'] == 'ali':
return self.ali_deconv(opts, noise, is_training, reuse, keep_prob)
elif opts['g_arch'] == 'began':
return self.began_dec(opts, noise, is_training, reuse, keep_prob)
else:
raise ValueError('%s unknown' % opts['g_arch'])
def discriminator(self, opts, input_, prefix='DISCRIMINATOR', reuse=False):
"""Discriminator for the GAN objective
"""
num_units = opts['d_num_filters']
num_layers = opts['d_num_layers']
nowozin_trick = opts['gan_p_trick']
# No convolutions as GAN happens in the latent space
with tf.variable_scope(prefix, reuse=reuse):
hi = input_
for i in range(num_layers):
hi = ops.linear(opts, hi, num_units, scope='h%d_lin' % (i+1))
hi = tf.nn.relu(hi)
hi = ops.linear(opts, hi, 1, scope='final_lin')
if nowozin_trick:
# We are doing GAN between our model Qz and the true Pz.
# We know analytical form of the true Pz.
# The optimal discriminator for D_JS(Pz, Qz) is given by:
# Dopt(x) = log dPz(x) - log dQz(x)
# And we know exactly dPz(x). So add log dPz(x) explicitly
# to the discriminator and let it learn only the remaining
# dQz(x) term. This appeared in the AVB paper.
assert opts['latent_space_distr'] == 'normal'
sigma2_p = float(opts['pot_pz_std']) ** 2
normsq = tf.reduce_sum(tf.square(input_), 1)
hi = hi - normsq / 2. / sigma2_p \
- 0.5 * tf.log(2. * np.pi) \
- 0.5 * opts['latent_space_dim'] * np.log(sigma2_p)
return hi
def correlation_loss(self, opts, input_):
"""
Independence test based on Pearson's correlation.
Keep in mind that this captures only linear dependancies.
However, for multivariate Gaussian independence is equivalent
to zero correlation.
"""
batch_size = self.get_batch_size(opts, input_)
dim = int(input_.get_shape()[1])
transposed = tf.transpose(input_, perm=[1, 0])
mean = tf.reshape(tf.reduce_mean(transposed, axis=1), [-1, 1])
centered_transposed = transposed - mean # Broadcasting mean
cov = tf.matmul(centered_transposed, centered_transposed, transpose_b=True)
cov = cov / (batch_size - 1)
#cov = tf.Print(cov, [cov], "cov")
sigmas = tf.sqrt(tf.diag_part(cov) + 1e-5)
#sigmas = tf.Print(sigmas, [sigmas], "sigmas")
sigmas = tf.reshape(sigmas, [1, -1])
sigmas = tf.matmul(sigmas, sigmas, transpose_a=True)
#sigmas = tf.Print(sigmas, [sigmas], "sigmas")
# Pearson's correlation
corr = cov / sigmas
triangle = tf.matrix_set_diag(tf.matrix_band_part(corr, 0, -1), tf.zeros(dim))
#triangle = tf.Print(triangle, [triangle], "triangle")
loss = tf.reduce_sum(tf.square(triangle)) / ((dim * dim - dim) / 2.0)
#loss = tf.Print(loss, [loss], "Correlation loss")
return loss
def encoder(self, opts, input_, is_training=False, reuse=False, keep_prob=1.):
if opts['e_add_noise']:
def add_noise(x):
shape = tf.shape(x)
return x + tf.truncated_normal(shape, 0.0, 0.01)
def do_nothing(x):
return x
input_ = tf.cond(is_training, lambda: add_noise(input_), lambda: do_nothing(input_))
num_units = opts['e_num_filters']
num_layers = opts['e_num_layers']
with tf.variable_scope("ENCODER", reuse=reuse):
if not opts['convolutions']:
hi = input_
for i in range(num_layers):
hi = ops.linear(opts, hi, num_units, scope='h%d_lin' % i)
if opts['batch_norm']:
hi = ops.batch_norm(opts, hi, is_training, reuse, scope='bn%d' % i)
hi = tf.nn.relu(hi)
if opts['e_is_random']:
latent_mean = ops.linear(
opts, hi, opts['latent_space_dim'], 'h%d_lin' % (i + 1))
log_latent_sigmas = ops.linear(
opts, hi, opts['latent_space_dim'], 'h%d_lin_sigma' % (i + 1))
return latent_mean, log_latent_sigmas
else:
return ops.linear(opts, hi, opts['latent_space_dim'], 'h%d_lin' % (i + 1))
elif opts['e_arch'] == 'dcgan':
return self.dcgan_encoder(opts, input_, is_training, reuse, keep_prob)
elif opts['e_arch'] == 'ali':
return self.ali_encoder(opts, input_, is_training, reuse, keep_prob)
elif opts['e_arch'] == 'began':
return self.began_encoder(opts, input_, is_training, reuse, keep_prob)
else:
raise ValueError('%s Unknown' % opts['e_arch'])
def began_encoder(self, opts, input_, is_training=False, reuse=False, keep_prob=1.):
num_units = opts['e_num_filters']
assert num_units == opts['g_num_filters'], 'BEGAN requires same number of filters in encoder and decoder'
num_layers = opts['e_num_layers']
layer_x = ops.conv2d(opts, input_, num_units, scope='h_first_conv')
for i in xrange(num_layers):
if i % 3 < 2:
if i != num_layers - 2:
ii = i - (i / 3)
scale = (ii + 1 - ii / 2)
else:
ii = i - (i / 3)
scale = (ii - (ii - 1) / 2)
layer_x = ops.conv2d(opts, layer_x, num_units * scale, d_h=1, d_w=1, scope='h%d_conv' % i)
layer_x = tf.nn.elu(layer_x)
else:
if i != num_layers - 1:
layer_x = ops.downsample(layer_x, scope='h%d_maxpool' % i, reuse=reuse)
# Tensor should be [N, 8, 8, filters] right now
if opts['e_is_random']:
latent_mean = ops.linear(
opts, layer_x, opts['latent_space_dim'], scope='hlast_lin')
log_latent_sigmas = ops.linear(
opts, layer_x, opts['latent_space_dim'], scope='hlast_lin_sigma')
return latent_mean, log_latent_sigmas
else:
return ops.linear(opts, layer_x, opts['latent_space_dim'], scope='hlast_lin')
def _recon_loss_using_disc_encoder(
self, opts, reconstructed_training, encoded_training,
real_points, is_training_ph, keep_prob_ph):
"""Build an additional loss using the encoder as discriminator."""
reconstructed_reencoded_sg = self.encoder(
opts, tf.stop_gradient(reconstructed_training),
is_training=is_training_ph, keep_prob=keep_prob_ph, reuse=True)
if opts['e_is_random']:
reconstructed_reencoded_sg = reconstructed_reencoded_sg[0]
reconstructed_reencoded = self.encoder(
opts, reconstructed_training, is_training=is_training_ph,
keep_prob=keep_prob_ph, reuse=True)
if opts['e_is_random']:
reconstructed_reencoded = reconstructed_reencoded[0]
# Below line enforces the forward to be reconstructed_reencoded and backwards to NOT change the encoder....
crazy_hack = reconstructed_reencoded - reconstructed_reencoded_sg +\
tf.stop_gradient(reconstructed_reencoded_sg)
encoded_training_sg = self.encoder(
opts, tf.stop_gradient(real_points),
is_training=is_training_ph, keep_prob=keep_prob_ph, reuse=True)
if opts['e_is_random']:
encoded_training_sg = encoded_training_sg[0]
adv_fake_layer = ops.linear(opts, reconstructed_reencoded_sg, 1, scope='adv_layer')
adv_true_layer = ops.linear(opts, encoded_training_sg, 1, scope='adv_layer', reuse=True)
adv_fake = tf.nn.sigmoid_cross_entropy_with_logits(
logits=adv_fake_layer, labels=tf.zeros_like(adv_fake_layer))
adv_true = tf.nn.sigmoid_cross_entropy_with_logits(
logits=adv_true_layer, labels=tf.ones_like(adv_true_layer))
adv_fake = tf.reduce_mean(adv_fake)
adv_true = tf.reduce_mean(adv_true)
adv_c_loss = adv_fake + adv_true
emb_c = tf.reduce_sum(tf.square(crazy_hack - tf.stop_gradient(encoded_training)), 1)
emb_c_loss = tf.reduce_mean(tf.sqrt(emb_c + 1e-5))
# Normalize the loss, so that it does not depend on how good the
# discriminator is.
emb_c_loss = emb_c_loss / tf.stop_gradient(emb_c_loss)
return adv_c_loss, emb_c_loss
def generator_mnist(self, z, is_train=True, reuse=False):
if reuse:
tf.get_variable_scope().reuse_variables()
h0 = linear(z, 64, 'g_h0_lin', stddev=self.config.init)
h1 = linear(tf.nn.relu(h0), 256, 'g_h1_lin', stddev=self.config.init)
h2 = linear(tf.nn.relu(h1), 256, 'g_h2_lin', stddev=self.config.init)
h3 = linear(tf.nn.relu(h2), 1024, 'g_h3_lin', stddev=self.config.init)
h4 = linear(tf.nn.relu(h3), 28 * 28 * 1, 'g_h4_lin', stddev=self.config.init)
return tf.reshape(tf.nn.sigmoid(h4), [self.batch_size, 28, 28, 1])
def generator_mnist(self, z, is_train=True, reuse=False):
if reuse:
tf.get_variable_scope().reuse_variables()
h0 = linear(z, 64, 'g_h0_lin', stddev=self.config.init)
h1 = linear(tf.nn.relu(h0), 256, 'g_h1_lin', stddev=self.config.init)
h2 = linear(tf.nn.relu(h1), 256, 'g_h2_lin', stddev=self.config.init)
h3 = linear(tf.nn.relu(h2), 1024, 'g_h3_lin', stddev=self.config.init)
h4 = linear(tf.nn.relu(h3), 28 * 28 * 1, 'g_h4_lin', stddev=self.config.init)
return tf.reshape(tf.nn.sigmoid(h4), [self.batch_size, 28, 28, 1])
def _vgg_fully_connected(self, x, n_in, n_out, scope):
with tf.variable_scope(scope):
fc = ops.linear(x, n_in, n_out)
return fc
def generator(hparams, z, scope_name, train, reuse):
with tf.variable_scope(scope_name) as scope:
if reuse:
scope.reuse_variables()
output_size = 64
s = output_size
s2, s4, s8, s16 = int(s/2), int(s/4), int(s/8), int(s/16)
g_bn0 = ops.batch_norm(name='g_bn0')
g_bn1 = ops.batch_norm(name='g_bn1')
g_bn2 = ops.batch_norm(name='g_bn2')
g_bn3 = ops.batch_norm(name='g_bn3')
# project `z` and reshape
h0 = tf.reshape(ops.linear(z, hparams.gf_dim*8*s16*s16, 'g_h0_lin'), [-1, s16, s16, hparams.gf_dim * 8])
h0 = tf.nn.relu(g_bn0(h0, train=train))
h1 = ops.deconv2d(h0, [hparams.batch_size, s8, s8, hparams.gf_dim*4], name='g_h1')
h1 = tf.nn.relu(g_bn1(h1, train=train))
h2 = ops.deconv2d(h1, [hparams.batch_size, s4, s4, hparams.gf_dim*2], name='g_h2')
h2 = tf.nn.relu(g_bn2(h2, train=train))
h3 = ops.deconv2d(h2, [hparams.batch_size, s2, s2, hparams.gf_dim*1], name='g_h3')
h3 = tf.nn.relu(g_bn3(h3, train=train))
h4 = ops.deconv2d(h3, [hparams.batch_size, s, s, hparams.c_dim], name='g_h4')
x_gen = tf.nn.tanh(h4)
return x_gen
def discriminator(hparams, x, scope_name, train, reuse):
with tf.variable_scope(scope_name) as scope:
if reuse:
scope.reuse_variables()
d_bn1 = ops.batch_norm(name='d_bn1')
d_bn2 = ops.batch_norm(name='d_bn2')
d_bn3 = ops.batch_norm(name='d_bn3')
h0 = ops.lrelu(ops.conv2d(x, hparams.df_dim, name='d_h0_conv'))
h1 = ops.conv2d(h0, hparams.df_dim*2, name='d_h1_conv')
h1 = ops.lrelu(d_bn1(h1, train=train))
h2 = ops.conv2d(h1, hparams.df_dim*4, name='d_h2_conv')
h2 = ops.lrelu(d_bn2(h2, train=train))
h3 = ops.conv2d(h2, hparams.df_dim*8, name='d_h3_conv')
h3 = ops.lrelu(d_bn3(h3, train=train))
h4 = ops.linear(tf.reshape(h3, [hparams.batch_size, -1]), 1, 'd_h3_lin')
d_logit = h4
d = tf.nn.sigmoid(d_logit)
return d, d_logit
def generator(hparams, z, train, reuse):
if reuse:
tf.get_variable_scope().reuse_variables()
output_size = 64
s = output_size
s2, s4, s8, s16 = int(s/2), int(s/4), int(s/8), int(s/16)
g_bn0 = ops.batch_norm(name='g_bn0')
g_bn1 = ops.batch_norm(name='g_bn1')
g_bn2 = ops.batch_norm(name='g_bn2')
g_bn3 = ops.batch_norm(name='g_bn3')
# project `z` and reshape
h0 = tf.reshape(ops.linear(z, hparams.gf_dim*8*s16*s16, 'g_h0_lin'), [-1, s16, s16, hparams.gf_dim * 8])
h0 = tf.nn.relu(g_bn0(h0, train=train))
h1 = ops.deconv2d(h0, [hparams.batch_size, s8, s8, hparams.gf_dim*4], name='g_h1')
h1 = tf.nn.relu(g_bn1(h1, train=train))
h2 = ops.deconv2d(h1, [hparams.batch_size, s4, s4, hparams.gf_dim*2], name='g_h2')
h2 = tf.nn.relu(g_bn2(h2, train=train))
h3 = ops.deconv2d(h2, [hparams.batch_size, s2, s2, hparams.gf_dim*1], name='g_h3')
h3 = tf.nn.relu(g_bn3(h3, train=train))
h4 = ops.deconv2d(h3, [hparams.batch_size, s, s, hparams.c_dim], name='g_h4')
x_gen = tf.nn.tanh(h4)
return x_gen
def discriminator(hparams, x, train, reuse):
if reuse:
tf.get_variable_scope().reuse_variables()
d_bn1 = ops.batch_norm(name='d_bn1')
d_bn2 = ops.batch_norm(name='d_bn2')
d_bn3 = ops.batch_norm(name='d_bn3')
h0 = ops.lrelu(ops.conv2d(x, hparams.df_dim, name='d_h0_conv'))
h1 = ops.conv2d(h0, hparams.df_dim*2, name='d_h1_conv')
h1 = ops.lrelu(d_bn1(h1, train=train))
h2 = ops.conv2d(h1, hparams.df_dim*4, name='d_h2_conv')
h2 = ops.lrelu(d_bn2(h2, train=train))
h3 = ops.conv2d(h2, hparams.df_dim*8, name='d_h3_conv')
h3 = ops.lrelu(d_bn3(h3, train=train))
h4 = ops.linear(tf.reshape(h3, [hparams.batch_size, -1]), 1, 'd_h3_lin')
d_logit = h4
d = tf.nn.sigmoid(d_logit)
return d, d_logit
def _build_train(self):
activation_fn = tf.nn.relu
with tf.variable_scope('train'):
# batched s_t to batched q and q_action
self.s_t = tf.placeholder('float32', [None, self.state_size], name='s_t')
# MLP Feature Extraction (s_t -> l3)
l1, self.w['train']['l1_w'], self.w['train']['l1_b'] = linear(self.s_t, 96, activation_fn=activation_fn, name='l1')
#l2, self.w['train']['l2_w'], self.w['train']['l2_b'] = linear(l1, 16, activation_fn=activation_fn, name='l2')
#l3, self.w['train']['l3_w'], self.w['train']['l3_b'] = linear(l2, 16, activation_fn=activation_fn, name='l3')
l3 = l1
if self.dueling:
# Value Net : V(s) is scalar (l3 -> value)
value_hid, self.w['train']['l4_val_w'], self.w['train']['l4_val_b'] = linear(l3, 32, activation_fn=activation_fn, name='value_hid')
value, self.w['train']['val_w_out'], self.w['train']['val_w_b'] = linear(value_hid, 1, name='value_out')
# Advantage Net : A(s) is vector with advantage given each action (l3 -> advantage)
adv_hid, self.w['train']['l4_adv_w'], self.w['train']['l4_adv_b'] = linear(l3, 32, activation_fn=activation_fn, name='adv_hid')
advantage, self.w['train']['adv_w_out'], self.w['train']['adv_w_b'] = linear(adv_hid, self.action_size, name='adv_out')
# Average Dueling (Subtract mean advantage) Q=V+A-mean(A)
q_train = value + (advantage - tf.reduce_mean(advantage, reduction_indices=1, keep_dims=True))
else:
l4, self.w['train']['l4_w'], self.w['train']['l4_b'] = linear(l3, 16, activation_fn=activation_fn, name='l4')
q_train, self.w['train']['q_w'], self.w['train']['q_b'] = linear(l4, self.action_size, name='q')
# Greedy policy
q_action = tf.argmax(q_train, dimension=1)
return q_train, q_action