def create_model(self, rnn_layer):
inputs = Input(shape=(self.max_length, self.feature_size))
masked_inputs = Masking(0.0)(inputs)
outputs = RNNEncoder(
RNNCell(
rnn_layer(
self.hidden_size
),
Dense(
self.encoding_size
),
dense_dropout=0.1
)
)(masked_inputs)
model = Model(inputs, outputs)
model.compile('sgd', 'mean_squared_error')
return model
python类Masking()的实例源码
def create_model(self, rnn_layer):
inputs = Input(shape=(self.max_length, self.feature_size))
masked_inputs = Masking(0.0)(inputs)
encoded = RNNEncoder(
rnn_layer(
self.encoding_size,
)
)(masked_inputs)
outputs = RNNDecoder(
rnn_layer(
self.feature_size,
)
)(encoded)
model = Model(inputs, outputs)
model.compile('sgd', 'mean_squared_error')
return model
def test_masking_layer():
''' This test based on a previously failing issue here:
https://github.com/fchollet/keras/issues/1567
'''
I = np.random.random((6, 3, 4))
V = np.abs(np.random.random((6, 3, 5)))
V /= V.sum(axis=-1, keepdims=True)
model = Sequential()
model.add(Masking(input_shape=(3, 4)))
model.add(recurrent.LSTM(output_dim=5, return_sequences=True, unroll=False))
model.compile(loss='categorical_crossentropy', optimizer='adam')
model.fit(I, V, nb_epoch=1, batch_size=100, verbose=1)
model = Sequential()
model.add(Masking(input_shape=(3, 4)))
model.add(recurrent.LSTM(output_dim=5, return_sequences=True, unroll=True))
model.compile(loss='categorical_crossentropy', optimizer='adam')
model.fit(I, V, nb_epoch=1, batch_size=100, verbose=1)
def test_masking_layer():
''' This test based on a previously failing issue here:
https://github.com/fchollet/keras/issues/1567
'''
I = np.random.random((6, 3, 4))
V = np.abs(np.random.random((6, 3, 5)))
V /= V.sum(axis=-1, keepdims=True)
model = Sequential()
model.add(Masking(input_shape=(3, 4)))
model.add(recurrent.LSTM(output_dim=5, return_sequences=True, unroll=False))
model.compile(loss='categorical_crossentropy', optimizer='adam')
model.fit(I, V, nb_epoch=1, batch_size=100, verbose=1)
model = Sequential()
model.add(Masking(input_shape=(3, 4)))
model.add(recurrent.LSTM(output_dim=5, return_sequences=True, unroll=True))
model.compile(loss='categorical_crossentropy', optimizer='adam')
model.fit(I, V, nb_epoch=1, batch_size=100, verbose=1)
def test_masking_layer():
''' This test based on a previously failing issue here:
https://github.com/fchollet/keras/issues/1567
'''
I = np.random.random((6, 3, 4))
V = np.abs(np.random.random((6, 3, 5)))
V /= V.sum(axis=-1, keepdims=True)
model = Sequential()
model.add(Masking(input_shape=(3, 4)))
model.add(recurrent.LSTM(output_dim=5, return_sequences=True, unroll=False))
model.compile(loss='categorical_crossentropy', optimizer='adam')
model.fit(I, V, nb_epoch=1, batch_size=100, verbose=1)
model = Sequential()
model.add(Masking(input_shape=(3, 4)))
model.add(recurrent.LSTM(output_dim=5, return_sequences=True, unroll=True))
model.compile(loss='categorical_crossentropy', optimizer='adam')
model.fit(I, V, nb_epoch=1, batch_size=100, verbose=1)
def build_lstm(input_shape):
model = Sequential()
# model.add(Masking(input_shape=input_shape, mask_value=-1.))
model.add(Embedding(input_shape[0], 128, input_length=input_shape[1]))
model.add(Convolution1D(nb_filter=64,
filter_length=5,
border_mode='valid',
activation='relu',
subsample_length=1))
model.add(MaxPooling1D(pool_length=4))
model.add(GRU(128))
# model.add(GRU(128, return_sequences=False))
# Add dropout if overfitting
# model.add(Dropout(0.5))
model.add(Dense(1))
model.add(Activation('sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
return model
def build_lstm(input_shape):
model = Sequential()
# model.add(Masking(input_shape=input_shape, mask_value=-1.))
model.add(Embedding(input_shape[0], 128, input_length=input_shape[1]))
model.add(Convolution1D(nb_filter=64,
filter_length=5,
border_mode='valid',
activation='relu',
subsample_length=1))
model.add(MaxPooling1D(pool_length=model.output_shape[1]))
model.add(Flatten())
model.add(Dense(128))
# model.add(GRU(128, return_sequences=False))
# Add dropout if overfitting
# model.add(Dropout(0.5))
model.add(Dense(1))
model.add(Activation('sigmoid'))
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
return model
def base_model(input_shapes):
from keras.layers import Input
from keras.layers.core import Masking
x_global = Input(shape=input_shapes[0])
x_charged = Input(shape=input_shapes[1])
x_neutral = Input(shape=input_shapes[2])
x_ptreco = Input(shape=input_shapes[3])
lstm_c = Masking()(x_charged)
lstm_c = LSTM(100,go_backwards=True,implementation=2)(lstm_c)
lstm_n = Masking()(x_neutral)
lstm_n = LSTM(100,go_backwards=True,implementation=2)(lstm_n)
x = concatenate( [lstm_c, lstm_n, x_global] )
x = Dense(200, activation='relu',kernel_initializer='lecun_uniform')(x)
x = Dense(100, activation='relu',kernel_initializer='lecun_uniform')(x)
x = Dense(100, activation='relu',kernel_initializer='lecun_uniform')(x)
x = Dense(100, activation='relu',kernel_initializer='lecun_uniform')(x)
x = Dense(100, activation='relu',kernel_initializer='lecun_uniform')(x)
x = Dense(100, activation='relu',kernel_initializer='lecun_uniform')(x)
x = concatenate([x, x_ptreco])
return [x_global, x_charged, x_neutral, x_ptreco], x
model_zoo.py 文件源码
项目:visual_turing_test-tutorial
作者: mateuszmalinowski
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def textual_embedding(self, language_model, mask_zero):
"""
Note:
* mask_zero only makes sense if embedding is learnt
"""
if self._config.textual_embedding_dim > 0:
print('Textual Embedding is on')
language_model.add(Embedding(
self._config.input_dim,
self._config.textual_embedding_dim,
mask_zero=mask_zero))
else:
print('Textual Embedding is off')
language_model.add(Reshape(
input_shape=(self._config.max_input_time_steps, self._config.input_dim),
dims=(self._config.max_input_time_steps, self._config.input_dim)))
if mask_zero:
language_model.add(Masking(0))
return language_model
model_zoo.py 文件源码
项目:visual_turing_test-tutorial
作者: mateuszmalinowski
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def textual_embedding_fixed_length(self, language_model, mask_zero):
"""
In contrast to textual_embedding, it produces a fixed length output.
"""
if self._config.textual_embedding_dim > 0:
print('Textual Embedding with fixed length is on')
language_model.add(Embedding(
self._config.input_dim,
self._config.textual_embedding_dim,
input_length=self._config.max_input_time_steps,
mask_zero=mask_zero))
else:
print('Textual Embedding with fixed length is off')
language_model.add(Reshape(
input_shape=(self._config.max_input_time_steps, self._config.input_dim),
dims=(self._config.max_input_time_steps, self._config.input_dim)))
if mask_zero:
language_model.add(Masking(0))
return language_model
def create_model(self, rnn_layer):
inputs = Input(shape=(self.max_length, self.feature_size))
masked_inputs = Masking(0.0)(inputs)
outputs = rnn_layer(
self.encoding_size,
return_sequences=True
)(masked_inputs)
model = Model(inputs, outputs)
model.compile('sgd', 'mean_squared_error')
return model
def create_model(self, rnn_layer):
inputs = Input(shape=(self.max_length, self.feature_size))
masked_inputs = Masking(0.0)(inputs)
outputs = RNNEncoder(
rnn_layer(
self.encoding_size,
)
)(masked_inputs)
model = Model(inputs, outputs)
model.compile('sgd', 'mean_squared_error')
return model
def create_model(self, rnn_layer):
inputs = Input(shape=(self.max_length, self.feature_size))
masked_inputs = Masking(0.0)(inputs)
encoded = RNNEncoder(
RNNCell(
rnn_layer(
self.hidden_size,
),
Dense(
self.encoding_size
),
dense_dropout=0.1
)
)(masked_inputs)
outputs = RNNDecoder(
RNNCell(
rnn_layer(
self.hidden_size,
),
Dense(
self.feature_size
),
dense_dropout=0.1
)
)(encoded)
model = Model(inputs, outputs)
model.compile('sgd', 'mean_squared_error')
return model
def create_model(self):
inputs = Input(shape=(self.max_length, self.feature_size))
masked_inputs = Masking(0.0)(inputs)
encoded = RNNEncoder(
LSTM(
self.encoding_size,
return_sequences=True
)
)(masked_inputs)
outputs = Pick()(encoded)
model = Model(inputs, outputs)
model.compile('sgd', 'mean_squared_error')
return model
def test_masking():
np.random.seed(1337)
X = np.array([[[1], [1]],
[[0], [0]]])
model = Sequential()
model.add(Masking(mask_value=0, input_shape=(2, 1)))
model.add(TimeDistributedDense(1, init='one'))
model.compile(loss='mse', optimizer='sgd')
y = np.array([[[1], [1]],
[[1], [1]]])
loss = model.train_on_batch(X, y)
assert loss == 0
def test_masking():
layer_test(core.Masking,
kwargs={},
input_shape=(3, 2, 3))
def test_merge_mask_2d():
from keras.layers import Input, merge, Masking
from keras.models import Model
rand = lambda *shape: np.asarray(np.random.random(shape) > 0.5, dtype='int32')
# inputs
input_a = Input(shape=(3,))
input_b = Input(shape=(3,))
# masks
masked_a = Masking(mask_value=0)(input_a)
masked_b = Masking(mask_value=0)(input_b)
# three different types of merging
merged_sum = merge([masked_a, masked_b], mode='sum')
merged_concat = merge([masked_a, masked_b], mode='concat', concat_axis=1)
merged_concat_mixed = merge([masked_a, input_b], mode='concat', concat_axis=1)
# test sum
model_sum = Model([input_a, input_b], [merged_sum])
model_sum.compile(loss='mse', optimizer='sgd')
model_sum.fit([rand(2, 3), rand(2, 3)], [rand(2, 3)], nb_epoch=1)
# test concatenation
model_concat = Model([input_a, input_b], [merged_concat])
model_concat.compile(loss='mse', optimizer='sgd')
model_concat.fit([rand(2, 3), rand(2, 3)], [rand(2, 6)], nb_epoch=1)
# test concatenation with masked and non-masked inputs
model_concat = Model([input_a, input_b], [merged_concat_mixed])
model_concat.compile(loss='mse', optimizer='sgd')
model_concat.fit([rand(2, 3), rand(2, 3)], [rand(2, 6)], nb_epoch=1)
def build_main_residual_network(batch_size,
time_step,
input_dim,
output_dim,
loop_depth=15,
dropout=0.3):
inp = Input(shape=(time_step,input_dim))
# add mask for filter invalid data
out = TimeDistributed(Masking(mask_value=0))(inp)
out = Conv1D(128,5)(out)
out = BatchNormalization()(out)
out = Activation('relu')(out)
out = first_block(out,(64,128),dropout=dropout)
for _ in range(loop_depth):
out = repeated_block(out,(64,128),dropout=dropout)
# add flatten
out = Flatten()(out)
out = BatchNormalization()(out)
out = Activation('relu')(out)
out = Dense(output_dim)(out)
model = Model(inp,out)
model.compile(loss='mse',optimizer='adam',metrics=['mse','mae'])
return model
def build_2d_main_residual_network(batch_size,
width,
height,
channel_size,
output_dim,
loop_depth=15,
dropout=0.3):
inp = Input(shape=(width,height,channel_size))
# add mask for filter invalid data
out = TimeDistributed(Masking(mask_value=0))(inp)
out = Conv2D(128,5,data_format='channels_last')(out)
out = BatchNormalization()(out)
out = Activation('relu')(out)
out = first_2d_block(out,(64,128),dropout=dropout)
for _ in range(loop_depth):
out = repeated_2d_block(out,(64,128),dropout=dropout)
# add flatten
out = Flatten()(out)
out = BatchNormalization()(out)
out = Activation('relu')(out)
out = Dense(output_dim)(out)
model = Model(inp,out)
model.compile(loss='mse',optimizer='adam',metrics=['mse','mae'])
return model
def build_main_residual_network_with_lstm(batch_size,
time_step,
input_dim,
output_dim,
loop_depth=15,
rnn_layer_num = 2,
dropout=0.3):
inp = Input(shape=(time_step,input_dim))
# add mask for filter invalid data
out = TimeDistributed(Masking(mask_value=0))(inp)
# add LSTM module
for _ in range(rnn_layer_num):
out = LSTM(128,return_sequences=True)(out)
out = Conv1D(128,5)(out)
out = BatchNormalization()(out)
out = Activation('relu')(out)
out = first_block(out,(64,128),dropout=dropout)
for _ in range(loop_depth):
out = repeated_block(out,(64,128),dropout=dropout)
# add flatten
out = Flatten()(out)
out = BatchNormalization()(out)
out = Activation('relu')(out)
out = Dense(output_dim)(out)
model = Model(inp,out)
model.compile(loss='mse',optimizer='adam',metrics=['mse','mae'])
return model
def build_main_residual_network(batch_size,
time_step,
input_dim,
output_dim,
loop_depth=15,
dropout=0.3):
inp = Input(shape=(time_step,input_dim))
# add mask for filter invalid data
out = TimeDistributed(Masking(mask_value=0))(inp)
out = Conv1D(128,5)(out)
out = BatchNormalization()(out)
out = Activation('relu')(out)
out = first_block(out,(64,128),dropout=dropout)
for _ in range(loop_depth):
out = repeated_block(out,(64,128),dropout=dropout)
# add flatten
out = Flatten()(out)
out = BatchNormalization()(out)
out = Activation('relu')(out)
out = Dense(output_dim)(out)
model = Model(inp,out)
model.compile(loss='mse',optimizer='adam',metrics=['mse','mae'])
return model
def build_2d_main_residual_network(batch_size,
width,
height,
channel_size,
output_dim,
loop_depth=15,
dropout=0.3):
inp = Input(shape=(width,height,channel_size))
# add mask for filter invalid data
out = TimeDistributed(Masking(mask_value=0))(inp)
out = Conv2D(128,5,data_format='channels_last')(out)
out = BatchNormalization()(out)
out = Activation('relu')(out)
out = first_2d_block(out,(64,128),dropout=dropout)
for _ in range(loop_depth):
out = repeated_2d_block(out,(64,128),dropout=dropout)
# add flatten
out = Flatten()(out)
out = BatchNormalization()(out)
out = Activation('relu')(out)
out = Dense(output_dim)(out)
model = Model(inp,out)
model.compile(loss='mse',optimizer='adam',metrics=['mse','mae'])
return model
def build_2d_main_residual_network(batch_size,
width,
height,
channel_size,
output_dim,
loop_depth=15,
dropout=0.3):
inp = Input(shape=(width,height,channel_size))
# add mask for filter invalid data
out = TimeDistributed(Masking(mask_value=0))(inp)
out = Conv2D(128,5,data_format='channels_last')(out)
out = BatchNormalization()(out)
out = Activation('relu')(out)
out = first_2d_block(out,(64,128),dropout=dropout)
for _ in range(loop_depth):
out = repeated_2d_block(out,(64,128),dropout=dropout)
# add flatten
out = Flatten()(out)
out = BatchNormalization()(out)
out = Activation('relu')(out)
out = Dense(output_dim)(out)
model = Model(inp,out)
model.compile(loss='mse',optimizer='adam',metrics=['mse','mae'])
return model
def test_masking():
np.random.seed(1337)
X = np.array([[[1], [1]],
[[0], [0]]])
model = Sequential()
model.add(Masking(mask_value=0, input_shape=(2, 1)))
model.add(TimeDistributedDense(1, init='one'))
model.compile(loss='mse', optimizer='sgd')
y = np.array([[[1], [1]],
[[1], [1]]])
loss = model.train_on_batch(X, y)
assert loss == 0
def test_masking():
layer_test(core.Masking,
kwargs={},
input_shape=(3, 2, 3))
def test_merge_mask_2d():
from keras.layers import Input, merge, Masking
from keras.models import Model
rand = lambda *shape: np.asarray(np.random.random(shape) > 0.5, dtype='int32')
# inputs
input_a = Input(shape=(3,))
input_b = Input(shape=(3,))
# masks
masked_a = Masking(mask_value=0)(input_a)
masked_b = Masking(mask_value=0)(input_b)
# three different types of merging
merged_sum = merge([masked_a, masked_b], mode='sum')
merged_concat = merge([masked_a, masked_b], mode='concat', concat_axis=1)
merged_concat_mixed = merge([masked_a, input_b], mode='concat', concat_axis=1)
# test sum
model_sum = Model([input_a, input_b], [merged_sum])
model_sum.compile(loss='mse', optimizer='sgd')
model_sum.fit([rand(2, 3), rand(2, 3)], [rand(2, 3)], nb_epoch=1)
# test concatenation
model_concat = Model([input_a, input_b], [merged_concat])
model_concat.compile(loss='mse', optimizer='sgd')
model_concat.fit([rand(2, 3), rand(2, 3)], [rand(2, 6)], nb_epoch=1)
# test concatenation with masked and non-masked inputs
model_concat = Model([input_a, input_b], [merged_concat_mixed])
model_concat.compile(loss='mse', optimizer='sgd')
model_concat.fit([rand(2, 3), rand(2, 3)], [rand(2, 6)], nb_epoch=1)
def build_model(self):
assert self.seq_len>1
assert len(self.alphabet.alphabet)>0
bits_per_char = self.alphabet.nb_chars
rnn_size = bits_per_char
model = Sequential()
model.add( Masking( mask_value=0, input_shape=(self.seq_len, bits_per_char), name='input_layer' ) )
model.add( recurrent.LSTM( rnn_size, input_shape=(self.seq_len, bits_per_char), return_sequences=False ) )
model.add( Dense( units=rnn_size, activation='sigmoid') )
model.add( Dense( units=bits_per_char, activation='softmax', name='output_layer') )
model.compile(loss='categorical_crossentropy', optimizer='rmsprop', metrics=['accuracy'])
return model
def build_lstm(n_con,n_emb,vocabs_size,n_dis,emb_size,cluster_size):
hidden_size = 800
con = Sequential()
con.add(Dense(input_dim=n_con,output_dim=emb_size))
emb_list = []
for i in range(n_emb):
emb = Sequential()
emb.add(Embedding(input_dim=vocabs_size[i],output_dim=emb_size,input_length=n_dis))
emb.add(Flatten())
emb_list.append(emb)
in_dimension = 2
seq = Sequential()
seq.add(BatchNormalization(input_shape=((MAX_LENGTH,in_dimension))))
seq.add(Masking([0]*in_dimension,input_shape=(MAX_LENGTH,in_dimension)))
seq.add(LSTM(emb_size,return_sequences=False,input_shape=(MAX_LENGTH,in_dimension)))
model = Sequential()
model.add(Merge([con]+emb_list+[seq],mode='concat'))
model.add(BatchNormalization())
model.add(Dense(hidden_size,activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(cluster_size,activation='softmax'))
model.add(Lambda(caluate_point,output_shape=[2]))
return model
def test_masking():
np.random.seed(1337)
X = np.array([[[1], [1]],
[[0], [0]]])
model = Sequential()
model.add(Masking(mask_value=0, input_shape=(2, 1)))
model.add(TimeDistributedDense(1, init='one'))
model.compile(loss='mse', optimizer='sgd')
y = np.array([[[1], [1]],
[[1], [1]]])
loss = model.train_on_batch(X, y)
assert loss == 0
def test_masking():
layer_test(core.Masking,
kwargs={},
input_shape=(3, 2, 3))