def BN_ReLU(self, net):
"""Batch Normalization and ReLU."""
# 'gamma' is not used as the next layer is ReLU
net = batch_norm(net,
center=True,
scale=False,
activation_fn=tf.nn.relu, )
self._activation_summary(net)
return net
# def conv2d(self, net, num_ker, ker_size, stride):
# 1D-convolution
net = convolution2d(
net,
num_outputs=num_ker,
kernel_size=[ker_size, 1],
stride=[stride, 1],
padding='SAME',
activation_fn=None,
normalizer_fn=None,
weights_initializer=variance_scaling_initializer(),
weights_regularizer=l2_regularizer(self.weight_decay),
biases_initializer=tf.zeros_initializer)
return net
python类zeros_initializer()的实例源码
def create_model(self, model_input, vocab_size, num_frames, l2_penalty=1e-8, **unused_params):
"""
A super model that combine one or more models
"""
models = FLAGS.wide_and_deep_models
outputs = []
for model_name in map(lambda x: x.strip(), models.split(",")):
model = getattr(frame_level_models, model_name, None)()
output = model.create_model(model_input, vocab_size, num_frames, l2_penalty=l2_penalty, **unused_params)["predictions"]
outputs.append(tf.expand_dims(output, axis=2))
num_models = len(outputs)
model_outputs = tf.concat(outputs, axis=2)
# linear_combination = tf.get_variable("combine", shape=[vocab_size,num_models],
# dtype=tf.float32, initializer=tf.zeros_initializer(),
# regularizer=slim.l2_regularizer(l2_penalty))
# combination = tf.nn.softmax(linear_combination)
combination = tf.fill(dims=[vocab_size,num_models], value=1.0/num_models)
output_sum = tf.einsum("ijk,jk->ij", model_outputs, combination)
return {"predictions": output_sum}
def trainable_initial_state(self, batch_size):
"""
Create a trainable initial state for the SkipLSTMCell
:param batch_size: number of samples per batch
:return: SkipLSTMStateTuple
"""
with tf.variable_scope('initial_c'):
initial_c = rnn_ops.create_initial_state(batch_size, self._num_units)
with tf.variable_scope('initial_h'):
initial_h = rnn_ops.create_initial_state(batch_size, self._num_units)
with tf.variable_scope('initial_update_prob'):
initial_update_prob = rnn_ops.create_initial_state(batch_size, 1, trainable=False,
initializer=tf.ones_initializer())
with tf.variable_scope('initial_cum_update_prob'):
initial_cum_update_prob = rnn_ops.create_initial_state(batch_size, 1, trainable=False,
initializer=tf.zeros_initializer())
return SkipLSTMStateTuple(initial_c, initial_h, initial_update_prob, initial_cum_update_prob)
def trainable_initial_state(self, batch_size):
"""
Create a trainable initial state for the MultiSkipGRUCell
:param batch_size: number of samples per batch
:return: list of tensors and SkipGRUStateTuple
"""
initial_states = []
for idx in range(self._num_layers - 1):
with tf.variable_scope('layer_%d' % (idx + 1)):
with tf.variable_scope('initial_h'):
initial_h = rnn_ops.create_initial_state(batch_size, self._num_units[idx])
initial_states.append(initial_h)
with tf.variable_scope('layer_%d' % self._num_layers):
with tf.variable_scope('initial_h'):
initial_h = rnn_ops.create_initial_state(batch_size, self._num_units[-1])
with tf.variable_scope('initial_update_prob'):
initial_update_prob = rnn_ops.create_initial_state(batch_size, 1, trainable=False,
initializer=tf.ones_initializer())
with tf.variable_scope('initial_cum_update_prob'):
initial_cum_update_prob = rnn_ops.create_initial_state(batch_size, 1, trainable=False,
initializer=tf.zeros_initializer())
initial_states.append(SkipGRUStateTuple(initial_h, initial_update_prob, initial_cum_update_prob))
return initial_states
def global_step(device=''):
"""Returns the global step variable.
Args:
device: Optional device to place the variable. It can be an string or a
function that is called to get the device for the variable.
Returns:
the tensor representing the global step variable.
"""
global_step_ref = tf.get_collection(tf.GraphKeys.GLOBAL_STEP)
if global_step_ref:
return global_step_ref[0]
else:
collections = [
VARIABLES_TO_RESTORE,
tf.GraphKeys.VARIABLES,
tf.GraphKeys.GLOBAL_STEP,
]
# Get the device for the variable.
with tf.device(variable_device(device, 'global_step')):
return tf.get_variable('global_step', shape=[], dtype=tf.int64,
initializer=tf.zeros_initializer,
trainable=False, collections=collections)
def linear(input_,
output_size,
weights_initializer=initializers.xavier_initializer(),
biases_initializer=tf.zeros_initializer,
activation_fn=None,
trainable=True,
name='linear'):
shape = input_.get_shape().as_list()
if len(shape) > 2:
input_ = tf.reshape(input_, [-1, reduce(lambda x, y: x * y, shape[1:])])
shape = input_.get_shape().as_list()
with tf.variable_scope(name):
w = tf.get_variable('w', [shape[1], output_size], tf.float32,
initializer=weights_initializer, trainable=trainable)
b = tf.get_variable('b', [output_size],
initializer=biases_initializer, trainable=trainable)
out = tf.nn.bias_add(tf.matmul(input_, w), b)
if activation_fn != None:
return activation_fn(out), w, b
else:
return out, w, b
def make_fc_layer(
self, inp_lyr, name_fc_lyr,
name_w, shp_w, name_b=None, shp_b=None,
initializer=xavier_init(uniform=False)
):
""" TODO - regularize batch norm params? """
W = self.make_wbkernels(name_w, shp_w, initializer=initializer)
b = self.make_wbkernels(
name_b, shp_b, initializer=tf.zeros_initializer()
)
fc_lyr = tf.nn.bias_add(
tf.matmul(inp_lyr, W, name=name_fc_lyr+'_matmul'), b,
data_format=self.data_format, name=name_fc_lyr,
)
if self.use_batch_norm:
fc_lyr = tf.contrib.layers.batch_norm(
fc_lyr, decay=self.batch_norm_decay, center=True, scale=True,
data_format=self.data_format, is_training=self.is_training
)
return fc_lyr
def vgg_arg_scope(weight_decay=0.0005):
"""Defines the VGG arg scope.
Args:
weight_decay: The l2 regularization coefficient.
Returns:
An arg_scope.
"""
with slim.arg_scope([slim.conv2d, slim.fully_connected],
activation_fn=tf.nn.relu,
weights_regularizer=slim.l2_regularizer(weight_decay),
biases_initializer=tf.zeros_initializer()):
with slim.arg_scope([slim.conv2d], padding='SAME') as arg_sc:
with slim.arg_scope([slim.max_pool2d], padding='SAME') as arg_sc:
return arg_sc
def apply_ln(layer):
def _normalize(x, prefix):
EPS = 1e-5
dim = x.get_shape()[-1].value
bias_name = prefix + "_ln/bias"
scale_name = prefix + "_ln/scale"
if bias_name not in layer.norm_params:
layer.norm_params[bias_name] = layer.add_param(
tf.zeros_initializer, (dim,), name=bias_name, regularizable=False)
if scale_name not in layer.norm_params:
layer.norm_params[scale_name] = layer.add_param(
tf.ones_initializer, (dim,), name=scale_name)
bias = layer.norm_params[bias_name]
scale = layer.norm_params[scale_name]
mean, var = tf.nn.moments(x, axes=[1], keep_dims=True)
x_normed = (x - mean) / tf.sqrt(var + EPS)
return x_normed * scale + bias
return _normalize
def create_segmentation_head(self, num_classes):
"""segmentation of map with stride 8 or 4, if --x4 flag is active"""
with tf.variable_scope(DEFAULT_SSD_SCOPE) as sc:
with slim.arg_scope([slim.conv2d],
kernel_size=args.seg_filter_size,
weights_regularizer=slim.l2_regularizer(self.weight_decay),
biases_initializer=tf.zeros_initializer()):
seg_materials = []
seg_size = self.config['fm_sizes'][0]
for i in range(len(self.layers)):
target_layer = self.outputs[self.layers[i]]
seg = slim.conv2d(target_layer, args.n_base_channels)
seg = tf.image.resize_nearest_neighbor(seg, [seg_size, seg_size])
seg_materials.append(seg)
seg_materials = tf.concat(seg_materials, -1)
seg_logits = slim.conv2d(seg_materials, num_classes,
kernel_size=3, activation_fn=None)
self.outputs['segmentation'] = seg_logits
return self.outputs['segmentation']
def call(self, step_inputs, state, scope=None, initialization='gaussian'):
"""
Make one step of ISAN transition.
Args:
step_inputs: one-hot encoded inputs, shape bs x n
state: previous hidden state, shape bs x d
scope: current scope
initialization: how to initialize the transition matrices:
orthogonal: usually speeds up training, orthogonalize Gaussian matrices
gaussian: sample gaussian matrices with a sensible scale
"""
d = self._num_units
n = step_inputs.shape[1].value
if initialization == 'orthogonal':
wx_ndd_init = np.zeros((n, d * d), dtype=np.float32)
for i in range(n):
wx_ndd_init[i, :] = orth(np.random.randn(d, d)).astype(np.float32).ravel()
wx_ndd_initializer = tf.constant_initializer(wx_ndd_init)
elif initialization == 'gaussian':
wx_ndd_initializer = tf.random_normal_initializer(stddev=1.0 / np.sqrt(d))
else:
raise Exception('Unknown init type: %s' % initialization)
wx_ndd = tf.get_variable('Wx', shape=[n, d * d],
initializer=wx_ndd_initializer)
bx_nd = tf.get_variable('bx', shape=[n, d],
initializer=tf.zeros_initializer())
# Multiplication with a 1-hot is just row selection.
# As of Jan '17 this is faster than doing gather.
Wx_bdd = tf.reshape(tf.matmul(step_inputs, wx_ndd), [-1, d, d])
bx_bd = tf.reshape(tf.matmul(step_inputs, bx_nd), [-1, 1, d])
# Reshape the state so that matmul multiplies different matrices
# for each batch element.
single_state = tf.reshape(state, [-1, 1, d])
new_state = tf.reshape(tf.matmul(single_state, Wx_bdd) + bx_bd, [-1, d])
return new_state, new_state
def conv2d(x, num_filters, name, filter_size=(3, 3), stride=(1, 1), pad="SAME", dtype=tf.float32, collections=None):
with tf.variable_scope(name):
stride_shape = [1, stride[0], stride[1], 1]
filter_shape = [filter_size[0], filter_size[1], int(x.get_shape()[3]), num_filters]
# there are "num input feature maps * filter height * filter width"
# inputs to each hidden unit
fan_in = np.prod(filter_shape[:3])
# each unit in the lower layer receives a gradient from:
# "num output feature maps * filter height * filter width" /
# pooling size
fan_out = np.prod(filter_shape[:2]) * num_filters
# initialize weights with random weights
w_bound = np.sqrt(6. / (fan_in + fan_out))
w = tf.get_variable("W", filter_shape, dtype, tf.random_uniform_initializer(-w_bound, w_bound),
collections=collections)
b = tf.get_variable("b", [1, 1, 1, num_filters], initializer=tf.zeros_initializer,
collections=collections)
return tf.nn.conv2d(x, w, stride_shape, pad) + b
def bn(x, c):
x_shape = x.get_shape()
params_shape = x_shape[-1:]
if c['use_bias']:
bias = _get_variable('bias', params_shape,
initializer=tf.zeros_initializer())
return x + bias
batch_norm_config = {'decay': 0.9, 'epsilon': 1e-5, 'scale': True,
'center': True}
x = tf.contrib.layers.batch_norm(x,
is_training=c['is_training'],
fused=True,
data_format=DATA_FORMAT,
**batch_norm_config)
return x
def linear_mapping_stupid(inputs, out_dim, in_dim=None, dropout=1.0, var_scope_name="linear_mapping"):
with tf.variable_scope(var_scope_name):
print('name', tf.get_variable_scope().name)
input_shape_tensor = tf.shape(inputs) # dynamic shape, no None
input_shape = inputs.get_shape().as_list() # static shape. may has None
print('input_shape', input_shape)
assert len(input_shape) == 3
inputs = tf.reshape(inputs, [-1, input_shape_tensor[-1]])
linear_mapping_w = tf.get_variable("linear_mapping_w", [input_shape[-1], out_dim], initializer=tf.random_normal_initializer(mean=0, stddev=tf.sqrt(dropout*1.0/input_shape[-1])))
linear_mapping_b = tf.get_variable("linear_mapping_b", [out_dim], initializer=tf.zeros_initializer())
output = tf.matmul(inputs, linear_mapping_w) + linear_mapping_b
print('xxxxx_params', input_shape, out_dim)
#output = tf.reshape(output, [input_shape[0], -1, out_dim])
output = tf.reshape(output, [input_shape_tensor[0], -1, out_dim])
return output
def linear_mapping_weightnorm(inputs, out_dim, in_dim=None, dropout=1.0, var_scope_name="linear_mapping"):
with tf.variable_scope(var_scope_name):
input_shape = inputs.get_shape().as_list() # static shape. may has None
input_shape_tensor = tf.shape(inputs)
# use weight normalization (Salimans & Kingma, 2016) w = g* v/2-norm(v)
V = tf.get_variable('V', shape=[int(input_shape[-1]), out_dim], dtype=tf.float32, initializer=tf.random_normal_initializer(mean=0, stddev=tf.sqrt(dropout*1.0/int(input_shape[-1]))), trainable=True)
V_norm = tf.norm(V.initialized_value(), axis=0) # V shape is M*N, V_norm shape is N
g = tf.get_variable('g', dtype=tf.float32, initializer=V_norm, trainable=True)
b = tf.get_variable('b', shape=[out_dim], dtype=tf.float32, initializer=tf.zeros_initializer(), trainable=True) # weightnorm bias is init zero
assert len(input_shape) == 3
inputs = tf.reshape(inputs, [-1, input_shape[-1]])
inputs = tf.matmul(inputs, V)
inputs = tf.reshape(inputs, [input_shape_tensor[0], -1, out_dim])
#inputs = tf.matmul(inputs, V) # x*v
scaler = tf.div(g, tf.norm(V, axis=0)) # g/2-norm(v)
inputs = tf.reshape(scaler,[1, out_dim])*inputs + tf.reshape(b,[1, out_dim]) # x*v g/2-norm(v) + b
return inputs
def _norm(input, is_train, reuse=True, norm=None):
assert norm in ['instance', 'batch', None]
if norm == 'instance':
with tf.variable_scope('instance_norm', reuse=reuse):
eps = 1e-5
mean, sigma = tf.nn.moments(input, [1, 2], keep_dims=True)
normalized = (input - mean) / (tf.sqrt(sigma) + eps)
out = normalized
# Apply momentum (not mendatory)
#c = input.get_shape()[-1]
#shift = tf.get_variable('shift', shape=[c],
# initializer=tf.zeros_initializer())
#scale = tf.get_variable('scale', shape=[c],
# initializer=tf.random_normal_initializer(1.0, 0.02))
#out = scale * normalized + shift
elif norm == 'batch':
with tf.variable_scope('batch_norm', reuse=reuse):
out = tf.contrib.layers.batch_norm(input,
decay=0.99, center=True,
scale=True, is_training=is_train,
updates_collections=None)
else:
out = input
return out
def global_step(device=''):
"""Returns the global step variable.
Args:
device: Optional device to place the variable. It can be an string or a
function that is called to get the device for the variable.
Returns:
the tensor representing the global step variable.
"""
global_step_ref = tf.get_collection(tf.GraphKeys.GLOBAL_STEP)
if global_step_ref:
return global_step_ref[0]
else:
collections = [
VARIABLES_TO_RESTORE,
tf.GraphKeys.VARIABLES,
tf.GraphKeys.GLOBAL_STEP,
]
# Get the device for the variable.
with tf.device(variable_device(device, 'global_step')):
return tf.get_variable('global_step', shape=[], dtype=tf.int64,
initializer=tf.zeros_initializer,
trainable=False, collections=collections)
def global_step(device=''):
"""Returns the global step variable.
Args:
device: Optional device to place the variable. It can be an string or a
function that is called to get the device for the variable.
Returns:
the tensor representing the global step variable.
"""
global_step_ref = tf.get_collection(tf.GraphKeys.GLOBAL_STEP)
if global_step_ref:
return global_step_ref[0]
else:
collections = [
VARIABLES_TO_RESTORE,
tf.GraphKeys.GLOBAL_VARIABLES,
tf.GraphKeys.GLOBAL_STEP,
]
# Get the device for the variable.
with tf.device(variable_device(device, 'global_step')):
return tf.get_variable('global_step', shape=[], dtype=tf.int64,
initializer=tf.zeros_initializer(),
trainable=False, collections=collections)
def _vgg_arg_scope(weight_decay,
is_training):
"""Defines the VGG arg scope.
Args:
weight_decay: The l2 regularization coefficient.
Returns:
An arg_scope.
"""
with slim.arg_scope([slim.conv2d, slim.fully_connected],
activation_fn=tf.nn.relu,
weights_regularizer=slim.l2_regularizer(weight_decay),
weights_initializer=tf.contrib.layers.xavier_initializer(),
biases_initializer=tf.zeros_initializer()):
with slim.arg_scope([slim.batch_norm], is_training=is_training):
with slim.arg_scope([slim.conv2d], padding='SAME', normalizer_fn=slim.batch_norm) as arg_sc:
return arg_sc
def gcn_block(inputs,
num_class,
kernel_size,
scope=None):
with tf.variable_scope(scope, 'gcn_block', [inputs]):
with slim.arg_scope([slim.conv2d],
padding='SAME',
activation_fn=None,
normalizer_fn=None,
normalizer_params=None,
weights_initializer=tf.contrib.layers.xavier_initializer(),
weights_regularizer=tf.contrib.layers.l2_regularizer(0.0001),
biases_initializer=tf.zeros_initializer(),
biases_regularizer=tf.contrib.layers.l2_regularizer(0.0002)):
left_conv1 = slim.conv2d(inputs, num_class, [kernel_size, 1])
left_conv2 = slim.conv2d(left_conv1, num_class, [1, kernel_size])
right_conv1 = slim.conv2d(inputs, num_class, [1, kernel_size])
right_conv2 = slim.conv2d(right_conv1, num_class, [kernel_size, 1])
result_sum = tf.add(left_conv2, right_conv2, name='gcn_module')
return result_sum
def gcn_br(inputs, scope):
with tf.variable_scope(scope, 'gcn_br', [inputs]):
with slim.arg_scope([slim.conv2d],
padding='SAME',
activation_fn=tf.nn.relu,
normalizer_fn=None,
normalizer_params=None,
weights_initializer=tf.contrib.layers.xavier_initializer(),
weights_regularizer=tf.contrib.layers.l2_regularizer(0.0001),
biases_initializer=tf.zeros_initializer(),
biases_regularizer=tf.contrib.layers.l2_regularizer(0.0002)):
num_class = inputs.get_shape()[3]
conv = slim.conv2d(inputs, num_class, [3, 3])
conv = slim.conv2d(conv, num_class, [3, 3], activation_fn=None)
result_sum = tf.add(inputs, conv, name='fcn_br')
return result_sum
def batch_norm(inputs, cts, ldc, epsilon=0.001, bOffset=True, bScale=True, reuse=None, decay=0.999, is_training=True):
name = get_name('bn', cts)
with tf.variable_scope(name, reuse=reuse):
inputs_shape = inputs.get_shape()
params_shape = inputs_shape[-1:]
axis = list(range(len(inputs_shape) - 1))
offset, scale = None, None
if bOffset:
offset = tf.get_variable('offset', shape=params_shape, trainable=True, initializer=tf.zeros_initializer())
if bScale:
scale = tf.get_variable('scale', shape=params_shape, trainable=True, initializer=tf.ones_initializer())
batch_mean, batch_variance = tf.nn.moments(inputs, axis)
outputs = tf.nn.batch_normalization(inputs, batch_mean, batch_variance, offset, scale, epsilon)
# Note: here for fast training we did not do the moving average for testing. which we usually not use.
ldc.append(name + ' offset:' + str(bOffset) + ' scale:' + str(bScale))
return outputs
def batch_norm(inputs, cts, ldc, bOffset=True, bScale=True, epsilon=0.001, reuse=None, decay=0.999, is_training=True):
name = get_name('bn', cts)
with tf.variable_scope(name, reuse=reuse):
inputs_shape = inputs.get_shape()
params_shape = inputs_shape[-1:]
axis = list(range(len(inputs_shape) - 1))
offset, scale = None, None
if bOffset:
offset = tf.get_variable('offset', shape=params_shape, trainable=True, initializer=tf.zeros_initializer())
if bScale:
scale = tf.get_variable('scale', shape=params_shape, trainable=True, initializer=tf.ones_initializer())
batch_mean, batch_variance = tf.nn.moments(inputs, axis)
outputs = tf.nn.batch_normalization(inputs, batch_mean, batch_variance, offset, scale, epsilon)
# Note: here for fast training we did not do the moving average (for testing). which we usually not use.
ldc.append(name + ' offset:' + str(bOffset) + ' scale:' + str(bScale))
return outputs
bridge.py 文件源码
项目:tensorflow_end2end_speech_recognition
作者: hirofumi0810
项目源码
文件源码
阅读 37
收藏 0
点赞 0
评论 0
def _create(self):
# Concat bridge inputs on the depth dimensions
bridge_input = nest.map_structure(
lambda x: tf.reshape(x, [self.batch_size, _total_tensor_depth(x)]),
self._bridge_input)
bridge_input_flat = nest.flatten([bridge_input])
bridge_input_concat = tf.concat(bridge_input_flat, axis=1)
state_size_splits = nest.flatten(self.decoder_state_size)
total_decoder_state_size = sum(state_size_splits)
# Pass bridge inputs through a fully connected layer layer
initial_state_flat = tf.contrib.layers.fully_connected(
bridge_input_concat,
num_outputs=total_decoder_state_size,
activation_fn=self._activation_fn,
weights_initializer=tf.truncated_normal_initializer(
stddev=self.parameter_init),
biases_initializer=tf.zeros_initializer(),
scope=None)
# Shape back into required state size
initial_state = tf.split(initial_state_flat, state_size_splits, axis=1)
return nest.pack_sequence_as(self.decoder_state_size, initial_state)
def mu_sigma_layer(inputs, n_outputs):
"""
Create a layer that makes a mu and sigma,
e.g. to use in continuous action spaces.
"""
mu = tf.contrib.layers.fully_connected(
inputs=inputs,
num_outputs=n_outputs,
activation_fn=None,
weights_initializer=tf.truncated_normal_initializer(mean=0.0, stddev=0.02),
biases_initializer=tf.zeros_initializer(),
scope="mu")
mu = tf.squeeze(mu, name="mu")
sigma = tf.contrib.layers.fully_connected(
inputs=inputs,
num_outputs=n_outputs,
activation_fn=None,
weights_initializer=tf.truncated_normal_initializer(mean=0.0, stddev=0.02),
biases_initializer=tf.zeros_initializer(),
scope="sigma")
sigma = tf.squeeze(sigma)
sigma = tf.nn.softplus(sigma) + 1e-5
return mu, sigma
def __init__(self, state_shape, n_hidden, summary=True):
super(CriticNetwork, self).__init__()
self.state_shape = state_shape
self.n_hidden = n_hidden
with tf.variable_scope("critic"):
self.states = tf.placeholder("float", [None] + self.state_shape, name="states")
self.r = tf.placeholder(tf.float32, [None], name="r")
L1 = tf.contrib.layers.fully_connected(
inputs=self.states,
num_outputs=self.n_hidden,
activation_fn=tf.tanh,
weights_initializer=tf.truncated_normal_initializer(mean=0.0, stddev=0.02),
biases_initializer=tf.zeros_initializer(),
scope="L1")
self.value = tf.reshape(linear(L1, 1, "value", normalized_columns_initializer(1.0)), [-1])
self.loss = tf.reduce_sum(tf.square(self.value - self.r))
self.summary_loss = self.loss
self.vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, tf.get_variable_scope().name)
def build_networks(self):
with tf.variable_scope("shared"):
self.states = tf.placeholder(tf.float32, [None] + list(self.envs[0].observation_space.shape), name="states")
self.action_taken = tf.placeholder(tf.float32, name="action_taken")
self.advantage = tf.placeholder(tf.float32, name="advantage")
if self.config["feature_extraction"]:
self.L1 = tf.contrib.layers.fully_connected(
inputs=self.states,
num_outputs=self.config["n_hidden_units"],
activation_fn=tf.tanh,
weights_initializer=tf.truncated_normal_initializer(mean=0.0, stddev=0.02),
biases_initializer=tf.zeros_initializer(),
scope="L1")
else:
self.L1 = self.states
self.knowledge_base = tf.Variable(tf.truncated_normal([self.L1.get_shape()[-1].value, self.config["n_sparse_units"]], mean=0.0, stddev=0.02), name="knowledge_base")
self.shared_vars = tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, tf.get_variable_scope().name)
def build_network(self):
# Symbolic variables for observation, action, and advantage
self.states = tf.placeholder(tf.float32, [None, self.env_runner.nO], name="states") # Observation
self.a_n = tf.placeholder(tf.float32, name="a_n") # Discrete action
self.adv_n = tf.placeholder(tf.float32, name="adv_n") # Advantage
L1 = tf.contrib.layers.fully_connected(
inputs=self.states,
num_outputs=self.config["n_hidden_units"],
activation_fn=tf.tanh,
weights_initializer=tf.random_normal_initializer(),
biases_initializer=tf.zeros_initializer())
self.probs = tf.contrib.layers.fully_connected(
inputs=L1,
num_outputs=self.env_runner.nA,
activation_fn=tf.nn.softmax,
weights_initializer=tf.random_normal_initializer(),
biases_initializer=tf.zeros_initializer())
self.action = tf.squeeze(tf.multinomial(tf.log(self.probs), 1), name="action")
def build_network(self):
self.rnn_state = None
self.states = tf.placeholder(tf.float32, [None] + list(self.env.observation_space.shape), name="states") # Observation
# self.n_states = tf.placeholder(tf.float32, shape=[None], name="n_states") # Observation
self.a_n = tf.placeholder(tf.float32, name="a_n") # Discrete action
self.adv_n = tf.placeholder(tf.float32, name="adv_n") # Advantage
n_states = tf.shape(self.states)[:1]
states = tf.expand_dims(flatten(self.states), [0])
enc_cell = tf.contrib.rnn.GRUCell(self.config["n_hidden_units"])
self.rnn_state_in = enc_cell.zero_state(1, tf.float32)
L1, self.rnn_state_out = tf.nn.dynamic_rnn(cell=enc_cell,
inputs=states,
sequence_length=n_states,
initial_state=self.rnn_state_in,
dtype=tf.float32)
self.probs = tf.contrib.layers.fully_connected(
inputs=L1[0],
num_outputs=self.env_runner.nA,
activation_fn=tf.nn.softmax,
weights_initializer=tf.truncated_normal_initializer(mean=0.0, stddev=0.02),
biases_initializer=tf.zeros_initializer())
self.action = tf.squeeze(tf.multinomial(tf.log(self.probs), 1), name="action")
def __arg_scope(self, weight_decay=0.0005, data_format='NHWC'):
"""Defines the VGG arg scope.
Args:
weight_decay: The l2 regularization coefficient.
Returns:
An arg_scope.
"""
with slim.arg_scope([slim.conv2d, slim.fully_connected],
activation_fn=tf.nn.relu,
weights_regularizer=slim.l2_regularizer(weight_decay),
weights_initializer=tf.contrib.layers.xavier_initializer(),
biases_initializer=tf.zeros_initializer()):
with slim.arg_scope([slim.conv2d, slim.max_pool2d],
padding='SAME',
data_format=data_format):
with slim.arg_scope([custom_layers.pad2d,
custom_layers.l2_normalization,
custom_layers.channel_to_last],
data_format=data_format) as sc:
return sc