def glorot_normal(shape, gain=1.0, c01b=False):
orig_shape = shape
if c01b:
if len(shape) != 4:
raise RuntimeError(
"If c01b is True, only shapes of length 4 are accepted")
n1, n2 = shape[0], shape[3]
receptive_field_size = shape[1] * shape[2]
else:
if len(shape) < 2:
shape = (1,) + tuple(shape)
n1, n2 = shape[:2]
receptive_field_size = np.prod(shape[2:])
std = gain * np.sqrt(2.0 / ((n1 + n2) * receptive_field_size))
return np.cast[floatX](
get_rng().normal(0.0, std, size=orig_shape))
python类cast()的实例源码
def adamax_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
for p, g in zip(params, grads):
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
if mom1>0:
v_t = mom1*v + (1. - mom1)*g
updates.append((v,v_t))
else:
v_t = g
mg_t = T.maximum(mom2*mg, abs(g))
g_t = v_t / (mg_t + 1e-6)
p_t = p - lr * g_t
updates.append((mg, mg_t))
updates.append((p, p_t))
return updates
def adam_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
t = th.shared(np.cast[th.config.floatX](1.))
for p, g in zip(params, grads):
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v_t = mom1*v + (1. - mom1)*g
mg_t = mom2*mg + (1. - mom2)*T.square(g)
v_hat = v_t / (1. - mom1 ** t)
mg_hat = mg_t / (1. - mom2 ** t)
g_t = v_hat / T.sqrt(mg_hat + 1e-8)
p_t = p - lr * g_t
updates.append((v, v_t))
updates.append((mg, mg_t))
updates.append((p, p_t))
updates.append((t, t+1))
return updates
def adam_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
t = th.shared(np.cast[th.config.floatX](1.))
for p, g in zip(params, grads):
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v_t = mom1*v + (1. - mom1)*g
mg_t = mom2*mg + (1. - mom2)*T.square(g)
v_hat = v_t / (1. - mom1 ** t)
mg_hat = mg_t / (1. - mom2 ** t)
g_t = v_hat / T.sqrt(mg_hat + 1e-8)
p_t = p - lr * g_t
updates.append((v, v_t))
updates.append((mg, mg_t))
updates.append((p, p_t))
updates.append((t, t+1))
return updates
def get_output_for(self, input, deterministic=False, **kwargs):
if deterministic:
norm_features = (input-self.avg_batch_mean.dimshuffle(*self.dimshuffle_args)) / T.sqrt(1e-6 + self.avg_batch_var).dimshuffle(*self.dimshuffle_args)
else:
batch_mean = T.mean(input,axis=self.axes_to_sum).flatten()
centered_input = input-batch_mean.dimshuffle(*self.dimshuffle_args)
batch_var = T.mean(T.square(centered_input),axis=self.axes_to_sum).flatten()
batch_stdv = T.sqrt(1e-6 + batch_var)
norm_features = centered_input / batch_stdv.dimshuffle(*self.dimshuffle_args)
# BN updates
new_m = 0.9*self.avg_batch_mean + 0.1*batch_mean
new_v = 0.9*self.avg_batch_var + T.cast((0.1*input.shape[0])/(input.shape[0]-1),th.config.floatX)*batch_var
self.bn_updates = [(self.avg_batch_mean, new_m), (self.avg_batch_var, new_v)]
if hasattr(self, 'g'):
activation = norm_features*self.g.dimshuffle(*self.dimshuffle_args)
else:
activation = norm_features
if hasattr(self, 'b'):
activation += self.b.dimshuffle(*self.dimshuffle_args)
return self.nonlinearity(activation)
def get_output_for(self, input, deterministic=False, **kwargs):
if deterministic:
norm_features = (input-self.avg_batch_mean.dimshuffle(*self.dimshuffle_args)) / T.sqrt(1e-6 + self.avg_batch_var).dimshuffle(*self.dimshuffle_args)
else:
batch_mean = T.mean(input,axis=self.axes_to_sum).flatten()
centered_input = input-batch_mean.dimshuffle(*self.dimshuffle_args)
batch_var = T.mean(T.square(centered_input),axis=self.axes_to_sum).flatten()
batch_stdv = T.sqrt(1e-6 + batch_var)
norm_features = centered_input / batch_stdv.dimshuffle(*self.dimshuffle_args)
# BN updates
new_m = 0.9*self.avg_batch_mean + 0.1*batch_mean
new_v = 0.9*self.avg_batch_var + T.cast((0.1*input.shape[0])/(input.shape[0]-1),th.config.floatX)*batch_var
self.bn_updates = [(self.avg_batch_mean, new_m), (self.avg_batch_var, new_v)]
if hasattr(self, 'g'):
activation = norm_features*self.g.dimshuffle(*self.dimshuffle_args)
else:
activation = norm_features
if hasattr(self, 'b'):
activation += self.b.dimshuffle(*self.dimshuffle_args)
return self.nonlinearity(activation)
def __call__(self, learning_rate):
"""Update the learning rate according to the exponential decay
schedule.
"""
if self._count == 0.:
self._base_lr = learning_rate.get_vale()
self._count += 1
if not self._min_reached:
new_lr = self._base_lr * (self.decay_factor ** (-self._count))
if new_lr <= self.min_lr:
self._min_reached = True
new_lr = self._min_reached
else:
new_lr = self.min_lr
learning_rate.set_value(np.cast[theano.config.floatX](new_lr))
def as_floatX(variable):
"""
This code is taken from pylearn2:
Casts a given variable into dtype config.floatX
numpy ndarrays will remain numpy ndarrays
python floats will become 0-D ndarrays
all other types will be treated as theano tensors
"""
if isinstance(variable, float):
return numpy.cast[theano.config.floatX](variable)
if isinstance(variable, numpy.ndarray):
return numpy.cast[theano.config.floatX](variable)
return theano.tensor.cast(variable, theano.config.floatX)
def as_floatX(variable):
"""
This code is taken from pylearn2:
Casts a given variable into dtype config.floatX
numpy ndarrays will remain numpy ndarrays
python floats will become 0-D ndarrays
all other types will be treated as theano tensors
"""
if isinstance(variable, float):
return numpy.cast[theano.config.floatX](variable)
if isinstance(variable, numpy.ndarray):
return numpy.cast[theano.config.floatX](variable)
return theano.tensor.cast(variable, theano.config.floatX)
def parameter_prediction(self, test_set_x): #, batch_size
""" This function is to predict the output of NN
:param test_set_x: input features for a testing sentence
:type test_set_x: python array variable
:returns: predicted features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.final_layer.output,
givens={self.x: test_set_x, self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
## the function to output activations at a hidden layer
def generate_hidden_layer(self, test_set_x, bn_layer_index):
""" This function is to predict the bottleneck features of NN
:param test_set_x: input features for a testing sentence
:type test_set_x: python array variable
:returns: predicted bottleneck features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.rnn_layers[bn_layer_index].output,
givens={self.x: test_set_x, self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
def parameter_prediction(self, test_set_x): #, batch_size
""" This function is to predict the output of NN
:param test_set_x: input features for a testing sentence
:type test_set_x: python array variable
:returns: predicted features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.final_layer.output,
givens={self.x: test_set_x[0:n_test_set_x], self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
def parameter_prediction_S2S(self, test_set_x, test_set_d):
""" This function is to predict the output of NN
:param test_set_x: input features for a testing sentence
:param test_set_d: phone durations for a testing sentence
:type test_set_x: python array variable
:type test_set_d: python array variable
:returns: predicted features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.final_layer.output,
givens={self.x: test_set_x[0:n_test_set_x], self.d: test_set_d[0:n_test_set_x], self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
def generate_hidden_layer(self, test_set_x, bn_layer_index):
""" This function is to predict the bottleneck features of NN
:param test_set_x: input features for a testing sentence
:type test_set_x: python array variable
:returns: predicted bottleneck features
"""
n_test_set_x = test_set_x.shape[0]
test_out = theano.function([], self.rnn_layers[bn_layer_index].output,
givens={self.x: test_set_x, self.is_train: np.cast['int32'](0)}, on_unused_input='ignore')
predict_parameter = test_out()
return predict_parameter
def get_training_data(num_samples):
"""Generates some training data."""
# As (x, y) Cartesian coordinates.
x = np.random.randint(0, 2, size=(num_samples, 2))
y = x[:, 0] + 2 * x[:, 1] # 2-digit binary to integer.
y = np.cast['int32'](y)
x = np.cast['float32'](x) * 1.6 - 0.8 # Scales to [-1, 1].
x += np.random.uniform(-0.1, 0.1, size=x.shape)
y_ohe = np.cast['float32'](np.eye(4)[y])
y = np.cast['float32'](np.expand_dims(y, -1))
return x, y, y_ohe
def pcnn_norm(x, colorspace="RGB", reverse=False):
"""Normalize the input from and to [-1, 1].
Args:
x: input image array (3D or 4D)
colorspace (str): Source/target colorspace, depending on the value of `reverse`
reverse (bool, optional): If False, converts the input from the given colorspace to float in the range [-1, 1].
Otherwise, converts the input to the valid range for the given colorspace. Defaults to False.
Returns:
x_norm: normalized input
"""
if colorspace == "RGB":
return np.cast[np.uint8](x * 127.5 + 127.5) if reverse else np.cast[np.float32]((x - 127.5) / 127.5)
elif colorspace == "lab":
if x.shape[-1] == 1:
return (x * 50. + 50.) if reverse else np.cast[np.float32]((x - 50.) / 50.)
else:
a = np.array([50., +0.5, -0.5], dtype=np.float32)
b = np.array([50., 127.5, 127.5], dtype=np.float32)
return np.cast[np.float64](x * b + a) if reverse else np.cast[np.float32]((x - a) / b)
else:
raise ValueError("Unknown colorspace" % colorspace)
def __init__(self, input, n_in, n_out, prob_drop=0.5, verbose=False):
self.verbose = verbose
self.prob_drop = prob_drop
self.prob_keep = 1.0 - prob_drop
self.flag_on = theano.shared(np.cast[theano.config.floatX](1.0))
self.flag_off = 1.0 - self.flag_on
seed_this = DropoutLayer.seed_common.randint(0, 2**31-1)
mask_rng = theano.tensor.shared_randomstreams.RandomStreams(seed_this)
self.mask = mask_rng.binomial(n=1, p=self.prob_keep, size=input.shape)
self.output = \
self.flag_on * T.cast(self.mask, theano.config.floatX) * input + \
self.flag_off * self.prob_keep * input
DropoutLayer.layers.append(self)
if self.verbose:
print 'dropout layer with P_drop: ' + str(self.prob_drop)
def load_data(dataset):
if dataset.split('.')[-1] == 'gz':
f = gzip.open(dataset, 'r')
else:
f = open(dataset, 'r')
train_set, valid_set, test_set = pkl.load(f)
f.close()
def shared_dataset(data_xy, borrow=True):
data_x, data_y = data_xy
shared_x = theano.shared(
np.asarray(data_x, dtype=theano.config.floatX),
borrow=borrow)
shared_y = theano.shared(
np.asarray(data_y, dtype=theano.config.floatX),
borrow=borrow)
return shared_x, T.cast(shared_y, 'int32')
train_set_x, train_set_y = shared_dataset(train_set)
valid_set_x, valid_set_y = shared_dataset(valid_set)
test_set_x, test_set_y = shared_dataset(test_set)
return [(train_set_x, train_set_y),
(valid_set_x, valid_set_y),
(test_set_x, test_set_y )]
def adam(loss, params, learning_rate, beta1=0.9, beta2=0.999, epsilon=1e-8):
grads = T.grad(loss, params)
updates = OrderedDict()
t_prev = theano.shared(np.cast[theano.config.floatX](0))
t = t_prev + 1
a_t = learning_rate * T.sqrt(1-beta2**t)/(1-beta1**t)
for param, grad in zip(params, grads):
value = param.get_value(borrow=True)
m_prev = theano.shared(
np.zeros(value.shape, dtype=value.dtype),
broadcastable=param.broadcastable)
v_prev = theano.shared(
np.zeros(value.shape, dtype=value.dtype),
broadcastable=param.broadcastable)
m_t = beta1 * m_prev + (1 - beta1) * grad
v_t = beta2 * v_prev + (1 - beta2) * grad ** 2
step = a_t * m_t / (T.sqrt(v_t) + epsilon)
updates[m_prev] = m_t
updates[v_prev] = v_t
updates[param] = param - step
updates[t_prev] = t
return updates
def get_output_for(self, input, deterministic=False, **kwargs):
if deterministic:
norm_features = (input-self.avg_batch_mean.dimshuffle(*self.dimshuffle_args)) / T.sqrt(1e-6 + self.avg_batch_var).dimshuffle(*self.dimshuffle_args)
else:
batch_mean = T.mean(input,axis=self.axes_to_sum).flatten()
centered_input = input-batch_mean.dimshuffle(*self.dimshuffle_args)
batch_var = T.mean(T.square(centered_input),axis=self.axes_to_sum).flatten()
batch_stdv = T.sqrt(1e-6 + batch_var)
norm_features = centered_input / batch_stdv.dimshuffle(*self.dimshuffle_args)
# BN updates
new_m = 0.9*self.avg_batch_mean + 0.1*batch_mean
new_v = 0.9*self.avg_batch_var + T.cast((0.1*input.shape[0])/(input.shape[0]-1),th.config.floatX)*batch_var
self.bn_updates = [(self.avg_batch_mean, new_m), (self.avg_batch_var, new_v)]
if hasattr(self, 'g'):
activation = norm_features*self.g.dimshuffle(*self.dimshuffle_args)
else:
activation = norm_features
if hasattr(self, 'b'):
activation += self.b.dimshuffle(*self.dimshuffle_args)
return self.nonlinearity(activation)
def one_hot(labels, num_classes, name='one_hot'):
"""Transform numeric labels into onehot_labels.
Args:
labels: [batch_size] target labels.
num_classes: total number of classes.
scope: Optional scope for op_scope.
Returns:
one hot encoding of the labels.
"""
with tf.op_scope(name):
batch_size = labels.get_shape()[0]
indices = tf.expand_dims(tf.range(0, batch_size), 1)
labels = tf.cast(tf.expand_dims(labels, 1), indices.dtype)
concated = tf.concat(1, [indices, labels])
onehot_labels = tf.sparse_to_dense(
concated, tf.pack([batch_size, num_classes]), 1.0, 0.0)
onehot_labels.set_shape([batch_size, num_classes])
return onehot_labels
def adam_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
t = th.shared(np.cast[th.config.floatX](1.))
for p, g in zip(params, grads):
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v_t = mom1*v + (1. - mom1)*g
mg_t = mom2*mg + (1. - mom2)*T.square(g)
v_hat = v_t / (1. - mom1 ** t)
mg_hat = mg_t / (1. - mom2 ** t)
g_t = v_hat / T.sqrt(mg_hat + 1e-8)
p_t = p - lr * g_t
updates.append((v, v_t))
updates.append((mg, mg_t))
updates.append((p, p_t))
updates.append((t, t+1))
return updates
def get_output_for(self, input, deterministic=False, **kwargs):
if deterministic:
norm_features = (input-self.avg_batch_mean.dimshuffle(*self.dimshuffle_args)) / T.sqrt(1e-6 + self.avg_batch_var).dimshuffle(*self.dimshuffle_args)
else:
batch_mean = T.mean(input,axis=self.axes_to_sum).flatten()
centered_input = input-batch_mean.dimshuffle(*self.dimshuffle_args)
batch_var = T.mean(T.square(centered_input),axis=self.axes_to_sum).flatten()
batch_stdv = T.sqrt(1e-6 + batch_var)
norm_features = centered_input / batch_stdv.dimshuffle(*self.dimshuffle_args)
# BN updates
new_m = 0.9*self.avg_batch_mean + 0.1*batch_mean
new_v = 0.9*self.avg_batch_var + T.cast((0.1*input.shape[0])/(input.shape[0]-1),th.config.floatX)*batch_var
self.bn_updates = [(self.avg_batch_mean, new_m), (self.avg_batch_var, new_v)]
if hasattr(self, 'g'):
activation = norm_features*self.g.dimshuffle(*self.dimshuffle_args)
else:
activation = norm_features
if hasattr(self, 'b'):
activation += self.b.dimshuffle(*self.dimshuffle_args)
return self.nonlinearity(activation)
def adamax_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
for p, g in zip(params, grads):
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
if mom1>0:
v_t = mom1*v + (1. - mom1)*g
updates.append((v,v_t))
else:
v_t = g
mg_t = T.maximum(mom2*mg, abs(g))
g_t = v_t / (mg_t + 1e-6)
p_t = p - lr * g_t
updates.append((mg, mg_t))
updates.append((p, p_t))
return updates
def adam_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
t = th.shared(np.cast[th.config.floatX](1.))
for p, g in zip(params, grads):
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v_t = mom1*v + (1. - mom1)*g
mg_t = mom2*mg + (1. - mom2)*T.square(g)
v_hat = v_t / (1. - mom1 ** t)
mg_hat = mg_t / (1. - mom2 ** t)
g_t = v_hat / T.sqrt(mg_hat + 1e-8)
p_t = p - lr * g_t
updates.append((v, v_t))
updates.append((mg, mg_t))
updates.append((p, p_t))
updates.append((t, t+1))
return updates
def get_output_for(self, input, deterministic=False, **kwargs):
if deterministic:
norm_features = (input-self.avg_batch_mean.dimshuffle(*self.dimshuffle_args)) / T.sqrt(1e-6 + self.avg_batch_var).dimshuffle(*self.dimshuffle_args)
else:
batch_mean = T.mean(input,axis=self.axes_to_sum).flatten()
centered_input = input-batch_mean.dimshuffle(*self.dimshuffle_args)
batch_var = T.mean(T.square(centered_input),axis=self.axes_to_sum).flatten()
batch_stdv = T.sqrt(1e-6 + batch_var)
norm_features = centered_input / batch_stdv.dimshuffle(*self.dimshuffle_args)
# BN updates
new_m = 0.9*self.avg_batch_mean + 0.1*batch_mean
new_v = 0.9*self.avg_batch_var + T.cast((0.1*input.shape[0])/(input.shape[0]-1.), th.config.floatX)*batch_var
self.bn_updates = [(self.avg_batch_mean, new_m), (self.avg_batch_var, new_v)]
if hasattr(self, 'g'):
activation = norm_features*self.g.dimshuffle(*self.dimshuffle_args)
else:
activation = norm_features
if hasattr(self, 'b'):
activation += self.b.dimshuffle(*self.dimshuffle_args)
return self.nonlinearity(activation)
def adam_updates(params, cost, lr=0.001, mom1=0.9, mom2=0.999):
updates = []
grads = T.grad(cost, params)
t = th.shared(np.cast[th.config.floatX](1.))
for p, g in zip(params, grads):
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v_t = mom1*v + (1. - mom1)*g
mg_t = mom2*mg + (1. - mom2)*T.square(g)
v_hat = v_t / (1. - mom1 ** t)
mg_hat = mg_t / (1. - mom2 ** t)
g_t = v_hat / T.sqrt(mg_hat + 1e-8)
p_t = p - lr * g_t
updates.append((v, v_t))
updates.append((mg, mg_t))
updates.append((p, p_t))
updates.append((t, t+1))
return updates
def adam_conditional_updates(params, cost, mincost, lr=0.001, mom1=0.9, mom2=0.999): # if cost is less than mincost, don't do update
updates = []
grads = T.grad(cost, params)
t = th.shared(np.cast[th.config.floatX](1.))
for p, g in zip(params, grads):
v = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
mg = th.shared(np.cast[th.config.floatX](p.get_value() * 0.))
v_t = mom1*v + (1. - mom1)*g
mg_t = mom2*mg + (1. - mom2)*T.square(g)
v_hat = v_t / (1. - mom1 ** t)
mg_hat = mg_t / (1. - mom2 ** t)
g_t = v_hat / T.sqrt(mg_hat + 1e-8)
p_t = p - lr * g_t
updates.append((v, ifelse(cost<mincost,v,v_t)))
updates.append((mg, ifelse(cost<mincost,mg,mg_t)))
updates.append((p, ifelse(cost<mincost,p,p_t)))
updates.append((t, ifelse(cost<mincost,t,t+1)))
return updates
def get_output_for(self, input, deterministic=False, **kwargs):
if deterministic:
norm_features = (input-self.avg_batch_mean.dimshuffle(*self.dimshuffle_args)) / T.sqrt(1e-6 + self.avg_batch_var).dimshuffle(*self.dimshuffle_args)
else:
batch_mean = T.mean(input,axis=self.axes_to_sum).flatten()
centered_input = input-batch_mean.dimshuffle(*self.dimshuffle_args)
batch_var = T.mean(T.square(centered_input),axis=self.axes_to_sum).flatten()
batch_stdv = T.sqrt(1e-6 + batch_var)
norm_features = centered_input / batch_stdv.dimshuffle(*self.dimshuffle_args)
# BN updates
new_m = 0.9*self.avg_batch_mean + 0.1*batch_mean
new_v = 0.9*self.avg_batch_var + T.cast((0.1*input.shape[0])/(input.shape[0]-1),th.config.floatX)*batch_var
self.bn_updates = [(self.avg_batch_mean, new_m), (self.avg_batch_var, new_v)]
if hasattr(self, 'g'):
activation = norm_features*self.g.dimshuffle(*self.dimshuffle_args)
else:
activation = norm_features
if hasattr(self, 'b'):
activation += self.b.dimshuffle(*self.dimshuffle_args)
if self.nonlinearity is not None:
return self.nonlinearity(activation)
else:
return activation
def shared_dataset(data_xy, borrow=True):
""" Function that loads the dataset into shared variables
The reason we store our dataset in shared variables is to allow
Theano to copy it into the GPU memory (when code is run on GPU).
Since copying data into the GPU is slow, copying a minibatch everytime
is needed (the default behaviour if the data is not in a shared
variable) would lead to a large decrease in performance.
"""
data_x, data_y = data_xy
shared_x = theano.shared(np.asarray(data_x,
dtype=theano.config.floatX),
borrow=borrow)
shared_y = theano.shared(np.asarray(data_y,
dtype=theano.config.floatX),
borrow=borrow)
return shared_x, T.cast(shared_y, 'int32')