def _initialize_variables():
if hasattr(tf, 'global_variables'):
variables = tf.global_variables()
else:
variables = tf.all_variables()
uninitialized_variables = []
for v in variables:
if not hasattr(v, '_keras_initialized') or not v._keras_initialized:
uninitialized_variables.append(v)
v._keras_initialized = True
if uninitialized_variables:
sess = get_session()
if hasattr(tf, 'variables_initializer'):
sess.run(tf.variables_initializer(uninitialized_variables))
else:
sess.run(tf.initialize_variables(uninitialized_variables))
python类variables_initializer()的实例源码
def load(cls, dirname, session, training=False):
"""
Load a previously saved file.
:param dirname: directory with model files
:param session: tensorflow session
:param training: whether to create training tensors
:return: an instance of MultiFeedForward
:rtype: MultiFeedForwardClassifier
"""
params = utils.load_parameters(dirname)
model = cls._init_from_load(params, training)
tensorflow_file = os.path.join(dirname, 'model')
saver = tf.train.Saver(tf.trainable_variables())
saver.restore(session, tensorflow_file)
# if training, optimizer values still have to be initialized
if training:
train_vars = [v for v in tf.global_variables()
if v.name.startswith('training')]
init_op = tf.variables_initializer(train_vars)
session.run(init_op)
return model
def yolo_eval(yolo_outputs, image_shape, max_boxes=10, score_threshold=.6, iou_threshold=.5):
"""Evaluate YOLO model on given input batch and return filtered boxes."""
box_xy, box_wh, box_confidence, box_class_probs = yolo_outputs
boxes = yolo_boxes_to_corners(box_xy, box_wh)
boxes, scores, classes = yolo_filter_boxes(boxes, box_confidence, box_class_probs, threshold=score_threshold)
# Scale boxes back to original image shape.
height = image_shape[0]
width = image_shape[1]
image_dims = K.stack([height, width, height, width])
image_dims = K.reshape(image_dims, [1, 4])
boxes = boxes * image_dims
max_boxes_tensor = K.variable(max_boxes, dtype='int32')
K.get_session().run(tf.variables_initializer([max_boxes_tensor]))
nms_index = tf.image.non_max_suppression(boxes, scores, max_boxes_tensor, iou_threshold=iou_threshold)
boxes = K.gather(boxes, nms_index)
scores = K.gather(scores, nms_index)
classes = K.gather(classes, nms_index)
return boxes, scores, classes
def initialize():
new_variables = set(tf.global_variables()) - ALREADY_INITIALIZED
get_session().run(tf.variables_initializer(new_variables))
ALREADY_INITIALIZED.update(new_variables)
def initialize(self, sess):
# Initial file lists are empty
np_paths = []
ss_paths = []
# Fresh train directly from ImageNet weights
print('Loading initial model weights from {:s}'.format(self.pretrained_model))
variables = tf.global_variables()
# Initialize all variables first
sess.run(tf.variables_initializer(variables, name='init'))
var_keep_dic = self.get_variables_in_checkpoint_file(self.pretrained_model)
# Get the variables to restore, ignoring the variables to fix
variables_to_restore = self.net.get_variables_to_restore(variables, var_keep_dic)
restorer = tf.train.Saver(variables_to_restore)
restorer.restore(sess, self.pretrained_model)
print('Loaded.')
# Need to fix the variables before loading, so that the RGB weights are changed to BGR
# For VGG16 it also changes the convolutional weights fc6 and fc7 to
# fully connected weights
self.net.fix_variables(sess, self.pretrained_model)
print('Fixed.')
last_snapshot_iter = 0
rate = cfg.TRAIN.LEARNING_RATE
stepsizes = list(cfg.TRAIN.STEPSIZE)
return rate, last_snapshot_iter, stepsizes, np_paths, ss_paths
def init(self):
if self.load_path:
print 'Attempting to load directly from path:',
print self.load_path
self.saver.restore(self.sess,self.load_path)
else:
print 'New ENCODE Model..init new Z parameters'
init=tf.variables_initializer(var_list=self.var)
print 'Initializing following variables:'
for v in self.var:
print v.name, v.get_shape().as_list()
self.model.sess.run(init)
def initialize_uninitialized(sess):
global_vars = tf.global_variables()
is_not_initialized = sess.run([tf.is_variable_initialized(var) for var in global_vars])
not_initialized_vars = [v for (v, f) in zip(global_vars, is_not_initialized) if not f]
print([str(i.name) for i in not_initialized_vars]) # only for testing
if len(not_initialized_vars):
sess.run(tf.variables_initializer(not_initialized_vars))
def _init_uninitialized(sess):
"""Initializes all uninitialized variables and returns them as a list."""
variables = tf.global_variables()
if not variables: return [] # sess.run() barfs on empty list
is_initialized = sess.run([tf.is_variable_initialized(v) for v in variables])
needs_init = [v for v, i in zip(variables, is_initialized) if not i]
if not needs_init: return []
sess.run(tf.variables_initializer(needs_init))
return needs_init
def initialize_variables(self, session):
with tf.device(self.device):
if len(self.variables()) == 0:
return
init = tf.variables_initializer(self.variables(), reuse=self._reuse)
session.run(init)
self.initialized = True
def resetGlobal(self):
self.global_acc = 0.0
self.global_loss = 0.0
# def initialize_uninit_variables(session, list_of_variables=None):
# if list_of_variables is None:
# list_of_variables = tf.global_variables()
# uninitialized_variables = list(tf.get_variable(name) for name in
# session.run(tf.report_uninitialized_variables(list_of_variables)))
# session.run(tf.variables_initializer(uninitialized_variables))
# return uninitialized_variables
def testShapesNotKnown(self, use_bias):
"""The generated shapes are correct when input shape not known."""
batch_size = 5
in_height = in_width = 32
in_channels = out_channels = 5
kernel_shape_h = kernel_shape_w = 3
inputs = tf.placeholder(
tf.float32,
shape=[None, None, None, in_channels],
name="inputs")
conv1 = snt.Conv2D(
name="conv1",
output_channels=out_channels,
kernel_shape=[kernel_shape_h, kernel_shape_w],
padding=snt.SAME,
stride=1,
use_bias=use_bias)
output = conv1(inputs)
with self.test_session():
tf.variables_initializer(
[conv1.w, conv1.b] if use_bias else [conv1.w]).run()
output_eval = output.eval({
inputs: np.zeros([batch_size, in_height, in_width, in_channels])})
self.assertEqual(
output_eval.shape,
(batch_size, in_height, in_width, out_channels))
def testInitializers(self, use_bias):
"""Test initializers work as expected."""
w = random.random()
b = random.random()
conv1 = snt.Conv2D(
output_channels=1,
kernel_shape=3,
stride=1,
name="conv1",
use_bias=use_bias,
initializers=create_constant_initializers(w, b, use_bias))
conv1(tf.placeholder(tf.float32, [1, 10, 10, 2]))
with self.test_session():
tf.variables_initializer(
[conv1.w, conv1.b] if use_bias else [conv1.w]).run()
self.assertAllClose(
conv1.w.eval(),
np.full([3, 3, 2, 1], w, dtype=np.float32))
if use_bias:
self.assertAllClose(
conv1.b.eval(),
[b])
err = "Initializer for 'w' is not a callable function or dictionary"
with self.assertRaisesRegexp(TypeError, err):
snt.Conv2D(output_channels=10, kernel_shape=3, stride=1, name="conv1",
initializers={"w": tf.ones([])})
def testSharing(self, use_bias):
"""Sharing is working."""
conv1 = snt.Conv2D(
output_channels=1,
kernel_shape=3,
stride=1,
padding=snt.SAME,
use_bias=use_bias,
name="conv1")
x = np.random.randn(1, 5, 5, 1)
x1 = tf.constant(x, dtype=np.float32)
x2 = tf.constant(x, dtype=np.float32)
out1 = conv1(x1)
out2 = conv1(x2)
with self.test_session():
tf.variables_initializer(
[conv1.w, conv1.b] if use_bias else [conv1.w]).run()
self.assertAllClose(
out1.eval(),
out2.eval())
# Now change the weights
w = np.random.randn(3, 3, 1, 1)
conv1.w.assign(w).eval()
self.assertAllClose(
out1.eval(),
out2.eval())
def testAtrousConvSame(self, use_bias):
"""The atrous conv 2D is constructed and applied correctly with SAME."""
conv1 = snt.Conv2D(
output_channels=1,
kernel_shape=3,
stride=1,
rate=2,
padding=snt.SAME,
name="conv1",
use_bias=use_bias,
initializers=create_constant_initializers(1.0, 1.0, use_bias))
out = conv1(tf.constant(np.ones([1, 5, 5, 1], dtype=np.float32)))
expected_out = np.array([[5, 5, 7, 5, 5],
[5, 5, 7, 5, 5],
[7, 7, 10, 7, 7],
[5, 5, 7, 5, 5],
[5, 5, 7, 5, 5]])
if not use_bias:
expected_out -= 1
with self.test_session():
tf.variables_initializer(
[conv1.w, conv1.b] if use_bias else [conv1.w]).run()
self.assertAllClose(np.reshape(out.eval(), [5, 5]), expected_out)
def testShapesNotKnown(self, use_bias):
"""The generated shapes are correct when input shape not known."""
batch_size = 5
in_length = 32
in_channels = out_channels = 5
kernel_shape = 3
inputs = tf.placeholder(
tf.float32,
shape=[None, None, in_channels],
name="inputs")
conv1 = snt.Conv1D(
name="conv1",
output_channels=out_channels,
kernel_shape=kernel_shape,
padding=snt.SAME,
stride=1,
use_bias=use_bias)
output = conv1(inputs)
with self.test_session():
tf.variables_initializer(
[conv1.w, conv1.b] if use_bias else [conv1.w]).run()
output_eval = output.eval({
inputs: np.zeros([batch_size, in_length, in_channels])})
self.assertEqual(
output_eval.shape,
(batch_size, in_length, out_channels))
def testInitializers(self, use_bias):
"""Test initializers work as expected."""
w = random.random()
b = random.random()
conv1 = snt.Conv1D(
output_channels=1,
kernel_shape=3,
stride=1,
padding=snt.SAME,
use_bias=use_bias,
name="conv1",
initializers=create_constant_initializers(w, b, use_bias))
conv1(tf.placeholder(tf.float32, [1, 10, 2]))
with self.test_session():
tf.variables_initializer(
[conv1.w, conv1.b] if use_bias else [conv1.w]).run()
self.assertAllClose(
conv1.w.eval(),
np.full([3, 2, 1], w, dtype=np.float32))
if use_bias:
self.assertAllClose(
conv1.b.eval(),
[b])
err = "Initializer for 'w' is not a callable function or dictionary"
with self.assertRaisesRegexp(TypeError, err):
snt.Conv1D(output_channels=10,
kernel_shape=3,
stride=1,
padding=snt.SAME,
use_bias=use_bias,
name="conv1",
initializers={"w": tf.ones([])})
def testSharing(self, use_bias):
"""Sharing is working."""
conv1 = snt.Conv1D(
output_channels=1,
kernel_shape=3,
stride=1,
padding=snt.SAME,
use_bias=use_bias,
name="conv1")
x = np.random.randn(1, 5, 1)
x1 = tf.constant(x, dtype=np.float32)
x2 = tf.constant(x, dtype=np.float32)
out1 = conv1(x1)
out2 = conv1(x2)
with self.test_session():
tf.variables_initializer(
[conv1.w, conv1.b] if use_bias else [conv1.w]).run()
self.assertAllClose(
out1.eval(),
out2.eval())
# Now change the weights
w = np.random.randn(3, 1, 1)
conv1.w.assign(w).eval()
self.assertAllClose(
out1.eval(),
out2.eval())
def testSharing(self, batch_size, in_length, in_channels, out_channels,
kernel_shape, padding, use_bias, out_shape, stride_shape):
"""Sharing is working."""
conv1 = snt.Conv1DTranspose(
output_channels=out_channels,
output_shape=out_shape,
kernel_shape=kernel_shape,
padding=padding,
stride=stride_shape,
name="conv1",
use_bias=use_bias)
x = np.random.randn(batch_size, in_length, in_channels)
x1 = tf.constant(x, dtype=np.float32)
x2 = tf.constant(x, dtype=np.float32)
out1 = conv1(x1)
out2 = conv1(x2)
with self.test_session():
tf.variables_initializer(
[conv1.w, conv1.b] if use_bias else [conv1.w]).run()
self.assertAllClose(
out1.eval(),
out2.eval())
# Now change the weights
w = np.random.randn(1, kernel_shape, out_channels, in_channels)
conv1.w.assign(w).eval()
self.assertAllClose(
out1.eval(),
out2.eval())
def testSharing(self, use_bias):
"""Sharing is working."""
conv1 = snt.CausalConv1D(
output_channels=1,
kernel_shape=3,
stride=1,
use_bias=use_bias,
name="conv1")
x = np.random.randn(1, 5, 1)
x1 = tf.constant(x, dtype=np.float32)
x2 = tf.constant(x, dtype=np.float32)
out1 = conv1(x1)
out2 = conv1(x2)
w = np.random.randn(3, 1, 1)
weight_change_op = conv1.w.assign(w)
init_op = tf.variables_initializer(
[conv1.w, conv1.b] if use_bias else [conv1.w])
with self.test_session() as sess:
sess.run(init_op)
first_replica_out = sess.run(out1)
second_replica_out = sess.run(out2)
# Now change the weights
sess.run(weight_change_op)
first_replica_out_changed = sess.run(out1)
second_replica_out_changed = sess.run(out2)
self.assertAllClose(first_replica_out, second_replica_out)
self.assertAllClose(first_replica_out_changed, second_replica_out_changed)
def testInitializers(self, use_bias):
"""Test that initializers work as expected."""
w = random.random()
b = np.random.randn(6) # Kernel shape is 3, input channels are 2, 2*3 = 6
conv1 = snt.DepthwiseConv2D(
channel_multiplier=3,
kernel_shape=3,
stride=1,
use_bias=use_bias,
initializers=create_constant_initializers(w, b, use_bias))
conv1(tf.placeholder(tf.float32, [1, 10, 10, 2]))
with self.test_session():
tf.variables_initializer(
[conv1.w, conv1.b] if use_bias else [conv1.w]).run()
self.assertAllClose(
conv1.w.eval(), np.full(
[3, 3, 2, 3], w, dtype=np.float32))
if use_bias:
self.assertAllClose(conv1.b.eval(), b)
error_msg = "Initializer for 'w' is not a callable function"
with self.assertRaisesRegexp(TypeError, error_msg):
snt.DepthwiseConv2D(
channel_multiplier=3,
kernel_shape=3,
stride=1,
use_bias=use_bias,
initializers={"w": tf.ones([])})