def create_network(self,state_dim,action_dim,scope):
with tf.variable_scope(scope,reuse=False) as s:
state_input = tf.placeholder("float",[None,None,state_dim])
# creating the recurrent part
lstm_cell=rnn.BasicLSTMCell(LSTM_HIDDEN_UNIT)
lstm_output,lstm_state=tf.nn.dynamic_rnn(cell=lstm_cell,inputs=state_input,dtype=tf.float32)
W3 = tf.Variable(tf.random_uniform([lstm_cell.state_size,action_dim],-3e-3,3e-3))
b3 = tf.Variable(tf.random_uniform([action_dim],-3e-3,3e-3))
action_output = tf.tanh(tf.matmul(lstm_state,W3) + b3)
net = [v for v in tf.trainable_variables() if scope in v.name]
return state_input,action_output,net
python类tanh()的实例源码
def ae(x):
if nonlinearity_name == 'relu':
f = tf.nn.relu
elif nonlinearity_name == 'elu':
f = tf.nn.elu
elif nonlinearity_name == 'gelu':
# def gelu(x):
# return tf.mul(x, tf.erfc(-x / tf.sqrt(2.)) / 2.)
# f = gelu
def gelu_fast(_x):
return 0.5 * _x * (1 + tf.tanh(tf.sqrt(2 / np.pi) * (_x + 0.044715 * tf.pow(_x, 3))))
f = gelu_fast
elif nonlinearity_name == 'silu':
def silu(_x):
return _x * tf.sigmoid(_x)
f = silu
# elif nonlinearity_name == 'soi':
# def soi_map(x):
# u = tf.random_uniform(tf.shape(x))
# mask = tf.to_float(tf.less(u, (1 + tf.erf(x / tf.sqrt(2.))) / 2.))
# return tf.cond(is_training, lambda: tf.mul(mask, x),
# lambda: tf.mul(x, tf.erfc(-x / tf.sqrt(2.)) / 2.))
# f = soi_map
else:
raise NameError("Need 'relu', 'elu', 'gelu', or 'silu' for nonlinearity_name")
h1 = f(tf.matmul(x, W['1']) + b['1'])
h2 = f(tf.matmul(h1, W['2']) + b['2'])
h3 = f(tf.matmul(h2, W['3']) + b['3'])
h4 = f(tf.matmul(h3, W['4']) + b['4'])
h5 = f(tf.matmul(h4, W['5']) + b['5'])
h6 = f(tf.matmul(h5, W['6']) + b['6'])
h7 = f(tf.matmul(h6, W['7']) + b['7'])
return tf.matmul(h7, W['8']) + b['8']
tree_encoder.py 文件源码
项目:almond-nnparser
作者: Stanford-Mobisocial-IoT-Lab
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def __call__(self, left_state, right_state, extra_input=None):
with tf.variable_scope('TreeLSTM'):
c1, h1 = left_state
c2, h2 = right_state
if extra_input is not None:
input_concat = tf.concat((extra_input, h1, h2), axis=1)
else:
input_concat = tf.concat((h1, h2), axis=1)
concat = tf.layers.dense(input_concat, 5 * self._num_cells)
i, f1, f2, o, g = tf.split(concat, 5, axis=1)
i = tf.sigmoid(i)
f1 = tf.sigmoid(f1)
f2 = tf.sigmoid(f2)
o = tf.sigmoid(o)
g = tf.tanh(g)
cnew = f1 * c1 + f2 * c2 + i * g
hnew = o * cnew
newstate = LSTMStateTuple(c=cnew, h=hnew)
return hnew, newstate
def __init__(self, num_units, forget_bias=1.0, activation=tf.tanh, layer_norm=False, update_bias=1.0):
"""
Initialize the stack of Skip LSTM cells
:param num_units: list of int, the number of units in each LSTM cell
:param forget_bias: float, the bias added to forget gates
:param activation: activation function of the inner states
:param layer_norm: bool, whether to use layer normalization
:param update_bias: float, initial value for the bias added to the update state gate
"""
if not isinstance(num_units, list):
num_units = [num_units]
self._num_units = num_units
self._num_layers = len(self._num_units)
self._forget_bias = forget_bias
self._activation = activation
self._layer_norm = layer_norm
self._update_bias = update_bias
def __init__(self, rnd_vec_dim, hidden_units, output_dim, alpha):
#-----------------------------------------------------------------------
# Inputs
#-----------------------------------------------------------------------
self.inputs_rnd = tf.placeholder(tf.float32, (None, rnd_vec_dim),
name='inputs_rnd')
#-----------------------------------------------------------------------
# The generator
#-----------------------------------------------------------------------
self.alpha = alpha
with tf.variable_scope('generator'):
h1 = tf.layers.dense(self.inputs_rnd, hidden_units, activation=None)
h1 = LeakyReLU(h1, self.alpha)
self.gen_logits = tf.layers.dense(h1, output_dim, activation=None)
self.gen_out = tf.tanh(self.gen_logits)
#---------------------------------------------------------------------------
def __init__(self, sess, num_user, num_item,
hidden_encoder_dim=216, hidden_decoder_dim=216, latent_dim=24,
learning_rate=0.002, batch_size=64, reg_param=0,
user_embed_dim=216, item_embed_dim=216, activate_fn=tf.tanh, vae=True):
if reg_param < 0 or reg_param > 1:
raise ValueError("regularization parameter must be in [0,1]")
self.sess = sess
self.num_user = num_user
self.num_item = num_item
self.hidden_encoder_dim = hidden_encoder_dim
self.hidden_decoder_dim = hidden_decoder_dim
self.latent_dim = latent_dim
self.learning_rate = learning_rate
self.batch_size = batch_size
self.reg_param = reg_param
self.user_embed_dim = user_embed_dim
self.item_embed_dim = item_embed_dim
self.activate_fn = activate_fn
self.vae = vae
self.build_model()
def _score(self, prev_decoder_state, prev_embedding):
# Returns scores in a tensor of shape [batch_size, input_sequence_length]
if self.mode == 'decode':
query_part = self.query_attention_partial_score_placeholder
encoder_part = self.encoder_state_attention_partial_scores_placeholder
else:
query_part = self.query_attention_partial_score
encoder_part = self.encoder_state_attention_partial_scores
embedding_part = tf.matmul(prev_embedding, self.attention_w_e)
output = tf.matmul(prev_decoder_state,
self.attention_w) + embedding_part + query_part + encoder_part + self.attention_b
output = tf.tanh(output)
output = tf.reduce_sum(self.attention_v * output, axis=2)
output = tf.transpose(output, [1, 0])
# Handle input document padding by giving a large penalty, eliminating it from the weighted average
padding_penalty = -1e20 * tf.to_float(1 - tf.sign(self.documents_placeholder))
masked = output + padding_penalty
return masked
def __call__(self, inputs, state, scope=None):
num_proj = self._num_units if self._num_proj is None else self._num_proj
c_prev = tf.slice(state, [0, 0], [-1, self._num_units])
m_prev = tf.slice(state, [0, self._num_units], [-1, num_proj])
input_size = inputs.get_shape().with_rank(2)[1]
if input_size.value is None:
raise ValueError("Could not infer input size from inputs.get_shape()[-1]")
with tf.variable_scope(type(self).__name__,
initializer=self._initializer): # "LSTMCell"
# i = input_gate, j = new_input, f = forget_gate, o = output_gate
cell_inputs = tf.concat(1, [inputs, m_prev])
lstm_matrix = tf.nn.bias_add(tf.matmul(cell_inputs, self._concat_w), self._b)
i, j, f, o = tf.split(1, 4, lstm_matrix)
c = tf.sigmoid(f + 1.0) * c_prev + tf.sigmoid(i) * tf.tanh(j)
m = tf.sigmoid(o) * tf.tanh(c)
if self._num_proj is not None:
m = tf.matmul(m, self._concat_w_proj)
new_state = tf.concat(1, [c, m])
return m, new_state
def pre(self, inputs, scope=None):
"""Preprocess inputs to be used by the cell. Assumes [N, J, *]
[x, u]"""
is_train = self._is_train
keep_prob = self._keep_prob
gate_size = self._gate_size
with tf.variable_scope(scope or "pre"):
x, u, _, _ = tf.split(2, 4, tf.slice(inputs, [0, 0, gate_size], [-1, -1, -1])) # [N, J, d]
a_raw = linear([x * u], gate_size, True, scope='a_raw', var_on_cpu=self._var_on_cpu,
wd=self._wd, initializer=self._initializer)
a = tf.sigmoid(a_raw - self._forget_bias, name='a')
if keep_prob < 1.0:
x = tf.cond(is_train, lambda: tf.nn.dropout(x, keep_prob), lambda: x)
u = tf.cond(is_train, lambda: tf.nn.dropout(u, keep_prob), lambda: u)
v_t = tf.nn.tanh(linear([x, u], self._num_units, True,
var_on_cpu=self._var_on_cpu, wd=self._wd, scope='v_raw'), name='v')
new_inputs = tf.concat(2, [a, x, u, v_t]) # [N, J, 3*d + 1]
return new_inputs
def compute_energy(hidden, state, attn_size, attn_keep_prob=None, pervasive_dropout=False, layer_norm=False,
mult_attn=False, **kwargs):
if attn_keep_prob is not None:
state_noise_shape = [1, tf.shape(state)[1]] if pervasive_dropout else None
state = tf.nn.dropout(state, keep_prob=attn_keep_prob, noise_shape=state_noise_shape)
hidden_noise_shape = [1, 1, tf.shape(hidden)[2]] if pervasive_dropout else None
hidden = tf.nn.dropout(hidden, keep_prob=attn_keep_prob, noise_shape=hidden_noise_shape)
if mult_attn:
state = dense(state, attn_size, use_bias=False, name='state')
hidden = dense(hidden, attn_size, use_bias=False, name='hidden')
return tf.einsum('ijk,ik->ij', hidden, state)
else:
y = dense(state, attn_size, use_bias=not layer_norm, name='W_a')
y = tf.expand_dims(y, axis=1)
if layer_norm:
y = tf.contrib.layers.layer_norm(y, scope='layer_norm_state')
hidden = tf.contrib.layers.layer_norm(hidden, center=False, scope='layer_norm_hidden')
f = dense(hidden, attn_size, use_bias=False, name='U_a')
v = get_variable('v_a', [attn_size])
s = f + y
return tf.reduce_sum(v * tf.tanh(s), axis=2)
def lstm(xs, ms, s, scope, nh, init_scale=1.0):
nbatch, nin = [v.value for v in xs[0].get_shape()]
nsteps = len(xs)
with tf.variable_scope(scope):
wx = tf.get_variable("wx", [nin, nh*4], initializer=ortho_init(init_scale))
wh = tf.get_variable("wh", [nh, nh*4], initializer=ortho_init(init_scale))
b = tf.get_variable("b", [nh*4], initializer=tf.constant_initializer(0.0))
c, h = tf.split(axis=1, num_or_size_splits=2, value=s)
for idx, (x, m) in enumerate(zip(xs, ms)):
c = c*(1-m)
h = h*(1-m)
z = tf.matmul(x, wx) + tf.matmul(h, wh) + b
i, f, o, u = tf.split(axis=1, num_or_size_splits=4, value=z)
i = tf.nn.sigmoid(i)
f = tf.nn.sigmoid(f)
o = tf.nn.sigmoid(o)
u = tf.tanh(u)
c = f*c + i*u
h = o*tf.tanh(c)
xs[idx] = h
s = tf.concat(axis=1, values=[c, h])
return xs, s
def vae(observed, n, n_x, n_z, n_k, tau, n_particles, relaxed=False):
with zs.BayesianNet(observed=observed) as model:
z_stacked_logits = tf.zeros([n, n_z, n_k])
if relaxed:
z = zs.ExpConcrete('z', tau, z_stacked_logits,
n_samples=n_particles, group_ndims=1)
z = tf.exp(tf.reshape(z, [n_particles, n, n_z * n_k]))
else:
z = zs.OnehotCategorical(
'z', z_stacked_logits, n_samples=n_particles, group_ndims=1,
dtype=tf.float32)
z = tf.reshape(z, [n_particles, n, n_z * n_k])
lx_z = tf.layers.dense(z, 200, activation=tf.tanh)
lx_z = tf.layers.dense(lx_z, 200, activation=tf.tanh)
x_logits = tf.layers.dense(lx_z, n_x)
x = zs.Bernoulli('x', x_logits, group_ndims=1)
return model
text_classification_model_han.py 文件源码
项目:kaggle_redefining_cancer_treatment
作者: jorgemf
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def _attention(self, inputs, output_size, gene, variation, activation_fn=tf.tanh):
inputs_shape = inputs.get_shape()
if len(inputs_shape) != 3 and len(inputs_shape) != 4:
raise ValueError('Shape of input must have 3 or 4 dimensions')
input_projection = layers.fully_connected(inputs, output_size,
activation_fn=activation_fn)
doc_context = tf.concat([gene, variation], axis=1)
doc_context_vector = layers.fully_connected(doc_context, output_size,
activation_fn=activation_fn)
doc_context_vector = tf.expand_dims(doc_context_vector, 1)
if len(inputs_shape) == 4:
doc_context_vector = tf.expand_dims(doc_context_vector, 1)
vector_attn = input_projection * doc_context_vector
vector_attn = tf.reduce_sum(vector_attn, axis=-1, keep_dims=True)
attention_weights = tf.nn.softmax(vector_attn, dim=1)
weighted_projection = input_projection * attention_weights
outputs = tf.reduce_sum(weighted_projection, axis=-2)
return outputs
def align(hid_align, h_dec, scope):
h_dec_align = linear3(h_dec, dim_align, "h_dec_align_"+scope) #batch_size x dimAlign
h_dec_align = tf.reshape(h_dec_align,[batch_size,1,dim_align])
h_dec_align_tiled = tf.tile(h_dec_align, [1, sentence_length, 1])
all_align = tf.tanh(h_dec_align + hid_align)
with tf.variable_scope("v_align_"+scope, reuse = DO_SHARE):
v_align=tf.get_variable("v_align_"+scope, [dim_align], initializer=tf.constant_initializer(0.0))
e_t = all_align * v_align
e_t = tf.reduce_sum(e_t, 2)
# normalise
alpha = tf.nn.softmax(e_t) # batch_size x sentence_length
alpha_t = tf.reshape(alpha, [batch_size, sentence_length, 1])
alpha_tile = tf.tile(alpha_t, [1, 1, 2*y_enc_size])
s_t = tf.multiply(alpha_tile, h_t_lang)
s_t = tf.reduce_sum(s_t, 1)
return s_t,alpha
def create_network(self,state_dim,action_dim):
layer1_size = LAYER1_SIZE
layer2_size = LAYER2_SIZE
state_input = tf.placeholder("float",[None,state_dim])
W1 = self.variable([state_dim,layer1_size],state_dim)
b1 = self.variable([layer1_size],state_dim)
W2 = self.variable([layer1_size,layer2_size],layer1_size)
b2 = self.variable([layer2_size],layer1_size)
W3 = tf.Variable(tf.random_uniform([layer2_size,action_dim],-3e-3,3e-3))
b3 = tf.Variable(tf.random_uniform([action_dim],-3e-3,3e-3))
layer1 = tf.nn.relu(tf.matmul(state_input,W1) + b1)
layer2 = tf.nn.relu(tf.matmul(layer1,W2) + b2)
action_output = tf.tanh(tf.matmul(layer2,W3) + b3)
return state_input,action_output,[W1,b1,W2,b2,W3,b3]
def nnet(X, Y):
"""Neural net with regularization."""
lambda_ = 1e-4 # Weight regularizer
noise = .5 # Likelihood st. dev.
net = (
ab.InputLayer(name="X", n_samples=1) >>
ab.DenseMAP(output_dim=40, l2_reg=lambda_, l1_reg=0.) >>
ab.Activation(tf.tanh) >>
ab.DenseMAP(output_dim=20, l2_reg=lambda_, l1_reg=0.) >>
ab.Activation(tf.tanh) >>
ab.DenseMAP(output_dim=10, l2_reg=lambda_, l1_reg=0.) >>
ab.Activation(tf.tanh) >>
ab.DenseMAP(output_dim=1, l2_reg=lambda_, l1_reg=0.)
)
f, reg = net(X=X)
lkhood = tf.distributions.Normal(loc=f, scale=noise)
loss = ab.max_posterior(lkhood, Y, reg)
return f, loss
def nnet_dropout(X, Y):
"""Neural net with dropout."""
lambda_ = 1e-3 # Weight prior
noise = .5 # Likelihood st. dev.
net = (
ab.InputLayer(name="X", n_samples=n_samples_) >>
ab.DenseMAP(output_dim=40, l2_reg=lambda_, l1_reg=0.) >>
ab.Activation(tf.tanh) >>
ab.DropOut(keep_prob=0.9) >>
ab.DenseMAP(output_dim=20, l2_reg=lambda_, l1_reg=0.) >>
ab.Activation(tf.tanh) >>
ab.DropOut(keep_prob=0.95) >>
ab.DenseMAP(output_dim=10, l2_reg=lambda_, l1_reg=0.) >>
ab.Activation(tf.tanh) >>
ab.DenseMAP(output_dim=1, l2_reg=lambda_, l1_reg=0.)
)
f, reg = net(X=X)
lkhood = tf.distributions.Normal(loc=f, scale=noise)
loss = ab.max_posterior(lkhood, Y, reg)
return f, loss
def nnet_bayesian(X, Y):
"""Bayesian neural net."""
lambda_ = 1e-1 # Weight prior
noise = tf.Variable(0.01) # Likelihood st. dev. initialisation
net = (
ab.InputLayer(name="X", n_samples=n_samples_) >>
ab.DenseVariational(output_dim=20, std=lambda_) >>
ab.Activation(tf.nn.relu) >>
ab.DenseVariational(output_dim=7, std=lambda_) >>
ab.Activation(tf.nn.relu) >>
ab.DenseVariational(output_dim=5, std=lambda_) >>
ab.Activation(tf.tanh) >>
ab.DenseVariational(output_dim=1, std=lambda_)
)
f, kl = net(X=X)
lkhood = tf.distributions.Normal(loc=f, scale=ab.pos(noise))
loss = ab.elbo(lkhood, Y, N, kl)
return f, loss
def __init__(self, num_units, input_size=None, activation=tf.nn.tanh,
bias=True, weights_init=None, trainable=True, restore=True,
reuse=False):
if input_size is not None:
logging.warn("%s: The input_size parameter is deprecated." % self)
self._num_units = num_units
if isinstance(activation, str):
self._activation = activations.get(activation)
elif hasattr(activation, '__call__'):
self._activation = activation
else:
raise ValueError("Invalid Activation.")
self.bias = bias
self.weights_init = weights_init
if isinstance(weights_init, str):
self.weights_init = initializations.get(weights_init)()
self.trainable = trainable
self.restore = restore
self.reuse = reuse
def __init__(self, num_units, input_size=None, activation=tf.tanh,
inner_activation=tf.sigmoid, bias=True, weights_init=None,
trainable=True, restore=True, reuse=False):
if input_size is not None:
logging.warn("%s: The input_size parameter is deprecated." % self)
self._num_units = num_units
if isinstance(activation, str):
self._activation = activations.get(activation)
elif hasattr(activation, '__call__'):
self._activation = activation
else:
raise ValueError("Invalid Activation.")
if isinstance(inner_activation, str):
self._inner_activation = activations.get(inner_activation)
elif hasattr(inner_activation, '__call__'):
self._inner_activation = inner_activation
else:
raise ValueError("Invalid Activation.")
self.bias = bias
self.weights_init = weights_init
if isinstance(weights_init, str):
self.weights_init = initializations.get(weights_init)()
self.trainable = trainable
self.restore = restore
self.reuse = reuse
def _build(self, inputs, state):
hidden, cell = state
input_conv = self._convolutions["input"]
hidden_conv = self._convolutions["hidden"]
next_hidden = input_conv(inputs) + hidden_conv(hidden)
gates = tf.split(value=next_hidden, num_or_size_splits=4,
axis=self._conv_ndims+1)
input_gate, next_input, forget_gate, output_gate = gates
next_cell = tf.sigmoid(forget_gate + self._forget_bias) * cell
next_cell += tf.sigmoid(input_gate) * tf.tanh(next_input)
output = tf.tanh(next_cell) * tf.sigmoid(output_gate)
if self._skip_connection:
output = tf.concat([output, inputs], axis=-1)
return output, (output, next_cell)
def testComputation(self):
np.random.seed(100)
in_shape = [2, 3, 4]
in_shape_flat = [6, 4]
hidden_size = 5
out_shape1 = in_shape[:2] + [hidden_size]
out_shape2 = in_shape
inputs = tf.random_uniform(shape=in_shape)
inputs_flat = tf.reshape(inputs, shape=in_shape_flat)
linear = snt.Linear(hidden_size,
initializers={"w": _test_initializer(),
"b": _test_initializer()})
merge_linear = snt.BatchApply(module_or_op=linear)
outputs1 = merge_linear(inputs)
outputs1_flat = linear(inputs_flat)
merge_tanh = snt.BatchApply(module_or_op=tf.tanh)
outputs2 = merge_tanh(inputs)
outputs2_flat = merge_tanh(inputs_flat)
with self.test_session() as sess:
sess.run(tf.global_variables_initializer())
out1, out_flat1 = sess.run([outputs1, outputs1_flat])
out2, out_flat2 = sess.run([outputs2, outputs2_flat])
self.assertAllClose(out1, out_flat1.reshape(out_shape1))
self.assertAllClose(out2, out_flat2.reshape(out_shape2))
def __call__(self, inputs, state, scope=None):
with tf.variable_scope(scope or "SHCell"):
a_size = 1 if self._scalar else self._state_size
h, u = tf.split(1, 2, inputs)
if self._logit_func == 'mul_linear':
args = [h * u, state * u]
a = tf.nn.sigmoid(linear(args, a_size, True))
elif self._logit_func == 'linear':
args = [h, u, state]
a = tf.nn.sigmoid(linear(args, a_size, True))
elif self._logit_func == 'tri_linear':
args = [h, u, state, h * u, state * u]
a = tf.nn.sigmoid(linear(args, a_size, True))
elif self._logit_func == 'double':
args = [h, u, state]
a = tf.nn.sigmoid(linear(tf.tanh(linear(args, a_size, True)), self._state_size, True))
else:
raise Exception()
new_state = a * state + (1 - a) * h
outputs = state
return outputs, new_state
def __call__(self, inputs, state, scope=None):
"""
:param inputs: [N, d + JQ + JQ * d]
:param state: [N, d]
:param scope:
:return:
"""
with tf.variable_scope(scope or self.__class__.__name__):
c_prev, h_prev = state
x = tf.slice(inputs, [0, 0], [-1, self._input_size])
q_mask = tf.slice(inputs, [0, self._input_size], [-1, self._q_len]) # [N, JQ]
qs = tf.slice(inputs, [0, self._input_size + self._q_len], [-1, -1])
qs = tf.reshape(qs, [-1, self._q_len, self._input_size]) # [N, JQ, d]
x_tiled = tf.tile(tf.expand_dims(x, 1), [1, self._q_len, 1]) # [N, JQ, d]
h_prev_tiled = tf.tile(tf.expand_dims(h_prev, 1), [1, self._q_len, 1]) # [N, JQ, d]
f = tf.tanh(linear([qs, x_tiled, h_prev_tiled], self._input_size, True, scope='f')) # [N, JQ, d]
a = tf.nn.softmax(exp_mask(linear(f, 1, True, squeeze=True, scope='a'), q_mask)) # [N, JQ]
q = tf.reduce_sum(qs * tf.expand_dims(a, -1), 1)
z = tf.concat(1, [x, q]) # [N, 2d]
return self._cell(z, state)
def __call__(self, inputs, state, scope=None):
"""Long short-term memory cell (LSTM)."""
with tf.variable_scope(scope or type(self).__name__): # "DilatedLSTMCell"
# Parameters of gates are concatenated into one multiply for efficiency.
c, h = tf.split(state, 2, axis=1)
concat = self._linear([inputs, h], 4 * self._num_units, True)
# i = input_gate, j = new_input, f = forget_gate, o = output_gate
i, j, f, o = tf.split(concat, 4, axis=1)
new_c = c * tf.sigmoid(f + self._forget_bias) + tf.sigmoid(i) * tf.tanh(j)
new_h = tf.tanh(new_c) * tf.sigmoid(o)
# update relevant cores
timestep = tf.assign_add(self._timestep, 1)
core_to_update = tf.mod(timestep, self._cores)
updated_h = self._hold_mask[core_to_update] * h + self._dilated_mask[core_to_update] * new_h
return updated_h, tf.concat([new_c, updated_h], axis=1)
def attentive_pooling(self,input_left,input_right):
Q = tf.reshape(input_left,[self.batch_size,self.max_input_left,len(self.filter_sizes) * self.num_filters],name = 'Q')
A = tf.reshape(input_right,[self.batch_size,self.max_input_right,len(self.filter_sizes) * self.num_filters],name = 'A')
# G = tf.tanh(tf.matmul(tf.matmul(Q,self.U),\
# A,transpose_b = True),name = 'G')
first = tf.matmul(tf.reshape(Q,[-1,len(self.filter_sizes) * self.num_filters]),self.U)
second_step = tf.reshape(first,[self.batch_size,-1,len(self.filter_sizes) * self.num_filters])
result = tf.matmul(second_step,tf.transpose(A,perm = [0,2,1]))
G = tf.tanh(result)
# column-wise pooling ,row-wise pooling
row_pooling = tf.reduce_max(G,1,True,name = 'row_pooling')
col_pooling = tf.reduce_max(G,2,True,name = 'col_pooling')
attention_q = tf.nn.softmax(col_pooling,1,name = 'attention_q')
attention_a = tf.nn.softmax(row_pooling,name = 'attention_a')
R_q = tf.reshape(tf.matmul(Q,attention_q,transpose_a = 1),[self.batch_size,self.num_filters * len(self.filter_sizes),-1],name = 'R_q')
R_a = tf.reshape(tf.matmul(attention_a,A),[self.batch_size,self.num_filters * len(self.filter_sizes),-1],name = 'R_a')
return R_q,R_a
def attentive_pooling(self,input_left,input_right):
Q = tf.reshape(input_left,[self.batch_size,self.max_input_left,len(self.filter_sizes) * self.num_filters],name = 'Q')
A = tf.reshape(input_right,[self.batch_size,self.max_input_right,len(self.filter_sizes) * self.num_filters],name = 'A')
# G = tf.tanh(tf.matmul(tf.matmul(Q,self.U),\
# A,transpose_b = True),name = 'G')
first = tf.matmul(tf.reshape(Q,[-1,len(self.filter_sizes) * self.num_filters]),self.U)
second_step = tf.reshape(first,[self.batch_size,-1,len(self.filter_sizes) * self.num_filters])
result = tf.matmul(second_step,tf.transpose(A,perm = [0,2,1]))
G = tf.tanh(result)
# column-wise pooling ,row-wise pooling
row_pooling = tf.reduce_max(G,1,True,name = 'row_pooling')
col_pooling = tf.reduce_max(G,2,True,name = 'col_pooling')
attention_q = tf.nn.softmax(col_pooling,1,name = 'attention_q')
attention_a = tf.nn.softmax(row_pooling,name = 'attention_a')
R_q = tf.reshape(tf.matmul(Q,attention_q,transpose_a = 1),[self.batch_size,self.num_filters * len(self.filter_sizes),-1],name = 'R_q')
R_a = tf.reshape(tf.matmul(attention_a,A),[self.batch_size,self.num_filters * len(self.filter_sizes),-1],name = 'R_a')
return R_q,R_a
def test_basic(self):
with tf.Graph().as_default(), self.test_session() as sess:
rnd = np.random.RandomState(0)
x = self.get_random_tensor([18, 12], rnd=rnd)
y = tf.tanh(x)
self.assert_bw_fw(sess, x, y, rnd=rnd)
def test_manual(self):
with tf.Graph().as_default(), tf.device("/cpu:0"):
with self.test_session() as sess:
x_val = np.random.uniform(0, 1)
x = tf.constant(x_val)
y = tf.tanh(x)
dy_dx = forward_gradients(y, x, gate_gradients=True)
dy_dx_tf = sess.run(dy_dx)
eps = 1e-5
x_val = x_val - eps
y_val_1 = np.tanh(x_val)
x_val = x_val + 2 * eps
y_val_2 = np.tanh(x_val)
dy_dx_fd = (y_val_2 - y_val_1) / (2 * eps)
np.testing.assert_allclose(dy_dx_tf, dy_dx_fd, rtol=1e-5)
def __call__(self, inputs, state, scope=None):
with tf.variable_scope(scope or "SHCell"):
a_size = 1 if self._scalar else self._state_size
h, u = tf.split(1, 2, inputs)
if self._logit_func == 'mul_linear':
args = [h * u, state * u]
a = tf.nn.sigmoid(linear(args, a_size, True))
elif self._logit_func == 'linear':
args = [h, u, state]
a = tf.nn.sigmoid(linear(args, a_size, True))
elif self._logit_func == 'tri_linear':
args = [h, u, state, h * u, state * u]
a = tf.nn.sigmoid(linear(args, a_size, True))
elif self._logit_func == 'double':
args = [h, u, state]
a = tf.nn.sigmoid(linear(tf.tanh(linear(args, a_size, True)), self._state_size, True))
else:
raise Exception()
new_state = a * state + (1 - a) * h
outputs = state
return outputs, new_state
def __call__(self, inputs, state, scope=None):
"""
:param inputs: [N, d + JQ + JQ * d]
:param state: [N, d]
:param scope:
:return:
"""
with tf.variable_scope(scope or self.__class__.__name__):
c_prev, h_prev = state
x = tf.slice(inputs, [0, 0], [-1, self._input_size])
q_mask = tf.slice(inputs, [0, self._input_size], [-1, self._q_len]) # [N, JQ]
qs = tf.slice(inputs, [0, self._input_size + self._q_len], [-1, -1])
qs = tf.reshape(qs, [-1, self._q_len, self._input_size]) # [N, JQ, d]
x_tiled = tf.tile(tf.expand_dims(x, 1), [1, self._q_len, 1]) # [N, JQ, d]
h_prev_tiled = tf.tile(tf.expand_dims(h_prev, 1), [1, self._q_len, 1]) # [N, JQ, d]
f = tf.tanh(linear([qs, x_tiled, h_prev_tiled], self._input_size, True, scope='f')) # [N, JQ, d]
a = tf.nn.softmax(exp_mask(linear(f, 1, True, squeeze=True, scope='a'), q_mask)) # [N, JQ]
q = tf.reduce_sum(qs * tf.expand_dims(a, -1), 1)
z = tf.concat(1, [x, q]) # [N, 2d]
return self._cell(z, state)