def create_char_rnn_model(self, emb_dim, word_maxlen, vocab_char_size,
char_maxlen):
from keras.layers import SimpleRNN
logger.info('Building character RNN model')
input_char = Input(shape=(char_maxlen, ), name='input_char')
char_emb = Embedding(
vocab_char_size, emb_dim, mask_zero=True)(input_char)
rnn = SimpleRNN(
300,
return_sequences=True,
dropout=self.dropout,
recurrent_dropout=self.recurrent_dropout)(char_emb)
dropped = Dropout(0.5)(rnn)
mot = MeanOverTime(mask_zero=True)(dropped)
densed = Dense(self.num_outputs, name='dense')(mot)
output = Activation('sigmoid')(densed)
model = Model(inputs=input_char, outputs=output)
model.get_layer('dense').bias.set_value(self.bias)
logger.info(' Done')
return model
python类SimpleRNN()的实例源码
def test_initial_state_SimpleRNN(self):
data = np.random.rand(1, 1, 2)
model = keras.models.Sequential()
model.add(keras.layers.SimpleRNN(5, input_shape=(1, 2), batch_input_shape=[1, 1, 2], stateful=True))
model.get_layer(index=1).reset_states()
coreml_model = keras_converter.convert(model=model, input_names='data', output_names='output')
keras_output_1 = model.predict(data)
coreml_full_output_1 = coreml_model.predict({'data': data})
coreml_output_1 = coreml_full_output_1['output']
coreml_output_1 = np.expand_dims(coreml_output_1, 1)
np.testing.assert_array_almost_equal(coreml_output_1.T, keras_output_1)
hidden_state = np.random.rand(1, 5)
model.get_layer(index=1).reset_states(states=hidden_state)
coreml_model = keras_converter.convert(model=model, input_names='data', output_names='output')
spec = coreml_model.get_spec()
keras_output_2 = model.predict(data)
coreml_full_output_2 = coreml_model.predict({'data': data, spec.description.input[1].name: hidden_state[0]})
coreml_output_2 = coreml_full_output_2['output']
coreml_output_2 = np.expand_dims(coreml_output_2, 1)
np.testing.assert_array_almost_equal(coreml_output_2.T, keras_output_2)
def test_rnn_seq(self):
np.random.seed(1988)
input_dim = 11
input_length = 5
# Define a model
model = Sequential()
model.add(SimpleRNN(20, input_shape=(input_length, input_dim), return_sequences=False))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2 - 0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output')
def test_merge_mask_3d():
from keras.layers import Input, merge, Embedding, SimpleRNN
from keras.models import Model
rand = lambda *shape: np.asarray(np.random.random(shape) > 0.5, dtype='int32')
# embeddings
input_a = Input(shape=(3,), dtype='int32')
input_b = Input(shape=(3,), dtype='int32')
embedding = Embedding(3, 4, mask_zero=True)
embedding_a = embedding(input_a)
embedding_b = embedding(input_b)
# rnn
rnn = SimpleRNN(3, return_sequences=True)
rnn_a = rnn(embedding_a)
rnn_b = rnn(embedding_b)
# concatenation
merged_concat = merge([rnn_a, rnn_b], mode='concat', concat_axis=-1)
model = Model([input_a, input_b], [merged_concat])
model.compile(loss='mse', optimizer='sgd')
model.fit([rand(2, 3), rand(2, 3)], [rand(2, 3, 6)])
def test_merge_mask_3d():
from keras.layers import Input, merge, Embedding, SimpleRNN
from keras.models import Model
rand = lambda *shape: np.asarray(np.random.random(shape) > 0.5, dtype='int32')
# embeddings
input_a = Input(shape=(3,), dtype='int32')
input_b = Input(shape=(3,), dtype='int32')
embedding = Embedding(3, 4, mask_zero=True)
embedding_a = embedding(input_a)
embedding_b = embedding(input_b)
# rnn
rnn = SimpleRNN(3, return_sequences=True)
rnn_a = rnn(embedding_a)
rnn_b = rnn(embedding_b)
# concatenation
merged_concat = merge([rnn_a, rnn_b], mode='concat', concat_axis=-1)
model = Model([input_a, input_b], [merged_concat])
model.compile(loss='mse', optimizer='sgd')
model.fit([rand(2, 3), rand(2, 3)], [rand(2, 3, 6)])
def setUp(self):
super(TestRNNEncoderWithSimpleRNNClass, self).setUp()
self.model = self.create_model(SimpleRNN)
def setUp(self):
super(TestRNNEncoderWithRNNCellSimpleRNNClass, self).setUp()
self.model = self.create_model(SimpleRNN)
def setUp(self):
super(TestRNNCellWithSimpleRNNClass, self).setUp()
self.model = self.create_model(SimpleRNN)
def setUp(self):
super(TestRNNDecoderWithSimpleRNNClass, self).setUp()
self.model = self.create_model(SimpleRNN)
def setUp(self):
super(TestRNNDecoderWithRNNCellSimpleRNNClass, self).setUp()
self.model = self.create_model(SimpleRNN)
test_bidirectional_rnn_encoder.py 文件源码
项目:yoctol-keras-layer-zoo
作者: Yoctol
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def setUp(self):
super(TestBidirectionalRNNEncoderWithRNNCellSimpleRNNClass, self).setUp()
self.model = self.create_model(SimpleRNN)
test_rnn_decoder_with_decoding_size.py 文件源码
项目:yoctol-keras-layer-zoo
作者: Yoctol
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def setUp(self):
super(TestRNNDecoderWithDecodingSizeSimpleRNNClass, self).setUp()
self.model = self.create_model(SimpleRNN)
self.max_length = self.decoding_length
def test_merge_mask_3d():
from keras.layers import Input, merge, Embedding, SimpleRNN
from keras.models import Model
rand = lambda *shape: np.asarray(np.random.random(shape) > 0.5, dtype='int32')
# embeddings
input_a = Input(shape=(3,), dtype='int32')
input_b = Input(shape=(3,), dtype='int32')
embedding = Embedding(3, 4, mask_zero=True)
embedding_a = embedding(input_a)
embedding_b = embedding(input_b)
# rnn
rnn = SimpleRNN(3, return_sequences=True)
rnn_a = rnn(embedding_a)
rnn_b = rnn(embedding_b)
# concatenation
merged_concat = merge([rnn_a, rnn_b], mode='concat', concat_axis=-1)
model = Model([input_a, input_b], [merged_concat])
model.compile(loss='mse', optimizer='sgd')
model.fit([rand(2, 3), rand(2, 3)], [rand(2, 3, 6)])
def build_stateful_lstm_model(batch_size,time_step,input_dim,output_dim,dropout=0.2,rnn_layer_num=2,hidden_dim=128,hidden_num=0,rnn_type='LSTM'):
model = Sequential()
# may use BN for accelerating speed
# add first LSTM
if rnn_type == 'LSTM':
rnn_cell = LSTM
elif rnn_type == 'GRU':
rnn_cell = GRU
elif rnn_type == 'SimpleRNN':
rnn_cell = SimpleRNN
else:
raise ValueError('Option rnn_type could only be configured as LSTM, GRU or SimpleRNN')
model.add(rnn_cell(hidden_dim,return_sequences=True,batch_input_shape=(batch_size,time_step,input_dim)))
for _ in range(rnn_layer_num-2):
model.add(rnn_cell(hidden_dim, return_sequence=True))
# prevent over fitting
model.add(Dropout(dropout))
model.add(rnn_cell(hidden_dim,return_sequences=False))
# add hidden layer
for _ in range(hidden_num):
model.add(Dense(hidden_dim))
model.add(Dropout(dropout))
model.add(Dense(output_dim))
rmsprop = RMSprop(lr=0.01)
adam = Adam(lr=0.01)
model.compile(loss='mse',metrics=['acc'],optimizer=rmsprop)
return model
def test_keras_import(self):
model = Sequential()
model.add(LSTM(64, return_sequences=True, input_shape=(10, 64)))
model.add(SimpleRNN(32, return_sequences=True))
model.add(GRU(10, kernel_regularizer=regularizers.l2(0.01),
bias_regularizer=regularizers.l2(0.01), recurrent_regularizer=regularizers.l2(0.01),
activity_regularizer=regularizers.l2(0.01), kernel_constraint='max_norm',
bias_constraint='max_norm', recurrent_constraint='max_norm'))
model.build()
json_string = Model.to_json(model)
with open(os.path.join(settings.BASE_DIR, 'media', 'test.json'), 'w') as out:
json.dump(json.loads(json_string), out, indent=4)
sample_file = open(os.path.join(settings.BASE_DIR, 'media', 'test.json'), 'r')
response = self.client.post(reverse('keras-import'), {'file': sample_file})
response = json.loads(response.content)
layerId = sorted(response['net'].keys())
self.assertEqual(response['result'], 'success')
self.assertGreaterEqual(len(response['net'][layerId[1]]['params']), 7)
self.assertGreaterEqual(len(response['net'][layerId[3]]['params']), 7)
self.assertGreaterEqual(len(response['net'][layerId[6]]['params']), 7)
# ********** Embedding Layers **********
def test_keras_export(self):
tests = open(os.path.join(settings.BASE_DIR, 'tests', 'unit', 'keras_app',
'keras_export_test.json'), 'r')
response = json.load(tests)
tests.close()
net = yaml.safe_load(json.dumps(response['net']))
net = {'l0': net['Input2'], 'l1': net['RNN']}
net['l0']['connection']['output'].append('l1')
# # net = get_shapes(net)
inp = data(net['l0'], '', 'l0')['l0']
net = recurrent(net['l1'], [inp], 'l1')
model = Model(inp, net['l1'])
self.assertEqual(model.layers[1].__class__.__name__, 'SimpleRNN')
def create_word_rnn_model(self, emb_dim, emb_path, vocab_word,
vocab_word_size, word_maxlen):
from keras.layers import SimpleRNN
logger.info('Building word SimpleRNN model')
input_word = Input(shape=(word_maxlen, ), name='input_word')
word_emb = Embedding(
vocab_word_size, emb_dim, mask_zero=True,
name='word_emb')(input_word)
rnn = SimpleRNN(
300,
return_sequences=True,
dropout=self.dropout,
recurrent_dropout=self.recurrent_dropout)(word_emb)
dropped = Dropout(0.5)(rnn)
mot = MeanOverTime(mask_zero=True)(dropped)
densed = Dense(self.num_outputs, name='dense')(mot)
output = Activation('sigmoid')(densed)
model = Model(inputs=input_word, outputs=output)
model.get_layer('dense').bias.set_value(self.bias)
if emb_path:
from emb_reader import EmbReader as EmbReader
logger.info('Initializing lookup table')
emb_reader = EmbReader(emb_path, emb_dim=emb_dim)
model.get_layer('word_emb').embeddings.set_value(
emb_reader.get_emb_matrix_given_vocab(
vocab_word,
model.get_layer('word_emb').embeddings.get_value()))
logger.info(' Done')
return model
def test_delete_channels_simplernn(channel_index):
layer = SimpleRNN(9, return_sequences=True)
recursive_test_helper(layer, channel_index)
def test_simple_rnn(self):
"""
Test the conversion of a simple RNN layer.
"""
from keras.layers import SimpleRNN
# Create a simple Keras model
model = Sequential()
model.add(SimpleRNN(32, input_dim=32, input_length=10))
input_names = ['input']
output_names = ['output']
spec = keras.convert(model, input_names, output_names).get_spec()
self.assertIsNotNone(spec)
# Test the model class
self.assertIsNotNone(spec.description)
self.assertTrue(spec.HasField('neuralNetwork'))
# Test the inputs and outputs
self.assertEquals(len(spec.description.input), len(input_names) + 1)
self.assertEquals(input_names[0], spec.description.input[0].name)
self.assertEquals(32, spec.description.input[1].type.multiArrayType.shape[0])
self.assertEquals(len(spec.description.output), len(output_names) + 1)
self.assertEquals(output_names[0], spec.description.output[0].name)
self.assertEquals(32, spec.description.output[0].type.multiArrayType.shape[0])
self.assertEquals(32, spec.description.output[1].type.multiArrayType.shape[0])
# Test the layer parameters.
layers = spec.neuralNetwork.layers
layer_0 = layers[0]
self.assertIsNotNone(layer_0.simpleRecurrent)
self.assertEquals(len(layer_0.input), 2)
self.assertEquals(len(layer_0.output), 2)
def test_simple_rnn(self):
"""
Test the conversion of a simple RNN layer.
"""
from keras.layers import SimpleRNN
# Create a simple Keras model
model = Sequential()
model.add(SimpleRNN(32, input_shape=(10,32)))
input_names = ['input']
output_names = ['output']
spec = keras.convert(model, input_names, output_names).get_spec()
self.assertIsNotNone(spec)
# Test the model class
self.assertIsNotNone(spec.description)
self.assertTrue(spec.HasField('neuralNetwork'))
# Test the inputs and outputs
self.assertEquals(len(spec.description.input), len(input_names) + 1)
self.assertEquals(input_names[0], spec.description.input[0].name)
self.assertEquals(32, spec.description.input[1].type.multiArrayType.shape[0])
self.assertEquals(len(spec.description.output), len(output_names) + 1)
self.assertEquals(output_names[0], spec.description.output[0].name)
self.assertEquals(32, spec.description.output[0].type.multiArrayType.shape[0])
self.assertEquals(32, spec.description.output[1].type.multiArrayType.shape[0])
# Test the layer parameters.
layers = spec.neuralNetwork.layers
layer_0 = layers[0]
self.assertIsNotNone(layer_0.simpleRecurrent)
self.assertEquals(len(layer_0.input), 2)
self.assertEquals(len(layer_0.output), 2)
def test_SimpleRNN(self):
params = dict(
input_dims=[1, 2, 100], go_backwards=False, activation='tanh',
stateful=False, unroll=False, return_sequences=True, output_dim=4 # Passes for < 3
),
model = Sequential()
if keras.__version__[:2] == '2.':
model.add(SimpleRNN(units=params[0]['output_dim'],
input_shape=(params[0]['input_dims'][1],params[0]['input_dims'][2]),
activation=params[0]['activation'],
return_sequences=params[0]['return_sequences'],
go_backwards=params[0]['go_backwards'],
unroll=True,
))
else:
model.add(SimpleRNN(output_dim=params[0]['output_dim'],
input_length=params[0]['input_dims'][1],
input_dim=params[0]['input_dims'][2],
activation=params[0]['activation'],
return_sequences=params[0]['return_sequences'],
go_backwards=params[0]['go_backwards'],
unroll=True,
))
relative_error, keras_preds, coreml_preds = simple_model_eval(params, model)
for i in range(len(relative_error)):
self.assertLessEqual(relative_error[i], 0.01)
def test_tiny_sequence_simple_rnn_random(self):
np.random.seed(1988)
input_dim = 2
input_length = 4
num_channels = 3
# Define a model
model = Sequential()
model.add(SimpleRNN(num_channels, input_shape=(input_length, input_dim)))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2 - 0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output')
def test_tiny_seq2seq_rnn_random(self):
np.random.seed(1988)
input_dim = 2
input_length = 4
num_channels = 3
# Define a model
model = Sequential()
model.add(SimpleRNN(num_channels, input_shape=(input_length, input_dim), return_sequences=True))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2 - 0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output')
def test_rnn_seq_backwards(self):
np.random.seed(1988)
input_dim = 11
input_length = 5
# Define a model
model = Sequential()
model.add(SimpleRNN(20, input_shape=(input_length, input_dim), return_sequences=False, go_backwards=True))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2 - 0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output')
def test_medium_no_sequence_simple_rnn_random(self):
np.random.seed(1988)
input_dim = 10
input_length = 1
num_channels = 10
# Define a model
model = Sequential()
model.add(SimpleRNN(num_channels, input_shape=(input_length, input_dim)))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2 - 0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output')
def cnn3_full1_rnn1():
img_input = Input(shape=(120, 160, 3), name="img_input")
x = Convolution2D(8, 3, 3)(img_input)
x = Activation('relu')(x)
x = MaxPooling2D(pool_size=(2, 2))(x)
x = Convolution2D(16, 3, 3)(x)
x = Activation('relu')(x)
x = MaxPooling2D(pool_size=(2, 2))(x)
x = Convolution2D(32, 3, 3)(x)
x = Activation('relu')(x)
x = MaxPooling2D(pool_size=(2, 2))(x)
merged = Flatten()(x)
x = Dense(256)(merged)
x = Activation('linear')(x)
x = Dropout(.2)(x)
x = Reshape((1, 256))(merged)
x = SimpleRNN(256, activation='linear')(x)
throttle_out = Dense(1, name="throttle_out")(x)
angle_out = Dense(1, name="angle_out")(x)
model = Model(input=[img_input], output=[angle_out])
model.compile(optimizer='adam', loss='mean_squared_error')
return model
def define_sequence_model(self):
seed = 12345
np.random.seed(seed)
# add hidden layers
for i in range(self.n_layers):
if i == 0:
input_size = self.n_in
else:
input_size = self.hidden_layer_size[i - 1]
if self.hidden_layer_type[i]=='rnn':
self.model.add(SimpleRNN(
units=self.hidden_layer_size[i],
input_shape=(None, input_size),
return_sequences=True))
elif self.hidden_layer_type[i]=='gru':
self.model.add(GRU(
units=self.hidden_layer_size[i],
input_shape=(None, input_size),
return_sequences=True))
elif self.hidden_layer_type[i]=='lstm':
self.model.add(LSTM(
units=self.hidden_layer_size[i],
input_shape=(None, input_size),
return_sequences=True))
elif self.hidden_layer_type[i]=='blstm':
self.model.add(LSTM(
units=self.hidden_layer_size[i],
input_shape=(None, input_size),
return_sequences=True,
go_backwards=True))
else:
self.model.add(Dense(
units=self.hidden_layer_size[i],
activation=self.hidden_layer_type[i],
kernel_initializer="normal",
input_shape=(None, input_size)))
# add output layer
self.final_layer = self.model.add(Dense(
units=self.n_out,
input_dim=self.hidden_layer_size[-1],
kernel_initializer='normal',
activation=self.output_type.lower()))
# Compile the model
self.compile_model()
def build_stateful_lstm_model_with_normalization(batch_size,
time_step,
input_dim,
output_dim,
dropout=0.2,
rnn_layer_num=2,
hidden_dim=128,
hidden_num=0,
rnn_type='LSTM'):
model = Sequential()
# may use BN for accelerating speed
# add first LSTM
if rnn_type == 'LSTM':
rnn_cell = LSTM
elif rnn_type == 'GRU':
rnn_cell = GRU
elif rnn_type == 'SimpleRNN':
rnn_cell = SimpleRNN
else:
raise ValueError('Option rnn_type could only be configured as LSTM, GRU or SimpleRNN')
model.add(rnn_cell(hidden_dim,return_sequences=True,batch_input_shape=(batch_size,time_step,input_dim)))
model.add(BatchNormalization())
for _ in range(rnn_layer_num-2):
model.add(rnn_cell(hidden_dim, return_sequence=True))
# prevent over fitting
model.add(Dropout(dropout))
model.add(BatchNormalization())
model.add(rnn_cell(hidden_dim,return_sequences=False))
# add hidden layer
for _ in range(hidden_num):
model.add(Dense(hidden_dim))
model.add(Dropout(dropout))
model.add(Dense(output_dim))
rmsprop = RMSprop(lr=0.01)
adam = Adam(lr=0.01)
model.compile(loss='mse',metrics=['acc'],optimizer=rmsprop)
return model
def build_real_stateful_lstm_model_with_normalization(batch_size,
time_step,
input_dim,
output_dim,
dropout=0.2,
rnn_layer_num=2,
hidden_dim=128,
hidden_num=0,
rnn_type='LSTM'):
model = Sequential()
# may use BN for accelerating speed
# add first LSTM
if rnn_type == 'LSTM':
rnn_cell = LSTM
elif rnn_type == 'GRU':
rnn_cell = GRU
elif rnn_type == 'SimpleRNN':
rnn_cell = SimpleRNN
else:
raise ValueError('Option rnn_type could only be configured as LSTM, GRU or SimpleRNN')
model.add(rnn_cell(hidden_dim,stateful=True,return_sequences=True,batch_input_shape=(batch_size,time_step,input_dim)))
model.add(BatchNormalization())
for _ in range(rnn_layer_num-2):
model.add(rnn_cell(hidden_dim,stateful=True, return_sequence=True))
# prevent over fitting
model.add(Dropout(dropout))
model.add(BatchNormalization())
model.add(rnn_cell(hidden_dim,stateful=True,return_sequences=False))
# add hidden layer
for _ in range(hidden_num):
model.add(Dense(hidden_dim))
model.add(Dropout(dropout))
model.add(Dense(output_dim))
rmsprop = RMSprop(lr=0.01)
adam = Adam(lr=0.01)
model.compile(loss='mse',metrics=['acc'],optimizer=rmsprop)
return model
def test_rnn_layer(self):
i = 0
numerical_err_models = []
shape_err_models = []
numerical_failiure = 0
for base_params in self.base_layer_params:
base_params = dict(zip(self.params_dict.keys(), base_params))
for rnn_params in self.rnn_layer_params:
rnn_params = dict(zip(self.simple_rnn_params_dict.keys(), rnn_params))
model = Sequential()
model.add(
SimpleRNN(
base_params['output_dim'],
input_length=base_params['input_dims'][1],
input_dim=base_params['input_dims'][2],
activation=base_params['activation'],
return_sequences=base_params['return_sequences'],
go_backwards=base_params['go_backwards'],
unroll=base_params['unroll'],
)
)
mlkitmodel = get_mlkit_model_from_path(model)
input_data = generate_input(base_params['input_dims'][0], base_params['input_dims'][1],
base_params['input_dims'][2])
keras_preds = model.predict(input_data).flatten()
if K.tensorflow_backend._SESSION:
import tensorflow as tf
tf.reset_default_graph()
K.tensorflow_backend._SESSION.close()
K.tensorflow_backend._SESSION = None
input_data = np.transpose(input_data, [1, 0, 2])
coreml_preds = mlkitmodel.predict({'data': input_data})['output'].flatten()
try:
self.assertEquals(coreml_preds.shape, keras_preds.shape)
except AssertionError:
print("Shape error:\nbase_params: {}\nkeras_preds.shape: {}\ncoreml_preds.shape: {}".format(
base_params, keras_preds.shape, coreml_preds.shape))
shape_err_models.append(base_params)
i += 1
continue
try:
max_denominator = np.maximum(np.maximum(np.abs(coreml_preds), np.abs(keras_preds)), 1.0)
relative_error = coreml_preds / max_denominator - keras_preds / max_denominator
for i in range(len(relative_error)):
self.assertLessEqual(relative_error[i], 0.01)
except AssertionError:
print("Assertion error:\nbase_params: {}\nkeras_preds: {}\ncoreml_preds: {}".format(base_params,
keras_preds,
coreml_preds))
numerical_failiure += 1
numerical_err_models.append(base_params)
i += 1
self.assertEquals(shape_err_models, [], msg='Shape error models {}'.format(shape_err_models))
self.assertEquals(numerical_err_models, [], msg='Numerical error models {}\n'
'Total numerical failiures: {}/{}\n'.format(
numerical_err_models,
numerical_failiure, i)
)