def test_medium_no_sequence_lstm_random(self):
np.random.seed(1988)
input_dim = 10
input_length = 1
num_channels = 10
# Define a model
model = Sequential()
model.add(LSTM(num_channels, input_shape = (input_length, input_dim),
recurrent_activation = 'sigmoid'))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2-0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output')
python类add()的实例源码
def test_tiny_no_sequence_lstm_zeros_gpu(self):
np.random.seed(1988)
input_dim = 1
input_length = 1
num_channels = 1
# Define a model
model = Sequential()
model.add(LSTM(num_channels, input_shape = (input_length, input_dim),
implementation = 2, recurrent_activation = 'sigmoid'))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2-0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, mode = 'zeros', input_blob = 'data', output_blob = 'output')
def test_small_no_sequence_lstm_random(self):
np.random.seed(1988)
input_dim = 10
input_length = 1
num_channels = 1
# Define a model
model = Sequential()
model.add(LSTM(num_channels, input_shape = (input_length, input_dim),
implementation = 2, recurrent_activation = 'sigmoid'))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2-0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output')
def test_small_no_sequence_gru_random(self):
np.random.seed(1988)
input_dim = 10
input_length = 1
num_channels = 1
# Define a model
model = Sequential()
model.add(GRU(num_channels, input_shape = (input_length, input_dim),
recurrent_activation = 'sigmoid'))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2-0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output')
def test_medium_no_sequence_gru_random(self,
model_precision=_MLMODEL_FULL_PRECISION):
np.random.seed(1988)
input_dim = 10
input_length = 1
num_channels = 10
# Define a model
model = Sequential()
model.add(GRU(num_channels, input_shape = (input_length, input_dim), recurrent_activation = 'sigmoid'))
# Set some random weights
model.set_weights([np.random.rand(*w.shape) for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob='data', output_blob='output',
model_precision=model_precision)
def test_tiny_no_sequence_bidir_random(self,
model_precision=_MLMODEL_FULL_PRECISION):
np.random.seed(1988)
input_dim = 1
input_length = 1
num_channels = 1
num_samples = 1
# Define a model
model = Sequential()
model.add(Bidirectional(LSTM(num_channels,
implementation = 1, recurrent_activation = 'sigmoid'),
input_shape=(input_length, input_dim)))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2-0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output',
model_precision=model_precision)
def test_small_no_sequence_bidir_random(self):
np.random.seed(1988)
input_dim = 10
input_length = 1
num_channels = 1
# Define a model
model = Sequential()
model.add(Bidirectional(LSTM(num_channels,
implementation = 2, recurrent_activation = 'sigmoid'),
input_shape=(input_length, input_dim)))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2-0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output')
def test_medium_no_sequence_bidir_random(self):
np.random.seed(1988)
input_dim = 10
input_length = 1
num_channels = 10
# Define a model
model = Sequential()
model.add(Bidirectional(LSTM(num_channels,
implementation = 2, recurrent_activation = 'sigmoid'),
input_shape=(input_length, input_dim)))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2-0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output')
def test_medium_bidir_random_return_seq_false(self):
np.random.seed(1988)
input_dim = 7
input_length = 5
num_channels = 10
# Define a model
model = Sequential()
model.add(Bidirectional(LSTM(num_channels,
return_sequences=False, implementation=2, recurrent_activation='sigmoid'),
input_shape=(input_length, input_dim)))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2-0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob='data', output_blob='output')
def test_medium_bidir_random_return_seq_true(self):
np.random.seed(1988)
input_dim = 7
input_length = 5
num_channels = 10
# Define a model
model = Sequential()
model.add(Bidirectional(LSTM(num_channels,
return_sequences = True, implementation = 2, recurrent_activation = 'sigmoid'),
input_shape=(input_length, input_dim)))
# Set some random weights
model.set_weights([np.random.rand(*w.shape)*0.2-0.1 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output')
def test_tiny_add_random(self):
np.random.seed(1988)
input_dim = 10
num_channels = 6
# Define a model
input_tensor = Input(shape = (input_dim, ))
x1 = Dense(num_channels)(input_tensor)
x2 = Dense(num_channels)(x1)
x3 = Dense(num_channels)(x1)
x4 = add([x2, x3])
x5 = Dense(num_channels)(x4)
model = Model(inputs=[input_tensor], outputs=[x5])
# Set some random weights
model.set_weights([np.random.rand(*w.shape) for w in model.get_weights()])
# Get the coreml model
self._test_keras_model(model)
def test_tiny_conv_dense_random(self):
np.random.seed(1988)
num_samples = 1
input_dim = 8
input_shape = (input_dim, input_dim, 3)
num_kernels = 2
kernel_height = 5
kernel_width = 5
hidden_dim = 4
# Define a model
model = Sequential()
model.add(Conv2D(input_shape = input_shape,
filters = num_kernels, kernel_size=(kernel_height, kernel_width)))
model.add(Dropout(0.5))
model.add(Flatten())
model.add(Dense(hidden_dim))
# Set some random weights
model.set_weights([np.random.rand(*w.shape) for w in model.get_weights()])
# Get the coreml model
self._test_keras_model(model)
def test_tiny_conv_dropout_random(self):
np.random.seed(1988)
num_samples = 1
input_dim = 8
input_shape = (input_dim, input_dim, 3)
num_kernels = 2
kernel_height = 5
kernel_width = 5
hidden_dim = 4
# Define a model
model = Sequential()
model.add(Conv2D(input_shape = input_shape,
filters = num_kernels, kernel_size=(kernel_height, kernel_width)))
model.add(SpatialDropout2D(0.5))
model.add(Flatten())
model.add(Dense(hidden_dim))
# Set some random weights
model.set_weights([np.random.rand(*w.shape) for w in model.get_weights()])
# Get the coreml model
self._test_keras_model(model)
def test_tiny_sequence_lstm(self, model_precision=_MLMODEL_FULL_PRECISION):
np.random.seed(1988)
input_dim = 1
input_length = 2
num_channels = 1
# Define a model
model = Sequential()
model.add(LSTM(num_channels, input_shape = (input_length, input_dim),
implementation = 1, recurrent_activation = 'sigmoid'))
# Set some random weights
model.set_weights([(np.random.rand(*w.shape)-0.5)*0.2 for w in model.get_weights()])
# Test the keras model
self._test_keras_model(model, input_blob = 'data', output_blob = 'output', delta=1e-4,
model_precision=model_precision)
def test_activation_layer_params(self):
options = dict(
activation = ['tanh', 'relu', 'sigmoid', 'softmax', 'softplus', 'softsign', 'hard_sigmoid', 'elu']
)
# Define a function that tests a model
num_channels = 10
input_dim = 10
def build_model(x):
model = Sequential()
model.add(Dense(num_channels, input_dim = input_dim))
model.add(Activation(**dict(zip(options.keys(), x))))
return x, model
# Iterate through all combinations
product = itertools.product(*options.values())
args = [build_model(p) for p in product]
# Test the cases
print("Testing a total of %s cases. This could take a while" % len(args))
for param, model in args:
model.set_weights([np.random.rand(*w.shape) for w in model.get_weights()])
self._run_test(model, param)
def test_dense_layer_params(self):
options = dict(
activation = ['relu', 'softmax', 'tanh', 'sigmoid', 'softplus', 'softsign', 'elu','hard_sigmoid'],
use_bias = [True, False],
)
# Define a function that tests a model
input_shape = (10,)
num_channels = 10
def build_model(x):
kwargs = dict(zip(options.keys(), x))
model = Sequential()
model.add(Dense(num_channels, input_shape = input_shape, **kwargs))
return x, model
# Iterate through all combinations
product = itertools.product(*options.values())
args = [build_model(p) for p in product]
# Test the cases
print("Testing a total of %s cases. This could take a while" % len(args))
for param, model in args:
self._run_test(model, param)
def test_conv_layer_params(self, model_precision=_MLMODEL_FULL_PRECISION):
options = dict(
activation = ['relu', 'tanh', 'sigmoid'], # keras does not support softmax on 4-D
use_bias = [True, False],
padding = ['same', 'valid'],
filters = [1, 3, 5],
kernel_size = [[5,5]], # fails when sizes are different
)
# Define a function that tests a model
input_shape = (10, 10, 1)
def build_model(x):
kwargs = dict(zip(options.keys(), x))
model = Sequential()
model.add(Conv2D(input_shape = input_shape, **kwargs))
return x, model
# Iterate through all combinations
product = itertools.product(*options.values())
args = [build_model(p) for p in product]
# Test the cases
print("Testing a total of %s cases. This could take a while" % len(args))
for param, model in args:
self._run_test(model, param, model_precision=model_precision)
def test_dense_elementwise_params(self):
options = dict(
modes = [add, multiply, concatenate, average, maximum]
)
def build_model(mode):
x1 = Input(shape=(3,))
x2 = Input(shape=(3,))
y1 = Dense(4)(x1)
y2 = Dense(4)(x2)
z = mode([y1, y2])
model = Model([x1,x2], z)
return mode, model
product = itertools.product(*options.values())
args = [build_model(p[0]) for p in product]
print("Testing a total of %s cases. This could take a while" % len(args))
for param, model in args:
self._run_test(model, param)
def test_tiny_babi_rnn(self):
vocab_size = 10
embed_hidden_size = 8
story_maxlen = 5
query_maxlen = 5
input_tensor_1 = Input(shape=(story_maxlen,))
x1 = Embedding(vocab_size, embed_hidden_size)(input_tensor_1)
x1 = Dropout(0.3)(x1)
input_tensor_2 = Input(shape=(query_maxlen,))
x2 = Embedding(vocab_size, embed_hidden_size)(input_tensor_2)
x2 = Dropout(0.3)(x2)
x2 = LSTM(embed_hidden_size, return_sequences=False)(x2)
x2 = RepeatVector(story_maxlen)(x2)
x3 = add([x1, x2])
x3 = LSTM(embed_hidden_size, return_sequences=False)(x3)
x3 = Dropout(0.3)(x3)
x3 = Dense(vocab_size, activation='softmax')(x3)
model = Model(inputs=[input_tensor_1,input_tensor_2], outputs=[x3])
self._test_keras_model(model, one_dim_seq_flags=[True, True])
def ResXceptionBlock(input, size):
# residual component
r = Conv2D(size, (1, 1), strides=(2, 2),
padding='same', use_bias=False)(input)
r = BatchNormalization()(r)
# depth-wise separable conv
x = SeparableConv2D(size, (3, 3), padding='same',
kernel_regularizer=l2(0.01),
use_bias=False)(input)
x = BatchNormalization()(x)
x = Activation('relu')(x)
x = SeparableConv2D(size, (3, 3), padding='same',
kernel_regularizer=l2(0.01),
use_bias=False)(x)
x = BatchNormalization()(x)
x = MaxPooling2D((3, 3), strides=(2, 2), padding='same')(x)
# sum the two components
output = add([x, r])
return output
def _shortcut(inputs, x):
# shortcut path
_, inputs_w, inputs_h, inputs_ch = K.int_shape(inputs)
_, x_w, x_h, x_ch = K.int_shape(x)
stride_w = int(round(inputs_w / x_w))
stride_h = int(round(inputs_h / x_h))
equal_ch = inputs_ch == x_ch
if stride_w>1 or stride_h>1 or not equal_ch:
shortcut = Conv2D(x_ch, (1, 1),
strides = (stride_w, stride_h),
kernel_initializer = init, padding = 'valid')(inputs)
else:
shortcut = inputs
merged = Add()([shortcut, x])
return merged
def ResidualBlock1D_helper(layers, kernel_size, filters, final_stride=1):
def f(_input):
basic = _input
for ln in range(layers):
#basic = BatchNormalization()( basic ) # triggers known keras bug w/ TimeDistributed: https://github.com/fchollet/keras/issues/5221
basic = ELU()(basic)
basic = Conv1D(filters, kernel_size, kernel_initializer='he_normal',
kernel_regularizer=l2(1.e-4), padding='same')(basic)
# note that this strides without averaging
return AveragePooling1D(pool_size=1, strides=final_stride)(Add()([_input, basic]))
return f
def deflating_convolution(inputs, n_deflation_layers, n_filters_init=32, noise=None, name_prefix=None):
def add_linear_noise(x, eps, ind):
flattened_deflated = Reshape((-1,), name=name_prefix + '_conv_flatten_{}'.format(ind))(x)
deflated_shape = ker.int_shape(x)
deflated_size = deflated_shape[1] * deflated_shape[2] * deflated_shape[3]
noise_transformed = Dense(deflated_size, activation=None,
name=name_prefix + '_conv_noise_dense_{}'.format(ind))(eps)
added_noise = Add(name=name_prefix + '_conv_add_noise_{}'.format(ind))([noise_transformed, flattened_deflated])
x = Reshape((deflated_shape[1], deflated_shape[2], deflated_shape[3]),
name=name_prefix + '_conv_backreshape_{}'.format(ind))(added_noise)
return x
deflated = Conv2D(filters=n_filters_init, kernel_size=(5, 5), strides=(2, 2),
padding='same', activation='relu', name=name_prefix + '_conv_0')(inputs)
if noise is not None:
deflated = add_linear_noise(deflated, noise, 0)
for i in range(1, n_deflation_layers):
deflated = Conv2D(filters=n_filters_init * (2**i), kernel_size=(5, 5), strides=(2, 2),
padding='same', activation='relu', name=name_prefix + '_conv_{}'.format(i))(deflated)
# if noise is not None:
# deflated = add_linear_noise(deflated, noise, i)
return deflated
def synthetic_adaptive_prior_discriminator(data_dim, latent_dim):
data_input = Input(shape=(data_dim,), name='disc_internal_data_input')
# center the data around 0 in [-1, 1] as it is in [0, 1].
centered_data = Lambda(lambda x: 2 * x - 1, name='disc_centering_data_input')(data_input)
discriminator_body_data = repeat_dense(centered_data, n_layers=2, n_units=256, name_prefix='disc_body_data')
theta = Dense(4*256, activation='relu', name='disc_theta')(discriminator_body_data)
discriminator_body_data_t = repeat_dense(centered_data, n_layers=2, n_units=256, name_prefix='disc_body_data_t')
discriminator_body_data_t = Dense(1, activation=None, name='disc_data_squash')(discriminator_body_data_t)
latent_input = Input(shape=(latent_dim,), name='disc_internal_latent_input')
discriminator_body_latent = repeat_dense(latent_input, n_layers=2, n_units=256, name_prefix='disc_body_latent')
sigma = Dense(4*256, activation='relu', name='disc_sigma')(discriminator_body_latent)
discriminator_body_latent_t = repeat_dense(latent_input, n_layers=2, n_units=256, name_prefix='disc_body_latent_t')
discriminator_body_latent_t = Dense(1, activation=None, name='disc_latent_squash')(discriminator_body_latent_t)
merged_data_latent = Multiply(name='disc_mul_sigma_theta')([theta, sigma])
merged_data_latent = Lambda(lambda x: ker.sum(x, axis=-1), name='disc_add_activ_sig_the')(merged_data_latent)
discriminator_output = Add(name='disc_add_data_latent_t')([discriminator_body_data_t,
discriminator_body_latent_t,
merged_data_latent])
collapsed_noise = Lambda(lambda x: 0.5 * ker.sum(x ** 2, axis=-1), name='disc_noise_addition')(latent_input)
discriminator_output = Add(name='disc_add_all_toghether')([discriminator_output, collapsed_noise])
discriminator_model = Model(inputs=[data_input, latent_input], outputs=discriminator_output,
name='disc_internal_model')
return discriminator_model
def mnist_adaptive_prior_discriminator(data_dim, latent_dim):
data_input = Input(shape=(data_dim,), name='disc_internal_data_input')
# center the data around 0 in [-1, 1] as it is in [0, 1].
centered_data = Lambda(lambda x: 2 * x - 1, name='disc_centering_data_input')(data_input)
discriminator_body_data = repeat_dense(centered_data, n_layers=3, n_units=512, name_prefix='disc_body_data')
theta = Dense(4*512, activation='relu', name='disc_theta')(discriminator_body_data)
discriminator_body_data_t = repeat_dense(centered_data, n_layers=3, n_units=512, name_prefix='disc_body_data_t')
discriminator_body_data_t = Dense(1, activation=None, name='disc_data_squash')(discriminator_body_data_t)
latent_input = Input(shape=(latent_dim,), name='disc_internal_latent_input')
discriminator_body_latent = repeat_dense(latent_input, n_layers=3, n_units=512, name_prefix='disc_body_latent')
sigma = Dense(4*512, activation='relu', name='disc_sigma')(discriminator_body_latent)
discriminator_body_latent_t = repeat_dense(latent_input, n_layers=3, n_units=512, name_prefix='disc_body_latent_t')
discriminator_body_latent_t = Dense(1, activation=None, name='disc_latent_squash')(discriminator_body_latent_t)
merged_data_latent = Multiply(name='disc_mul_sigma_theta')([theta, sigma])
merged_data_latent = Lambda(lambda x: ker.sum(x, axis=-1), name='disc_add_activ_sig_the')(merged_data_latent)
discriminator_output = Add(name='disc_add_data_latent_t')([discriminator_body_data_t,
discriminator_body_latent_t,
merged_data_latent])
collapsed_noise = Lambda(lambda x: 0.5 * ker.sum(x**2, axis=-1), name='disc_noise_addition')(latent_input)
discriminator_output = Add(name='disc_add_all_toghether')([discriminator_output, collapsed_noise])
discriminator_model = Model(inputs=[data_input, latent_input], outputs=discriminator_output,
name='disc_internal_model')
return discriminator_model
def sample_standard_normal_noise(inputs, **kwargs):
from keras.backend import shape, random_normal
n_samples = kwargs.get('n_samples', shape(inputs)[0])
n_basis_noise_vectors = kwargs.get('n_basis', -1)
data_dim = kwargs.get('data_dim', 1)
noise_dim = kwargs.get('noise_dim', data_dim)
seed = kwargs.get('seed', 7)
if n_basis_noise_vectors > 0:
samples_isotropic = random_normal(shape=(n_samples, n_basis_noise_vectors, noise_dim),
mean=0, stddev=1, seed=seed)
else:
samples_isotropic = random_normal(shape=(n_samples, noise_dim),
mean=0, stddev=1, seed=seed)
op_mode = kwargs.get('mode', 'none')
if op_mode == 'concatenate':
concat = Concatenate(axis=1, name='enc_noise_concatenation')([inputs, samples_isotropic])
return concat
elif op_mode == 'add':
resized_noise = Dense(data_dim, activation=None, name='enc_resized_noise_sampler')(samples_isotropic)
added_noise_data = Add(name='enc_adding_noise_data')([inputs, resized_noise])
return added_noise_data
return samples_isotropic
def mnist_encoder_simple(data_dim, noise_dim, latent_dim=8):
data_input = Input(shape=(data_dim,), name='enc_internal_data_input')
noise_input = Input(shape=(noise_dim,), name='enc_internal_noise_input')
# center the input around 0
# centered_data = Lambda(lambda x: 2 * x - 1, name='enc_centering_data_input')(data_input)
# concat_input = Concatenate(axis=-1, name='enc_noise_data_concat')([centered_data, noise_input])
enc_body = repeat_dense(data_input, n_layers=2, n_units=256, activation='relu', name_prefix='enc_body')
enc_output = Dense(100, activation='relu', name='enc_dense_before_latent')(enc_body)
enc_output = Dense(latent_dim, name='enc_latent_features')(enc_output)
noise_resized = Dense(latent_dim, activation=None, name='enc_noise_resizing')(noise_input)
enc_output = Add(name='enc_add_noise_data')([enc_output, noise_resized])
latent_factors = Model(inputs=[data_input, noise_input], outputs=enc_output, name='enc_internal_model')
return latent_factors
def test_delete_channels_merge_others(channel_index, data_format):
layer_test_helper_merge_2d(Add(), channel_index, data_format)
layer_test_helper_merge_2d(Multiply(), channel_index, data_format)
layer_test_helper_merge_2d(Average(), channel_index, data_format)
layer_test_helper_merge_2d(Maximum(), channel_index, data_format)
def identity_block(input_tensor, kernel_size, filters, stage, block, trainable=True):
nb_filter1, nb_filter2, nb_filter3 = filters
if K.image_dim_ordering() == 'tf':
bn_axis = 3
else:
bn_axis = 1
conv_name_base = 'res' + str(stage) + block + '_branch'
bn_name_base = 'bn' + str(stage) + block + '_branch'
x = Convolution2D(nb_filter1, (1, 1), name=conv_name_base + '2a', trainable=trainable)(input_tensor)
x = FixedBatchNormalization(axis=bn_axis, name=bn_name_base + '2a')(x)
x = Activation('relu')(x)
x = Convolution2D(nb_filter2, (1, kernel_size), padding='same', name=conv_name_base + '2b', trainable=trainable)(x)
x = FixedBatchNormalization(axis=bn_axis, name=bn_name_base + '2b')(x)
x = Activation('relu')(x)
x = Convolution2D(nb_filter3, (1, 1), name=conv_name_base + '2c', trainable=trainable)(x)
x = FixedBatchNormalization(axis=bn_axis, name=bn_name_base + '2c')(x)
x = Add()([x, input_tensor])
x = Activation('relu')(x)
return x
def identity_block_td(input_tensor, kernel_size, filters, stage, block, trainable=True):
# identity block time distributed
nb_filter1, nb_filter2, nb_filter3 = filters
if K.image_dim_ordering() == 'tf':
bn_axis = 3
else:
bn_axis = 1
conv_name_base = 'res' + str(stage) + block + '_branch'
bn_name_base = 'bn' + str(stage) + block + '_branch'
x = TimeDistributed(Convolution2D(nb_filter1, (1, 1), trainable=trainable, kernel_initializer='normal'), name=conv_name_base + '2a')(input_tensor)
x = TimeDistributed(FixedBatchNormalization(axis=bn_axis), name=bn_name_base + '2a')(x)
x = Activation('relu')(x)
x = TimeDistributed(Convolution2D(nb_filter2, (1, kernel_size), trainable=trainable, kernel_initializer='normal',padding='same'), name=conv_name_base + '2b')(x)
x = TimeDistributed(FixedBatchNormalization(axis=bn_axis), name=bn_name_base + '2b')(x)
x = Activation('relu')(x)
x = TimeDistributed(Convolution2D(nb_filter3, (1, 1), trainable=trainable, kernel_initializer='normal'), name=conv_name_base + '2c')(x)
x = TimeDistributed(FixedBatchNormalization(axis=bn_axis), name=bn_name_base + '2c')(x)
x = Add()([x, input_tensor])
x = Activation('relu')(x)
return x