def test_merge_mask_2d():
from keras.layers import Input, merge, Masking
from keras.models import Model
rand = lambda *shape: np.asarray(np.random.random(shape) > 0.5, dtype='int32')
# inputs
input_a = Input(shape=(3,))
input_b = Input(shape=(3,))
# masks
masked_a = Masking(mask_value=0)(input_a)
masked_b = Masking(mask_value=0)(input_b)
# three different types of merging
merged_sum = merge([masked_a, masked_b], mode='sum')
merged_concat = merge([masked_a, masked_b], mode='concat', concat_axis=1)
merged_concat_mixed = merge([masked_a, input_b], mode='concat', concat_axis=1)
# test sum
model_sum = Model([input_a, input_b], [merged_sum])
model_sum.compile(loss='mse', optimizer='sgd')
model_sum.fit([rand(2, 3), rand(2, 3)], [rand(2, 3)], nb_epoch=1)
# test concatenation
model_concat = Model([input_a, input_b], [merged_concat])
model_concat.compile(loss='mse', optimizer='sgd')
model_concat.fit([rand(2, 3), rand(2, 3)], [rand(2, 6)], nb_epoch=1)
# test concatenation with masked and non-masked inputs
model_concat = Model([input_a, input_b], [merged_concat_mixed])
model_concat.compile(loss='mse', optimizer='sgd')
model_concat.fit([rand(2, 3), rand(2, 3)], [rand(2, 6)], nb_epoch=1)
python类Masking()的实例源码
def build_lstm(input_shape):
model = Sequential()
model.add(Masking(input_shape=input_shape, mask_value=-1.))
# model.add(GRU(128, return_sequences=True))
model.add(GRU(128, return_sequences=False))
# Add dropout if overfitting
# model.add(Dropout(0.5))
model.add(Dense(1))
model.add(Activation('sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
return model
def test_loss_masking(self):
X = np.array(
[[[1, 1], [2, 1], [3, 1], [5, 5]],
[[1, 5], [5, 0], [0, 0], [0, 0]]], dtype=np.int32)
model = Sequential()
model.add(Masking(mask_value=0, input_shape=(None, 2)))
model.add(TimeDistributedDense(1, init='one'))
model.compile(loss='mse', optimizer='sgd')
y = model.predict(X)
loss = model.fit(X, 4*y, nb_epoch=1, batch_size=2, verbose=1).history['loss'][0]
assert loss == 285.
def test_sequences(self):
"""Test masking sequences with zeroes as padding"""
# integer inputs, one per timestep, like embeddings
layer = core.Masking()
func = theano.function([layer.input], layer.get_output_mask())
self.assertTrue(np.all(
# get mask for this input
func(np.array([[[1], [2], [3], [0]],
[[0], [4], [5], [0]]], dtype=np.int32)) ==
# This is the expected output mask, one dimension less
np.array([[1, 1, 1, 0], [0, 1, 1, 0]])))
def test_non_zero(self):
"""Test masking with non-zero mask value"""
layer = core.Masking(5)
func = theano.function([layer.input], layer.get_output_mask())
self.assertTrue(np.all(
# get mask for this input, if not all the values are 5, shouldn't masked
func(np.array([[[1, 1], [2, 1], [3, 1], [5, 5]],
[[1, 5], [5, 0], [0, 0], [0, 0]]], dtype=np.int32)) ==
# This is the expected output mask, one dimension less
np.array([[1, 1, 1, 0], [1, 1, 1, 1]])))
def test_non_zero_output(self):
"""Test output of masking layer with non-zero mask value"""
layer = core.Masking(5)
func = theano.function([layer.input], layer.get_output())
self.assertTrue(np.all(
# get output for this input, replace padding with 0
func(np.array([[[1, 1], [2, 1], [3, 1], [5, 5]],
[[1, 5], [5, 0], [0, 0], [0, 0]]], dtype=np.int32)) ==
# This is the expected output
np.array([[[1, 1], [2, 1], [3, 1], [0, 0]],
[[1, 5], [5, 0], [0, 0], [0, 0]]])))
def RecurrenPT(Inputs,nclasses,dropoutRate=-1):
x_pt = Masking()(Inputs[1])
x_pt = LSTM(100)(x_pt)
x = merge( [x_pt, Inputs[0]] , mode='concat')
x = Dense(200, activation='relu',kernel_initializer='lecun_uniform')(x)
x = Dense(100, activation='relu',kernel_initializer='lecun_uniform')(x)
x = Dense(100, activation='relu',kernel_initializer='lecun_uniform')(x)
x = Dense(100, activation='relu',kernel_initializer='lecun_uniform')(x)
x = Dense(100, activation='relu',kernel_initializer='lecun_uniform')(x)
x = Dense(100, activation='relu',kernel_initializer='lecun_uniform')(x)
x = merge( [x, Inputs[2]] , mode='concat')
predictions = [Dense(2, activation='linear',init='normal')(x),Dense(nclasses, activation='softmax',kernel_initializer='lecun_uniform')(x)]
model = Model(inputs=Inputs, outputs=predictions)
return model
def model(input_shapes, nclasses):
from keras.layers import Input
from keras.layers.core import Masking
x_global = Input(shape=input_shapes[0])
x = Dense(10, activation='relu',kernel_initializer='lecun_uniform')(x_global)
for _ in range(6):
x = Dense(10, activation='relu',kernel_initializer='lecun_uniform')(x)
predictions = Dense(
nclasses, activation='softmax',
kernel_initializer='lecun_uniform',
name='classification_out'
)(x)
return Model(inputs=x_global, outputs=predictions)
def build_main_residual_network_with_lstm(batch_size,
time_step,
input_dim,
output_dim,
loop_depth=15,
rnn_layer_num = 2,
dropout=0.3):
inp = Input(shape=(time_step,input_dim))
# add mask for filter invalid data
out = TimeDistributed(Masking(mask_value=0))(inp)
# add LSTM module
for _ in range(rnn_layer_num):
out = LSTM(128,return_sequences=True)(out)
out = Conv1D(128,5)(out)
out = BatchNormalization()(out)
out = Activation('relu')(out)
out = first_block(out,(64,128),dropout=dropout)
for _ in range(loop_depth):
out = repeated_block(out,(64,128),dropout=dropout)
# add flatten
out = Flatten()(out)
out = BatchNormalization()(out)
out = Activation('relu')(out)
out = Dense(output_dim)(out)
model = Model(inp,out)
model.compile(loss='mse',optimizer='adam',metrics=['mse','mae'])
return model