def test_sequential_model_saving_2():
# test with custom optimizer, loss
custom_opt = optimizers.rmsprop
custom_loss = objectives.mse
model = Sequential()
model.add(Dense(2, input_dim=3))
model.add(Dense(3))
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
x = np.random.random((1, 3))
y = np.random.random((1, 3))
model.train_on_batch(x, y)
out = model.predict(x)
_, fname = tempfile.mkstemp('.h5')
save_model(model, fname)
model = load_model(fname,
custom_objects={'custom_opt': custom_opt,
'custom_loss': custom_loss})
os.remove(fname)
out2 = model.predict(x)
assert_allclose(out, out2, atol=1e-05)
python类mse()的实例源码
def test_sequential_model_saving_2():
# test with custom optimizer, loss
custom_opt = optimizers.rmsprop
custom_loss = objectives.mse
model = Sequential()
model.add(Dense(2, input_dim=3))
model.add(Dense(3))
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
x = np.random.random((1, 3))
y = np.random.random((1, 3))
model.train_on_batch(x, y)
out = model.predict(x)
_, fname = tempfile.mkstemp('.h5')
save_model(model, fname)
model = load_model(fname,
custom_objects={'custom_opt': custom_opt,
'custom_loss': custom_loss})
os.remove(fname)
out2 = model.predict(x)
assert_allclose(out, out2, atol=1e-05)
def test_sequential_model_saving_2():
# test with custom optimizer, loss
custom_opt = optimizers.rmsprop
custom_loss = objectives.mse
model = Sequential()
model.add(Dense(2, input_dim=3))
model.add(Dense(3))
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
x = np.random.random((1, 3))
y = np.random.random((1, 3))
model.train_on_batch(x, y)
out = model.predict(x)
_, fname = tempfile.mkstemp('.h5')
save_model(model, fname)
model = load_model(fname,
custom_objects={'custom_opt': custom_opt,
'custom_loss': custom_loss})
os.remove(fname)
out2 = model.predict(x)
assert_allclose(out, out2, atol=1e-05)
def build_keras_model_sg(index_size,vector_size,
context_size,
#code_dim,
sub_batch_size=256,
learn_vectors=True,learn_hidden=True,
model=None):
kerasmodel = Graph()
kerasmodel.add_input(name='point' , input_shape=(1,), dtype=int)
kerasmodel.add_input(name='index' , input_shape=(1,), dtype=int)
kerasmodel.add_node(Embedding(index_size, vector_size, input_length=sub_batch_size,weights=[model.syn0]),name='embedding', input='index')
kerasmodel.add_node(Embedding(context_size, vector_size, input_length=sub_batch_size,weights=[model.keras_syn1]),name='embedpoint', input='point')
kerasmodel.add_node(Lambda(lambda x:x.sum(2)) , name='merge',inputs=['embedding','embedpoint'], merge_mode='mul')
kerasmodel.add_node(Activation('sigmoid'), name='sigmoid', input='merge')
kerasmodel.add_output(name='code',input='sigmoid')
kerasmodel.compile('rmsprop', {'code':'mse'})
return kerasmodel
def build_keras_model_cbow(index_size,vector_size,
context_size,
#code_dim,
sub_batch_size=1,
model=None,cbow_mean=False):
kerasmodel = Graph()
kerasmodel.add_input(name='point' , input_shape=(sub_batch_size,), dtype='int')
kerasmodel.add_input(name='index' , input_shape=(1,), dtype='int')
kerasmodel.add_node(Embedding(index_size, vector_size, weights=[model.syn0]),name='embedding', input='index')
kerasmodel.add_node(Embedding(context_size, vector_size, input_length=sub_batch_size,weights=[model.keras_syn1]),name='embedpoint', input='point')
if cbow_mean:
kerasmodel.add_node(Lambda(lambda x:x.mean(1),output_shape=(vector_size,)),name='average',input='embedding')
else:
kerasmodel.add_node(Lambda(lambda x:x.sum(1),output_shape=(vector_size,)),name='average',input='embedding')
kerasmodel.add_node(Activation('sigmoid'), name='sigmoid',inputs=['average','embedpoint'], merge_mode='dot',dot_axes=-1)
kerasmodel.add_output(name='code',input='sigmoid')
kerasmodel.compile('rmsprop', {'code':'mse'})
return kerasmodel
def vae_loss(x, x_hat):
kl_loss = - 0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
xent_loss = n * objectives.binary_crossentropy(x, x_hat)
mse_loss = n * objectives.mse(x, x_hat)
if use_loss == 'xent':
return xent_loss + kl_loss
elif use_loss == 'mse':
return mse_loss + kl_loss
else:
raise Expception, 'Nonknow loss!'
def report(self,train_data,
test_data=None,
train_data_to=None,
test_data_to=None,
batch_size=1000,
**kwargs):
test_data = train_data if test_data is None else test_data
train_data_to = train_data if train_data_to is None else train_data_to
test_data_to = test_data if test_data_to is None else test_data_to
opts = {'verbose':0,'batch_size':batch_size}
def test_both(msg, fn):
print(msg.format(fn(train_data)))
if test_data is not None:
print((msg+" (validation)").format(fn(test_data)))
self.autoencoder.compile(optimizer='adam', loss=mse)
test_both("Reconstruction MSE: {}",
lambda data: self.autoencoder.evaluate(data,data,**opts))
test_both("Reconstruction MSE (gaussian 0.3): {}",
lambda data: self.autoencoder.evaluate(gaussian(data),data,**opts))
test_both("Reconstruction MSE (salt 0.06): {}",
lambda data: self.autoencoder.evaluate(salt(data),data,**opts))
test_both("Reconstruction MSE (pepper 0.06): {}",
lambda data: self.autoencoder.evaluate(pepper(data),data,**opts))
# self.autoencoder.compile(optimizer=optimizer, loss=bce)
# test_both("Reconstruction BCE: {}",
# lambda data: self.autoencoder.evaluate(data,data,**opts))
# test_both("Noise reconstruction BCE (gaussian 0.3): {}",
# lambda data: self.autoencoder.evaluate(gaussian(data),data,**opts))
# test_both("Noise reconstruction BCE (salt 0.1): {}",
# lambda data: self.autoencoder.evaluate(salt(data),data,**opts))
# test_both("Noise reconstruction BCE (pepper 0.1): {}",
# lambda data: self.autoencoder.evaluate(pepper(data),data,**opts))
test_both("Latent activation: {}",
lambda data: self.encode_binary(train_data,batch_size=batch_size,).mean())
return self
def test_saving_without_compilation():
model = Sequential()
model.add(Dense(2, input_dim=3))
model.add(Dense(3))
model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
_, fname = tempfile.mkstemp('.h5')
save_model(model, fname)
model = load_model(fname)
os.remove(fname)
def test_saving_right_after_compilation():
model = Sequential()
model.add(Dense(2, input_dim=3))
model.add(Dense(3))
model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
model.model._make_train_function()
_, fname = tempfile.mkstemp('.h5')
save_model(model, fname)
model = load_model(fname)
os.remove(fname)
def test_saving_without_compilation():
model = Sequential()
model.add(Dense(2, input_dim=3))
model.add(Dense(3))
model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
_, fname = tempfile.mkstemp('.h5')
save_model(model, fname)
model = load_model(fname)
os.remove(fname)
def test_saving_right_after_compilation():
model = Sequential()
model.add(Dense(2, input_dim=3))
model.add(Dense(3))
model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
model.model._make_train_function()
_, fname = tempfile.mkstemp('.h5')
save_model(model, fname)
model = load_model(fname)
os.remove(fname)
def vae_loss(input_motion, x):
mse_loss = objectives.mse(input_motion, x)
kl_loss = - 0.5 * K.mean(1 + z_log_std - K.square(z_mean) - K.exp(z_log_std))
return mse_loss + 0.5 * kl_loss
def test_saving_without_compilation():
model = Sequential()
model.add(Dense(2, input_dim=3))
model.add(Dense(3))
model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
_, fname = tempfile.mkstemp('.h5')
save_model(model, fname)
model = load_model(fname)
os.remove(fname)
def test_saving_right_after_compilation():
model = Sequential()
model.add(Dense(2, input_dim=3))
model.add(Dense(3))
model.compile(loss='mse', optimizer='sgd', metrics=['acc'])
model.model._make_train_function()
_, fname = tempfile.mkstemp('.h5')
save_model(model, fname)
model = load_model(fname)
os.remove(fname)
def vae_loss(x, x_hat):
kl_loss = - 0.5 * K.sum(1 + z_log_var - K.square(z_mean) - K.exp(z_log_var), axis=-1)
xent_loss = n * objectives.binary_crossentropy(x, x_hat)
mse_loss = n * objectives.mse(x, x_hat)
if use_loss == 'xent':
return xent_loss + kl_loss
elif use_loss == 'mse':
return mse_loss + kl_loss
else:
raise Expception, 'Nonknow loss!'
def test_loading_weights_by_name():
"""
test loading model weights by name on:
- sequential model
"""
# test with custom optimizer, loss
custom_opt = optimizers.rmsprop
custom_loss = objectives.mse
# sequential model
model = Sequential()
model.add(Dense(2, input_dim=3, name="rick"))
model.add(Dense(3, name="morty"))
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
x = np.random.random((1, 3))
y = np.random.random((1, 3))
model.train_on_batch(x, y)
out = model.predict(x)
old_weights = [layer.get_weights() for layer in model.layers]
_, fname = tempfile.mkstemp('.h5')
model.save_weights(fname)
# delete and recreate model
del(model)
model = Sequential()
model.add(Dense(2, input_dim=3, name="rick"))
model.add(Dense(3, name="morty"))
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
# load weights from first model
model.load_weights(fname, by_name=True)
os.remove(fname)
out2 = model.predict(x)
assert_allclose(out, out2, atol=1e-05)
for i in range(len(model.layers)):
new_weights = model.layers[i].get_weights()
for j in range(len(new_weights)):
assert_allclose(old_weights[i][j], new_weights[j], atol=1e-05)
def test_loading_weights_by_name_2():
"""
test loading model weights by name on:
- both sequential and functional api models
- different architecture with shared names
"""
# test with custom optimizer, loss
custom_opt = optimizers.rmsprop
custom_loss = objectives.mse
# sequential model
model = Sequential()
model.add(Dense(2, input_dim=3, name="rick"))
model.add(Dense(3, name="morty"))
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
x = np.random.random((1, 3))
y = np.random.random((1, 3))
model.train_on_batch(x, y)
out = model.predict(x)
old_weights = [layer.get_weights() for layer in model.layers]
_, fname = tempfile.mkstemp('.h5')
model.save_weights(fname)
# delete and recreate model using Functional API
del(model)
data = Input(shape=(3,))
rick = Dense(2, name="rick")(data)
jerry = Dense(3, name="jerry")(rick) # add 2 layers (but maintain shapes)
jessica = Dense(2, name="jessica")(jerry)
morty = Dense(3, name="morty")(jessica)
model = Model(input=[data], output=[morty])
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
# load weights from first model
model.load_weights(fname, by_name=True)
os.remove(fname)
out2 = model.predict(x)
assert np.max(np.abs(out - out2)) > 1e-05
rick = model.layers[1].get_weights()
jerry = model.layers[2].get_weights()
jessica = model.layers[3].get_weights()
morty = model.layers[4].get_weights()
assert_allclose(old_weights[0][0], rick[0], atol=1e-05)
assert_allclose(old_weights[0][1], rick[1], atol=1e-05)
assert_allclose(old_weights[1][0], morty[0], atol=1e-05)
assert_allclose(old_weights[1][1], morty[1], atol=1e-05)
assert_allclose(np.zeros_like(jerry[1]), jerry[1]) # biases init to 0
assert_allclose(np.zeros_like(jessica[1]), jessica[1]) # biases init to 0
def test_loading_weights_by_name():
"""
test loading model weights by name on:
- sequential model
"""
# test with custom optimizer, loss
custom_opt = optimizers.rmsprop
custom_loss = objectives.mse
# sequential model
model = Sequential()
model.add(Dense(2, input_dim=3, name="rick"))
model.add(Dense(3, name="morty"))
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
x = np.random.random((1, 3))
y = np.random.random((1, 3))
model.train_on_batch(x, y)
out = model.predict(x)
old_weights = [layer.get_weights() for layer in model.layers]
_, fname = tempfile.mkstemp('.h5')
model.save_weights(fname)
# delete and recreate model
del(model)
model = Sequential()
model.add(Dense(2, input_dim=3, name="rick"))
model.add(Dense(3, name="morty"))
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
# load weights from first model
model.load_weights(fname, by_name=True)
os.remove(fname)
out2 = model.predict(x)
assert_allclose(out, out2, atol=1e-05)
for i in range(len(model.layers)):
new_weights = model.layers[i].get_weights()
for j in range(len(new_weights)):
assert_allclose(old_weights[i][j], new_weights[j], atol=1e-05)
def test_loading_weights_by_name_2():
"""
test loading model weights by name on:
- both sequential and functional api models
- different architecture with shared names
"""
# test with custom optimizer, loss
custom_opt = optimizers.rmsprop
custom_loss = objectives.mse
# sequential model
model = Sequential()
model.add(Dense(2, input_dim=3, name="rick"))
model.add(Dense(3, name="morty"))
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
x = np.random.random((1, 3))
y = np.random.random((1, 3))
model.train_on_batch(x, y)
out = model.predict(x)
old_weights = [layer.get_weights() for layer in model.layers]
_, fname = tempfile.mkstemp('.h5')
model.save_weights(fname)
# delete and recreate model using Functional API
del(model)
data = Input(shape=(3,))
rick = Dense(2, name="rick")(data)
jerry = Dense(3, name="jerry")(rick) # add 2 layers (but maintain shapes)
jessica = Dense(2, name="jessica")(jerry)
morty = Dense(3, name="morty")(jessica)
model = Model(input=[data], output=[morty])
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
# load weights from first model
model.load_weights(fname, by_name=True)
os.remove(fname)
out2 = model.predict(x)
assert np.max(np.abs(out - out2)) > 1e-05
rick = model.layers[1].get_weights()
jerry = model.layers[2].get_weights()
jessica = model.layers[3].get_weights()
morty = model.layers[4].get_weights()
assert_allclose(old_weights[0][0], rick[0], atol=1e-05)
assert_allclose(old_weights[0][1], rick[1], atol=1e-05)
assert_allclose(old_weights[1][0], morty[0], atol=1e-05)
assert_allclose(old_weights[1][1], morty[1], atol=1e-05)
assert_allclose(np.zeros_like(jerry[1]), jerry[1]) # biases init to 0
assert_allclose(np.zeros_like(jessica[1]), jessica[1]) # biases init to 0
# a function to be called from the Lambda layer
def test_loading_weights_by_name():
"""
test loading model weights by name on:
- sequential model
"""
# test with custom optimizer, loss
custom_opt = optimizers.rmsprop
custom_loss = objectives.mse
# sequential model
model = Sequential()
model.add(Dense(2, input_dim=3, name="rick"))
model.add(Dense(3, name="morty"))
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
x = np.random.random((1, 3))
y = np.random.random((1, 3))
model.train_on_batch(x, y)
out = model.predict(x)
old_weights = [layer.get_weights() for layer in model.layers]
_, fname = tempfile.mkstemp('.h5')
model.save_weights(fname)
# delete and recreate model
del(model)
model = Sequential()
model.add(Dense(2, input_dim=3, name="rick"))
model.add(Dense(3, name="morty"))
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
# load weights from first model
model.load_weights(fname, by_name=True)
os.remove(fname)
out2 = model.predict(x)
assert_allclose(out, out2, atol=1e-05)
for i in range(len(model.layers)):
new_weights = model.layers[i].get_weights()
for j in range(len(new_weights)):
assert_allclose(old_weights[i][j], new_weights[j], atol=1e-05)
def test_loading_weights_by_name_2():
"""
test loading model weights by name on:
- both sequential and functional api models
- different architecture with shared names
"""
# test with custom optimizer, loss
custom_opt = optimizers.rmsprop
custom_loss = objectives.mse
# sequential model
model = Sequential()
model.add(Dense(2, input_dim=3, name="rick"))
model.add(Dense(3, name="morty"))
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
x = np.random.random((1, 3))
y = np.random.random((1, 3))
model.train_on_batch(x, y)
out = model.predict(x)
old_weights = [layer.get_weights() for layer in model.layers]
_, fname = tempfile.mkstemp('.h5')
model.save_weights(fname)
# delete and recreate model using Functional API
del(model)
data = Input(shape=(3,))
rick = Dense(2, name="rick")(data)
jerry = Dense(3, name="jerry")(rick) # add 2 layers (but maintain shapes)
jessica = Dense(2, name="jessica")(jerry)
morty = Dense(3, name="morty")(jessica)
model = Model(input=[data], output=[morty])
model.compile(loss=custom_loss, optimizer=custom_opt(), metrics=['acc'])
# load weights from first model
model.load_weights(fname, by_name=True)
os.remove(fname)
out2 = model.predict(x)
assert np.max(np.abs(out - out2)) > 1e-05
rick = model.layers[1].get_weights()
jerry = model.layers[2].get_weights()
jessica = model.layers[3].get_weights()
morty = model.layers[4].get_weights()
assert_allclose(old_weights[0][0], rick[0], atol=1e-05)
assert_allclose(old_weights[0][1], rick[1], atol=1e-05)
assert_allclose(old_weights[1][0], morty[0], atol=1e-05)
assert_allclose(old_weights[1][1], morty[1], atol=1e-05)
assert_allclose(np.zeros_like(jerry[1]), jerry[1]) # biases init to 0
assert_allclose(np.zeros_like(jessica[1]), jessica[1]) # biases init to 0
# a function to be called from the Lambda layer
def build_keras_model_dbow(index_size,vector_size,
#vocab_size,
context_size,
sub_batch_size=1,
doctag_vectors=None,
hidden_vectors=None,
learn_doctags=True,
learn_hidden=True,
model=None,
):
"""
>>> index_size=3
>>> vector_size=2
>>> context_siz=3
>>> sub_batch_size=2
>>> doctag_vectors=np.array([[-1.1,2.2],[-3.2,-4.3],[-1.1,-1.4]],'float32')
>>> hidden_vectors=np.array([[-1,2],[3,4],[5,6]],'float32')
>>> kerasmodel=build_keras_model_dbow(index_size=3,vector_size=2,context_size=3,sub_batch_size=2,doctag_vectors=doctag_vectors,hidden_vectors=hidden_vectors)
>>> ind=[[0,1],[1,0]]
>>> ipt=[[0,1],[1,2]]
>>> tmp1=kerasmodel.predict({'index':np.array(ind),'point':np.array(ipt)})['code']
>>> tmp2=np.array([np.sum(doctag_vectors[ind[i]]*hidden_vectors[ipt[i]], axis=1) for i in range(2)])
>>> np.linalg.norm(1/(1+np.exp(-tmp2))-tmp1) < 0.001
True
"""
kerasmodel = Graph()
kerasmodel.add_input(name='point' , input_shape=(sub_batch_size,), dtype=int)
kerasmodel.add_input(name='index' , input_shape=(sub_batch_size,), dtype=int)
if hidden_vectors is None :
kerasmodel.add_node(Embedding(context_size, vector_size, input_length=sub_batch_size, ),name='embedpoint', input='point')
else:
kerasmodel.add_node(Embedding(context_size, vector_size, input_length=sub_batch_size, weights=[hidden_vectors]),name='embedpoint', input='point')
if doctag_vectors is None :
kerasmodel.add_node(Embedding(index_size , vector_size, input_length=sub_batch_size, ),name='embedindex' , input='index')
else:
kerasmodel.add_node(Embedding(index_size , vector_size, input_length=sub_batch_size, weights=[doctag_vectors]),name='embedindex' , input='index')
kerasmodel.add_node(Lambda(lambda x:x.sum(2)) , name='merge',inputs=['embedindex','embedpoint'], merge_mode='mul')
kerasmodel.add_node(Activation('sigmoid'), name='sigmoid', input='merge')
kerasmodel.add_output(name='code',input='sigmoid')
kerasmodel.compile('rmsprop', {'code':'mse'})
return kerasmodel
def build_keras_model_dm(index_size,vector_size,vocab_size,
context_size,
maxwords,
cbow_mean=False,
learn_doctags=True, learn_words=True, learn_hidden=True,
model=None ,
word_vectors=None,doctag_vectors=None,hidden_vectors=None,
sub_batch_size=1
):
"""
>>> word_vectors=np.array([[1,2],[3,4],[5,6]])
>>> doctag_vectors=np.array([[10,20],[30,40]])
>>> hidden_vectors=np.array([[1,0],[0,1]])
>>> sub_batch_size=2
>>> kerasmodel=build_keras_model_dm(index_size=2,vector_size=2,vocab_size=3,context_size=2,maxwords=2,sub_batch_size=sub_batch_size,word_vectors=word_vectors,doctag_vectors=doctag_vectors,hidden_vectors=hidden_vectors, learn_words=True )
>>> ind=[[0],[1]]
>>> iwd=[[1,0],[1,1]]
>>> ipt=[[1,0],[0,1]]
>>> tmp1=kerasmodel.predict({'index':np.array(ind),'iword':np.array(iwd),'point':np.array(ipt)})['code']
>>> tmp2=np.array([ [(word_vectors[iwd[i]].sum(0)+doctag_vectors[i]).dot(hidden_vectors[j]) for j in ipt[i] ] for i in range(2)])
>>> np.linalg.norm(1/(1+np.exp(-tmp2))-tmp1) < 0.001
True
"""
kerasmodel = Graph()
kerasmodel.add_input(name='index',input_shape=(1,) , dtype=int)
if doctag_vectors is None :
kerasmodel.add_node(Embedding(index_size, vector_size,trainable=learn_doctags,input_length=1 ),name='embedindex', input='index')
else:
kerasmodel.add_node(Embedding(index_size, vector_size,trainable=learn_doctags,input_length=1 ,weights=[doctag_vectors]),name='embedindex', input='index')
kerasmodel.add_input(name='iword',input_shape=(maxwords,), dtype=int)
if word_vectors is None :
kerasmodel.add_node(Embedding(vocab_size, vector_size,trainable=learn_words ,input_length=maxwords ),name='embedword', input='iword')
else:
kerasmodel.add_node(Embedding(vocab_size, vector_size,trainable=learn_words ,input_length=maxwords,weights=[word_vectors ]),name='embedword', input='iword')
kerasmodel.add_input(name='point',input_shape=(sub_batch_size,) , dtype=int)
if hidden_vectors is None :
kerasmodel.add_node(Embedding(context_size, vector_size,trainable=learn_hidden ,input_length=sub_batch_size ),name='embedpoint', input='point')
else:
kerasmodel.add_node(Embedding(context_size, vector_size,trainable=learn_hidden ,input_length=sub_batch_size ,weights=[hidden_vectors]),name='embedpoint', input='point')
if cbow_mean:
kerasmodel.add_node(Lambda(lambda x:x.mean(1),output_shape=(vector_size,)), name='merge',inputs=['embedindex','embedword'], merge_mode='concat', concat_axis=1)
else:
kerasmodel.add_node(Lambda(lambda x:x.sum(1),output_shape=(vector_size,)), name='merge',inputs=['embedindex','embedword'], merge_mode='concat', concat_axis=1)
kerasmodel.add_node(Activation('sigmoid'), name='sigmoid',inputs=['merge','embedpoint'], merge_mode='dot',dot_axes=-1)
kerasmodel.add_output(name='code',input='sigmoid')
kerasmodel.compile('rmsprop', {'code':'mse'})
return kerasmodel
def build_keras_model_dm_concat(index_size,vector_size,vocab_size,
#code_dim,
context_size,
window_size,
learn_doctags=True, learn_words=True, learn_hidden=True,
model=None ,
word_vectors=None,doctag_vectors=None,hidden_vectors=None
):
"""
>>> syn0=np.array([[1,-2],[-1,2],[2,-2]],'float32')
>>> word_vectors=syn0
>>> syn1=np.array([[-1,2,1,-5,4,1,-2,3,-4,5],[3,4,-4,1,-2,6,-7,8,9,1],[5,-6,-8,7,6,-1,2,-3,4,5]],'float32')
>>> hidden_vectors=syn1
>>> doctag_vectors=np.array([[-1.1,2.2],[-3.2,-4.3],[-1.1,-1.4]],'float32')
>>> kerasmodel=build_keras_model_dm_concat(index_size=3,vector_size=2,vocab_size=3,context_size=3,window_size=2,word_vectors=word_vectors,doctag_vectors=doctag_vectors,hidden_vectors=hidden_vectors)
>>> ind=[[0],[1]]
>>> iwd=[[0,0,1,2],[1,1,2,0]]
>>> ipt=[[0],[1]]
>>> tmp1=kerasmodel.predict({'index':np.array(ind),'iword':np.array(iwd),'point':np.array(ipt)})['code']
>>> tmp2=np.array([[np.vstack((doctag_vectors[ind[i]],word_vectors[iwd[i]])).flatten().dot(hidden_vectors[j]) for j in ipt[i] ] for i in range(2)])
>>> np.linalg.norm(1/(1+np.exp(-tmp2))-tmp1) < 0.001
True
"""
kerasmodel = Graph()
kerasmodel.add_input(name='iword' , input_shape=(1,), dtype=int)
kerasmodel.add_input(name='index' , input_shape=(1,), dtype=int)
if word_vectors is None:
kerasmodel.add_node(Embedding(vocab_size, vector_size,input_length=2*window_size,trainable=learn_words,),name='embedword', input='iword')
else:
kerasmodel.add_node(Embedding(vocab_size, vector_size,input_length=2*window_size,trainable=learn_words,weights=[word_vectors]),name='embedword', input='iword')
if doctag_vectors is None:
kerasmodel.add_node(Embedding(index_size, vector_size,input_length=1,trainable=learn_doctags,), name='embedindex', input='index')
else:
kerasmodel.add_node(Embedding(index_size, vector_size,input_length=1,trainable=learn_doctags,weights=[doctag_vectors]), name='embedindex', input='index')
kerasmodel.add_input(name='point',input_shape=(1,) , dtype=int)
if hidden_vectors is None:
kerasmodel.add_node(Embedding(context_size, (2*window_size+1)*vector_size,input_length=1, trainable=learn_hidden,),name='embedpoint', input='point')
else:
kerasmodel.add_node(Embedding(context_size, (2*window_size+1)*vector_size,input_length=1, trainable=learn_hidden,weights=[hidden_vectors]),name='embedpoint', input='point')
kerasmodel.add_node(Flatten(),name='merge',inputs=['embedindex','embedword'],merge_mode='concat', concat_axis=1)
kerasmodel.add_node(Activation('sigmoid'), name='sigmoid',inputs=['merge','embedpoint'], merge_mode='dot',dot_axes=-1)
kerasmodel.add_output(name='code',input='sigmoid')
kerasmodel.compile('rmsprop', {'code':'mse'})
return kerasmodel