def sparse_hermitian_product(emb, tuples):
"""
Compute the Hermitian inner product between selected complex embeddings
This corresponds to the usual dot product applied on the conjugate of the first vector: <conj(x), y>
where conj is the complex conjugate (obtained by inverting the imaginary part)
We consider that the embedding dimension is twice the rank, where the first part is in embeddings[:,:rk] and
the imaginary part is in embeddings[:,rk:].
It computes
S[i] = <conj(E[I[i,1]], E[I[i,2]]>
Usage:
S = sparse_hermitian_product(E, I):
:param emb: embedding matrix of size [n_emb, 2 * r] containing float numbers where r is the complex rank
:param tuples: tuple matrix of size [n_t, 2] containing integers that correspond to the indices of the embeddings
:return: a pair containing the real and imaginary parts of the Hermitian dot products
"""
rk = emb.get_shape()[1].value // 2
emb_re = emb[:, :rk]
emb_im = emb[:, rk:]
emb_sel_a_re = tf.gather(emb_re, tuples[:, 0])
emb_sel_a_im = tf.gather(emb_im, tuples[:, 0])
emb_sel_b_re = tf.gather(emb_re, tuples[:, 1])
emb_sel_b_im = tf.gather(emb_im, tuples[:, 1])
pred_re = tf.reduce_sum(tf.mul(emb_sel_a_re, emb_sel_b_re) + tf.mul(emb_sel_a_im, emb_sel_b_im), 1)
pred_im = tf.reduce_sum(tf.mul(emb_sel_a_re, emb_sel_b_im) - tf.mul(emb_sel_a_im, emb_sel_b_re), 1)
return pred_re, pred_im
python类real()的实例源码
def hermitian_dot(u, v):
"""
Hermitian dot product between multiple embeddings given by rows.
:param u: first matrix of n embeddings
:param v: second matrix of m embeddings
:param alpha: weight of the real part in the response
:return: a pair of n * m matrix of Hermitian inner products between all vector combinations:
- Re(<u_i, v_j>) for the first output
- Im(<u_i, v_j>) for the second output
>>> embeddings = np.array([[1., 1, 0, 3], [0, 1, 0, 1], [-1, 1, 1, 5]])
>>> print(hermitian_dot(embeddings, embeddings.T))
(array([[ 11., 4., 15.],
[ 4., 2., 6.],
[ 15., 6., 28.]]), array([[ 0., -2., 3.],
[ 2., 0., 4.],
[-3., -4., 0.]]))
"""
rk = u.shape[1] // 2
u_re = u[:, :rk]
u_im = u[:, rk:]
v_re = v[:rk, :]
v_im = v[rk:, :]
return np.dot(u_re, v_re) + np.dot(u_im, v_im), np.dot(u_re, v_im) - np.dot(u_im, v_re)
unitary_rnn_cell_modern.py 文件源码
项目:tensorflow_with_latest_papers
作者: NickShahML
项目源码
文件源码
阅读 32
收藏 0
点赞 0
评论 0
def __call__(self, inputs, state, scope=None ):
zero_initer = tf.constant_initializer(0.)
with tf.variable_scope(scope or type(self).__name__):
#nick there are these two matrix multiplications and they are used to convert regular input sizes to complex outputs -- makes sense -- we can further modify this for lstm configurations
mat_in = tf.get_variable('W_in', [self.input_size, self.state_size*2])
mat_out = tf.get_variable('W_out', [self.state_size*2, self.output_size])
in_proj = tf.matmul(inputs, mat_in)
in_proj_c = tf.complex( in_proj[:, :self.state_size], in_proj[:, self.state_size:] )
out_state = modrelu_c( in_proj_c +
ulinear_c(state,transform=self.transform),
tf.get_variable(name='B', dtype=tf.float32, shape=[self.state_size], initializer=zero_initer)
)
out_bias = tf.get_variable(name='B_out', dtype=tf.float32, shape=[self.output_size], initializer = zero_initer)
out = tf.matmul( tf.concat(1,[tf.real(out_state), tf.imag(out_state)] ), mat_out ) + out_bias
return out, out_state
def normalize(z):
norm = tf.sqrt(tf.reduce_sum(tf.abs(z)**2))
factor = (norm + 1e-6)
return tf.complex(tf.real(z) / factor, tf.imag(z) / factor)
# z: complex[batch_sz, num_units]
# bias: real[num_units]
def modReLU(z, bias): # relu(|z|+b) * (z / |z|)
norm = tf.abs(z)
scale = tf.nn.relu(norm + bias) / (norm + 1e-6)
scaled = tf.complex(tf.real(z)*scale, tf.imag(z)*scale)
return scaled
###################################################################################################333
# 4k / 7k trainable params
def bound(x):
bound = tf.maximum(tf.sqrt(tf.mul(tf.real(x), tf.real(x)) \
+ tf.mul(tf.imag(x), tf.imag(x))),
1.0)
return tf.complex(tf.real(x) / bound, tf.imag(x) / bound)
def unsplit_from_complex_ri(x):
return tf.concat(1, [tf.real(x), tf.imag(x)])
def unsplit_from_complex_ir(x):
#return tf.concat(1, [tf.imag(x), tf.abs(tf.real(x))])
return tf.abs(tf.concat(1, [tf.imag(x), tf.real(x)]))
#mag = tf.maximum(1.0, tf.complex_abs(x))
#x = tf.complex(tf.real(x) / (mag + 1e-10), tf.imag(x) / (mag + 1e-10))
# real = tf.concat(1, [tf.imag(x), tf.real(x)])
# return tf.abs(HolographicMemory.normalize_real_by_complex_abs([real])[0])
def hermitian_tuple_scorer(tuples_var, rank=None, n_emb=None, emb0=None, symmetry_coef=(1.0, 1.0),
learn_symmetry_coef=True):
"""
The Hermitian Scorer can learn embeddings for non-symmetric relations
:param tuples_var: TensorFlow variable that encodes the tuples as inputs
:param rank: size of the embeddings, including real and imaginary parts. The complex rank is half of it.
not needed if emb0 is given
:param n_emb: number of embeddings (not needed if initial embeddings are given)
:param emb0: initial embeddings (optional)
:param symmetry_coef: symmetry coefficient that equals np.inf for symmetric matrices, -np.inf for anti-symmetric
matrices and a real scalar for other cases.
:param learn_symmetry_coef: False if the symmetry coefficient is not learned [True by default]
:return: a pair (scoring TensorFlow graph, parameters). The parameters have the form
([n_emd*rank] float matrix, symmetry coef)
>>> embeddings = [[1., 1, 0, 3], [0, 1, 0, 1], [-1, 1, 1, 5]]
>>> tuples_var = tf.Variable([[0, 1], [1, 0], [0, 2], [2, 0], [1, 2], [2, 1]])
>>> (g, params) = hermitian_tuple_scorer(tuples_var, emb0=embeddings, symmetry_coef=(1.0, 0.0))
>>> print(tf_eval(g)) # symmetric form
[ 4. 4. 15. 15. 6. 6.]
>>> (g, params) = hermitian_tuple_scorer(tuples_var, emb0=embeddings, symmetry_coef=(0.0, 1.0))
>>> print(tf_eval(g)) # skewed (anti-symmetric) form
[-2. 2. 3. -3. 4. -4.]
>>> (g, params) = hermitian_tuple_scorer(tuples_var, emb0=embeddings, symmetry_coef=(1.0, 1.0))
>>> print(tf_eval(g)) # combination of the previous two forms
[ 2. 6. 18. 12. 10. 2.]
>>> (g, params) = hermitian_tuple_scorer(tuples_var, emb0=embeddings, symmetry_coef=(0.9, 0.1))
>>> print(tf_eval(g)) # close to symmetric
[ 3.39999986 3.79999995 13.80000019 13.19999981 5.79999971
4.99999952]
"""
emb0 = emb0 if emb0 is not None else np.random.normal(size=(n_emb, rank))
embeddings = tf.Variable(tf.cast(emb0, 'float32'), 'embeddings')
symmetry_coef = tf.Variable(symmetry_coef, name='symmetry_coef', trainable=learn_symmetry_coef)
params = (embeddings, symmetry_coef)
return sparse_hermitian_scoring(params, tuples_var), params
def sparse_hermitian_scoring(params, tuples):
"""
TensorFlow operator that scores tuples by the dot product of their complex embeddings.
It is the same a the multilinear function, but uses complex embeddings instead. The complex
embeddings are of size 2 * R where R is the complex
dimension. They are encoded such that the first columns correspond to the real part, and the last R correspond to
the imaginary part. The result of this function is a length-N vector with values:
S[i] = alpha_0 * Re(sum_j E[I[i,1],j] * E[I[i,2],j]) + alpha_1 * Im(sum_j E[I[i,1],j] * E[I[i,2],j]))
Where:
- I is the tuple tensor of integers with shape (T, 2)
- E is the N * 2R tensor of complex embeddings (R first columns: real part, the last R columsn: imaginary part)
- alpha_0 and alpha_1 are the symmetry coefficients
:param params: tuple (embeddings, symm_coef) containing:
- embeddings: a real tensor of size [N, 2*R] containing the N rank-R embeddings by row (real part in the rank first R
columns, imaginary part in the last R columns)
- the 2-tuple (s0, s1) of symmetry coefficients (or complex-to-real projection coefficients) that are used to
transform the complex result of the dot product into a real number, as used by most statistical models (e.g.
mean of a Gaussian or Poisson distributions, natural parameter of a Bernouilli distribution). The conversion
from complexto real is a simple weighted sum: results = s0 * Re(<e_i, e_j>) + s1 * Im(<e_i, e_j>
:param tuples: tuple matrix of size [T, 2] containing T pairs of integers corresponding to the indices of the
embeddings.
:return: Hermitian dot products of selected embeddings
>>> embeddings = (tf.Variable([[1., 1, 0, 3], [0, 1, 0, 1], [-1, 1, 1, 5]]), (0.0, 1.0))
>>> idx = tf.Variable([[0, 1], [1, 0], [0, 2], [2, 0], [1, 2], [2, 1]])
>>> g = sparse_hermitian_scoring(embeddings, idx)
>>> print(tf_eval(g))
[-2. 2. 3. -3. 4. -4.]
"""
emb, symmetry = params
pred_re, pred_im = sparse_hermitian_product(emb, tuples)
return symmetry[0] * pred_re + symmetry[1] * pred_im
def abs2_c(z):
return tf.real(z)*tf.real(z)+tf.imag(z)*tf.imag(z)
def complex_mul_real( z, r ):
return tf.complex(tf.real(z)*r, tf.imag(z)*r)
def modrelu_c(in_c, bias):
if not in_c.dtype.is_complex:
raise(ValueError('modrelu_c: Argument in_c must be complex type'))
if bias.dtype.is_complex:
raise(ValueError('modrelu_c: Argument bias must be real type'))
n = tf.complex_abs(in_c)
scale = 1./(n+1e-5)
return complex_mul_real(in_c, ( tf.nn.relu(n+bias)*scale ))
unitary_rnn_cell_modern.py 文件源码
项目:tensorflow_with_latest_papers
作者: NickShahML
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def __call__(self, inputs, state, scope=None ):
with tf.variable_scope(scope or type(self).__name__):
unitary_hidden_state, secondary_cell_hidden_state = tf.split(1,2,state)
mat_in = tf.get_variable('mat_in', [self.input_size, self.state_size*2])
mat_out = tf.get_variable('mat_out', [self.state_size*2, self.output_size])
in_proj = tf.matmul(inputs, mat_in)
in_proj_c = tf.complex(tf.split(1,2,in_proj))
out_state = modReLU( in_proj_c +
ulinear(unitary_hidden_state, self.state_size),
tf.get_variable(name='bias', dtype=tf.float32, shape=tf.shape(unitary_hidden_state), initializer = tf.constant_initalizer(0.)),
scope=scope)
with tf.variable_scope('unitary_output'):
'''computes data linear, unitary linear and summation -- TODO: should be complex output'''
unitary_linear_output_real = linear.linear([tf.real(out_state), tf.imag(out_state), inputs], True, 0.0)
with tf.variable_scope('scale_nonlinearity'):
modulus = tf.complex_abs(unitary_linear_output_real)
rescale = tf.maximum(modulus + hidden_bias, 0.) / (modulus + 1e-7)
#transition to data shortcut connection
#out_ = tf.matmul(tf.concat(1,[tf.real(out_state), tf.imag(out_state), ] ), mat_out) + out_bias
#hidden state is complex but output is completely real
return out_, out_state #complex
def s_binary_crossentropy(self, y_true, y_pred):
if self.p:
y_pred = undo_normcentererr(y_pred, self.p)
y_true = undo_normcentererr(y_true, self.p)
s_true = K.dot(y_true, K.transpose(self.H))%2
twopminusone = 2*y_pred-1
s_pred = ( 1 - tf.real(K.exp(K.dot(K.log(tf.cast(twopminusone, tf.complex64)), tf.cast(K.transpose(self.H), tf.complex64)))) ) / 2
return K.mean(K.binary_crossentropy(s_pred, s_true), axis=-1)
def test_Real(self):
t = tf.real(tf.Variable(self.random(3, 4, complex=True)))
self.check(t)
#
# Fourier transform ops
#
def call(self, inputs, state):
"""The most basic URNN cell.
Args:
inputs (Tensor - batch_sz x num_in): One batch of cell input.
state (Tensor - batch_sz x num_units): Previous cell state: COMPLEX
Returns:
A tuple (outputs, state):
outputs (Tensor - batch_sz x num_units*2): Cell outputs on the whole batch.
state (Tensor - batch_sz x num_units): New state of the cell.
"""
#print("cell.call inputs:", inputs.shape, inputs.dtype)
#print("cell.call state:", state.shape, state.dtype)
# prepare input linear combination
inputs_mul = tf.matmul(inputs, tf.transpose(self.w_ih)) # [batch_sz, 2*num_units]
inputs_mul_c = tf.complex( inputs_mul[:, :self._num_units],
inputs_mul[:, self._num_units:] )
# [batch_sz, num_units]
# prepare state linear combination (always complex!)
state_c = tf.complex( state[:, :self._num_units],
state[:, self._num_units:] )
state_mul = self.D1.mul(state_c)
state_mul = FFT(state_mul)
state_mul = self.R1.mul(state_mul)
state_mul = self.P.mul(state_mul)
state_mul = self.D2.mul(state_mul)
state_mul = IFFT(state_mul)
state_mul = self.R2.mul(state_mul)
state_mul = self.D3.mul(state_mul)
# [batch_sz, num_units]
# calculate preactivation
preact = inputs_mul_c + state_mul
# [batch_sz, num_units]
new_state_c = modReLU(preact, self.b_h) # [batch_sz, num_units] C
new_state = tf.concat([tf.real(new_state_c), tf.imag(new_state_c)], 1) # [batch_sz, 2*num_units] R
# outside network (last dense layer) is ready for 2*num_units -> num_out
output = new_state
# print("cell.call output:", output.shape, output.dtype)
# print("cell.call new_state:", new_state.shape, new_state.dtype)
return output, new_state
def fft_circ_conv1d(X, keys, batch_size, num_copies, num_keys=None, conj=False):
if conj:
keys = HolographicMemory.conj_real_by_complex(keys)
# Get our original shapes
xshp = X.get_shape().as_list()
kshp = keys.get_shape().as_list()
kshp[0] = num_keys if num_keys is not None else kshp[0]
kshp[1] = xshp[1] if kshp[1] is None else kshp[1]
print 'X : ', xshp, ' | keys : ', kshp, ' | batch_size = ', batch_size
# duplicate out input data by the ratio: number_keys / batch_size
# eg: |input| = [2, 784] ; |keys| = 3*[2, 784] ; (3 is the num_copies)
# |new_input| = 6/2 |input| = [input; input; input]
#
# At test: |memories| = [3, 784] ; |keys| = 3*[n, 784] ;
# |new_input| = 3n / 3 = n [where n is the number of desired parallel retrievals]
num_dupes = kshp[0] / batch_size
print 'num dupes = ', num_dupes
xcplx = HolographicMemory.split_to_complex(tf.tile(X, [num_dupes, 1]) \
if num_dupes > 1 else X)
xshp = xcplx.get_shape().as_list()
kcplx = HolographicMemory.split_to_complex(keys, kshp)
# Convolve & re-cast to a real valued function
unsplit_func = HolographicMemory.unsplit_from_complex_ri if not conj \
else HolographicMemory.unsplit_from_complex_ir
#fft_mul = HolographicMemory.bound(tf.mul(tf.fft(xcplx), tf.fft(kcplx)))
fft_mul = tf.mul(tf.fft(xcplx), tf.fft(kcplx))
conv = unsplit_func(tf.ifft(fft_mul))
print 'full conv = ', conv.get_shape().as_list()
batch_iter = min(batch_size, xshp[0]) if xshp[0] is not None else batch_size
print 'batch = ', batch_size, ' | num_copies = ', num_copies, '| num_keys = ', num_keys, \
'| xshp[0] = ', xshp[0], ' | len(keys) = ', kshp[0], ' | batch iter = ', batch_iter
conv_concat = [tf.expand_dims(tf.reduce_mean(conv[begin:end], 0), 0)
for begin, end in zip(range(0, kshp[0], batch_iter),
range(batch_iter, kshp[0]+1, batch_iter))]
print 'conv concat = ', len(conv_concat), ' x ', conv_concat[0].get_shape().as_list()
# return a single concatenated tensor:
# C = [c0; c1; ...]
C = tf.concat(0, conv_concat)
return C
#C = tf_mean_std_normalize(C)
#return C / tf.maximum(tf.reduce_max(C), 1e-20)
#return tf.nn.sigmoid(C)
#return tf_mean_std_normalize(C)
def sparse_relational_hermitian_scoring(emb, tuples):
"""
TensorFlow operator that scores triples where relations are defined by a complex vector w
It is the same a the multilinear function, but uses complex embeddings instead. The complex
embeddings are of size 2 * R where R is the complex
dimension. They are encoded such that the first columns correspond to the real part, and the last R correspond to
the imaginary part. The result of this function is a length-N vector with values:
S[i] = sum(Re(E[I[i,2],j]) * (Re(E[I[i,1],j]) * Re(E[I[i,3],j]) + Im(E[I[i,1],j]) * Im(E[I[i,3],j]))
+ Im(E[I[i,2],j]) * (Re(E[I[i,1],j]) * Im(E[I[i,3],j]) - Im(E[I[i,1],j]) * Re(E[I[i,3],j])) )
Where:
- I is the tuple tensor of integers with shape (T, 2)
- E is the N * 2R tensor of complex embeddings (R first columns: real part, the last R columsn: imaginary part)
- alpha_0 and alpha_1 are the symmetry coefficients
:param params: tuple (embeddings, symm_coef) containing:
- embeddings: a real tensor of size [N, 2*R] containing the N rank-R embeddings by row (real part in the rank first R
columns, imaginary part in the last R columns)
- the 2-tuple (s0, s1) of symmetry coefficients (or complex-to-real projection coefficients) that are used to
transform the complex result of the dot product into a real number, as used by most statistical models (e.g.
mean of a Gaussian or Poisson distributions, natural parameter of a Bernouilli distribution). The conversion
from complexto real is a simple weighted sum: results = s0 * Re(<e_i, e_j>) + s1 * Im(<e_i, e_j>
:param tuples: tuple matrix of size [T, 2] containing T pairs of integers corresponding to the indices of the
embeddings.
:return: Hermitian dot products of selected embeddings
>>> embeddings = tf.Variable([[1., 1, 0, 3], [0, 1, 0, 1], [-1, 1, 1, 5], [-3, 1, 0, 2], [-1, 2, -1, -5]])
>>> idx = tf.Variable([[0, 3, 1], [1, 3, 0], [0, 3, 2], [2, 4, 0], [1, 4, 2], [2, 4, 1]])
>>> g = sparse_relational_hermitian_scoring(embeddings, idx)
>>> print(tf_eval(g))
[ 0. 8. 23. 44. -8. 32.]
"""
rk = emb.get_shape()[1].value // 2
emb_re = emb[:, :rk]
emb_im = emb[:, rk:]
emb_sel_a_re = tf.gather(emb_re, tuples[:, 0])
emb_sel_a_im = tf.gather(emb_im, tuples[:, 0])
emb_sel_b_re = tf.gather(emb_re, tuples[:, 2])
emb_sel_b_im = tf.gather(emb_im, tuples[:, 2])
emb_rel_re = tf.gather(emb_re, tuples[:, 1])
emb_rel_im = tf.gather(emb_im, tuples[:, 1])
pred_re = tf.mul(emb_sel_a_re, emb_sel_b_re) + tf.mul(emb_sel_a_im, emb_sel_b_im)
pred_im = tf.mul(emb_sel_a_re, emb_sel_b_im) - tf.mul(emb_sel_a_im, emb_sel_b_re)
tmp = emb_rel_re * pred_re + emb_rel_im * pred_im
return tf.reduce_sum(tmp, 1)
def create_network():
dp = tflearn.data_preprocessing.DataPreprocessing()
dp.add_featurewise_zero_center()
dp.add_featurewise_stdnorm()
#dp.add_samplewise_zero_center()
#dp.add_samplewise_stdnorm()
network = tflearn.input_data(shape=[None, chunk_size])#, data_preprocessing=dp)
# input is a real signal
network = tf.complex(network, 0.0)
# fft the input
input_fft = tf.fft(network)
input_orig_fft = input_fft
input_fft = tf.stack([tf.real(input_fft), tf.imag(input_fft)], axis=2)
fft_size = int(input_fft.shape[1])
network = input_fft
print("fft shape: " + str(input_fft.get_shape()))
omg = fft_size
nn_reg = None
mask = network
mask = tflearn.layers.fully_connected(mask, omg*2, activation="tanh", regularizer=nn_reg)
mask = tflearn.layers.normalization.batch_normalization(mask)
mask = tflearn.layers.fully_connected(mask, omg, activation="tanh", regularizer=nn_reg)
mask = tflearn.layers.normalization.batch_normalization(mask)
mask = tflearn.layers.fully_connected(mask, omg/2, activation="tanh", regularizer=nn_reg)
mask = tflearn.layers.normalization.batch_normalization(mask)
#mask = tflearn.layers.fully_connected(mask, omg/4, activation="tanh")
mask = tflearn.reshape(mask, [-1, 1, omg/2])
mask = tflearn.layers.recurrent.lstm(mask, omg/4)
mask = tflearn.layers.fully_connected(mask, omg/2, activation="tanh", regularizer=nn_reg)
mask = tflearn.layers.normalization.batch_normalization(mask)
mask = tflearn.layers.fully_connected(mask, omg, activation="tanh", regularizer=nn_reg)
mask = tflearn.layers.normalization.batch_normalization(mask)
mask = tflearn.layers.fully_connected(mask, omg*2, activation="tanh", regularizer=nn_reg)
mask = tflearn.layers.normalization.batch_normalization(mask)
mask = tflearn.layers.fully_connected(mask, omg, activation="sigmoid", regularizer=nn_reg)
real = tf.multiply(tf.real(input_orig_fft), mask)
imag = tf.multiply(tf.imag(input_orig_fft), mask)
network = tf.real(tf.ifft(tf.complex(real, imag)))
print("final shape: " + str(network.get_shape()))
network = tflearn.regression(network, optimizer="adam", learning_rate=learning_rate, loss="mean_square")
return network