def test_tiny_no_sequence_bidir_random_gpu(self,
model_precision = _MLMODEL_FULL_PRECISION):
np.random.seed(1988)
input_dim = 1
input_length = 1
num_channels = 1
num_samples = 1
# Define a model
model = Sequential()
model.add(Bidirectional(LSTM(num_channels,
implementation = 2, recurrent_activation = 'sigmoid'),
input_shape=(input_length, input_dim)))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2-0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output',
model_precision=model_precision)
python类LSTM的实例源码
def test_small_no_sequence_bidir_random(self):
np.random.seed(1988)
input_dim = 10
input_length = 1
num_channels = 1
# Define a model
model = Sequential()
model.add(Bidirectional(LSTM(num_channels,
implementation = 2, recurrent_activation = 'sigmoid'),
input_shape=(input_length, input_dim)))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2-0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output')
def test_medium_bidir_random_return_seq_false(self):
np.random.seed(1988)
input_dim = 7
input_length = 5
num_channels = 10
# Define a model
model = Sequential()
model.add(Bidirectional(LSTM(num_channels,
return_sequences=False, implementation=2, recurrent_activation='sigmoid'),
input_shape=(input_length, input_dim)))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2-0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob='data', output_blob='output')
def test_medium_bidir_random_return_seq_true(self):
np.random.seed(1988)
input_dim = 7
input_length = 5
num_channels = 10
# Define a model
model = Sequential()
model.add(Bidirectional(LSTM(num_channels,
return_sequences = True, implementation = 2, recurrent_activation = 'sigmoid'),
input_shape=(input_length, input_dim)))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2-0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output')
def test_tiny_sequence_lstm(self, model_precision=_MLMODEL_FULL_PRECISION):
np.random.seed(1988)
input_dim = 1
input_length = 2
num_channels = 1
# Define a model
model = Sequential()
model.add(LSTM(num_channels, input_shape = (input_length, input_dim),
implementation = 1, recurrent_activation = 'sigmoid'))
# Set some random weights
model.set_weights([(np.random.rand(*w.shape)-0.5)*0.2 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output', delta=1e-4,
model_precision=model_precision)
def test_tiny_babi_rnn(self):
vocab_size = 10
embed_hidden_size = 8
story_maxlen = 5
query_maxlen = 5
input_tensor_1 = Input(shape=(story_maxlen,))
x1 = Embedding(vocab_size, embed_hidden_size)(input_tensor_1)
x1 = Dropout(0.3)(x1)
input_tensor_2 = Input(shape=(query_maxlen,))
x2 = Embedding(vocab_size, embed_hidden_size)(input_tensor_2)
x2 = Dropout(0.3)(x2)
x2 = LSTM(embed_hidden_size, return_sequences=False)(x2)
x2 = RepeatVector(story_maxlen)(x2)
x3 = add([x1, x2])
x3 = LSTM(embed_hidden_size, return_sequences=False)(x3)
x3 = Dropout(0.3)(x3)
x3 = Dense(vocab_size, activation='softmax')(x3)
model = Model(inputs=[input_tensor_1,input_tensor_2], outputs=[x3])
self._test_keras_model(model, one_dim_seq_flags=[True, True])
def __init__(self, nlp, expressions):
self.nlp = nlp
self.all_expressions = expressions
self.num_features = nlp.vocab.vectors_length
self.categories = list(set([expression.intent().name for expression in expressions]))
intents = [expression.intent().name for expression in expressions]
intents_indices = [self.categories.index(intent) for intent in intents]
self.x = np.zeros(shape=(len(expressions), DOCUMENT_MAX_NUM_WORDS, self.num_features)).astype('float32')
self.__init_x(self.x, expressions)
self.y = np_utils.to_categorical(intents_indices)
self.model = Sequential()
self.model.add(Bidirectional(LSTM(int(DOCUMENT_MAX_NUM_WORDS * 1.5)),
input_shape=(DOCUMENT_MAX_NUM_WORDS, self.num_features)))
self.model.add(Dropout(0.3))
self.model.add(Dense(len(self.categories), activation='sigmoid'))
self.model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
def bidirectional_lstm(len_output):
# sequence_input is a matrix of glove vectors (one for each input word)
sequence_input = Input(
shape=(MAX_SEQUENCE_LENGTH, EMBEDDING_DIM,), dtype='float32')
l_lstm = Bidirectional(LSTM(100))(sequence_input)
preds = Dense(len_output, activation='softmax')(l_lstm)
model = Model(sequence_input, preds)
model.compile(loss='categorical_crossentropy',
optimizer='rmsprop',
metrics=[utils.f1_score, 'categorical_accuracy'])
"""
model.add(Bidirectional(LSTM(shape['nr_hidden'])))
# dropout to avoid overfitting
model.add(Dropout(settings['dropout']))
model.add(Dense(shape['nr_class'], activation='sigmoid'))
model.compile(optimizer=Adam(lr=settings['lr']), loss='binary_crossentropy',
metrics=['accuracy'])
"""
return model
def get_model_41(params):
embedding_weights = pickle.load(open("../data/datasets/train_data/embedding_weights_w2v-google_MSD-AG.pk","rb"))
# main sequential model
model = Sequential()
model.add(Embedding(len(embedding_weights[0]), params['embedding_dim'], input_length=params['sequence_length'],
weights=embedding_weights))
#model.add(Dropout(params['dropout_prob'][0], input_shape=(params['sequence_length'], params['embedding_dim'])))
model.add(LSTM(2048))
#model.add(Dropout(params['dropout_prob'][1]))
model.add(Dense(output_dim=params["n_out"], init="uniform"))
model.add(Activation(params['final_activation']))
logging.debug("Output CNN: %s" % str(model.output_shape))
if params['final_activation'] == 'linear':
model.add(Lambda(lambda x :K.l2_normalize(x, axis=1)))
return model
# CRNN Arch for audio
def gen_model(input_dim=10, output_dim=8, batch_size=100):
model_LSTM = Sequential()
model_LSTM.name = "LSTM"
model_LSTM.batch_size = batch_size
model_LSTM.input_dim = input_dim
model_LSTM.output_dim = output_dim
model_LSTM.add(LSTM(input_shape=(None, input_dim), units=units, return_sequences=True))
model_LSTM.add(LSTM(units=units, return_sequences=True))
model_LSTM.add(LSTM(units=output_dim, return_sequences=True))
model_LSTM.add(Activation('sigmoid'))
sgd = Adam(lr=lr, clipnorm=clipnorm)
model_LSTM.compile(loss='binary_crossentropy', optimizer=sgd, metrics = ['binary_accuracy'], sample_weight_mode="temporal")
return model_LSTM
def subj_run(index_embedding, dataset, num_words=5000, embedding_len=100, max_len=50):
(x_train, y_train), (x_test, y_test) = ds.load_data(dataset, num_words)
x_train = sequence.pad_sequences(x_train, maxlen=max_len)
x_test = sequence.pad_sequences(x_test, maxlen=max_len)
model = Sequential()
model.add(Embedding(num_words, embedding_len, input_length=max_len, weights=[index_embedding]))
model.add(LSTM(max_len, dropout=0.5, recurrent_dropout=0.5))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy',
optimizer='adam',
metrics=['accuracy'])
print(model.summary())
model.fit(x_train, y_train, epochs=4, batch_size=50, verbose=2)
score, acc = model.evaluate(x_test, y_test, verbose=0)
print('Test score:', score)
print('Test accuracy:', acc)
def imdb_run(index_embedding, dataset, num_words=5000, embedding_len=100, max_len=500):
(x_train, y_train), (x_test, y_test) = ds.load_data(dataset, num_words)
x_train = sequence.pad_sequences(x_train, maxlen=max_len)
x_test = sequence.pad_sequences(x_test, maxlen=max_len)
model = Sequential()
model.add(Embedding(num_words, embedding_len, input_length=max_len, weights=[index_embedding]))
model.add(LSTM(100, dropout=0.2, recurrent_dropout=0.2))
model.add(Dense(1, activation='sigmoid'))
model.compile(loss='binary_crossentropy',
optimizer='adam',
metrics=['accuracy'])
print(model.summary())
model.fit(x_train, y_train, epochs=3, batch_size=64, verbose=2)
score, acc = model.evaluate(x_test, y_test, verbose=0)
print('Test score:', score)
print('Test accuracy:', acc)
def train(X_train, y_train):
model = Sequential()
model.add(LSTM(
lstm_neurons,
batch_input_shape=(batch_size, X_train.shape[1], X_train.shape[2]),
stateful=True))
model.add(Dense(1))
model.compile(loss='mean_squared_error', optimizer='adam')
for i in range(epochs):
print 'batch', i+1
model.fit(
X_train,
y_train,
epochs=1,
batch_size=batch_size,
verbose=2,
shuffle=False,
validation_split=0.33)
model.reset_states()
return model
def build(self):
dim_data = self.size_of_input_data_dim
nb_time_step = self.size_of_input_timesteps
financial_time_series_input = Input(shape=(nb_time_step, dim_data))
lstm_layer_1 = LSTM(output_dim=nb_hidden_units, dropout_U=dropout, dropout_W=dropout, inner_activation='sigmoid',
W_regularizer=l2(l2_norm_alpha), b_regularizer=l2(l2_norm_alpha), activation='tanh',
return_sequences=True)
lstm_layer_2 = LSTM(output_dim=nb_hidden_units, dropout_U=dropout, dropout_W=dropout, inner_activation='sigmoid',
W_regularizer=l2(l2_norm_alpha), b_regularizer=l2(l2_norm_alpha), activation='tanh',
return_sequences=True)
h1 = lstm_layer_1(financial_time_series_input)
h2 = lstm_layer_2(h1)
time_series_predictions = TimeDistributedDense(1)(h2)
self.model = Model(financial_time_series_input, time_series_predictions,
name="deep rnn for financial time series forecasting")
def train(train_generator,train_size,input_num,dims_num):
print("Start Train Job! ")
start=time.time()
inputs=InputLayer(input_shape=(input_num,dims_num),batch_size=batch_size)
layer1=LSTM(128)
output=Dense(2,activation="softmax",name="Output")
optimizer=Adam()
model=Sequential()
model.add(inputs)
model.add(layer1)
model.add(Dropout(0.5))
model.add(output)
call=TensorBoard(log_dir=log_dir,write_grads=True,histogram_freq=1)
model.compile(optimizer,loss="categorical_crossentropy",metrics=["accuracy"])
model.fit_generator(train_generator,steps_per_epoch=train_size//batch_size,epochs=epochs_num,callbacks=[call])
# model.fit_generator(train_generator, steps_per_epoch=5, epochs=5, callbacks=[call])
model.save(model_dir)
end=time.time()
print("Over train job in %f s"%(end-start))
def model_cnn(net_layers, input_shape):
inp = Input(shape=input_shape)
model = inp
for cl in net_layers['conv_layers']:
model = Conv2D(filters=cl[0], kernel_size=cl[1], activation='relu')(model)
if cl[4]:
model = MaxPooling2D()(model)
if cl[2]:
model = BatchNormalization()(model)
if cl[3]:
model = Dropout(0.2)(model)
model = Flatten()(model)
for dl in net_layers['dense_layers']:
model = Dense(dl[0])(model)
model = Activation('relu')(model)
if dl[1]:
model = BatchNormalization()(model)
if dl[2]:
model = Dropout(0.2)(model)
model = Dense(1)(model)
model = Activation('sigmoid')(model)
model = Model(inp, model)
return model
# %%
# LSTM architecture
# conv_layers -> [(filters, kernel_size, BatchNormaliztion, Dropout, MaxPooling)]
# dense_layers -> [(num_neurons, BatchNormaliztion, Dropout)]
def model_lstm(input_shape):
inp = Input(shape=input_shape)
model = inp
if input_shape[0] > 2: model = Conv1D(filters=24, kernel_size=(3), activation='relu')(model)
# if input_shape[0] > 0: model = TimeDistributed(Conv1D(filters=24, kernel_size=3, activation='relu'))(model)
model = LSTM(16)(model)
model = Activation('relu')(model)
model = Dropout(0.2)(model)
model = Dense(16)(model)
model = Activation('relu')(model)
model = BatchNormalization()(model)
model = Dense(1)(model)
model = Activation('sigmoid')(model)
model = Model(inp, model)
return model
# %%
# Conv-1D architecture. Just one sample as input
def test(path_test, input_size, hidden_size, batch_size, save_dir, model_name, maxlen):
db = read_data(path_test)
X = create_sequences(db, maxlen, maxlen)
y = create_sequences(db, maxlen, maxlen)
X = np.reshape(X, (X.shape[0], X.shape[1], 1))
y = np.reshape(y, (y.shape[0], y.shape[1], 1))
# build the model: 1 layer LSTM
print('Build model...')
model = Sequential()
# "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE
# note: in a situation where your input sequences have a variable length,
# use input_shape=(None, nb_feature).
model.add(LSTM(hidden_size, input_shape=(maxlen, input_size)))
# For the decoder's input, we repeat the encoded input for each time step
model.add(RepeatVector(maxlen))
# The decoder RNN could be multiple layers stacked or a single layer
model.add(LSTM(hidden_size, return_sequences=True))
# For each of step of the output sequence, decide which character should be chosen
model.add(TimeDistributed(Dense(1)))
model.load_weights(save_dir + model_name)
model.compile(loss='mae', optimizer='adam')
model.summary()
prediction = model.predict(X, batch_size, verbose=1, )
prediction = prediction.flatten()
# prediction_container = np.array(prediction).flatten()
plt.plot(prediction.flatten()[:4000], label='prediction')
plt.plot(y.flatten()[maxlen:4000 + maxlen], label='true')
plt.legend()
plt.show()
store_prediction_and_ground_truth(model)
def train_normal_model(path_train, input_size, hidden_size, batch_size, early_stopping_patience, val_percentage, save_dir, model_name, maxlen):
if not os.path.exists(save_dir):
os.mkdir(save_dir)
db = read_data(path_train)
train_x = db[:-140]
train_y = db[140:]
X = create_sequences(train_x, 140, 140)
y = create_sequences(train_y, 140, 140)
X = np.reshape(X, (X.shape[0], X.shape[1], 1))
# preparing the callbacks
check_pointer = callbacks.ModelCheckpoint(filepath=save_dir + model_name, verbose=1, save_best_only=True)
early_stop = callbacks.EarlyStopping(patience=early_stopping_patience, verbose=1)
# build the model: 1 layer LSTM
print('Build model...')
model = Sequential()
model.add(LSTM(hidden_size, return_sequences=False, input_shape=(maxlen, input_size)))
model.add(Dense(140))
model.compile(loss='mse', optimizer='adam')
model.summary()
model.fit(X, y, batch_size=batch_size, nb_epoch=100, validation_split=val_percentage,
callbacks=[check_pointer, early_stop])
return model
def train_normal_model(path_train, input_size, hidden_size, batch_size, early_stopping_patience, val_percentage,
save_dir, model_name, maxlen):
if not os.path.exists(save_dir):
os.mkdir(save_dir)
db = read_data(path_train)
train_x = db[:-maxlen]
train_y = db[maxlen:]
X = create_sequences(train_x, maxlen, maxlen)
y = create_sequences(train_y, maxlen, maxlen)
X = np.reshape(X, (X.shape[0], X.shape[1], 1))
y = np.reshape(y, (y.shape[0], y.shape[1], 1))
#
# preparing the callbacks
check_pointer = callbacks.ModelCheckpoint(filepath=save_dir + model_name, verbose=1, save_best_only=True)
early_stop = callbacks.EarlyStopping(patience=early_stopping_patience, verbose=1)
# build the model: 1 layer LSTM
print('Build model...')
model = Sequential()
# "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE
# note: in a situation where your input sequences have a variable length,
# use input_shape=(None, nb_feature).
model.add(LSTM(hidden_size, input_shape=(maxlen, input_size)))
# For the decoder's input, we repeat the encoded input for each time step
model.add(RepeatVector(maxlen))
# The decoder RNN could be multiple layers stacked or a single layer
model.add(LSTM(hidden_size, return_sequences=True))
# For each of step of the output sequence, decide which character should be chosen
model.add(TimeDistributed(Dense(1)))
model.compile(loss='mae', optimizer='adam')
model.summary()
model.fit(X, y, batch_size=batch_size, nb_epoch=50, validation_split=val_percentage,
callbacks=[check_pointer, early_stop])
return model