def test_activity_regularization():
from keras.engine import Input, Model
layer = core.ActivityRegularization(l1=0.01, l2=0.01)
# test in functional API
x = Input(shape=(3,))
z = core.Dense(2)(x)
y = layer(z)
model = Model(input=x, output=y)
model.compile('rmsprop', 'mse', mode='FAST_COMPILE')
model.predict(np.random.random((2, 3)))
# test serialization
model_config = model.get_config()
model = Model.from_config(model_config)
model.compile('rmsprop', 'mse')
python类Input()的实例源码
def test_activity_regularization():
from keras.engine import Input, Model
layer = core.ActivityRegularization(l1=0.01, l2=0.01)
# test in functional API
x = Input(shape=(3,))
z = core.Dense(2)(x)
y = layer(z)
model = Model(input=x, output=y)
model.compile('rmsprop', 'mse', mode='FAST_COMPILE')
model.predict(np.random.random((2, 3)))
# test serialization
model_config = model.get_config()
model = Model.from_config(model_config)
model.compile('rmsprop', 'mse')
def __init__(self, maxlen, d_L, d_C, d_D, V_C):
"""
maxlen = maximum input/output word size
d_L = language model hidden state (= context vector) size
d_C = character features (input embedding vector size)
d_D = decoder hidden state h size
V_C = character vocabulary
"""
# extend embeddings to treat zero values as zeros vectors (for y_0 = 0)
# but don't do any masking
class CharEmb(Embedding):
def call(self, x, mask=None):
y = super(CharEmb, self).call(x)
return y * K.cast(K.expand_dims(x, -1), K.floatx())
c = Input(shape=(d_L,), name='c')
y_tm1 = Input(shape=(maxlen,), name='y_tm1', dtype='int32')
ye_tm1 = CharEmb(V_C.size + 1, d_C)(y_tm1)
h = DecoderGRU(d_D, return_sequences=True)([ye_tm1, c])
s = Maxout(d_C)([h, ye_tm1, RepeatVector(maxlen)(c)])
s = Dropout(.2)(s)
c_I = ProjectionOverTime(V_C.size)(s)
super(W2C, self).__init__(input=[c, y_tm1], output=c_I, name='W2C')
def __init__(self, config):
self.subject = Input(shape=(config['subject_len'],), dtype='int32', name='subject_base')
self.subject_bad = Input(shape=(config['subject_len'],), dtype='int32', name='subject_bad_base')
self.relation = Input(shape=(config['relation_len'],), dtype='int32', name='relation_base')
self.object_good = Input(shape=(config['object_len'],), dtype='int32', name='object_good_base')
self.object_bad = Input(shape=(config['object_len'],), dtype='int32', name='object_bad_base')
self.config = config
self.model_params = config.get('model_params', dict())
self.similarity_params = config.get('similarity_params', dict())
# initialize a bunch of variables that will be set later
self._models = None
self._similarities = None
self._object = None
self._subject = None
self._qa_model = None
self._qa_model_rt = None
self.training_model = None
self.training_model_rt = None
self.prediction_model = None
self.prediction_model_rt = None
def test_activity_regularization():
from keras.engine import Input, Model
layer = core.ActivityRegularization(l1=0.01, l2=0.01)
# test in functional API
x = Input(shape=(3,))
z = core.Dense(2)(x)
y = layer(z)
model = Model(input=x, output=y)
model.compile('rmsprop', 'mse', mode='FAST_COMPILE')
model.predict(np.random.random((2, 3)))
# test serialization
model_config = model.get_config()
model = Model.from_config(model_config)
model.compile('rmsprop', 'mse')
def loss_net(self) -> Model:
"""Returns the network that yields a loss given both input spectrograms and labels. Used for training."""
input_batch = self._input_batch_input
label_batch = Input(name=Wav2Letter.InputNames.label_batch, shape=(None,), dtype='int32')
label_lengths = Input(name=Wav2Letter.InputNames.label_lengths, shape=(1,), dtype='int64')
asg_transition_probabilities_variable = backend.variable(value=self.asg_transition_probabilities,
name="asg_transition_probabilities")
asg_initial_probabilities_variable = backend.variable(value=self.asg_initial_probabilities,
name="asg_initial_probabilities")
# Since Keras doesn't currently support loss functions with extra parameters,
# we define a custom lambda layer yielding one single real-valued CTC loss given the grapheme probabilities:
loss_layer = Lambda(Wav2Letter._asg_lambda if self.use_asg else Wav2Letter._ctc_lambda,
name='asg_loss' if self.use_asg else 'ctc_loss',
output_shape=(1,),
arguments={"transition_probabilities": asg_transition_probabilities_variable,
"initial_probabilities": asg_initial_probabilities_variable} if self.use_asg else None)
# ([asg_transition_probabilities_variable, asg_initial_probabilities_variable] if self.use_asg else [])
# This loss layer is placed atop the predictive network and provided with additional arguments,
# namely the label batch and prediction/label sequence lengths:
loss = loss_layer(
[self.predictive_net(input_batch), label_batch, self._prediction_lengths_input, label_lengths])
loss_net = Model(inputs=[input_batch, label_batch, self._prediction_lengths_input, label_lengths],
outputs=[loss])
# Since loss is already calculated in the last layer of the net, we just pass through the results here.
# The loss dummy labels have to be given to satify the Keras API.
loss_net.compile(loss=lambda dummy_labels, ctc_loss: ctc_loss, optimizer=self.optimizer)
return loss_net
def _prediction_lengths_input(self):
return Input(name=Wav2Letter.InputNames.prediction_lengths, shape=(1,), dtype='int64')
def _input_batch_input(self):
return Input(name=Wav2Letter.InputNames.input_batch, batch_shape=self.predictive_net.input_shape)
def test_trainable_weights():
a = Input(shape=(2,))
b = Dense(1)(a)
model = Model(a, b)
weights = model.weights
assert model.trainable_weights == weights
assert model.non_trainable_weights == []
model.trainable = False
assert model.trainable_weights == []
assert model.non_trainable_weights == weights
model.trainable = True
assert model.trainable_weights == weights
assert model.non_trainable_weights == []
model.layers[1].trainable = False
assert model.trainable_weights == []
assert model.non_trainable_weights == weights
# sequential model
model = Sequential()
model.add(Dense(1, input_dim=2))
weights = model.weights
assert model.trainable_weights == weights
assert model.non_trainable_weights == []
model.trainable = False
assert model.trainable_weights == []
assert model.non_trainable_weights == weights
model.trainable = True
assert model.trainable_weights == weights
assert model.non_trainable_weights == []
model.layers[0].trainable = False
assert model.trainable_weights == []
assert model.non_trainable_weights == weights
def test_learning_phase():
a = Input(shape=(32,), name='input_a')
b = Input(shape=(32,), name='input_b')
a_2 = Dense(16, name='dense_1')(a)
dp = Dropout(0.5, name='dropout')
b_2 = dp(b)
assert dp.uses_learning_phase
assert not a_2._uses_learning_phase
assert b_2._uses_learning_phase
# test merge
m = merge([a_2, b_2], mode='concat')
assert m._uses_learning_phase
# Test recursion
model = Model([a, b], [a_2, b_2])
print(model.input_spec)
assert model.uses_learning_phase
c = Input(shape=(32,), name='input_c')
d = Input(shape=(32,), name='input_d')
c_2, b_2 = model([c, d])
assert c_2._uses_learning_phase
assert b_2._uses_learning_phase
# try actually running graph
fn = K.function(model.inputs + [K.learning_phase()], model.outputs)
input_a_np = np.random.random((10, 32))
input_b_np = np.random.random((10, 32))
fn_outputs_no_dp = fn([input_a_np, input_b_np, 0])
fn_outputs_dp = fn([input_a_np, input_b_np, 1])
# output a: nothing changes
assert fn_outputs_no_dp[0].sum() == fn_outputs_dp[0].sum()
# output b: dropout applied
assert fn_outputs_no_dp[1].sum() != fn_outputs_dp[1].sum()
def test_merge_mask_2d():
from keras.layers import Input, merge, Masking
from keras.models import Model
rand = lambda *shape: np.asarray(np.random.random(shape) > 0.5, dtype='int32')
# inputs
input_a = Input(shape=(3,))
input_b = Input(shape=(3,))
# masks
masked_a = Masking(mask_value=0)(input_a)
masked_b = Masking(mask_value=0)(input_b)
# three different types of merging
merged_sum = merge([masked_a, masked_b], mode='sum')
merged_concat = merge([masked_a, masked_b], mode='concat', concat_axis=1)
merged_concat_mixed = merge([masked_a, input_b], mode='concat', concat_axis=1)
# test sum
model_sum = Model([input_a, input_b], [merged_sum])
model_sum.compile(loss='mse', optimizer='sgd')
model_sum.fit([rand(2, 3), rand(2, 3)], [rand(2, 3)], nb_epoch=1)
# test concatenation
model_concat = Model([input_a, input_b], [merged_concat])
model_concat.compile(loss='mse', optimizer='sgd')
model_concat.fit([rand(2, 3), rand(2, 3)], [rand(2, 6)], nb_epoch=1)
# test concatenation with masked and non-masked inputs
model_concat = Model([input_a, input_b], [merged_concat_mixed])
model_concat.compile(loss='mse', optimizer='sgd')
model_concat.fit([rand(2, 3), rand(2, 3)], [rand(2, 6)], nb_epoch=1)
def test_merge_mask_3d():
from keras.layers import Input, merge, Embedding, SimpleRNN
from keras.models import Model
rand = lambda *shape: np.asarray(np.random.random(shape) > 0.5, dtype='int32')
# embeddings
input_a = Input(shape=(3,), dtype='int32')
input_b = Input(shape=(3,), dtype='int32')
embedding = Embedding(3, 4, mask_zero=True)
embedding_a = embedding(input_a)
embedding_b = embedding(input_b)
# rnn
rnn = SimpleRNN(3, return_sequences=True)
rnn_a = rnn(embedding_a)
rnn_b = rnn(embedding_b)
# concatenation
merged_concat = merge([rnn_a, rnn_b], mode='concat', concat_axis=-1)
model = Model([input_a, input_b], [merged_concat])
model.compile(loss='mse', optimizer='sgd')
model.fit([rand(2, 3), rand(2, 3)], [rand(2, 3, 6)])
def test_get_updates_for():
a = Input(shape=(2,))
dense_layer = Dense(1)
dense_layer.add_update(0, inputs=a)
dense_layer.add_update(1, inputs=None)
assert dense_layer.get_updates_for(a) == [0]
assert dense_layer.get_updates_for(None) == [1]
def test_get_losses_for():
a = Input(shape=(2,))
dense_layer = Dense(1)
dense_layer.add_loss(0, inputs=a)
dense_layer.add_loss(1, inputs=None)
assert dense_layer.get_losses_for(a) == [0]
assert dense_layer.get_losses_for(None) == [1]
def test_trainable_weights():
a = Input(shape=(2,))
b = Dense(1)(a)
model = Model(a, b)
weights = model.weights
assert model.trainable_weights == weights
assert model.non_trainable_weights == []
model.trainable = False
assert model.trainable_weights == []
assert model.non_trainable_weights == weights
model.trainable = True
assert model.trainable_weights == weights
assert model.non_trainable_weights == []
model.layers[1].trainable = False
assert model.trainable_weights == []
assert model.non_trainable_weights == weights
# sequential model
model = Sequential()
model.add(Dense(1, input_dim=2))
weights = model.weights
assert model.trainable_weights == weights
assert model.non_trainable_weights == []
model.trainable = False
assert model.trainable_weights == []
assert model.non_trainable_weights == weights
model.trainable = True
assert model.trainable_weights == weights
assert model.non_trainable_weights == []
model.layers[0].trainable = False
assert model.trainable_weights == []
assert model.non_trainable_weights == weights
def test_learning_phase():
a = Input(shape=(32,), name='input_a')
b = Input(shape=(32,), name='input_b')
a_2 = Dense(16, name='dense_1')(a)
dp = Dropout(0.5, name='dropout')
b_2 = dp(b)
assert dp.uses_learning_phase
assert not a_2._uses_learning_phase
assert b_2._uses_learning_phase
# test merge
m = merge([a_2, b_2], mode='concat')
assert m._uses_learning_phase
# Test recursion
model = Model([a, b], [a_2, b_2])
print(model.input_spec)
assert model.uses_learning_phase
c = Input(shape=(32,), name='input_c')
d = Input(shape=(32,), name='input_d')
c_2, b_2 = model([c, d])
assert c_2._uses_learning_phase
assert b_2._uses_learning_phase
# try actually running graph
fn = K.function(model.inputs + [K.learning_phase()], model.outputs)
input_a_np = np.random.random((10, 32))
input_b_np = np.random.random((10, 32))
fn_outputs_no_dp = fn([input_a_np, input_b_np, 0])
fn_outputs_dp = fn([input_a_np, input_b_np, 1])
# output a: nothing changes
assert fn_outputs_no_dp[0].sum() == fn_outputs_dp[0].sum()
# output b: dropout applied
assert fn_outputs_no_dp[1].sum() != fn_outputs_dp[1].sum()
def test_merge_mask_2d():
from keras.layers import Input, merge, Masking
from keras.models import Model
rand = lambda *shape: np.asarray(np.random.random(shape) > 0.5, dtype='int32')
# inputs
input_a = Input(shape=(3,))
input_b = Input(shape=(3,))
# masks
masked_a = Masking(mask_value=0)(input_a)
masked_b = Masking(mask_value=0)(input_b)
# three different types of merging
merged_sum = merge([masked_a, masked_b], mode='sum')
merged_concat = merge([masked_a, masked_b], mode='concat', concat_axis=1)
merged_concat_mixed = merge([masked_a, input_b], mode='concat', concat_axis=1)
# test sum
model_sum = Model([input_a, input_b], [merged_sum])
model_sum.compile(loss='mse', optimizer='sgd')
model_sum.fit([rand(2, 3), rand(2, 3)], [rand(2, 3)], nb_epoch=1)
# test concatenation
model_concat = Model([input_a, input_b], [merged_concat])
model_concat.compile(loss='mse', optimizer='sgd')
model_concat.fit([rand(2, 3), rand(2, 3)], [rand(2, 6)], nb_epoch=1)
# test concatenation with masked and non-masked inputs
model_concat = Model([input_a, input_b], [merged_concat_mixed])
model_concat.compile(loss='mse', optimizer='sgd')
model_concat.fit([rand(2, 3), rand(2, 3)], [rand(2, 6)], nb_epoch=1)
def __init__(self):
x = Input([1])
y = np.array([[2.0]])
b = np.array([0.0])
mult = Dense(1, weights=(y, b))
z = mult(x)
self.x = x
self.mult = mult
self.z = z
def __init__(self):
x = Input([1])
y = np.array([[2.0]])
b = np.array([0.0])
mult = Dense(1, weights=(y, b))
z = mult(x)
self.x = x
self.mult = mult
self.z = z
def __init__(self, batch_size, d_W, d_L):
"""
batch_size = batch size used in training/validation (mandatory because of stateful LSTMs)
n_ctx = context size in training/validation
d_W = word features (of output word embeddings from C2W sub-model)
d_L = language model hidden state size
"""
def masked_ctx(emb, mask):
class L(Lambda):
def __init__(self):
super(L, self).__init__(lambda x: x[0] * K.expand_dims(x[1], -1), lambda input_shapes: input_shapes[0])
def compute_mask(self, x, input_mask=None):
return K.expand_dims(x[1], -1)
return L()([Reshape((1, d_W))(emb), mask])
self._saved_states = None
self._lstms = []
ctx_emb = Input(batch_shape=(batch_size, d_W), name='ctx_emb')
ctx_mask = Input(batch_shape=(batch_size,), name='ctx_mask')
C = masked_ctx(ctx_emb, ctx_mask)
for i in range(NUM_LSTMs):
lstm = LSTM(d_L,
return_sequences=(i < NUM_LSTMs - 1),
stateful=True,
consume_less='gpu')
self._lstms.append(lstm)
C = lstm(C)
super(LanguageModel, self).__init__(input=[ctx_emb, ctx_mask], output=C, name='LanguageModel')
def get_object(self):
if self._object is None:
self._object = Input(shape=(self.config['object_len'],), dtype='int32', name='object')
return self._object
def get_subject(self):
if self._subject is None:
self._subject = Input(shape=(self.config['subject_len'],), dtype='int32', name='subject')
return self._subject
def test_get_updates_for():
a = Input(shape=(2,))
dense_layer = Dense(1)
dense_layer.add_update(0, inputs=a)
dense_layer.add_update(1, inputs=None)
assert dense_layer.get_updates_for(a) == [0]
assert dense_layer.get_updates_for(None) == [1]
def test_get_losses_for():
a = Input(shape=(2,))
dense_layer = Dense(1)
dense_layer.add_loss(0, inputs=a)
dense_layer.add_loss(1, inputs=None)
assert dense_layer.get_losses_for(a) == [0]
assert dense_layer.get_losses_for(None) == [1]
def test_trainable_weights():
a = Input(shape=(2,))
b = Dense(1)(a)
model = Model(a, b)
weights = model.weights
assert model.trainable_weights == weights
assert model.non_trainable_weights == []
model.trainable = False
assert model.trainable_weights == []
assert model.non_trainable_weights == weights
model.trainable = True
assert model.trainable_weights == weights
assert model.non_trainable_weights == []
model.layers[1].trainable = False
assert model.trainable_weights == []
assert model.non_trainable_weights == weights
# sequential model
model = Sequential()
model.add(Dense(1, input_dim=2))
weights = model.weights
assert model.trainable_weights == weights
assert model.non_trainable_weights == []
model.trainable = False
assert model.trainable_weights == []
assert model.non_trainable_weights == weights
model.trainable = True
assert model.trainable_weights == weights
assert model.non_trainable_weights == []
model.layers[0].trainable = False
assert model.trainable_weights == []
assert model.non_trainable_weights == weights
def test_learning_phase():
a = Input(shape=(32,), name='input_a')
b = Input(shape=(32,), name='input_b')
a_2 = Dense(16, name='dense_1')(a)
dp = Dropout(0.5, name='dropout')
b_2 = dp(b)
assert dp.uses_learning_phase
assert not a_2._uses_learning_phase
assert b_2._uses_learning_phase
# test merge
m = merge([a_2, b_2], mode='concat')
assert m._uses_learning_phase
# Test recursion
model = Model([a, b], [a_2, b_2])
print(model.input_spec)
assert model.uses_learning_phase
c = Input(shape=(32,), name='input_c')
d = Input(shape=(32,), name='input_d')
c_2, b_2 = model([c, d])
assert c_2._uses_learning_phase
assert b_2._uses_learning_phase
# try actually running graph
fn = K.function(model.inputs + [K.learning_phase()], model.outputs)
input_a_np = np.random.random((10, 32))
input_b_np = np.random.random((10, 32))
fn_outputs_no_dp = fn([input_a_np, input_b_np, 0])
fn_outputs_dp = fn([input_a_np, input_b_np, 1])
# output a: nothing changes
assert fn_outputs_no_dp[0].sum() == fn_outputs_dp[0].sum()
# output b: dropout applied
assert fn_outputs_no_dp[1].sum() != fn_outputs_dp[1].sum()
def test_merge_mask_2d():
from keras.layers import Input, merge, Masking
from keras.models import Model
rand = lambda *shape: np.asarray(np.random.random(shape) > 0.5, dtype='int32')
# inputs
input_a = Input(shape=(3,))
input_b = Input(shape=(3,))
# masks
masked_a = Masking(mask_value=0)(input_a)
masked_b = Masking(mask_value=0)(input_b)
# three different types of merging
merged_sum = merge([masked_a, masked_b], mode='sum')
merged_concat = merge([masked_a, masked_b], mode='concat', concat_axis=1)
merged_concat_mixed = merge([masked_a, input_b], mode='concat', concat_axis=1)
# test sum
model_sum = Model([input_a, input_b], [merged_sum])
model_sum.compile(loss='mse', optimizer='sgd')
model_sum.fit([rand(2, 3), rand(2, 3)], [rand(2, 3)], nb_epoch=1)
# test concatenation
model_concat = Model([input_a, input_b], [merged_concat])
model_concat.compile(loss='mse', optimizer='sgd')
model_concat.fit([rand(2, 3), rand(2, 3)], [rand(2, 6)], nb_epoch=1)
# test concatenation with masked and non-masked inputs
model_concat = Model([input_a, input_b], [merged_concat_mixed])
model_concat.compile(loss='mse', optimizer='sgd')
model_concat.fit([rand(2, 3), rand(2, 3)], [rand(2, 6)], nb_epoch=1)
def create_image_model_resnet50(images_shape, repeat_count):
print('Using ResNet50')
inputs = Input(shape=images_shape)
visual_model = ResNet50(weights='imagenet', include_top=False, input_tensor=inputs)
x = visual_model(inputs)
x = GlobalMaxPooling2D()(x)
x = RepeatVector(repeat_count)(x)
return Model(inputs, x, 'image_model')
def create_image_model_squeezenet(images_shape, repeat_count):
print('Using SqueezeNet')
inputs = Input(shape=images_shape)
visual_model = get_squeezenet(1000, dim_ordering='tf', include_top=False)
# visual_model.load_weights('squeezenet/model/squeezenet_weights_tf_dim_ordering_tf_kernels.h5')
x = visual_model(inputs)
x = GlobalMaxPooling2D()(x)
x = RepeatVector(repeat_count)(x)
return Model(inputs, x, 'image_model')
def create_image_model_xception(images_shape, repeat_count):
print('Using Xception')
inputs = Input(shape=images_shape)
visual_model = Xception(weights='imagenet', include_top=False, input_tensor=inputs)
x = visual_model(inputs)
x = GlobalMaxPooling2D()(x)
x = RepeatVector(repeat_count)(x)
return Model(inputs, x, 'image_model')