def test_temporal_regression():
'''
Predict float numbers (regression) based on sequences
of float numbers of length 3 using a single layer of GRU units
'''
(X_train, y_train), (X_test, y_test) = get_test_data(nb_train=500,
nb_test=400,
input_shape=(3, 5),
output_shape=(2,),
classification=False)
model = Sequential()
model.add(GRU(y_train.shape[-1],
input_shape=(X_train.shape[1], X_train.shape[2])))
model.compile(loss='hinge', optimizer='adam')
history = model.fit(X_train, y_train, nb_epoch=5, batch_size=16,
validation_data=(X_test, y_test), verbose=0)
assert(history.history['val_loss'][-1] < 1.)
python类GRU的实例源码
def __call__(self, inputs):
x = self._merge_inputs(inputs)
shape = getattr(x, '_keras_shape')
replicate_model = self._replicate_model(kl.Input(shape=shape[2:]))
x = kl.TimeDistributed(replicate_model)(x)
kernel_regularizer = kr.L1L2(l1=self.l1_decay, l2=self.l2_decay)
x = kl.Bidirectional(kl.GRU(128, kernel_regularizer=kernel_regularizer,
return_sequences=True),
merge_mode='concat')(x)
kernel_regularizer = kr.L1L2(l1=self.l1_decay, l2=self.l2_decay)
gru = kl.GRU(256, kernel_regularizer=kernel_regularizer)
x = kl.Bidirectional(gru)(x)
x = kl.Dropout(self.dropout)(x)
return self._build(inputs, x)
def HAN1(MAX_NB_WORDS, MAX_WORDS, MAX_SENTS, EMBEDDING_DIM, WORDGRU, embedding_matrix, DROPOUTPER):
#model = Sequential()
wordInputs = Input(shape=(MAX_WORDS,), name='word1', dtype='float32')
wordEmbedding = Embedding(MAX_NB_WORDS, EMBEDDING_DIM, weights=[embedding_matrix], mask_zero=True, trainable=True, name='emb1')(wordInputs) #Assuming all the sentences have same number of words. Check for input_length again.
hij = Bidirectional(GRU(WORDGRU, name='gru1', return_sequences=True))(wordEmbedding)
wordDrop = Dropout(DROPOUTPER, name='drop1')(hij)
alpha_its, Si = AttentionLayer(name='att1')(wordDrop)
v6 = Dense(1, activation="sigmoid", name="dense")(Si)
#model.add(Dense(1, activation="sigmoid", name="documentOut3"))
model = Model(inputs=[wordInputs] , outputs=[v6])
model.compile(loss='binary_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
return model
def fGRU_avg(MAX_NB_WORDS, MAX_WORDS, MAX_SENTS, EMBEDDING_DIM, WORDGRU, embedding_matrix, DROPOUTPER):
wordInputs = Input(shape=(MAX_SENTS+1, MAX_WORDS), name="wordInputs", dtype='float32')
wordInp = Flatten()(wordInputs)
wordEmbedding = Embedding(MAX_NB_WORDS, EMBEDDING_DIM, weights=[embedding_matrix], mask_zero=False, trainable=True, name='wordEmbedding')(wordInp)
hij = Bidirectional(GRU(WORDGRU, return_sequences=True), name='gru1')(wordEmbedding)
head = GlobalAveragePooling1D()(hij)
v6 = Dense(1, activation="sigmoid", name="dense")(head)
model = Model(inputs=[wordInputs] , outputs=[v6])
model.compile(loss='binary_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
return model
def reactionrnn_model(weights_path, num_classes, maxlen=140):
'''
Builds the model architecture for textgenrnn and
loads the pretrained weights for the model.
'''
input = Input(shape=(maxlen,), name='input')
embedded = Embedding(num_classes, 100, input_length=maxlen,
name='embedding')(input)
rnn = GRU(256, return_sequences=False, name='rnn')(embedded)
output = Dense(5, name='output',
activation=lambda x: K.relu(x) / K.sum(K.relu(x),
axis=-1))(rnn)
model = Model(inputs=[input], outputs=[output])
model.load_weights(weights_path, by_name=True)
model.compile(loss='mse', optimizer='nadam')
return model
def create_char_gru_model(self, emb_dim, word_maxlen, vocab_char_size,
char_maxlen):
from keras.layers import GRU
logger.info('Building character GRU model')
input_char = Input(shape=(char_maxlen, ), name='input_char')
char_emb = Embedding(
vocab_char_size, emb_dim, mask_zero=True)(input_char)
gru = GRU(
300,
return_sequences=True,
dropout=self.dropout,
recurrent_dropout=self.recurrent_dropout)(char_emb)
dropped = Dropout(0.5)(gru)
mot = MeanOverTime(mask_zero=True)(dropped)
densed = Dense(self.num_outputs, name='dense')(mot)
output = Activation('sigmoid')(densed)
model = Model(inputs=input_char, outputs=output)
model.get_layer('dense').bias.set_value(self.bias)
logger.info(' Done')
return model
def recursive_test_helper(layer, channel_index):
main_input = Input(shape=[32, 10])
x = layer(main_input)
x = GRU(4, return_sequences=False)(x)
main_output = Dense(5)(x)
model = Model(inputs=main_input, outputs=main_output)
# Delete channels
del_layer_index = 1
next_layer_index = 2
del_layer = model.layers[del_layer_index]
new_model = operations.delete_channels(model, del_layer, channel_index)
new_w = new_model.layers[next_layer_index].get_weights()
# Calculate next layer's correct weights
channel_count = getattr(del_layer, utils.get_channels_attr(del_layer))
channel_index = [i % channel_count for i in channel_index]
correct_w = model.layers[next_layer_index].get_weights()
correct_w[0] = np.delete(correct_w[0], channel_index, axis=0)
assert weights_equal(correct_w, new_w)
def test_initial_state_GRU(self):
data = np.random.rand(1, 1, 2)
model = keras.models.Sequential()
model.add(keras.layers.GRU(5, input_shape=(1, 2), batch_input_shape=[1, 1, 2], stateful=True))
model.get_layer(index=1).reset_states()
coreml_model = keras_converter.convert(model=model, input_names='data', output_names='output')
keras_output_1 = model.predict(data)
coreml_full_output_1 = coreml_model.predict({'data': data})
coreml_output_1 = coreml_full_output_1['output']
coreml_output_1 = np.expand_dims(coreml_output_1, 1)
np.testing.assert_array_almost_equal(coreml_output_1.T, keras_output_1)
hidden_state = (np.random.rand(1, 5))
model.get_layer(index=1).reset_states(states=hidden_state)
coreml_model = keras_converter.convert(model=model, input_names='data', output_names='output')
spec = coreml_model.get_spec()
keras_output_2 = model.predict(data)
coreml_full_output_2 = coreml_model.predict({'data': data, spec.description.input[1].name: hidden_state[0]})
coreml_output_2 = coreml_full_output_2['output']
coreml_output_2 = np.expand_dims(coreml_output_2, 1)
np.testing.assert_array_almost_equal(coreml_output_2.T, keras_output_2)
def test_small_no_sequence_gru_random(self):
np.random.seed(1988)
input_dim = 10
input_length = 1
num_channels = 1
# Define a model
model = Sequential()
model.add(GRU(num_channels, input_shape = (input_length, input_dim),
recurrent_activation = 'sigmoid'))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2-0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output')
def test_medium_no_sequence_gru_random(self,
model_precision=_MLMODEL_FULL_PRECISION):
np.random.seed(1988)
input_dim = 10
input_length = 1
num_channels = 10
# Define a model
model = Sequential()
model.add(GRU(num_channels, input_shape = (input_length, input_dim), recurrent_activation = 'sigmoid'))
# Set some random weights
model.set_weights([np.random.rand(*w.shape) for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob='data', output_blob='output',
model_precision=model_precision)
def test_temporal_regression():
'''
Predict float numbers (regression) based on sequences
of float numbers of length 3 using a single layer of GRU units
'''
(X_train, y_train), (X_test, y_test) = get_test_data(nb_train=500,
nb_test=400,
input_shape=(3, 5),
output_shape=(2,),
classification=False)
model = Sequential()
model.add(GRU(y_train.shape[-1],
input_shape=(X_train.shape[1], X_train.shape[2])))
model.compile(loss='hinge', optimizer='adam')
history = model.fit(X_train, y_train, nb_epoch=5, batch_size=16,
validation_data=(X_test, y_test), verbose=0)
assert(history.history['val_loss'][-1] < 1.)
def bidirectional_gru(len_output):
# sequence_input is a matrix of glove vectors (one for each input word)
sequence_input = Input(
shape=(MAX_SEQUENCE_LENGTH, EMBEDDING_DIM,), dtype='float32')
l_lstm = Bidirectional(GRU(100))(sequence_input)
# TODO look call(input_at_t, states_at_t) method, returning (output_at_t, states_at_t_plus_1)
# also look at switch(condition, then_expression, else_expression) for deciding when to feed previous state
preds = Dense(len_output, activation='softmax')(l_lstm)
model = Model(sequence_input, preds)
model.compile(loss='categorical_crossentropy',
optimizer='rmsprop',
metrics=[utils.f1_score, 'categorical_accuracy'])
return model
# required, see values below
def test_temporal_regression():
'''
Predict float numbers (regression) based on sequences
of float numbers of length 3 using a single layer of GRU units
'''
(X_train, y_train), (X_test, y_test) = get_test_data(nb_train=500,
nb_test=400,
input_shape=(3, 5),
output_shape=(2,),
classification=False)
model = Sequential()
model.add(GRU(y_train.shape[-1],
input_shape=(X_train.shape[1], X_train.shape[2])))
model.compile(loss='hinge', optimizer='adam')
history = model.fit(X_train, y_train, nb_epoch=5, batch_size=16,
validation_data=(X_test, y_test), verbose=0)
assert(history.history['val_loss'][-1] < 1.)
def BiGRU(X_train, y_train, X_test, y_test, gru_units, dense_units, input_shape, \
batch_size, epochs, drop_out, patience):
model = Sequential()
reg = L1L2(l1=0.2, l2=0.2)
model.add(Bidirectional(GRU(units = gru_units, dropout= drop_out, activation='relu', recurrent_regularizer = reg,
return_sequences = True),
input_shape = input_shape,
merge_mode="concat"))
model.add(BatchNormalization())
model.add(TimeDistributed(Dense(dense_units, activation='relu')))
model.add(BatchNormalization())
model.add(Bidirectional(GRU(units = gru_units, dropout= drop_out, activation='relu', recurrent_regularizer=reg,
return_sequences = True),
merge_mode="concat"))
model.add(BatchNormalization())
model.add(Dense(units=1))
model.add(GlobalAveragePooling1D())
print(model.summary())
early_stopping = EarlyStopping(monitor="val_loss", patience = patience)
model.compile(loss='mse', optimizer= 'adam')
history_callback = model.fit(X_train, y_train, batch_size=batch_size, epochs=epochs,\
verbose=2, callbacks=[early_stopping], validation_data=[X_test, y_test], shuffle = True)
return model, history_callback
def main():
print("\n\nLoading data...")
data_dir = "/data/translate"
vocab_size = 20000
en, fr = prepare_date(data_dir, vocab_size)
print("\n\nbuilding the model...")
embedding_size = 64
hidden_size = 32
model = Sequential()
model.add(Embedding(en.max_features, embedding_size, input_length=en.max_length, mask_zero=True))
model.add(Bidirectional(GRU(hidden_size), merge_mode='sum'))
model.add(RepeatVector(fr.max_length))
model.add(GRU(embedding_size))
model.add(Dense(fr.max_length, activation="softmax"))
model.compile('rmsprop', 'mse')
print(model.get_config())
print("\n\nFitting the model...")
model.fit(en.examples, fr.examples)
print("\n\nEvaluation...")
#TODO
def main():
print("\n\nLoading data...")
data_dir = "/data/translate"
vocab_size = 20000
en, fr = prepare_date(data_dir, vocab_size)
print("\n\nbuilding the model...")
embedding_size = 64
hidden_size = 32
model = Sequential()
model.add(Embedding(en.max_features, embedding_size, input_length=en.max_length, mask_zero=True))
model.add(Bidirectional(GRU(hidden_size), merge_mode='sum'))
model.add(RepeatVector(fr.max_length))
model.add(GRU(embedding_size))
model.add(Dense(fr.max_length, activation="softmax"))
model.compile('rmsprop', 'mse')
print(model.get_config())
print("\n\nFitting the model...")
model.fit(en.examples, fr.examples)
print("\n\nEvaluation...")
#TODO
def image_caption_model(vocab_size=2500, embedding_matrix=None, lang_dim=100,
max_caplen=28, img_dim=2048, clipnorm=1):
print('generating vocab_history model v5')
# text: current word
lang_input = Input(shape=(1,))
img_input = Input(shape=(img_dim,))
seq_input = Input(shape=(max_caplen,))
vhist_input = Input(shape=(vocab_size,))
if embedding_matrix is not None:
x = Embedding(output_dim=lang_dim, input_dim=vocab_size, init='glorot_uniform', input_length=1, weights=[embedding_matrix])(lang_input)
else:
x = Embedding(output_dim=lang_dim, input_dim=vocab_size, init='glorot_uniform', input_length=1)(lang_input)
lang_embed = Reshape((lang_dim,))(x)
lang_embed = merge([lang_embed, seq_input], mode='concat', concat_axis=-1)
lang_embed = Dense(lang_dim)(lang_embed)
lang_embed = Dropout(0.25)(lang_embed)
merge_layer = merge([img_input, lang_embed, vhist_input], mode='concat', concat_axis=-1)
merge_layer = Reshape((1, lang_dim+img_dim+vocab_size))(merge_layer)
gru_1 = GRU(img_dim)(merge_layer)
gru_1 = Dropout(0.25)(gru_1)
gru_1 = Dense(img_dim)(gru_1)
gru_1 = BatchNormalization()(gru_1)
gru_1 = Activation('softmax')(gru_1)
attention_1 = merge([img_input, gru_1], mode='mul', concat_axis=-1)
attention_1 = merge([attention_1, lang_embed, vhist_input], mode='concat', concat_axis=-1)
attention_1 = Reshape((1, lang_dim + img_dim + vocab_size))(attention_1)
gru_2 = GRU(1024)(attention_1)
gru_2 = Dropout(0.25)(gru_2)
gru_2 = Dense(vocab_size)(gru_2)
gru_2 = BatchNormalization()(gru_2)
out = Activation('softmax')(gru_2)
model = Model(input=[img_input, lang_input, seq_input, vhist_input], output=out)
model.compile(loss='categorical_crossentropy', optimizer=RMSprop(lr=0.0001, clipnorm=1.))
return model
def setUp(self):
super(TestRNNEncoderWithGRUClass, self).setUp()
self.model = self.create_model(GRU)
def setUp(self):
super(TestRNNEncoderWithRNNCellGRUClass, self).setUp()
self.model = self.create_model(GRU)
def setUp(self):
super(TestRNNCellWithGRUClass, self).setUp()
self.model = self.create_model(GRU)
def setUp(self):
super(TestRNNDecoderWithGRUClass, self).setUp()
self.model = self.create_model(GRU)
def setUp(self):
super(TestRNNDecoderWithRNNCellGRUClass, self).setUp()
self.model = self.create_model(GRU)
test_bidirectional_rnn_encoder.py 文件源码
项目:yoctol-keras-layer-zoo
作者: Yoctol
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def setUp(self):
super(TestBidirectionalRNNEncoderWithRNNCellGRUClass, self).setUp()
self.model = self.create_model(GRU)
test_rnn_decoder_with_decoding_size.py 文件源码
项目:yoctol-keras-layer-zoo
作者: Yoctol
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def setUp(self):
super(TestRNNDecoderWithDecodingSizeGRUClass, self).setUp()
self.model = self.create_model(GRU)
self.max_length = self.decoding_length
def GRUf(MAX_NB_WORDS, MAX_WORDS, MAX_SENTS, EMBEDDING_DIM, WORDGRU, embedding_matrix, DROPOUTPER):
wordInputs = Input(shape=(MAX_SENTS+1, MAX_WORDS), name="wordInputs", dtype='float32')
wordInp = Flatten()(wordInputs)
wordEmbedding = Embedding(MAX_NB_WORDS, EMBEDDING_DIM, weights=[embedding_matrix], mask_zero=False, trainable=True, name='wordEmbedding')(wordInp)
hij = Bidirectional(GRU(WORDGRU, return_sequences=False), name='gru1')(wordEmbedding)
v6 = Dense(1, activation="sigmoid", name="dense")(hij)
model = Model(inputs=[wordInputs] , outputs=[v6])
model.compile(loss='binary_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
return model
def fhan2_avg(MAX_NB_WORDS, MAX_WORDS, MAX_SENTS, EMBEDDING_DIM, WORDGRU, embedding_matrix, DROPOUTPER):
wordInputs = Input(shape=(MAX_WORDS,), name="wordInputs", dtype='float32')
wordEmbedding = Embedding(MAX_NB_WORDS, EMBEDDING_DIM, weights=[embedding_matrix], mask_zero=False, trainable=True, name='wordEmbedding')(wordInputs)
hij = Bidirectional(GRU(WORDGRU, return_sequences=True), name='gru1')(wordEmbedding)
Si = GlobalAveragePooling1D()(hij)
wordEncoder = Model(wordInputs, Si)
# -----------------------------------------------------------------------------------------------
docInputs = Input(shape=(None, MAX_WORDS), name='docInputs' ,dtype='float32')
#sentenceMasking = Masking(mask_value=0.0, name='sentenceMasking')(docInputs)
sentEncoding = TimeDistributed(wordEncoder, name='sentEncoding')(docInputs)
hi = Bidirectional(GRU(WORDGRU, return_sequences=True), merge_mode='concat', name='gru2')(sentEncoding)
Vb = GlobalAveragePooling1D()(hi)
v6 = Dense(1, activation="sigmoid", kernel_initializer = 'glorot_uniform', name="dense")(Vb)
model = Model(inputs=[docInputs] , outputs=[v6])
sgd = optimizers.SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
model.compile(loss='binary_crossentropy', optimizer=sgd, metrics=['accuracy'])
return model, wordEncoder
def han2(MAX_NB_WORDS, MAX_WORDS, MAX_SENTS, EMBEDDING_DIM, WORDGRU, embedding_matrix, DROPOUTPER):
wordInputs = Input(shape=(MAX_WORDS,), name="wordInputs", dtype='float32')
#print 'in han2 max-nb-words'
#print MAX_NB_WORDS
wordEmbedding = Embedding(MAX_NB_WORDS, EMBEDDING_DIM, weights=[embedding_matrix], mask_zero=True, trainable=True, name='wordEmbedding')(wordInputs)
hij = Bidirectional(GRU(WORDGRU, return_sequences=True), name='gru1')(wordEmbedding)
alpha_its, Si = AttentionLayer(name='att1')(hij)
#wordDrop = Dropout(DROPOUTPER, name='wordDrop')(Si)
wordEncoder = Model(wordInputs, Si)
# -----------------------------------------------------------------------------------------------
docInputs = Input(shape=(None, MAX_WORDS), name='docInputs' ,dtype='float32')
sentenceMasking = Masking(mask_value=0.0, name='sentenceMasking')(docInputs)
sentEncoding = TimeDistributed(wordEncoder, name='sentEncoding')(sentenceMasking)
hi = Bidirectional(GRU(WORDGRU, return_sequences=True), merge_mode='concat', name='gru2')(sentEncoding)
alpha_s, Vb = AttentionLayer(name='att2')(hi)
#sentDrop = Dropout(DROPOUTPER, name='sentDrop')(Vb)
v6 = Dense(1, activation="sigmoid", kernel_initializer = 'he_normal', name="dense")(Vb)
model = Model(inputs=[docInputs] , outputs=[v6])
sgd = optimizers.SGD(lr=0.01, decay=1e-6, momentum=0.9, nesterov=True)
model.compile(loss='binary_crossentropy', optimizer=sgd, metrics=['accuracy'])
return model, wordEncoder
def create_temporal_sequential_model():
model = Sequential()
model.add(GRU(32, input_shape=(timesteps, input_dim), return_sequences=True))
model.add(TimeDistributedDense(nb_classes))
model.add(Activation('softmax'))
return model
def test_temporal_classification():
'''
Classify temporal sequences of float numbers
of length 3 into 2 classes using
single layer of GRU units and softmax applied
to the last activations of the units
'''
(X_train, y_train), (X_test, y_test) = get_test_data(nb_train=500,
nb_test=500,
input_shape=(3, 5),
classification=True,
nb_class=2)
y_train = to_categorical(y_train)
y_test = to_categorical(y_test)
model = Sequential()
model.add(GRU(y_train.shape[-1],
input_shape=(X_train.shape[1], X_train.shape[2]),
activation='softmax'))
model.compile(loss='categorical_crossentropy',
optimizer='adagrad',
metrics=['accuracy'])
history = model.fit(X_train, y_train, nb_epoch=20, batch_size=32,
validation_data=(X_test, y_test),
verbose=0)
assert(history.history['val_acc'][-1] >= 0.8)
def process(self):
self.model.add(Embedding(len(self.song_hash) + 1, 50, mask_zero=True))
self.model.add(SpatialDropout1D(rate=0.20))
self.model.add(GRU(128))
self.model.add(Dense(len(self.song_hash) + 1, activation=self.activation))
self.model.compile(optimizer=self.optimizer,
loss=self.loss,
metrics=self.metrics)
self.model.fit(self.x_train, self.y_train, epochs=100, batch_size=512, validation_split=0.1,
callbacks=self.callbacks)
return self