def create_lstm_autoencoder(input_dim, timesteps, latent_dim):
"""
Creates an LSTM Autoencoder (VAE). Returns Autoencoder, Encoder, Generator.
(All code by fchollet - see reference.)
# Arguments
input_dim: int.
timesteps: int, input timestep dimension.
latent_dim: int, latent z-layer shape.
# References
- [Building Autoencoders in Keras](https://blog.keras.io/building-autoencoders-in-keras.html)
"""
inputs = Input(shape=(timesteps, input_dim,))
encoded = LSTM(latent_dim)(inputs)
decoded = RepeatVector(timesteps)(encoded)
decoded = LSTM(input_dim, return_sequences=True)(decoded)
sequence_autoencoder = Model(inputs, decoded)
encoder = Model(inputs, encoded)
autoencoder = Model(inputs, decoded)
autoencoder.compile(optimizer='adam', loss='mse')
return autoencoder
python类RepeatVector()的实例源码
def create_model(self, ret_model = False):
image_model = Sequential()
image_model.add(Dense(EMBEDDING_DIM, input_dim = 4096, activation='relu'))
image_model.add(RepeatVector(self.max_length))
lang_model = Sequential()
lang_model.add(Embedding(self.vocab_size, 256, input_length=self.max_length))
lang_model.add(LSTM(256,return_sequences=True))
lang_model.add(TimeDistributed(Dense(EMBEDDING_DIM)))
model = Sequential()
model.add(Merge([image_model, lang_model], mode='concat'))
model.add(LSTM(1000,return_sequences=False))
model.add(Dense(self.vocab_size))
model.add(Activation('softmax'))
print ("Model created!")
if(ret_model==True):
return model
model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
return model
def test_repeat_vector(self):
from keras.layers import RepeatVector
model = Sequential()
model.add(RepeatVector(3, input_shape=(5,)))
input_names = ['input']
output_names = ['output']
spec = keras.convert(model, input_names, output_names).get_spec()
self.assertIsNotNone(spec)
# Test the model class
self.assertIsNotNone(spec.description)
self.assertTrue(spec.HasField('neuralNetwork'))
# Test the inputs and outputs
self.assertEquals(len(spec.description.input), len(input_names))
self.assertEqual(sorted(input_names),
sorted(map(lambda x: x.name, spec.description.input)))
self.assertEquals(len(spec.description.output), len(output_names))
self.assertEqual(sorted(output_names),
sorted(map(lambda x: x.name, spec.description.output)))
layers = spec.neuralNetwork.layers
self.assertIsNotNone(layers[0].sequenceRepeat)
def test_one_to_many(self):
params = dict(
input_dims=[1, 10], activation='tanh',
return_sequences=False, output_dim=3
),
number_of_times = 4
model = Sequential()
model.add(RepeatVector(number_of_times, input_shape=(10,)))
model.add(LSTM(output_dim=params[0]['output_dim'],
activation=params[0]['activation'],
inner_activation='sigmoid',
return_sequences=True,
))
relative_error, keras_preds, coreml_preds = simple_model_eval(params, model)
# print relative_error, '\n', keras_preds, '\n', coreml_preds, '\n'
for i in range(len(relative_error)):
self.assertLessEqual(relative_error[i], 0.01)
def test_tiny_babi_rnn(self):
vocab_size = 10
embed_hidden_size = 8
story_maxlen = 5
query_maxlen = 5
input_tensor_1 = Input(shape=(story_maxlen,))
x1 = Embedding(vocab_size, embed_hidden_size)(input_tensor_1)
x1 = Dropout(0.3)(x1)
input_tensor_2 = Input(shape=(query_maxlen,))
x2 = Embedding(vocab_size, embed_hidden_size)(input_tensor_2)
x2 = Dropout(0.3)(x2)
x2 = LSTM(embed_hidden_size, return_sequences=False)(x2)
x2 = RepeatVector(story_maxlen)(x2)
x3 = add([x1, x2])
x3 = LSTM(embed_hidden_size, return_sequences=False)(x3)
x3 = Dropout(0.3)(x3)
x3 = Dense(vocab_size, activation='softmax')(x3)
model = Model(inputs=[input_tensor_1,input_tensor_2], outputs=[x3])
self._test_keras_model(model, one_dim_seq_flags=[True, True])
def __init__(self, maxlen, d_L, d_C, d_D, V_C):
"""
maxlen = maximum input/output word size
d_L = language model hidden state (= context vector) size
d_C = character features (input embedding vector size)
d_D = decoder hidden state h size
V_C = character vocabulary
"""
# extend embeddings to treat zero values as zeros vectors (for y_0 = 0)
# but don't do any masking
class CharEmb(Embedding):
def call(self, x, mask=None):
y = super(CharEmb, self).call(x)
return y * K.cast(K.expand_dims(x, -1), K.floatx())
c = Input(shape=(d_L,), name='c')
y_tm1 = Input(shape=(maxlen,), name='y_tm1', dtype='int32')
ye_tm1 = CharEmb(V_C.size + 1, d_C)(y_tm1)
h = DecoderGRU(d_D, return_sequences=True)([ye_tm1, c])
s = Maxout(d_C)([h, ye_tm1, RepeatVector(maxlen)(c)])
s = Dropout(.2)(s)
c_I = ProjectionOverTime(V_C.size)(s)
super(W2C, self).__init__(input=[c, y_tm1], output=c_I, name='W2C')
def arch_attention(embedding_layer, sequence_length, classes):
tweet_input = Input(shape=(sequence_length,), dtype='int32')
embedded_tweet = embedding_layer(tweet_input)
activations = LSTM(128, return_sequences=True, name='recurrent_layer')(embedded_tweet)
attention = TimeDistributed(Dense(1, activation='tanh'))(activations)
attention = Flatten()(attention)
attention = Activation('softmax')(attention)
attention = RepeatVector(128)(attention)
attention = Permute([2, 1], name='attention_layer')(attention)
sent_representation = merge([activations, attention], mode='mul')
sent_representation = Lambda(lambda xin: K.sum(xin, axis=1), name='merged_layer')(sent_representation)
tweet_output = Dense(classes, activation='softmax', name='predictions')(sent_representation)
tweetnet = Model(tweet_input, tweet_output)
tweetnet.compile(optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy'])
return tweetnet
def arch_attention36(embedding_layer, sequence_length, classes):
tweet_input = Input(shape=(sequence_length,), dtype='int32')
embedded_tweet = embedding_layer(tweet_input)
activations = LSTM(36, return_sequences=True, name='recurrent_layer')(embedded_tweet)
attention = TimeDistributed(Dense(1, activation='tanh'))(activations)
attention = Flatten()(attention)
attention = Activation('softmax')(attention)
attention = RepeatVector(36)(attention)
attention = Permute([2, 1], name='attention_layer')(attention)
sent_representation = merge([activations, attention], mode='mul')
sent_representation = Lambda(lambda xin: K.sum(xin, axis=1), name='merged_layer')(sent_representation)
tweet_output = Dense(classes, activation='softmax', name='output_layer')(sent_representation)
tweetnet = Model(tweet_input, tweet_output)
tweetnet.compile(optimizer='adam',
loss='categorical_crossentropy',
metrics=['accuracy'])
return tweetnet
def main():
print("\n\nLoading data...")
data_dir = "/data/translate"
vocab_size = 20000
en, fr = prepare_date(data_dir, vocab_size)
print("\n\nbuilding the model...")
embedding_size = 64
hidden_size = 32
model = Sequential()
model.add(Embedding(en.max_features, embedding_size, input_length=en.max_length, mask_zero=True))
model.add(Bidirectional(GRU(hidden_size), merge_mode='sum'))
model.add(RepeatVector(fr.max_length))
model.add(GRU(embedding_size))
model.add(Dense(fr.max_length, activation="softmax"))
model.compile('rmsprop', 'mse')
print(model.get_config())
print("\n\nFitting the model...")
model.fit(en.examples, fr.examples)
print("\n\nEvaluation...")
#TODO
def main():
print("\n\nLoading data...")
data_dir = "/data/translate"
vocab_size = 20000
en, fr = prepare_date(data_dir, vocab_size)
print("\n\nbuilding the model...")
embedding_size = 64
hidden_size = 32
model = Sequential()
model.add(Embedding(en.max_features, embedding_size, input_length=en.max_length, mask_zero=True))
model.add(Bidirectional(GRU(hidden_size), merge_mode='sum'))
model.add(RepeatVector(fr.max_length))
model.add(GRU(embedding_size))
model.add(Dense(fr.max_length, activation="softmax"))
model.compile('rmsprop', 'mse')
print(model.get_config())
print("\n\nFitting the model...")
model.fit(en.examples, fr.examples)
print("\n\nEvaluation...")
#TODO
def generate_model(output_len, chars=None):
"""Generate the model"""
print('Build model...')
chars = chars or CHARS
model = Sequential()
# "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE
# note: in a situation where your input sequences have a variable length,
# use input_shape=(None, nb_feature).
for layer_number in range(INPUT_LAYERS):
model.add(recurrent.LSTM(HIDDEN_SIZE, input_shape=(None, len(chars)), init=INITIALIZATION,
return_sequences=layer_number + 1 < INPUT_LAYERS))
model.add(Dropout(AMOUNT_OF_DROPOUT))
# For the decoder's input, we repeat the encoded input for each time step
model.add(RepeatVector(output_len))
# The decoder RNN could be multiple layers stacked or a single layer
for _ in range(OUTPUT_LAYERS):
model.add(recurrent.LSTM(HIDDEN_SIZE, return_sequences=True, init=INITIALIZATION))
model.add(Dropout(AMOUNT_OF_DROPOUT))
# For each of step of the output sequence, decide which character should be chosen
model.add(TimeDistributed(Dense(len(chars), init=INITIALIZATION)))
model.add(Activation('softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
return model
def generate_model(args, nb_features, input_length, nb_repeats=1):
"""
Generate the model.
"""
emb_weights = np.eye(nb_features)
model = Sequential()
model.add(Embedding(input_dim=nb_features, output_dim=nb_features, input_length=input_length,
weights=[emb_weights], trainable=False))
for layer_id in range(args.input_layers):
model.add(args.cell_type(args.hidden_layers,
return_sequences=layer_id + 1 < args.input_layers))
model.add(Dropout(args.dropout))
model.add(RepeatVector(nb_repeats))
for _ in range(args.output_layers):
model.add(args.cell_type(args.hidden_layers, return_sequences=True))
model.add(Dropout(args.dropout))
model.add(TimeDistributed(Dense(nb_features)))
model.add(Activation("softmax"))
model.compile(loss="sparse_categorical_crossentropy",
optimizer=args.optimizer,
metrics=["accuracy"])
return model
def test(path_test, input_size, hidden_size, batch_size, save_dir, model_name, maxlen):
db = read_data(path_test)
X = create_sequences(db, maxlen, maxlen)
y = create_sequences(db, maxlen, maxlen)
X = np.reshape(X, (X.shape[0], X.shape[1], 1))
y = np.reshape(y, (y.shape[0], y.shape[1], 1))
# build the model: 1 layer LSTM
print('Build model...')
model = Sequential()
# "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE
# note: in a situation where your input sequences have a variable length,
# use input_shape=(None, nb_feature).
model.add(LSTM(hidden_size, input_shape=(maxlen, input_size)))
# For the decoder's input, we repeat the encoded input for each time step
model.add(RepeatVector(maxlen))
# The decoder RNN could be multiple layers stacked or a single layer
model.add(LSTM(hidden_size, return_sequences=True))
# For each of step of the output sequence, decide which character should be chosen
model.add(TimeDistributed(Dense(1)))
model.load_weights(save_dir + model_name)
model.compile(loss='mae', optimizer='adam')
model.summary()
prediction = model.predict(X, batch_size, verbose=1, )
prediction = prediction.flatten()
# prediction_container = np.array(prediction).flatten()
plt.plot(prediction.flatten()[:4000], label='prediction')
plt.plot(y.flatten()[maxlen:4000 + maxlen], label='true')
plt.legend()
plt.show()
store_prediction_and_ground_truth(model)
def train_normal_model(path_train, input_size, hidden_size, batch_size, early_stopping_patience, val_percentage,
save_dir, model_name, maxlen):
if not os.path.exists(save_dir):
os.mkdir(save_dir)
db = read_data(path_train)
train_x = db[:-maxlen]
train_y = db[maxlen:]
X = create_sequences(train_x, maxlen, maxlen)
y = create_sequences(train_y, maxlen, maxlen)
X = np.reshape(X, (X.shape[0], X.shape[1], 1))
y = np.reshape(y, (y.shape[0], y.shape[1], 1))
#
# preparing the callbacks
check_pointer = callbacks.ModelCheckpoint(filepath=save_dir + model_name, verbose=1, save_best_only=True)
early_stop = callbacks.EarlyStopping(patience=early_stopping_patience, verbose=1)
# build the model: 1 layer LSTM
print('Build model...')
model = Sequential()
# "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE
# note: in a situation where your input sequences have a variable length,
# use input_shape=(None, nb_feature).
model.add(LSTM(hidden_size, input_shape=(maxlen, input_size)))
# For the decoder's input, we repeat the encoded input for each time step
model.add(RepeatVector(maxlen))
# The decoder RNN could be multiple layers stacked or a single layer
model.add(LSTM(hidden_size, return_sequences=True))
# For each of step of the output sequence, decide which character should be chosen
model.add(TimeDistributed(Dense(1)))
model.compile(loss='mae', optimizer='adam')
model.summary()
model.fit(X, y, batch_size=batch_size, nb_epoch=50, validation_split=val_percentage,
callbacks=[check_pointer, early_stop])
return model
def test_sequential_model_saving():
model = Sequential()
model.add(Dense(2, input_dim=3))
model.add(RepeatVector(3))
model.add(TimeDistributed(Dense(3)))
model.compile(loss=objectives.MSE,
optimizer=optimizers.RMSprop(lr=0.0001),
metrics=[metrics.categorical_accuracy],
sample_weight_mode='temporal')
x = np.random.random((1, 3))
y = np.random.random((1, 3, 3))
model.train_on_batch(x, y)
out = model.predict(x)
_, fname = tempfile.mkstemp('.h5')
save_model(model, fname)
new_model = load_model(fname)
os.remove(fname)
out2 = new_model.predict(x)
assert_allclose(out, out2, atol=1e-05)
# test that new updates are the same with both models
x = np.random.random((1, 3))
y = np.random.random((1, 3, 3))
model.train_on_batch(x, y)
new_model.train_on_batch(x, y)
out = model.predict(x)
out2 = new_model.predict(x)
assert_allclose(out, out2, atol=1e-05)
def test_keras_import(self):
model = Sequential()
model.add(RepeatVector(3, input_shape=(10,)))
model.build()
self.keras_type_test(model, 0, 'RepeatVector')
def test_keras_export(self):
tests = open(os.path.join(settings.BASE_DIR, 'tests', 'unit', 'keras_app',
'keras_export_test.json'), 'r')
response = json.load(tests)
tests.close()
net = yaml.safe_load(json.dumps(response['net']))
net = {'l0': net['Input3'], 'l1': net['RepeatVector']}
net['l0']['connection']['output'].append('l1')
inp = data(net['l0'], '', 'l0')['l0']
net = repeat_vector(net['l1'], [inp], 'l1')
model = Model(inp, net['l1'])
self.assertEqual(model.layers[1].__class__.__name__, 'RepeatVector')
def repeat_vector(layer, layer_in, layerId):
out = {layerId: RepeatVector(layer['params']['n'])(*layer_in)}
return out
def create_simpleCnnRnn(image_shape, max_caption_len,vocab_size):
image_model = Sequential()
# image_shape : C,W,H
# input: 100x100 images with 3 channels -> (3, 100, 100) tensors.
# this applies 32 convolution filters of size 3x3 each.
image_model.add(Convolution2D(32, 3, 3, border_mode='valid', input_shape=image_shape))
image_model.add(BatchNormalization())
image_model.add(Activation('relu'))
image_model.add(Convolution2D(32, 3, 3))
image_model.add(BatchNormalization())
image_model.add(Activation('relu'))
image_model.add(MaxPooling2D(pool_size=(2, 2)))
image_model.add(Dropout(0.25))
image_model.add(Convolution2D(64, 3, 3, border_mode='valid'))
image_model.add(BatchNormalization())
image_model.add(Activation('relu'))
image_model.add(Convolution2D(64, 3, 3))
image_model.add(BatchNormalization())
image_model.add(Activation('relu'))
image_model.add(MaxPooling2D(pool_size=(2, 2)))
image_model.add(Dropout(0.25))
image_model.add(Flatten())
# Note: Keras does automatic shape inference.
image_model.add(Dense(128))
image_model.add(RepeatVector(max_caption_len))
image_model.add(Bidirectional(GRU(output_dim=128, return_sequences=True)))
#image_model.add(GRU(output_dim=128, return_sequences=True))
image_model.add(TimeDistributed(Dense(vocab_size)))
image_model.add(Activation('softmax'))
return image_model
def create_imgText(image_shape, max_caption_len,vocab_size):
image_model = Sequential()
# image_shape : C,W,H
# input: 100x100 images with 3 channels -> (3, 100, 100) tensors.
# this applies 32 convolution filters of size 3x3 each.
image_model.add(Convolution2D(32, 3, 3, border_mode='valid', input_shape=image_shape))
image_model.add(BatchNormalization())
image_model.add(Activation('relu'))
image_model.add(Convolution2D(32, 3, 3))
image_model.add(BatchNormalization())
image_model.add(Activation('relu'))
image_model.add(MaxPooling2D(pool_size=(2, 2)))
image_model.add(Dropout(0.25))
image_model.add(Convolution2D(64, 3, 3, border_mode='valid'))
image_model.add(BatchNormalization())
image_model.add(Activation('relu'))
image_model.add(Convolution2D(64, 3, 3))
image_model.add(BatchNormalization())
image_model.add(Activation('relu'))
image_model.add(MaxPooling2D(pool_size=(2, 2)))
image_model.add(Dropout(0.25))
image_model.add(Flatten())
# Note: Keras does automatic shape inference.
image_model.add(Dense(128))
image_model.add(RepeatVector(1))
#model = AttentionSeq2Seq(input_dim=128, input_length=1, hidden_dim=128, output_length=max_caption_len, output_dim=vocab_size)
model = Seq2Seq(input_dim=128, input_length=1, hidden_dim=128, output_length=max_caption_len,
output_dim=128, peek=True)
image_model.add(model)
image_model.add(TimeDistributed(Dense(vocab_size)))
image_model.add(Activation('softmax'))
return image_model
def _create_layers(self, input_layer):
""" Create the encoding and the decoding layers of the sequence-to-sequence autoencoder.
:return: self
"""
encode_layer = LSTM(name='encoder',
units=self.n_hidden,
activation=self.enc_activation)(input_layer)
n_inputs = K.int_shape(input_layer)[-1]
decoded = RepeatVector(n=self.time_steps)(encode_layer)
self._decode_layer = LSTM(name='decoder',
units=n_inputs,
activation=self.dec_activation,
return_sequences=True)(decoded)
def _create_decoder_model(self):
""" Create the model that maps an encoded input to the original values
:return: self
"""
encoded_input = Input(shape=(self.n_hidden,))
# retrieve the last layer of the autoencoder model
decoder_layer = RepeatVector(n=self.time_steps)(encoded_input)
decoder_layer = self._model.get_layer('decoder')(decoder_layer)
self._decoder = kmodels.Model(inputs=encoded_input, outputs=decoder_layer)
def call(self, inputs, mask=None):
# Import (symbolic) dimensions
max_atoms = K.shape(inputs)[1]
# By [farizrahman4u](https://github.com/fchollet/keras/issues/3995)
ones = layers.Lambda(lambda x: (x * 0 + 1)[:, 0, :], output_shape=lambda s: (s[0], s[2]))(inputs)
dropped = self.dropout_layer(ones)
dropped = layers.RepeatVector(max_atoms)(dropped)
return layers.Lambda(lambda x: x[0] * x[1], output_shape=lambda s: s[0])([inputs, dropped])
def test_repeat_vector(self):
from keras.layers import RepeatVector
model = Sequential()
model.add(RepeatVector(3, input_shape=(5,)))
input_names = ['input']
output_names = ['output']
spec = keras.convert(model, input_names, output_names).get_spec()
self.assertIsNotNone(spec)
# Test the model class
self.assertIsNotNone(spec.description)
self.assertTrue(spec.HasField('neuralNetwork'))
# Test the inputs and outputs
self.assertEquals(len(spec.description.input), len(input_names))
self.assertItemsEqual(input_names,
map(lambda x: x.name, spec.description.input))
self.assertEquals(len(spec.description.output), len(output_names))
self.assertItemsEqual(output_names,
map(lambda x: x.name, spec.description.output))
layers = spec.neuralNetwork.layers
self.assertIsNotNone(layers[0].sequenceRepeat)
def test_sequential_model_saving():
model = Sequential()
model.add(Dense(2, input_dim=3))
model.add(RepeatVector(3))
model.add(TimeDistributed(Dense(3)))
model.compile(loss=objectives.MSE,
optimizer=optimizers.RMSprop(lr=0.0001),
metrics=[metrics.categorical_accuracy],
sample_weight_mode='temporal')
x = np.random.random((1, 3))
y = np.random.random((1, 3, 3))
model.train_on_batch(x, y)
out = model.predict(x)
_, fname = tempfile.mkstemp('.h5')
save_model(model, fname)
new_model = load_model(fname)
os.remove(fname)
out2 = new_model.predict(x)
assert_allclose(out, out2, atol=1e-05)
# test that new updates are the same with both models
x = np.random.random((1, 3))
y = np.random.random((1, 3, 3))
model.train_on_batch(x, y)
new_model.train_on_batch(x, y)
out = model.predict(x)
out2 = new_model.predict(x)
assert_allclose(out, out2, atol=1e-05)
def build_conv_autoencoder(input_dim=(28, 28, 1)):
input_img = Input(shape=input_dim) # adapt this if using `channels_first` image data format
x = Conv2D(64, (3, 3), activation='relu', padding='same')(input_img)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = MaxPooling2D((2, 2), padding='same')(x)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
encoded = MaxPooling2D((2, 2), padding='same')(x)
# at this point the representation is (4, 4, 8) i.e. 128-dimensional
x = Conv2D(32, (3, 3), activation='relu', padding='same')(encoded)
x = UpSampling2D((2, 2))(x)
x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
if input_dim[0] == 28:
x = Conv2D(64, (3, 3), activation='relu')(x)
else:
x = Conv2D(64, (3, 3), activation='relu', padding='same')(x)
x = UpSampling2D((2, 2))(x)
decoded = Conv2D(input_dim[2], (3, 3), activation='sigmoid', padding='same')(x)
autoencoder = Model(input_img, decoded)
autoencoder.compile(optimizer='adadelta', loss='binary_crossentropy')
return autoencoder
# def build_lstm_autoencoder(timesteps, input_dim)
# inputs = Input(shape=(timesteps, input_dim))
# encoded = LSTM(latent_dim)(inputs)
# decoded = RepeatVector(timesteps)(encoded)
# decoded = LSTM(input_dim, return_sequences=True)(decoded)
# sequence_autoencoder = Model(inputs, decoded)
# encoder = Model(inputs, encoded)
# return encoder, sequence_autoencoder
def r2r(dic_len,input_length,output_length,emb_dim=128,hidden=512,deepth=(1,1)):
model = Sequential()
model.add(Embedding(input_dim=dic_len, mask_zero=True, output_dim=emb_dim, input_length=input_length))
for l in range(deepth[0]):
model.add(LSTM(output_dim=hidden, return_sequences=(False if l==deepth[0]-1 else True)))
model.add(RepeatVector(output_length))
model.add(Dropout(0.5))
for l in range(deepth[0]):
model.add(LSTM(hidden, return_sequences=True))
model.add(TimeDistributed(Dense(units=dic_len, activation='softmax')))
model.compile(optimizer='rmsprop',loss='categorical_crossentropy',metrics=['acc'])
return model
def c2r(dic_len,input_length,output_length,emb_dim=128,hidden=512,nb_filter=64,deepth=(1,1),stride=3):
model = Sequential()
model.add(Embedding(input_dim=dic_len, output_dim=emb_dim, input_length=input_length))
for l in range(deepth[0]):
model.add(Conv1D(nb_filter,3,activation='relu'))
model.add(GlobalMaxPooling1D())
model.add(Dropout(0.5))
model.add(RepeatVector(output_length))
for l in range(deepth[0]):
model.add(LSTM(hidden, return_sequences=True))
model.add(TimeDistributed(Dense(units=dic_len, activation='softmax')))
model.compile(optimizer='rmsprop',loss='categorical_crossentropy',metrics=['acc'])
return model
def buildCharEncDec(hidden, RNN, layers, maxlen, chars, dropout=.3):
print('Build model...')
model = Sequential()
# "Encode" the input sequence using an RNN,
# producing an output of HIDDEN_SIZE.
# Note: In a situation where your input sequences have a variable length,
# use input_shape=(None, nb_feature).
# model.add(RNN(hidden, input_shape=(maxlen, len(chars)),
# name="encoder-rnn"))
model.add(Dropout(dropout, input_shape=(maxlen, len(chars)),
noise_shape=(1, maxlen, 1)))
model.add(RNN(hidden, name="encoder-rnn"))
# As the decoder RNN's input, repeatedly provide with the
# last hidden state of
# RNN for each time step. Repeat 'DIGITS + 1' times as that's the maximum
# length of output, e.g., when DIGITS=3, max output is 999+999=1998.
model.add(RepeatVector(maxlen, name="encoding"))
# The decoder RNN could be multiple layers stacked or a single layer.
for ii in range(layers):
# By setting return_sequences to True, return not only the last output
# but all the outputs so far in the form of (nb_samples, timesteps,
# output_dim). This is necessary as TimeDistributed in the below
# expects the first dimension to be the timesteps.
model.add(RNN(hidden, return_sequences=True,
name="decoder%i" % ii))
# Apply a dense layer to the every temporal slice of an input.
# For each step
# of the output sequence, decide which character should be chosen.
model.add(TimeDistributed(Dense(len(chars), name="dense"), name="td"))
model.add(Activation('softmax', name="softmax"))
model.compile(loss='categorical_crossentropy',
optimizer='adam',
metrics=['accuracy'])
model.summary()
return model
def test_sequential_model_saving():
model = Sequential()
model.add(Dense(2, input_dim=3))
model.add(RepeatVector(3))
model.add(TimeDistributed(Dense(3)))
model.compile(loss=objectives.MSE,
optimizer=optimizers.RMSprop(lr=0.0001),
metrics=[metrics.categorical_accuracy],
sample_weight_mode='temporal')
x = np.random.random((1, 3))
y = np.random.random((1, 3, 3))
model.train_on_batch(x, y)
out = model.predict(x)
_, fname = tempfile.mkstemp('.h5')
save_model(model, fname)
new_model = load_model(fname)
os.remove(fname)
out2 = new_model.predict(x)
assert_allclose(out, out2, atol=1e-05)
# test that new updates are the same with both models
x = np.random.random((1, 3))
y = np.random.random((1, 3, 3))
model.train_on_batch(x, y)
new_model.train_on_batch(x, y)
out = model.predict(x)
out2 = new_model.predict(x)
assert_allclose(out, out2, atol=1e-05)