def calculate_loss_mix2(self, predictions, predictions_class, predictions_encoder, labels, **unused_params):
with tf.name_scope("loss_mix2"):
float_labels = tf.cast(labels, tf.float32)
float_encoders = float_labels
for i in range(FLAGS.encoder_layers):
var_i = np.loadtxt(FLAGS.autoencoder_dir+'autoencoder_layer%d.model' % i)
weight_i = tf.constant(var_i[:-1,:],dtype=tf.float32)
bias_i = tf.reshape(tf.constant(var_i[-1,:],dtype=tf.float32),[-1])
float_encoders = tf.nn.xw_plus_b(float_encoders,weight_i,bias_i)
if i<FLAGS.encoder_layers-1:
float_encoders = tf.nn.relu(float_encoders)
else:
hidden_mean = tf.reduce_mean(float_encoders,axis=1,keep_dims=True)
hidden_std = tf.sqrt(tf.reduce_mean(tf.square(float_encoders-hidden_mean),axis=1,keep_dims=True))
float_encoders = (float_encoders-hidden_mean)/(hidden_std+1e-6)
#float_encoders = tf.nn.sigmoid(float_encoders)
cross_entropy_encoder = 0.1*self.calculate_mseloss(predictions_encoder,float_encoders)
cross_entropy_loss = self.calculate_loss(predictions,labels)
return cross_entropy_encoder+cross_entropy_loss, float_encoders
#return cross_entropy_encoder, float_encoders
python类cast()的实例源码
def one_hot_encoding(labels, num_classes, scope=None):
"""Transform numeric labels into onehot_labels.
Args:
labels: [batch_size] target labels.
num_classes: total number of classes.
scope: Optional scope for op_scope.
Returns:
one hot encoding of the labels.
"""
with tf.op_scope([labels], scope, 'OneHotEncoding'):
batch_size = labels.get_shape()[0]
indices = tf.expand_dims(tf.range(0, batch_size), 1)
labels = tf.cast(tf.expand_dims(labels, 1), indices.dtype)
concated = tf.concat(1, [indices, labels])
onehot_labels = tf.sparse_to_dense(
concated, tf.pack([batch_size, num_classes]), 1.0, 0.0)
onehot_labels.set_shape([batch_size, num_classes])
return onehot_labels
def SampleRandomFrames(model_input, num_frames, num_samples):
"""Samples a random set of frames of size num_samples.
Args:
model_input: A tensor of size batch_size x max_frames x feature_size
num_frames: A tensor of size batch_size x 1
num_samples: A scalar
Returns:
`model_input`: A tensor of size batch_size x num_samples x feature_size
"""
batch_size = tf.shape(model_input)[0]
frame_index = tf.cast(
tf.multiply(
tf.random_uniform([batch_size, num_samples]),
tf.tile(tf.cast(num_frames, tf.float32), [1, num_samples])), tf.int32)
batch_index = tf.tile(
tf.expand_dims(tf.range(batch_size), 1), [1, num_samples])
index = tf.stack([batch_index, frame_index], 2)
return tf.gather_nd(model_input, index)
def parse_example(serialized_example):
features = tf.parse_single_example(
serialized_example,
# Defaults are not specified since both keys are required.
features={
'shape': tf.FixedLenFeature([], tf.string),
'img_raw': tf.FixedLenFeature([], tf.string),
'gt_raw': tf.FixedLenFeature([], tf.string),
'example_name': tf.FixedLenFeature([], tf.string)
})
with tf.variable_scope('decoder'):
shape = tf.decode_raw(features['shape'], tf.int32)
image = tf.decode_raw(features['img_raw'], tf.float32)
ground_truth = tf.decode_raw(features['gt_raw'], tf.uint8)
example_name = features['example_name']
with tf.variable_scope('image'):
# reshape and add 0 dimension (would be batch dimension)
image = tf.expand_dims(tf.reshape(image, shape), 0)
with tf.variable_scope('ground_truth'):
# reshape
ground_truth = tf.cast(tf.reshape(ground_truth, shape[:-1]), tf.float32)
return image, ground_truth, example_name
def omniglot():
sess = tf.InteractiveSession()
""" def wrapper(v):
return tf.Print(v, [v], message="Printing v")
v = tf.Variable(initial_value=np.arange(0, 36).reshape((6, 6)), dtype=tf.float32, name='Matrix')
sess.run(tf.global_variables_initializer())
sess.run(tf.local_variables_initializer())
temp = tf.Variable(initial_value=np.arange(0, 36).reshape((6, 6)), dtype=tf.float32, name='temp')
temp = wrapper(v)
#with tf.control_dependencies([temp]):
temp.eval()
print 'Hello'"""
def update_tensor(V, dim2, val): # Update tensor V, with index(:,dim2[:]) by val[:]
val = tf.cast(val, V.dtype)
def body(_, (v, d2, chg)):
d2_int = tf.cast(d2, tf.int32)
return tf.slice(tf.concat_v2([v[:d2_int],[chg] ,v[d2_int+1:]], axis=0), [0], [v.get_shape().as_list()[0]])
Z = tf.scan(body, elems=(V, dim2, val), initializer=tf.constant(1, shape=V.get_shape().as_list()[1:], dtype=tf.float32), name="Scan_Update")
return Z
def switch(condition, then_tensor, else_tensor):
"""
Keras' implementation of switch for tensorflow uses tf.switch which accepts only scalar conditions.
It should use tf.select instead.
"""
if K.backend() == 'tensorflow':
import tensorflow as tf
condition_shape = condition.get_shape()
input_shape = then_tensor.get_shape()
if condition_shape[-1] != input_shape[-1] and condition_shape[-1] == 1:
# This means the last dim is an embedding dim. Keras does not mask this dimension. But tf wants
# the condition and the then and else tensors to be the same shape.
condition = K.dot(tf.cast(condition, tf.float32), tf.ones((1, input_shape[-1])))
return tf.select(tf.cast(condition, dtype=tf.bool), then_tensor, else_tensor)
else:
import theano.tensor as T
return T.switch(condition, then_tensor, else_tensor)
def calculate_loss_distill_boost(self, predictions, labels_distill, labels, **unused_params):
with tf.name_scope("loss_distill_boost"):
print("loss_distill_boost")
epsilon = 10e-6
float_labels = tf.cast(labels, tf.float32)
batch_size = tf.shape(float_labels)[0]
float_labels_distill = tf.cast(labels_distill, tf.float32)
error = tf.negative(float_labels * tf.log(float_labels_distill + epsilon) + (
1 - float_labels) * tf.log(1 - float_labels_distill + epsilon))
error = tf.reduce_sum(error,axis=1,keep_dims=True)
alpha = error / tf.reduce_sum(error) * tf.cast(batch_size,dtype=tf.float32)
alpha = tf.clip_by_value(alpha, 0.5, 5)
alpha = alpha / tf.reduce_sum(alpha) * tf.cast(batch_size,dtype=tf.float32)
cross_entropy_loss = float_labels * tf.log(predictions + epsilon) + (
1 - float_labels) * tf.log(1 - predictions + epsilon)
cross_entropy_loss = tf.negative(cross_entropy_loss * alpha)
return tf.reduce_mean(tf.reduce_sum(cross_entropy_loss, 1))
def calculate_loss_distill_relabel(self, predictions, labels_distill, labels, **unused_params):
with tf.name_scope("loss_distill_relabel"):
print("loss_distill_relabel")
epsilon = 10e-6
float_labels = tf.cast(labels, tf.float32)
sum_labels = tf.cast(tf.reduce_sum(float_labels),dtype=tf.int32)
pos_distill, _ = tf.nn.top_k(tf.reshape(labels_distill,[-1]), k=sum_labels)
labels_true = tf.ones(tf.shape(labels))
labels_false = tf.zeros(tf.shape(labels))
labels_add = tf.where(tf.greater_equal(labels_distill, pos_distill[-1]), labels_true, labels_false)
print(labels_add.get_shape().as_list())
float_labels = float_labels+labels_add*(1.0-float_labels)
cross_entropy_loss = float_labels * tf.log(predictions + epsilon) + (
1 - float_labels) * tf.log(1 - predictions + epsilon)
cross_entropy_loss = tf.negative(cross_entropy_loss)
return tf.reduce_mean(tf.reduce_sum(cross_entropy_loss, 1))
def calculate_loss(self, predictions, labels, **unused_params):
with tf.name_scope("loss_xent"):
epsilon = 10e-6
vocab_size = predictions.get_shape().as_list()[1]
float_labels = tf.cast(labels, tf.float32)
cross_entropy_loss = float_labels * tf.log(predictions + epsilon) + (
1 - float_labels) * tf.log(1 - predictions + epsilon)
cross_entropy_loss = tf.negative(cross_entropy_loss)
neg_labels = 1 - float_labels
predictions_pos = predictions*float_labels+10*neg_labels
predictions_minpos = tf.reduce_min(predictions_pos,axis=1,keep_dims=True)
predictions_neg = predictions*neg_labels-10*float_labels
predictions_maxneg = tf.reduce_max(predictions_neg,axis=1,keep_dims=True)
mask_1 = tf.cast(tf.greater_equal(predictions_neg, predictions_minpos),dtype=tf.float32)
mask_2 = tf.cast(tf.less_equal(predictions_pos, predictions_maxneg),dtype=tf.float32)
cross_entropy_loss = cross_entropy_loss*(mask_1+mask_2)*10 + cross_entropy_loss
return tf.reduce_mean(tf.reduce_sum(cross_entropy_loss, 1))
def calculate_loss(self, predictions, labels, **unused_params):
bound = FLAGS.softmax_bound
vocab_size_1 = bound
with tf.name_scope("loss_softmax"):
epsilon = 10e-8
float_labels = tf.cast(labels, tf.float32)
labels_1 = float_labels[:,:vocab_size_1]
predictions_1 = predictions[:,:vocab_size_1]
cross_entropy_loss = CrossEntropyLoss().calculate_loss(predictions_1,labels_1)
lables_2 = float_labels[:,vocab_size_1:]
predictions_2 = predictions[:,vocab_size_1:]
# l1 normalization (labels are no less than 0)
label_rowsum = tf.maximum(
tf.reduce_sum(lables_2, 1, keep_dims=True),
epsilon)
label_append = 1.0-tf.reduce_max(lables_2, 1, keep_dims=True)
norm_float_labels = tf.concat((tf.div(lables_2, label_rowsum),label_append),axis=1)
predictions_append = 1.0-tf.reduce_sum(predictions_2, 1, keep_dims=True)
softmax_outputs = tf.concat((predictions_2,predictions_append),axis=1)
softmax_loss = norm_float_labels * tf.log(softmax_outputs + epsilon) + (
1 - norm_float_labels) * tf.log(1 - softmax_outputs + epsilon)
softmax_loss = tf.negative(tf.reduce_sum(softmax_loss, 1))
return tf.reduce_mean(softmax_loss) + cross_entropy_loss
def calculate_loss(self, predictions, labels, **unused_params):
bound = FLAGS.softmax_bound
vocab_size_1 = bound
with tf.name_scope("loss_softmax"):
epsilon = 10e-8
float_labels = tf.cast(labels, tf.float32)
labels_1 = float_labels[:,:vocab_size_1]
predictions_1 = predictions[:,:vocab_size_1]
cross_entropy_loss = CrossEntropyLoss().calculate_loss(predictions_1,labels_1)
lables_2 = float_labels[:,vocab_size_1:]
predictions_2 = predictions[:,vocab_size_1:]
# l1 normalization (labels are no less than 0)
label_rowsum = tf.maximum(
tf.reduce_sum(lables_2, 1, keep_dims=True),
epsilon)
label_append = 1.0-tf.reduce_max(lables_2, 1, keep_dims=True)
norm_float_labels = tf.concat((tf.div(lables_2, label_rowsum),label_append),axis=1)
predictions_append = 1.0-tf.reduce_sum(predictions_2, 1, keep_dims=True)
softmax_outputs = tf.concat((predictions_2,predictions_append),axis=1)
softmax_loss = norm_float_labels * tf.log(softmax_outputs + epsilon) + (
1 - norm_float_labels) * tf.log(1 - softmax_outputs + epsilon)
softmax_loss = tf.negative(tf.reduce_sum(softmax_loss, 1))
return tf.reduce_mean(softmax_loss) + cross_entropy_loss
def calculate_loss(self, predictions, support_predictions, labels, **unused_params):
"""
support_predictions batch_size x num_models x num_classes
predictions = tf.reduce_mean(support_predictions, axis=1)
"""
model_count = tf.shape(support_predictions)[1]
vocab_size = tf.shape(support_predictions)[2]
mean_predictions = tf.reduce_mean(support_predictions, axis=1, keep_dims=True)
support_labels = tf.tile(tf.expand_dims(tf.cast(labels, dtype=tf.float32), axis=1), multiples=[1,model_count,1])
support_means = tf.stop_gradient(tf.tile(mean_predictions, multiples=[1,model_count,1]))
support_predictions = tf.reshape(support_predictions, shape=[-1,model_count*vocab_size])
support_labels = tf.reshape(support_labels, shape=[-1,model_count*vocab_size])
support_means = tf.reshape(support_means, shape=[-1,model_count*vocab_size])
ce_loss_fn = CrossEntropyLoss()
# The cross entropy between predictions and ground truth
cross_entropy_loss = ce_loss_fn.calculate_loss(support_predictions, support_labels, **unused_params)
# The cross entropy between predictions and mean predictions
divergence = ce_loss_fn.calculate_loss(support_predictions, support_means, **unused_params)
loss = cross_entropy_loss * (1.0 - FLAGS.support_loss_percent) - divergence * FLAGS.support_loss_percent
return loss
def augment(self, model_input_raw, num_frames, labels_batch, **unused_params):
assert(FLAGS.frame_feature,
"AugmentationTransformer only works with frame feature")
feature_dim = len(model_input_raw.get_shape()) - 1
frame_dim = len(model_input_raw.get_shape()) - 2
max_frame = model_input_raw.get_shape().as_list()[frame_dim]
limit = tf.cast(tf.reduce_min(num_frames) / 4.0, tf.int32)
offset = tf.random_uniform(shape=[], dtype=tf.int32) % limit
input_trans1 = tf.pad(model_input_raw[:,offset:,:], paddings=[0,offset,0])
num_frames_trans1 = num_frames - offset
num_frames_trans1 = tf.cast(
tf.random_uniform(shape=num_frames.shape, minval=0.75, maxval=1.0,
dtype=tf.float32)
* num_frames_trans1, tf.int32)
model_input = tf.concat([model_input_raw, input_trans1], axis=0)
labels_batch = tf.concat([labels_batch, labels_batch], axis=0)
num_frames = tf.concat([num_frames, num_frames_trans1], axis=0)
return model_input, labels_batch, num_frames_new
def SampleRandomFrames(model_input, num_frames, num_samples):
"""Samples a random set of frames of size num_samples.
Args:
model_input: A tensor of size batch_size x max_frames x feature_size
num_frames: A tensor of size batch_size x 1
num_samples: A scalar
Returns:
`model_input`: A tensor of size batch_size x num_samples x feature_size
"""
batch_size = tf.shape(model_input)[0]
frame_index = tf.cast(
tf.multiply(
tf.random_uniform([batch_size, num_samples]),
tf.tile(tf.cast(num_frames, tf.float32), [1, num_samples])), tf.int32)
batch_index = tf.tile(
tf.expand_dims(tf.range(batch_size), 1), [1, num_samples])
index = tf.stack([batch_index, frame_index], 2)
return tf.gather_nd(model_input, index)
def calculate_loss(self, predictions, labels, weights=None, **unused_params):
with tf.name_scope("loss_xent"):
epsilon = 10e-6
if FLAGS.label_smoothing:
float_labels = smoothing(labels)
else:
float_labels = tf.cast(labels, tf.float32)
cross_entropy_loss = float_labels * tf.log(predictions + epsilon) + (
1 - float_labels) * tf.log(1 - predictions + epsilon)
cross_entropy_loss = tf.negative(cross_entropy_loss)
if weights is not None:
print cross_entropy_loss, weights
weighted_loss = tf.einsum("ij,i->ij", cross_entropy_loss, weights)
print "create weighted_loss", weighted_loss
return tf.reduce_mean(tf.reduce_sum(weighted_loss, 1))
else:
return tf.reduce_mean(tf.reduce_sum(cross_entropy_loss, 1))
def resize_axis(tensor, axis, new_size, fill_value=0):
tensor = tf.convert_to_tensor(tensor)
shape = tf.unstack(tf.shape(tensor))
pad_shape = shape[:]
pad_shape[axis] = tf.maximum(0, new_size - shape[axis])
shape[axis] = tf.minimum(shape[axis], new_size)
shape = tf.stack(shape)
resized = tf.concat([
tf.slice(tensor, tf.zeros_like(shape), shape),
tf.fill(tf.stack(pad_shape), tf.cast(fill_value, tensor.dtype))
], axis)
# Update shape.
new_shape = tensor.get_shape().as_list() # A copy is being made.
new_shape[axis] = new_size
resized.set_shape(new_shape)
return resized
def SampleRandomFrames(model_input, num_frames, num_samples):
"""Samples a random set of frames of size num_samples.
Args:
model_input: A tensor of size batch_size x max_frames x feature_size
num_frames: A tensor of size batch_size x 1
num_samples: A scalar
Returns:
`model_input`: A tensor of size batch_size x num_samples x feature_size
"""
batch_size = tf.shape(model_input)[0]
frame_index = tf.cast(
tf.multiply(
tf.random_uniform([batch_size, num_samples]),
tf.tile(tf.cast(num_frames, tf.float32), [1, num_samples])), tf.int32)
batch_index = tf.tile(
tf.expand_dims(tf.range(batch_size), 1), [1, num_samples])
index = tf.stack([batch_index, frame_index], 2)
return tf.gather_nd(model_input, index)
def switch(condition, then_expression, else_expression):
'''Switches between two operations depending on a scalar value (int or bool).
Note that both `then_expression` and `else_expression`
should be symbolic tensors of the *same shape*.
# Arguments
condition: scalar tensor.
then_expression: TensorFlow operation.
else_expression: TensorFlow operation.
'''
x_shape = copy.copy(then_expression.get_shape())
x = tf.cond(tf.cast(condition, 'bool'),
lambda: then_expression,
lambda: else_expression)
x.set_shape(x_shape)
return x
# Extras
def __init__(self, shape, name=None):
"""Takes input in uint8 format which is cast to float32 and divided by 255
before passing it to the model.
On GPU this ensures lower data transfer times.
Parameters
----------
shape: [int]
shape of the tensor.
name: str
name of the underlying placeholder
"""
super().__init__(tf.placeholder(tf.uint8, [None] + list(shape), name=name))
self._shape = shape
self._output = tf.cast(super().get(), tf.float32) / 255.0
def loss(logits, labels):
"""Add L2Loss to all the trainable variables.
Add summary for for "Loss" and "Loss/avg".
Args:
logits: Logits from inference().
labels: Labels from distorted_inputs or inputs(). 1-D tensor
of shape [batch_size]
Returns:
Loss tensor of type float.
"""
# Calculate the average cross entropy loss across the batch.
labels = tf.cast(labels, tf.int64)
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels, logits=logits, name='cross_entropy_per_example')
cross_entropy_mean = tf.reduce_mean(cross_entropy, name='cross_entropy')
tf.add_to_collection('losses', cross_entropy_mean)
# The total loss is defined as the cross entropy loss plus all of the weight
# decay terms (L2 loss).
return tf.add_n(tf.get_collection('losses'), name='total_loss')
def loss(logits, labels):
"""Add L2Loss to all the trainable variables.
Add summary for for "Loss" and "Loss/avg".
Args:
logits: Logits from inference().
labels: Labels from distorted_inputs or inputs(). 1-D tensor
of shape [batch_size]
Returns:
Loss tensor of type float.
"""
# Calculate the average cross entropy loss across the batch.
labels = tf.cast(labels, tf.int64)
cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels, logits=logits, name='cross_entropy_per_example')
cross_entropy_mean = tf.reduce_mean(cross_entropy, name='cross_entropy')
tf.add_to_collection('losses', cross_entropy_mean)
# The total loss is defined as the cross entropy loss plus all of the weight
# decay terms (L2 loss).
return tf.add_n(tf.get_collection('losses'), name='total_loss')
def inputs(eval_data):
"""Construct input for BBBC006 evaluation using the Reader ops.
Args:
eval_data: bool, indicating if one should use the train or eval data set.
Returns:
images: Images. 4D tensor of [batch_size, IMAGE_WIDTH, IMAGE_HEIGHT, 1] size.
labels: Labels. 4D tensor of [batch_size, IMAGE_WIDTH, IMAGE_HEIGHT, 2] size.
Raises:
ValueError: If no data_dir
"""
if not FLAGS.data_dir:
raise ValueError('Please supply a data_dir')
images, labels = bbbc006_input.inputs(eval_data=eval_data,
batch_size=FLAGS.batch_size)
if FLAGS.use_fp16:
images = tf.cast(images, tf.float16)
labels = tf.cast(labels, tf.float16)
return images, labels
def _add_cross_entropy(labels, logits, pref):
"""Compute average cross entropy and add to loss collection.
Args:
labels: Single dimension labels from distorted_inputs() or inputs().
logits: Output map from inference().
pref: Either 'c' or 's', for contours or segments, respectively.
"""
with tf.variable_scope('{}_cross_entropy'.format(pref)) as scope:
class_prop = C_CLASS_PROP if pref == 'c' else S_CLASS_PROP
weight_per_label = tf.scalar_mul(class_prop, tf.cast(tf.equal(labels, 0),
tf.float32)) + \
tf.scalar_mul(1.0 - class_prop, tf.cast(tf.equal(labels, 1),
tf.float32))
cross_entropy = tf.losses.sparse_softmax_cross_entropy(
labels=tf.squeeze(labels, squeeze_dims=[3]), logits=logits)
cross_entropy_weighted = tf.multiply(weight_per_label, cross_entropy)
cross_entropy_mean = tf.reduce_mean(cross_entropy_weighted, name=scope.name)
tf.add_to_collection('losses', cross_entropy_mean)
def get_show_preds(c_fuse, s_fuse):
"""Compute and view logits.
Args:
c_fuse: Contours fuse layer.
s_fuse: Segments fuse layer.
Returns:
c_logits: Softmax applied to contours fuse layer.
s_logits: Softmax applied to segments fuse layer.
"""
# Index 1 of fuse layers correspond to foreground, so discard index 0.
_, c_logits = tf.split(tf.cast(tf.nn.softmax(c_fuse), tf.float32), 2, 3)
_, s_logits = tf.split(tf.cast(tf.nn.softmax(s_fuse), tf.float32), 2, 3)
tf.summary.image('c_logits', c_logits)
tf.summary.image('s_logits', s_logits)
return c_logits, s_logits
def preprocess(self, image_buffer, bbox, batch_position):
"""Preprocessing image_buffer as a function of its batch position."""
if self.train:
image = train_image(image_buffer, self.height, self.width, bbox,
batch_position, self.resize_method, self.distortions,
None, summary_verbosity=self.summary_verbosity,
distort_color_in_yiq=self.distort_color_in_yiq,
fuse_decode_and_crop=self.fuse_decode_and_crop)
else:
image = tf.image.decode_jpeg(
image_buffer, channels=3, dct_method='INTEGER_FAST')
image = eval_image(image, self.height, self.width, batch_position,
self.resize_method,
summary_verbosity=self.summary_verbosity)
# Note: image is now float32 [height,width,3] with range [0, 255]
# image = tf.cast(image, tf.uint8) # HACK TESTING
return image
def read_whole_features(file_pattern, num_epochs=1):
'''
Return
`feature`: `dict` whose keys are `sp`, `ap`, `f0`, `en`, `speaker`
'''
files = tf.gfile.Glob(file_pattern)
print('{} files found'.format(len(files)))
filename_queue = tf.train.string_input_producer(files, num_epochs=num_epochs)
reader = tf.WholeFileReader()
key, value = reader.read(filename_queue)
print("Processing {}".format(key), flush=True)
value = tf.decode_raw(value, tf.float32)
value = tf.reshape(value, [-1, FEAT_DIM])
return {
'sp': value[:, :SP_DIM],
'ap': value[:, SP_DIM : 2*SP_DIM],
'f0': value[:, SP_DIM * 2],
'en': value[:, SP_DIM * 2 + 1],
'speaker': tf.cast(value[:, SP_DIM * 2 + 2], tf.int64),
'filename': key,
}
def mnist_batcher_in_tanh_vector(
batch_size,
capacity=256,
min_after_dequeue=128,
):
(x, y), (_, _) = keras.datasets.mnist.load_data()
x = tf.constant(x)
x = tf.cast(x, tf.float32)
x = keras.layers.Flatten()(x) / 127.5 - 1.
y = tf.cast(y, tf.int64)
return tf.train.shuffle_batch(
[x, y],
batch_size=batch_size,
capacity=capacity,
min_after_dequeue=min_after_dequeue,
enqueue_many=True
)
def repeat(tensor: tf.Tensor, repeats: int, axis: int) -> tf.Tensor:
"""
Repeat elements of the input tensor in the specified axis ``repeats``-times.
.. note::
Chaining of this op may produce TF warnings although the performance seems to be unaffected.
:param tensor: TF tensor to be repeated
:param repeats: number of repeats
:param axis: axis to repeat
:return: tensor with repeated elements
"""
shape = tensor.get_shape().as_list()
dims = np.arange(len(tensor.shape))
prepare_perm = np.hstack(([axis], np.delete(dims, axis)))
restore_perm = np.hstack((dims[1:axis+1], [0], dims[axis+1:]))
indices = tf.cast(tf.floor(tf.range(0, shape[axis]*repeats)/tf.constant(repeats)), 'int32')
shuffled = tf.transpose(tensor, prepare_perm)
repeated = tf.gather(shuffled, indices)
return tf.transpose(repeated, restore_perm)
def bin_stats(predictions: tf.Tensor, labels: tf.Tensor) -> Tuple[tf.Tensor, tf.Tensor, tf.Tensor]:
"""
Calculate f1, precision and recall from binary classification expected and predicted values.
:param predictions: 2-d tensor (batch, predictions) of predicted 0/1 classes
:param labels: 2-d tensor (batch, labels) of expected 0/1 classes
:return: a tuple of batched (f1, precision and recall) values
"""
predictions = tf.cast(predictions, tf.int32)
labels = tf.cast(labels, tf.int32)
true_positives = tf.reduce_sum((predictions * labels), axis=1)
false_positives = tf.reduce_sum(tf.cast(tf.greater(predictions, labels), tf.int32), axis=1)
false_negatives = tf.reduce_sum(tf.cast(tf.greater(labels, predictions), tf.int32), axis=1)
recall = true_positives / (true_positives + false_negatives)
precision = true_positives / (true_positives + false_positives)
f1_score = 2 / (1 / precision + 1 / recall)
return f1_score, precision, recall
def bin_dice(predictions: tf.Tensor, labels: tf.Tensor) -> tf.Tensor:
"""
Calculate Sorensen–Dice coefficient from the given binary classification expected and predicted values.
The coefficient is defined as :math:`2*|X \cup Y| / (|X| + |Y|)`.
:param predictions: 2-d tensor (batch, predictions) of predicted 0/1 classes
:param labels: 2-d tensor (batch, labels) of expected 0/1 classes
:return: batched Sørensen–Dice coefficients
"""
predictions = tf.cast(predictions, tf.int32)
labels = tf.cast(labels, tf.int32)
true_positives = tf.reduce_sum((predictions * labels), axis=1)
pred_positives = tf.reduce_sum(predictions, axis=1)
label_positives = tf.reduce_sum(labels, axis=1)
return 2 * true_positives / (pred_positives + label_positives)