def build_single_cell(self):
cell_type = LSTMCell
if (self.cell_type.lower() == 'gru'):
cell_type = GRUCell
cell = cell_type(self.hidden_units)
if self.use_dropout:
cell = DropoutWrapper(cell, dtype=self.dtype,
output_keep_prob=self.keep_prob_placeholder,)
if self.use_residual:
cell = ResidualWrapper(cell)
return cell
# Building encoder cell
python类DropoutWrapper()的实例源码
def apply_dropout(
cell, input_keep_probability, output_keep_probability, random_seed=None):
"""Apply dropout to the outputs and inputs of `cell`.
Args:
cell: An `RNNCell`.
input_keep_probability: Probability to keep inputs to `cell`. If `None`,
no dropout is applied.
output_keep_probability: Probability to keep outputs to `cell`. If `None`,
no dropout is applied.
random_seed: Seed for random dropout.
Returns:
An `RNNCell`, the result of applying the supplied dropouts to `cell`.
"""
input_prob_none = input_keep_probability is None
output_prob_none = output_keep_probability is None
if input_prob_none and output_prob_none:
return cell
if input_prob_none:
input_keep_probability = 1.0
if output_prob_none:
output_keep_probability = 1.0
return rnn_cell.DropoutWrapper(
cell, input_keep_probability, output_keep_probability, random_seed)
def compute_states(self,emb):
def unpack_sequence(tensor):
return tf.unpack(tf.transpose(tensor, perm=[1, 0, 2]))
with tf.variable_scope("Composition",initializer=
tf.contrib.layers.xavier_initializer(),regularizer=
tf.contrib.layers.l2_regularizer(self.reg)):
cell = rnn_cell.LSTMCell(self.hidden_dim)
#tf.cond(tf.less(self.dropout
#if tf.less(self.dropout, tf.constant(1.0)):
cell = rnn_cell.DropoutWrapper(cell,
output_keep_prob=self.dropout,input_keep_prob=self.dropout)
#output, state = rnn.dynamic_rnn(cell,emb,sequence_length=self.lngths,dtype=tf.float32)
outputs,_=rnn.rnn(cell,unpack_sequence(emb),sequence_length=self.lngths,dtype=tf.float32)
#output = pack_sequence(outputs)
sum_out=tf.reduce_sum(tf.pack(outputs),[0])
sent_rep = tf.div(sum_out,tf.expand_dims(tf.to_float(self.lngths),1))
final_state=sent_rep
return final_state
def createRNN(self):
with self.sess.graph.as_default():
self.prob = tf.placeholder("float", name="keep_prob")
# input layer #
with tf.name_scope("input"):
self.s = tf.placeholder("float", [None, DAYS_RANGE, INPUT_DIM], name='input_state')
s_tran = tf.transpose(self.s, [1, 0, 2])
s_re = tf.reshape(s_tran, [-1, INPUT_DIM])
s_list = tf.split(0, DAYS_RANGE, s_re) ## split s to DAYS_RANGE tensor of shape [BATCH, INPUT_DIM]
lstm_cell = rnn_cell.LSTMCell(1024, use_peepholes=True, forget_bias=1.0, state_is_tuple=True)
lstm_drop = rnn_cell.DropoutWrapper(lstm_cell, output_keep_prob=self.prob)
lstm_stack = rnn_cell.MultiRNNCell([lstm_cell]*3, state_is_tuple=True)
lstm_output, hidden_states = rnn.rnn(lstm_stack, s_list, dtype='float', scope='LSTMStack') # out: [timestep, batch, hidden], state: [cell, c+h, batch, hidden]
h_fc1 = self.FC_layer(lstm_output[-1], [1024, 1024], name='h_fc1', activate=True)
h_fc1_d = tf.nn.dropout(h_fc1, keep_prob=self.prob, name='h_fc1_drop')
h_fc2 = self.FC_layer(h_fc1_d, [1024, ACTIONS], name='h_fc2', activate=False)
# output layer #
self.pred_action = tf.nn.softmax(h_fc2)
def sentence_embedding(self, inputs, keep_prob, w):
with tf.device('/cpu:0'):
embedding_layer = tf.nn.embedding_lookup(w['word_embedding_w'],inputs)
# batch_size x max_len x word_embedding
cell_input = tf.transpose(embedding_layer,[1,0,2])
cell_input = tf.reshape(cell_input,[-1,self.hiddensize])
cell_input = tf.split(0,self.max_len,cell_input)
with tf.variable_scope('forward'):
lstm_fw_cell = rnn_cell.DropoutWrapper(rnn_cell.BasicLSTMCell(self.rnnsize,forget_bias=1.0,state_is_tuple=True),input_keep_prob=keep_prob,output_keep_prob=keep_prob)
with tf.variable_scope('backward'):
lstm_bw_cell = rnn_cell.DropoutWrapper(rnn_cell.BasicLSTMCell(self.rnnsize,forget_bias=1.0,state_is_tuple=True),input_keep_prob=keep_prob,output_keep_prob=keep_prob)
outputs,_,_ = rnn.bidirectional_rnn(lstm_fw_cell,lstm_bw_cell,cell_input,dtype=tf.float32)
# outputs shape: seq_len x [batch_size x (fw_cell_size + bw_cell_size)]
att = self.attention_layer(outputs,w)
return att
def build(self, inputs, keep_prob, n_classes, word_embedding):
inputs = tf.transpose(inputs,[1,0,2])
inputs = tf.reshape(inputs,[-1,self.max_len])
inputs = tf.split(0, self.max_sen, inputs)
variable_dict = {
"word_embedding_w": tf.get_variable(name="word_embedding",shape=[self.vocabsize,self.hiddensize],initializer=tf.constant_initializer(word_embedding),trainable=True),
"attention_w" : tf.get_variable(name="word_attention_weights",shape=[2*self.rnnsize,2*self.rnnsize]),
"attention_b" : tf.get_variable(name="word_attention_bias",shape=[2*self.rnnsize]),
"attention_c" : tf.get_variable(name="word_attention_context",shape=[2*self.rnnsize,1]),
}
sent_embeddings = []
with tf.variable_scope("embedding_scope") as scope:
for x in inputs:
embedding = self.sentence_embedding(x,keep_prob,variable_dict)
sent_embeddings.append(embedding)
scope.reuse_variables()
with tf.variable_scope('forward'):
lstm_fw_cell = rnn_cell.DropoutWrapper(rnn_cell.BasicLSTMCell(self.docsize,forget_bias=1.0,state_is_tuple=True),input_keep_prob=keep_prob,output_keep_prob=keep_prob)
with tf.variable_scope('backward'):
lstm_bw_cell = rnn_cell.DropoutWrapper(rnn_cell.BasicLSTMCell(self.docsize,forget_bias=1.0,state_is_tuple=True),input_keep_prob=keep_prob,output_keep_prob=keep_prob)
outputs, _ , _ = rnn.bidirectional_rnn(lstm_fw_cell,lstm_bw_cell,sent_embeddings,dtype=tf.float32)
atten_variable_dict = {
"attention_w" : tf.get_variable(name="sent_attention_weights", shape=[2*self.docsize,2*self.docsize]),
"attention_b" : tf.get_variable(name="sent_attention_bias", shape=[2*self.docsize]),
"attention_c" : tf.get_variable(name="sent_attention_context", shape=[2*self.docsize,1]),
}
att = self.attention_layer(outputs,atten_variable_dict)
# full connected layer
W = tf.get_variable("fullconnect_weights",shape=[2 * self.docsize,n_classes])
B = tf.get_variable("fullconnect_bias",shape=[n_classes])
output = tf.add(tf.matmul(att,W),B,name="output")
return output
def compute_states(self,emb):
def unpack_sequence(tensor):
return tf.unpack(tf.transpose(tensor, perm=[1, 0, 2]))
with tf.variable_scope("Composition",initializer=
tf.contrib.layers.xavier_initializer(),regularizer=
tf.contrib.layers.l2_regularizer(self.reg)):
cell_fw = rnn_cell.LSTMCell(self.hidden_dim)
cell_bw = rnn_cell.LSTMCell(self.hidden_dim)
#tf.cond(tf.less(self.dropout
#if tf.less(self.dropout, tf.constant(1.0)):
cell_fw = rnn_cell.DropoutWrapper(cell_fw,
output_keep_prob=self.dropout,input_keep_prob=self.dropout)
cell_bw=rnn_cell.DropoutWrapper(cell_bw, output_keep_prob=self.dropout,input_keep_prob=self.dropout)
#output, state = rnn.dynamic_rnn(cell,emb,sequence_length=self.lngths,dtype=tf.float32)
outputs,_,_=rnn.bidirectional_rnn(cell_fw,cell_bw,unpack_sequence(emb),sequence_length=self.lngths,dtype=tf.float32)
#output = pack_sequence(outputs)
sum_out=tf.reduce_sum(tf.pack(outputs),[0])
sent_rep = tf.div(sum_out,tf.expand_dims(tf.to_float(self.lngths),1))
final_state=sent_rep
return final_state
def createMultiRNN(self, n_layer, n_hidden):
with self.sess.graph.as_default():
self.prob = tf.placeholder("float", name="keep_prob")
# input #
with tf.name_scope('input'):
self.s = tf.placeholder('float', shape=[None, INPUT_DIM, DAYS_RANGE], name='input_state')
input_trans = tf.transpose(self.s, [2, 0, 1]) # [DAYS_RANGE, None, INPUT_DIM]
input_reshape = tf.reshape(input_trans, [-1, INPUT_DIM])
input_list = tf.split(0, DAYS_RANGE, input_reshape) # split to DAY_RANGE element
with tf.name_scope('tg_input'):
self.target_s = tf.placeholder('float', shape=[None, INPUT_DIM, DAYS_RANGE], name='input_state')
tg_input_trans = tf.transpose(self.target_s, [2, 0, 1]) # [DAYS_RANGE, None, INPUT_DIM]
tg_input_reshape = tf.reshape(tg_input_trans, [-1, INPUT_DIM])
tg_input_list = tf.split(0, DAYS_RANGE, tg_input_reshape) # split to DAY_RANGE element
# multi LSTM #
lstm_cell = rnn_cell.LSTMCell(n_hidden, use_peepholes=True, forget_bias=1.0, state_is_tuple=True)
lstm_drop = rnn_cell.DropoutWrapper(lstm_cell, output_keep_prob=self.prob)
lstm_stack = rnn_cell.MultiRNNCell([lstm_drop] * n_layer, state_is_tuple=True)
tg_lstm_cell = rnn_cell.LSTMCell(n_hidden, use_peepholes=True, forget_bias=1.0, state_is_tuple=True)
tg_lstm_drop = rnn_cell.DropoutWrapper(tg_lstm_cell, output_keep_prob=self.prob)
tg_lstm_stack = rnn_cell.MultiRNNCell([tg_lstm_drop] * n_layer, state_is_tuple=True)
lstm_output, hidden_states = rnn.rnn(lstm_stack,
input_list,
dtype='float',
scope='LSTMStack') # out: [timestep, batch, hidden], state: [cell, 2(for c, h), batch, hidden]
tg_lstm_output, tg_hidden_states = rnn.rnn(tg_lstm_stack, tg_input_list, dtype='float', scope='tg_LSTMStack')
for var in tf.get_collection(tf.GraphKeys.TRAINABLE_VARIABLES, scope="LSTMStack"):
tf.add_to_collection("L2_VARIABLES", var)
h_fc1 = self.FC_layer(lstm_output[-1], tg_lstm_output[-1], [n_hidden, 1024], name='h_fc1', activate=True)
h_fc2 = self.FC_layer(h_fc1[0], h_fc1[1], [1024, ACTIONS], name='h_fc2', activate=False)
key = tf.GraphKeys.TRAINABLE_VARIABLES
update_pair = zip(tf.get_collection(key, scope="LSTMStack"), tf.get_collection(key, scope="tg_LSTMStack"))
for var, tg_var in update_pair:
self.update_list.append(tg_var.assign(var))
# readout layer
self.readout = h_fc2[0]
self.target_readout = h_fc2[1]