def create_rnn():
"""Create a recurrent neural network to compute a control policy.
Reference:
Koutnik, Jan, Jurgen Schmidhuber, and Faustino Gomez. "Evolving deep
unsupervised convolutional networks for vision-based reinforcement
learning." Proceedings of the 2014 conference on Genetic and
evolutionary computation. ACM, 2014.
"""
model = Sequential()
model.add(SimpleRNN(output_dim=3, stateful=True, batch_input_shape=(1, 1, 3)))
model.add(Dense(input_dim=3, output_dim=3))
model.compile(loss='mse', optimizer='rmsprop')
return model
python类SimpleRNN()的实例源码
def test_regularizer(layer_class):
layer = layer_class(output_dim, return_sequences=False, weights=None,
batch_input_shape=(nb_samples, timesteps, embedding_dim),
W_regularizer=regularizers.WeightRegularizer(l1=0.01),
U_regularizer=regularizers.WeightRegularizer(l1=0.01),
b_regularizer='l2')
shape = (nb_samples, timesteps, embedding_dim)
layer.build(shape)
output = layer(K.variable(np.ones(shape)))
K.eval(output)
if layer_class == recurrent.SimpleRNN:
assert len(layer.losses) == 3
if layer_class == recurrent.GRU:
assert len(layer.losses) == 9
if layer_class == recurrent.LSTM:
assert len(layer.losses) == 12
def test_regularizer(layer_class):
layer = layer_class(output_dim, return_sequences=False, weights=None,
batch_input_shape=(nb_samples, timesteps, embedding_dim),
W_regularizer=regularizers.WeightRegularizer(l1=0.01),
U_regularizer=regularizers.WeightRegularizer(l1=0.01),
b_regularizer='l2')
shape = (nb_samples, timesteps, embedding_dim)
layer.build(shape)
output = layer(K.variable(np.ones(shape)))
K.eval(output)
if layer_class == recurrent.SimpleRNN:
assert len(layer.losses) == 3
if layer_class == recurrent.GRU:
assert len(layer.losses) == 9
if layer_class == recurrent.LSTM:
assert len(layer.losses) == 12
def rnn_test(f):
"""
All the recurrent layers share the same interface,
so we can run through them with a single function.
"""
f = keras_test(f)
return pytest.mark.parametrize("layer_class", [
recurrent.SimpleRNN,
recurrent.GRU,
recurrent.LSTM
])(f)
def naiveQN(layer,activation = 'tanh'):
model = Sequential()
model.add(SimpleRNN(layer[1], input_shape = (None, layer[0]),return_sequences=True))
for i in range(2,len(layer) - 1):
model.add(Dense(layer[i], input_dim = layer[i-1]))
model.add(Activation(activation))
model.add(Dense(layer[-1], input_dim = layer[-2]))
optimizer = RMSprop(lr = ETA, decay = 0.01)
model.compile(optimizer= optimizer, loss='mean_squared_error')
return model
def rnn_test(f):
"""
All the recurrent layers share the same interface,
so we can run through them with a single function.
"""
f = keras_test(f)
return pytest.mark.parametrize("layer_class", [
recurrent.SimpleRNN,
recurrent.GRU,
recurrent.LSTM
])(f)
def build(self, input_shape):
self.input_spec = [InputSpec(shape=input_shape)]
if self.stateful:
self.reset_states()
else:
# initial states: all-zero tensor of shape (output_dim)
self.states = [None]
input_dim = input_shape[2]
self.input_dim = input_dim
self.W = self.init((input_dim, self.output_dim),
name='{}_W'.format(self.name))
# Only change in build compared to SimpleRNN:
# U is of shape (inner_input_dim, output_dim) now.
self.U = self.inner_init((self.inner_input_dim, self.output_dim),
name='{}_U'.format(self.name))
self.b = K.zeros((self.output_dim,), name='{}_b'.format(self.name))
self.regularizers = []
if self.W_regularizer:
self.W_regularizer.set_param(self.W)
self.regularizers.append(self.W_regularizer)
if self.U_regularizer:
self.U_regularizer.set_param(self.U)
self.regularizers.append(self.U_regularizer)
if self.b_regularizer:
self.b_regularizer.set_param(self.b)
self.regularizers.append(self.b_regularizer)
self.trainable_weights = [self.W, self.U, self.b]
if self.initial_weights is not None:
self.set_weights(self.initial_weights)
del self.initial_weights
def rnn_test(f):
"""
All the recurrent layers share the same interface,
so we can run through them with a single function.
"""
f = keras_test(f)
return pytest.mark.parametrize("layer_class", [
recurrent.SimpleRNN,
recurrent.GRU,
recurrent.LSTM
])(f)
def test_simple(self):
_runner(recurrent.SimpleRNN)
def test_SimpleRNN(self):
# all recurrent layers inherit output_shape
# from the same base recurrent layer
layer = SimpleRNN(2)
input_data = np.random.random((2, 2, 3))
check_layer_output_shape(layer, input_data)
def SimpleRNNModel(self, nHidden=120, lr = 0.01):
self.rnnModel.add(SimpleRNN( nHidden, input_shape =( None, self.maxFeatures), activation='sigmoid', return_sequences=True))
self.rnnModel.add(TimeDistributedDense(self.maxFeatures))
self.rnnModel.add(Activation('softmax'))
rmsprop = RMSprop(lr=lr, rho=0.9, epsilon=1e-06)
self.rnnModel.compile(loss='categorical_crossentropy', optimizer=rmsprop)
def test_Bidirectional():
rnn = recurrent.SimpleRNN
nb_sample = 2
dim = 2
timesteps = 2
output_dim = 2
for mode in ['sum', 'concat']:
x = np.random.random((nb_sample, timesteps, dim))
target_dim = 2 * output_dim if mode == 'concat' else output_dim
y = np.random.random((nb_sample, target_dim))
# test with Sequential model
model = Sequential()
model.add(wrappers.Bidirectional(rnn(output_dim),
merge_mode=mode, input_shape=(timesteps, dim)))
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, nb_epoch=1, batch_size=1)
# test config
model.get_config()
model = model_from_json(model.to_json())
model.summary()
# test stacked bidirectional layers
model = Sequential()
model.add(wrappers.Bidirectional(rnn(output_dim, return_sequences=True),
merge_mode=mode, input_shape=(timesteps, dim)))
model.add(wrappers.Bidirectional(rnn(output_dim), merge_mode=mode))
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, nb_epoch=1, batch_size=1)
# test with functional API
input = Input((timesteps, dim))
output = wrappers.Bidirectional(rnn(output_dim), merge_mode=mode)(input)
model = Model(input, output)
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, nb_epoch=1, batch_size=1)
# Bidirectional and stateful
input = Input(batch_shape=(1, timesteps, dim))
output = wrappers.Bidirectional(rnn(output_dim, stateful=True), merge_mode=mode)(input)
model = Model(input, output)
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, nb_epoch=1, batch_size=1)
def test_Bidirectional():
rnn = recurrent.SimpleRNN
nb_sample = 2
dim = 2
timesteps = 2
output_dim = 2
for mode in ['sum', 'concat']:
x = np.random.random((nb_sample, timesteps, dim))
target_dim = 2 * output_dim if mode == 'concat' else output_dim
y = np.random.random((nb_sample, target_dim))
# test with Sequential model
model = Sequential()
model.add(wrappers.Bidirectional(rnn(output_dim),
merge_mode=mode, input_shape=(timesteps, dim)))
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, nb_epoch=1, batch_size=1)
# test config
model.get_config()
model = model_from_json(model.to_json())
model.summary()
# test stacked bidirectional layers
model = Sequential()
model.add(wrappers.Bidirectional(rnn(output_dim, return_sequences=True),
merge_mode=mode, input_shape=(timesteps, dim)))
model.add(wrappers.Bidirectional(rnn(output_dim), merge_mode=mode))
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, nb_epoch=1, batch_size=1)
# test with functional API
input = Input((timesteps, dim))
output = wrappers.Bidirectional(rnn(output_dim), merge_mode=mode)(input)
model = Model(input, output)
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, nb_epoch=1, batch_size=1)
# Bidirectional and stateful
input = Input(batch_shape=(1, timesteps, dim))
output = wrappers.Bidirectional(rnn(output_dim, stateful=True), merge_mode=mode)(input)
model = Model(input, output)
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, nb_epoch=1, batch_size=1)
def test_Bidirectional():
rnn = recurrent.SimpleRNN
nb_sample = 2
dim = 2
timesteps = 2
output_dim = 2
for mode in ['sum', 'concat']:
x = np.random.random((nb_sample, timesteps, dim))
target_dim = 2 * output_dim if mode == 'concat' else output_dim
y = np.random.random((nb_sample, target_dim))
# test with Sequential model
model = Sequential()
model.add(wrappers.Bidirectional(rnn(output_dim),
merge_mode=mode, input_shape=(timesteps, dim)))
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, nb_epoch=1, batch_size=1)
# test config
model.get_config()
model = model_from_json(model.to_json())
model.summary()
# test stacked bidirectional layers
model = Sequential()
model.add(wrappers.Bidirectional(rnn(output_dim, return_sequences=True),
merge_mode=mode, input_shape=(timesteps, dim)))
model.add(wrappers.Bidirectional(rnn(output_dim), merge_mode=mode))
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, nb_epoch=1, batch_size=1)
# test with functional API
input = Input((timesteps, dim))
output = wrappers.Bidirectional(rnn(output_dim), merge_mode=mode)(input)
model = Model(input, output)
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, nb_epoch=1, batch_size=1)
# Bidirectional and stateful
input = Input(batch_shape=(1, timesteps, dim))
output = wrappers.Bidirectional(rnn(output_dim, stateful=True), merge_mode=mode)(input)
model = Model(input, output)
model.compile(loss='mse', optimizer='sgd')
model.fit(x, y, nb_epoch=1, batch_size=1)