def rnn_model(self):
cell = rnn.BasicLSTMCell(num_units=self.n_units)
multi_cell = rnn.MultiRNNCell([cell]*self.n_layers)
# we only need one output so get it wrapped to out one value which is next word index
cell_wrapped = rnn.OutputProjectionWrapper(multi_cell, output_size=1)
# get input embed
embedding = tf.Variable(initial_value=tf.random_uniform([self.vocab_size, self.n_units], -1.0, 1.0))
inputs = tf.nn.embedding_lookup(embedding, self.inputs)
# what is inputs dim??
outputs, states = tf.nn.dynamic_rnn(cell_wrapped, inputs=inputs, dtype=tf.float32)
outputs = tf.reshape(outputs, [int(outputs.get_shape()[0]), int(inputs.get_shape()[1])])
w = tf.Variable(tf.truncated_normal([int(inputs.get_shape()[1]), self.vocab_size]))
b = tf.Variable(tf.zeros([self.vocab_size]))
logits = tf.nn.bias_add(tf.matmul(outputs, w), b)
return logits
python类MultiRNNCell()的实例源码
rnn_model_no_state.py 文件源码
项目:tensorflow_novelist-master
作者: charlesXu86
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_build_nn(build_nn):
with tf.Graph().as_default():
test_input_data_shape = [128, 5]
test_input_data = tf.placeholder(tf.int32, test_input_data_shape)
test_rnn_size = 256
test_rnn_layer_size = 2
test_vocab_size = 27
test_cell = rnn.MultiRNNCell([rnn.BasicLSTMCell(test_rnn_size)] * test_rnn_layer_size)
logits, final_state = build_nn(test_cell, test_rnn_size, test_input_data, test_vocab_size)
# Check name
assert hasattr(final_state, 'name'), \
'Final state doesn\'t have the "name" attribute. Are you using build_rnn?'
assert final_state.name == 'final_state:0', \
'Final state doesn\'t have the correct name. Found the name {}. Are you using build_rnn?'.format(final_state.name)
# Check Shape
assert logits.get_shape().as_list() == test_input_data_shape + [test_vocab_size], \
'Outputs has wrong shape. Found shape {}'.format(logits.get_shape())
assert final_state.get_shape().as_list() == [test_rnn_layer_size, 2, None, test_rnn_size], \
'Final state wrong shape. Found shape {}'.format(final_state.get_shape())
_print_success_message()
def build_lstm_inner(H, lstm_input):
'''
build lstm decoder
'''
lstm_cell = rnn_cell.BasicLSTMCell(H['lstm_size'], forget_bias=0.0, state_is_tuple=False)
if H['num_lstm_layers'] > 1:
lstm = rnn_cell.MultiRNNCell([lstm_cell] * H['num_lstm_layers'], state_is_tuple=False)
else:
lstm = lstm_cell
batch_size = H['batch_size'] * H['grid_height'] * H['grid_width']
state = tf.zeros([batch_size, lstm.state_size])
outputs = []
with tf.variable_scope('RNN', initializer=tf.random_uniform_initializer(-0.1, 0.1)):
for time_step in range(H['rnn_len']):
if time_step > 0: tf.get_variable_scope().reuse_variables()
output, state = lstm(lstm_input, state)
outputs.append(output)
return outputs
def test_get_init_cell(get_init_cell):
with tf.Graph().as_default():
test_batch_size_ph = tf.placeholder(tf.int32)
test_rnn_size = 256
cell, init_state = get_init_cell(test_batch_size_ph, test_rnn_size)
# Check type
assert isinstance(cell, tf.contrib.rnn.MultiRNNCell),\
'Cell is wrong type. Found {} type'.format(type(cell))
# Check for name attribute
assert hasattr(init_state, 'name'),\
'Initial state doesn\'t have the "name" attribute. Try using `tf.identity` to set the name.'
# Check name
assert init_state.name == 'initial_state:0',\
'Initial state doesn\'t have the correct name. Found the name {}'.format(init_state.name)
_print_success_message()
def test_build_rnn(build_rnn):
with tf.Graph().as_default():
test_rnn_size = 256
test_rnn_layer_size = 2
test_cell = rnn.MultiRNNCell([rnn.BasicLSTMCell(test_rnn_size)] * test_rnn_layer_size)
test_inputs = tf.placeholder(tf.float32, [None, None, test_rnn_size])
outputs, final_state = build_rnn(test_cell, test_inputs)
# Check name
assert hasattr(final_state, 'name'),\
'Final state doesn\'t have the "name" attribute. Try using `tf.identity` to set the name.'
assert final_state.name == 'final_state:0',\
'Final state doesn\'t have the correct name. Found the name {}'.format(final_state.name)
# Check shape
assert outputs.get_shape().as_list() == [None, None, test_rnn_size],\
'Outputs has wrong shape. Found shape {}'.format(outputs.get_shape())
assert final_state.get_shape().as_list() == [test_rnn_layer_size, 2, None, test_rnn_size],\
'Final state wrong shape. Found shape {}'.format(final_state.get_shape())
_print_success_message()
def RNN(x, weights, biases):
# reshape to [1, n_input]
x = tf.reshape(x, [-1, n_input])
# Generate a n_input-element sequence of inputs
# (eg. [had] [a] [general] -> [20] [6] [33])
x = tf.split(x,n_input,1)
# 2-layer LSTM, each layer has n_hidden units.
# Average Accuracy= 95.20% at 50k iter
rnn_cell = rnn.MultiRNNCell([rnn.BasicLSTMCell(n_hidden),rnn.BasicLSTMCell(n_hidden)])
# 1-layer LSTM with n_hidden units but with lower accuracy.
# Average Accuracy= 90.60% 50k iter
# Uncomment line below to test but comment out the 2-layer rnn.MultiRNNCell above
# rnn_cell = rnn.BasicLSTMCell(n_hidden)
# generate prediction
outputs, states = rnn.static_rnn(rnn_cell, x, dtype=tf.float32)
# there are n_input outputs but
# we only want the last output
return tf.matmul(outputs[-1], weights['out']) + biases['out']
def bidirectional_GRU(inputs, inputs_len, cell = None, cell_fn = tf.contrib.rnn.GRUCell, units = Params.attn_size, layers = 1, scope = "Bidirectional_GRU", output = 0, is_training = True, reuse = None):
'''
Bidirectional recurrent neural network with GRU cells.
Args:
inputs: rnn input of shape (batch_size, timestep, dim)
inputs_len: rnn input_len of shape (batch_size, )
cell: rnn cell of type RNN_Cell.
output: if 0, output returns rnn output for every timestep,
if 1, output returns concatenated state of backward and
forward rnn.
'''
with tf.variable_scope(scope, reuse = reuse):
if cell is not None:
(cell_fw, cell_bw) = cell
else:
shapes = inputs.get_shape().as_list()
if len(shapes) > 3:
inputs = tf.reshape(inputs,(shapes[0]*shapes[1],shapes[2],-1))
inputs_len = tf.reshape(inputs_len,(shapes[0]*shapes[1],))
# if no cells are provided, use standard GRU cell implementation
if layers > 1:
cell_fw = MultiRNNCell([apply_dropout(cell_fn(units), size = inputs.shape[-1] if i == 0 else units, is_training = is_training) for i in range(layers)])
cell_bw = MultiRNNCell([apply_dropout(cell_fn(units), size = inputs.shape[-1] if i == 0 else units, is_training = is_training) for i in range(layers)])
else:
cell_fw, cell_bw = [apply_dropout(cell_fn(units), size = inputs.shape[-1], is_training = is_training) for _ in range(2)]
outputs, states = tf.nn.bidirectional_dynamic_rnn(cell_fw, cell_bw, inputs,
sequence_length = inputs_len,
dtype=tf.float32)
if output == 0:
return tf.concat(outputs, 2)
elif output == 1:
return tf.reshape(tf.concat(states,1),(Params.batch_size, shapes[1], 2*units))
def build_cells(self):
# encoder cell
with tf.name_scope('encoder_cell') as scope:
encoder_cell = rnn.MultiRNNCell([self.RNNCell(num_units=self.hidden_size)
for _ in range(self.encoder_layer_size)])
encoder_cell = rnn.DropoutWrapper(encoder_cell,
input_keep_prob=self.encoder_input_keep_prob,
output_keep_prob=self.encoder_output_keep_prob)
# decoder cell
with tf.name_scope('decoder_cell') as scope:
decoder_cell = rnn.MultiRNNCell([self.RNNCell(num_units=self.hidden_size)
for _ in range(self.decoder_layer_size)])
decoder_cell = rnn.DropoutWrapper(decoder_cell,
input_keep_prob=self.decoder_input_keep_prob,
output_keep_prob=self.decoder_output_keep_prob)
return encoder_cell, decoder_cell
def __build_rnn_cell(self):
with tf.name_scope('encoder_cell'):
encoder_cell = rnn.MultiRNNCell([self.RNN(num_units=self.hidden_layer_size)
for _ in range(self.encoder_layer_size)])
encoder_cell = rnn.DropoutWrapper(
cell=encoder_cell,
input_keep_prob=self.encoder_input_keep_prob,
output_keep_prob=self.encoder_output_keep_prob
)
with tf.name_scope('decoder_cell'):
decoder_cell = rnn.MultiRNNCell([self.RNN(num_units=self.hidden_layer_size)
for _ in range(self.decoder_layer_size)])
decoder_cell = rnn.DropoutWrapper(
cell=decoder_cell,
input_keep_prob=self.decoder_input_keep_prob,
output_keep_prob=self.decoder_output_keep_prob
)
return encoder_cell, decoder_cell
def _build_model(self, batch_size, helper_build_fn, decoder_maxiters=None, alignment_history=False):
# embed input_data into a one-hot representation
inputs = tf.one_hot(self.input_data, self._input_size, dtype=self._dtype)
inputs_len = self.input_lengths
with tf.name_scope('bidir-encoder'):
fw_cell = rnn.MultiRNNCell([rnn.BasicRNNCell(self._enc_rnn_size) for i in range(3)], state_is_tuple=True)
bw_cell = rnn.MultiRNNCell([rnn.BasicRNNCell(self._enc_rnn_size) for i in range(3)], state_is_tuple=True)
fw_cell_zero = fw_cell.zero_state(batch_size, self._dtype)
bw_cell_zero = bw_cell.zero_state(batch_size, self._dtype)
enc_out, _ = tf.nn.bidirectional_dynamic_rnn(fw_cell, bw_cell, inputs,
sequence_length=inputs_len,
initial_state_fw=fw_cell_zero,
initial_state_bw=bw_cell_zero)
with tf.name_scope('attn-decoder'):
dec_cell_in = rnn.GRUCell(self._dec_rnn_size)
attn_values = tf.concat(enc_out, 2)
attn_mech = seq2seq.BahdanauAttention(self._enc_rnn_size * 2, attn_values, inputs_len)
dec_cell_attn = rnn.GRUCell(self._enc_rnn_size * 2)
dec_cell_attn = seq2seq.AttentionWrapper(dec_cell_attn,
attn_mech,
self._enc_rnn_size * 2,
alignment_history=alignment_history)
dec_cell_out = rnn.GRUCell(self._output_size)
dec_cell = rnn.MultiRNNCell([dec_cell_in, dec_cell_attn, dec_cell_out],
state_is_tuple=True)
dec = seq2seq.BasicDecoder(dec_cell, helper_build_fn(),
dec_cell.zero_state(batch_size, self._dtype))
dec_out, dec_state = seq2seq.dynamic_decode(dec, output_time_major=False,
maximum_iterations=decoder_maxiters, impute_finished=True)
self.outputs = dec_out.rnn_output
self.output_ids = dec_out.sample_id
self.final_state = dec_state
def buildRNN(self,x,scope):
print(x)
x = tf.transpose(x, [1, 0, 2])
#print(x)
x = tf.reshape(x, [-1,self.nfeatures])
#print(x)
x = tf.split(x, self.n_steps, 0)
print(x)
#lstm_cell = rnn.MultiRNNCell([rnn.BasicLSTMCell(self.n_hidden, forget_bias=1.0) for _ in range(self.n_layers)], state_is_tuple=True)
#outputs, states = tf.nn.dynamic_rnn(lstm_cell, x, dtype=tf.float64)
with tf.name_scope("fw"+scope),tf.variable_scope("fw"+scope):
fw_cell_array = []
print(tf.get_variable_scope().name)
for _ in range(self.n_layers):
fw_cell = rnn.BasicLSTMCell(self.n_hidden, forget_bias=1.0, state_is_tuple=True)
#fw_cell = rnn.DropoutWrapper(fw_cell,output_keep_prob=self.dropout)
fw_cell_array.append(fw_cell)
fw_cell = rnn.MultiRNNCell(fw_cell_array, state_is_tuple=True)
with tf.name_scope("bw"+scope),tf.variable_scope("bw"+scope):
bw_cell_array = []
print(tf.get_variable_scope().name)
for _ in range(self.n_layers):
bw_cell = rnn.BasicLSTMCell(self.n_hidden, forget_bias=1.0, state_is_tuple=True)
#bw_cell = rnn.DropoutWrapper(bw_cell,output_keep_prob=self.dropout)
bw_cell_array.append(bw_cell)
bw_cell = rnn.MultiRNNCell(bw_cell_array, state_is_tuple=True)
outputs, _,_ = tf.contrib.rnn.static_bidirectional_rnn(fw_cell, bw_cell, x, dtype=tf.float64)
#outputs, = tf.nn.bidirectional_dynamic_rnn(fw_cell, bw_cell, x, dtype=tf.float64)
print(outputs)
print(outputs[-1])
return outputs[-1]
def attention_encoder(x, length,
num_blocks=3,
name=None, reuse=None):
with tf.variable_scope(name, "attention-encoder", values=[x, length],
reuse=reuse) as scope:
# get shapes
batch_size = x.get_shape().as_list()[0]
if batch_size is None:
batch_size = tf.shape(x)[0]
dims = int(x.get_shape()[-1])
# encode data
fw_cell = rnn.MultiRNNCell([
rnn.BasicRNNCell(dims, reuse=scope.reuse) for i in range(num_blocks)
], state_is_tuple=True)
bw_cell = rnn.MultiRNNCell([
rnn.BasicRNNCell(dims, reuse=scope.reuse) for i in range(num_blocks)
], state_is_tuple=True)
enc_out, _ = tf.nn.bidirectional_dynamic_rnn(
fw_cell, bw_cell,
x,
sequence_length=length,
initial_state_fw=fw_cell.zero_state(batch_size, tf.float32),
initial_state_bw=bw_cell.zero_state(batch_size, tf.float32)
)
enc_out = tf.concat(enc_out, 2)
return enc_out
def _set_train_model(self):
"""
define train graph
:return:
"""
# Create the internal multi-layer cell for our RNN.
if use_lstm:
single_cell1 = LSTMCell(self.enc_hidden_size)
single_cell2 = LSTMCell(self.dec_hidden_size)
else:
single_cell1 = GRUCell(self.enc_hidden_size)
single_cell2 = GRUCell(self.dec_hidden_size)
enc_cell = MultiRNNCell([single_cell1 for _ in range(self.enc_num_layers)])
dec_cell = MultiRNNCell([single_cell2 for _ in range(self.dec_num_layers)])
self.encoder_cell = enc_cell
self.decoder_cell = dec_cell
self._make_graph(forward_only)
self.saver = tf.train.Saver(tf.global_variables())
lstm_predictior.py 文件源码
项目:LSTM-Time-Series-Analysis-using-Tensorflow
作者: pusj
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def lstm_model(time_steps, rnn_layers, dense_layers=None, learning_rate=0.01, optimizer='Adagrad',learning_rate_decay_fn = None): # [Ftrl, Adam, Adagrad, Momentum, SGD, RMSProp]
print(time_steps)
#exit(0)
"""
Creates a deep model based on:
* stacked lstm cells
* an optional dense layers
:param num_units: the size of the cells.
:param rnn_layers: list of int or dict
* list of int: the steps used to instantiate the `BasicLSTMCell` cell
* list of dict: [{steps: int, keep_prob: int}, ...]
:param dense_layers: list of nodes for each layer
:return: the model definition
"""
def lstm_cells(layers):
print('-------------------------sdsdsdsdssd---------------------------------------------',layers)
if isinstance(layers[0], dict):
return [rnn.DropoutWrapper(rnn.BasicLSTMCell(layer['num_units'],state_is_tuple=True),layer['keep_prob'])
if layer.get('keep_prob')
else rnn.BasicLSTMCell(layer['num_units'], state_is_tuple=True)
for layer in layers]
return [rnn.BasicLSTMCell(steps, state_is_tuple=True) for steps in layers]
def dnn_layers(input_layers, layers):
if layers and isinstance(layers, dict):
return tflayers.stack(input_layers, tflayers.fully_connected,
layers['layers'],
activation=layers.get('activation'),
dropout=layers.get('dropout'))
elif layers:
return tflayers.stack(input_layers, tflayers.fully_connected, layers)
else:
return input_layers
def _lstm_model(X, y):
stacked_lstm = rnn.MultiRNNCell(lstm_cells(rnn_layers), state_is_tuple=True)
x_ = tf.unstack(X, num=time_steps, axis=1)
output, layers = rnn.static_rnn(stacked_lstm, x_, dtype=dtypes.float32)
output = dnn_layers(output[-1], dense_layers)
prediction, loss = tflearn.models.linear_regression(output, y)
train_op = tf.contrib.layers.optimize_loss(
loss, tf.contrib.framework.get_global_step(), optimizer=optimizer,
learning_rate = tf.train.exponential_decay(learning_rate, tf.contrib.framework.get_global_step(), decay_steps = 1000, decay_rate = 0.9, staircase=False, name=None))
print('learning_rate',learning_rate)
return prediction, loss, train_op
# https://www.tensorflow.org/versions/r0.10/api_docs/python/train/decaying_the_learning_rate
return _lstm_model
def _create_cells(self) -> List[MultiRNNCell]:
"""
Creates the multilayer-RNN cells required by the architecture of this RNN.
Returns
-------
list of MultiRNNCell
A list of MultiRNNCells containing one entry if the RNN is unidirectional, and two identical entries if the
RNN is bidirectional
"""
cells = [[self._create_rnn_cell()
for _ in range(self.num_layers)]
for _ in range(2 if self.bidirectional else 1)]
return [MultiRNNCell(x) for x in cells]
def create_model(session, restore_only=False):
# with bidirectional encoder, decoder state size should be
# 2x encoder state size
is_training = tf.placeholder(dtype=tf.bool, name='is_training')
encoder_cell = LSTMCell(64)
encoder_cell = MultiRNNCell([encoder_cell]*5)
decoder_cell = LSTMCell(128)
decoder_cell = MultiRNNCell([decoder_cell]*5)
model = Seq2SeqModel(encoder_cell=encoder_cell,
decoder_cell=decoder_cell,
vocab_size=wiki.vocab_size,
embedding_size=300,
attention=True,
bidirectional=True,
is_training=is_training,
device=args.device,
debug=False)
saver = tf.train.Saver(tf.global_variables(), keep_checkpoint_every_n_hours=1)
checkpoint = tf.train.get_checkpoint_state(checkpoint_dir)
if checkpoint:
print("Reading model parameters from %s" % checkpoint.model_checkpoint_path)
saver.restore(session, checkpoint.model_checkpoint_path)
elif restore_only:
raise FileNotFoundError("Cannot restore model")
else:
print("Created model with fresh parameters")
session.run(tf.global_variables_initializer())
tf.get_default_graph().finalize()
return model, saver
def build_cell(units, cell_type='lstm', num_layers=1):
if num_layers > 1:
cell = rnn.MultiRNNCell([
build_cell(units, cell_type, 1) for _ in range(num_layers)
])
else:
if cell_type == "lstm":
cell = rnn.LSTMCell(units)
elif cell_type == "gru":
cell = rnn.GRUCell(units)
else:
raise ValueError('Do not support %s' % cell_type)
return cell
def bi_lstm_layer(self,inputs):
if self.hidden_layer_num >1:
lstm_fw = rnn.MultiRNNCell([self.lstm_cell() for _ in range(self.hidden_layer_num)])
lstm_bw = rnn.MultiRNNCell([self.lstm_cell() for _ in range(self.hidden_layer_num)])
else:
lstm_fw = self.lstm_cell()
lstm_bw = self.lstm_cell()
outpus,_,_ = rnn.static_bidirectional_rnn(lstm_fw,lstm_bw,inputs,sequence_length=self.lengths,dtype=tf.float32)
features = tf.reshape(outpus,[-1,self.num_hidden *2])
return features
def bi_lstm_layer(self,inputs):
if self.hidden_layer_num >1:
lstm_fw = rnn.MultiRNNCell([self.lstm_cell() for _ in range(self.hidden_layer_num)])
lstm_bw = rnn.MultiRNNCell([self.lstm_cell() for _ in range(self.hidden_layer_num)])
else:
lstm_fw = self.lstm_cell()
lstm_bw = self.lstm_cell()
outpus,_,_ = rnn.static_bidirectional_rnn(lstm_fw,lstm_bw,inputs,dtype=tf.float32)
features = tf.reshape(outpus,[-1,self.hidden_neural_size *2])
return features
def bilstm_layer(self,inputs):
if self.hidden_layer_num >1:
lstm_fw = rnn.MultiRNNCell([self.lstm_fw() for _ in range(self.hidden_layer_num)])
lstm_bw = rnn.MultiRNNCell([self.lstm_bw() for _ in range(self.hidden_layer_num)])
else:
lstm_fw = self.lstm_fw()
lstm_bw = self.lstm_bw()
#outputs,_ = tf.nn.(cell_fw=lstm_fw,cell_bw=lstm_bw,inputs=inputs,dtype=tf.float32)
outputs,_,_ = rnn.static_bidirectional_rnn(lstm_fw,lstm_bw,inputs,dtype=tf.float32)
#outputs = tf.concat(outputs, 2)
output = outputs[-1]
return output
def RNN(layer_in, num_hidden_layers, num_hidden_units, num_inputs_in=155):
layer_in = tf.reshape(layer_in, [-1, 8 * 8])
n_features = layer_in.get_shape().as_list()[1]
num_inputs_in = 155
num_classes = 155
# reshape to [1, n_input]
X = tf.reshape(layer_in, [-1, n_features])
# Generate a n_input-element sequence of inputs
# (eg. [had] [a] [general] -> [20] [6] [33])
X = tf.split(X, n_features, 1)
# 1-layer LSTM with n_hidden units.
# rnn_cell = rnn.BasicLSTMCell(num_hidden)
rnn_cell = rnn.MultiRNNCell([rnn.BasicLSTMCell(num_hidden_units)] * num_hidden_layers)
# generate prediction
outputs, states = rnn.static_rnn(rnn_cell, X, dtype=tf.float32)
# there are n_input outputs but
# we only want the last output
weights = {
'out': tf.Variable(tf.random_normal([num_hidden_units, num_classes]))
}
biases = {
'out': tf.Variable(tf.random_normal([num_classes]))
}
return tf.matmul(outputs[-1], weights['out']) + biases['out']
def RNN(X, num_hidden_layers):
# reshape to [1, n_input]
std_dev_He = np.sqrt(2 / np.prod(X.get_shape().as_list()[1:]))
X = tf.reshape(X, [-1, sequence_length* 8*8])
# Generate a n_input-element sequence of inputs
# (eg. [had] [a] [general] -> [20] [6] [33])
X = tf.split(X, sequence_length, 1)
# 1-layer LSTM with n_hidden units.
# rnn_cell = rnn.BasicLSTMCell(n_hidden)
with tf.variable_scope('RNN', tf.random_normal_initializer(mean=0.0, stddev=std_dev_He)): #tf.random_normal_initializer(mean=0.0, stddev=std_dev_He) #initializer=tf.contrib.layers.xavier_initializer()
# weights = {
# 'out': tf.Variable(tf.random_normal([num_hidden, num_classes]))
# }
# biases = {
# 'out': tf.Variable(tf.random_normal([num_classes]))
# }
weights = tf.get_variable(
name='weights',
shape=[num_hidden, num_classes], # 1 x 64 filter in, 1 class out
dtype=tf.float32,
initializer=tf.contrib.layers.xavier_initializer())
biases = tf.get_variable(
name='biases',
shape=[num_classes],
dtype=tf.float32,
initializer=tf.constant_initializer(0.0))
GRU_cell_layer = [rnn.GRUCell(num_hidden)]
# LSTM_cell_layer = [rnn.BasicLSTMCell(num_hidden, forget_bias=1)]
rnn_cell = rnn.MultiRNNCell(GRU_cell_layer * num_hidden_layers)
# generate prediction
outputs, states = rnn.static_rnn(rnn_cell, X, dtype=tf.float32)
# there are n_input outputs but
# we only want the last output
# return tf.matmul(outputs[-1], weights['out']) + biases['out']
return tf.matmul(outputs[-1], weights) + biases
def RNN(layer_in, num_hidden_layers, num_hidden_units, num_inputs_in=155):
layer_in = tf.reshape(layer_in, [-1, 8 * 8])
n_features = layer_in.get_shape().as_list()[1]
num_inputs_in = 155
num_classes = 155
# reshape to [1, n_input]
X = tf.reshape(layer_in, [-1, n_features])
# Generate a n_input-element sequence of inputs
# (eg. [had] [a] [general] -> [20] [6] [33])
X = tf.split(X, n_features, 1)
# 1-layer LSTM with n_hidden units.
# rnn_cell = rnn.BasicLSTMCell(num_hidden)
rnn_cell = rnn.MultiRNNCell([rnn.BasicLSTMCell(num_hidden_units)] * num_hidden_layers)
# generate prediction
outputs, states = rnn.static_rnn(rnn_cell, X, dtype=tf.float32)
# there are n_input outputs but
# we only want the last output
weights = {
'out': tf.Variable(tf.random_normal([num_hidden_units, num_classes]))
}
biases = {
'out': tf.Variable(tf.random_normal([num_classes]))
}
return tf.matmul(outputs[-1], weights['out']) + biases['out']
def cell_create(self,scope_name):
with tf.variable_scope(scope_name):
if self.cell_type == 'tanh':
cells = rnn.MultiRNNCell([rnn.BasicRNNCell(self.n_hidden[i]) for i in range(self.n_layers)], state_is_tuple=True)
elif self.cell_type == 'LSTM':
cells = rnn.MultiRNNCell([rnn.BasicLSTMCell(self.n_hidden[i]) for i in range(self.n_layers)], state_is_tuple=True)
elif self.cell_type == 'GRU':
cells = rnn.MultiRNNCell([rnn.GRUCell(self.n_hidden[i]) for i in range(self.n_layers)], state_is_tuple=True)
elif self.cell_type == 'LSTMP':
cells = rnn.MultiRNNCell([rnn.LSTMCell(self.n_hidden[i]) for i in range(self.n_layers)], state_is_tuple=True)
cells = rnn.DropoutWrapper(cells, input_keep_prob=self.dropout_ph,output_keep_prob=self.dropout_ph)
return cells
def _build_cell(self, m, n_stack=1, wrappers=[]):
if n_stack == 1:
cell = self.c(m)
cell = rnn.MultiRNNCell([self.c(m) for _ in range(n_stack)])
# Apply wrappers; use functools.partial to bind other arguments
for wrapper in wrappers:
cell = wrapper(cell)
return cell
state_saving_rnn_estimator.py 文件源码
项目:DeepLearning_VirtualReality_BigData_Project
作者: rashmitripathi
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def lstm_cell(num_units, num_layers):
"""Constructs a `MultiRNNCell` with num_layers `BasicLSTMCell`s.
Args:
num_units: The number of units in the `RNNCell`.
num_layers: The number of layers in the RNN.
Returns:
An intiialized `MultiRNNCell`.
"""
return rnn_cell.MultiRNNCell([
rnn_cell.BasicLSTMCell(
num_units=num_units, state_is_tuple=True) for _ in range(num_layers)
])
dynamic_rnn_estimator.py 文件源码
项目:DeepLearning_VirtualReality_BigData_Project
作者: rashmitripathi
项目源码
文件源码
阅读 33
收藏 0
点赞 0
评论 0
def _to_rnn_cell(cell_or_type, num_units, num_layers):
"""Constructs and return an `RNNCell`.
Args:
cell_or_type: Either a string identifying the `RNNCell` type, a subclass of
`RNNCell` or an instance of an `RNNCell`.
num_units: The number of units in the `RNNCell`.
num_layers: The number of layers in the RNN.
Returns:
An initialized `RNNCell`.
Raises:
ValueError: `cell_or_type` is an invalid `RNNCell` name.
TypeError: `cell_or_type` is not a string or a subclass of `RNNCell`.
"""
if isinstance(cell_or_type, contrib_rnn.RNNCell):
return cell_or_type
if isinstance(cell_or_type, str):
cell_or_type = _CELL_TYPES.get(cell_or_type)
if cell_or_type is None:
raise ValueError('The supported cell types are {}; got {}'.format(
list(_CELL_TYPES.keys()), cell_or_type))
if not issubclass(cell_or_type, contrib_rnn.RNNCell):
raise TypeError(
'cell_or_type must be a subclass of RNNCell or one of {}.'.format(
list(_CELL_TYPES.keys())))
single_cell = lambda: cell_or_type(num_units=num_units)
if num_layers > 1:
cell = contrib_rnn.MultiRNNCell(
[single_cell() for _ in range(num_layers)], state_is_tuple=True)
else:
cell = single_cell()
return cell
def rnn_model(self):
# BasicLSTMCell ????LSTM?????????forget_bias(????1)?????????????????????????
# ??????????????????peep-hole????????
# BasicLSTMCell ????? rnn.python.ops?? core_rnn_cell_impl.py
cell = rnn.BasicLSTMCell(num_units=self.n_units)
# MultiRNNCell ????????????????????RNN????????????????????
# ??????????True state_is_tuple = True
# ???????LSTM??????????????????
multi_cell = rnn.MultiRNNCell([cell]*self.n_layers)
# we only need one output so get it wrapped to out one value which is next word index
# ? rnn_cell ??????????? output_size?????size ?????Output_projection?rnn_cell
cell_wrapped = rnn.OutputProjectionWrapper(multi_cell, output_size=1)
# get input embed
# tf.random_uniform(shape, minval, maxval, dtype, seed, name) ? ???? n*n????????minval ? maxval ??
embedding = tf.Variable(initial_value=tf.random_uniform([self.vocab_size, self.n_units], -1.0, 1.0))
# tf.nn.embedding_lokkup(embedding, inputs_id) : ??inputs_id??embedding??????????input_ids=[1,3,5],?
# ??embedding????1,3,5????????????
inputs = tf.nn.embedding_lookup(embedding, self.inputs)
# what is inputs dim??
# add initial state into dynamic rnn, if I am not result would be bad, I tried, don't know why
if self.labels is not None:
# zero_state ? ?????
initial_state = cell_wrapped.zero_state(int(inputs.get_shape()[0]), tf.float32)
else:
initial_state = cell_wrapped.zero_state(1, tf.float32)
# dynamic_rnn ?????????????batch?????????????????batch????????????????
# dynamic_rnn ? rnn ??
outputs, states = tf.nn.dynamic_rnn(cell_wrapped, inputs=inputs, dtype=tf.float32, initial_state=initial_state)
outputs = tf.reshape(outputs, [int(outputs.get_shape()[0]), int(inputs.get_shape()[1])])
# truncated_normal : ???????????
w = tf.Variable(tf.truncated_normal([int(inputs.get_shape()[1]), self.vocab_size]))
b = tf.Variable(tf.zeros([self.vocab_size]))
logits = tf.nn.bias_add(tf.matmul(outputs, w), b)
return logits, states
def HAN_model_1(session, restore_only=False):
"""Hierarhical Attention Network"""
import tensorflow as tf
try:
from tensorflow.contrib.rnn import GRUCell, MultiRNNCell, DropoutWrapper
except ImportError:
MultiRNNCell = tf.nn.rnn_cell.MultiRNNCell
GRUCell = tf.nn.rnn_cell.GRUCell
from bn_lstm import BNLSTMCell
from HAN_model import HANClassifierModel
is_training = tf.placeholder(dtype=tf.bool, name='is_training')
cell = BNLSTMCell(80, is_training) # h-h batchnorm LSTMCell
# cell = GRUCell(30)
cell = MultiRNNCell([cell]*5)
model = HANClassifierModel(
vocab_size=vocab_size,
embedding_size=200,
classes=classes,
word_cell=cell,
sentence_cell=cell,
word_output_size=100,
sentence_output_size=100,
device=args.device,
learning_rate=args.lr,
max_grad_norm=args.max_grad_norm,
dropout_keep_proba=0.5,
is_training=is_training,
)
saver = tf.train.Saver(tf.global_variables())
checkpoint = tf.train.get_checkpoint_state(checkpoint_dir)
if checkpoint:
print("Reading model parameters from %s" % checkpoint.model_checkpoint_path)
saver.restore(session, checkpoint.model_checkpoint_path)
elif restore_only:
raise FileNotFoundError("Cannot restore model")
else:
print("Created model with fresh parameters")
session.run(tf.global_variables_initializer())
# tf.get_default_graph().finalize()
return model, saver
def __init__(self, data, model='lstm', infer=False):
self.rnn_size = 128
self.n_layers = 2
if infer:
self.batch_size = 1
else:
self.batch_size = data.batch_size
if model == 'rnn':
cell_rnn = rnn.BasicRNNCell
elif model == 'gru':
cell_rnn = rnn.GRUCell
elif model == 'lstm':
cell_rnn = rnn.BasicLSTMCell
cell = cell_rnn(self.rnn_size, state_is_tuple=False)
self.cell = rnn.MultiRNNCell([cell] * self.n_layers, state_is_tuple=False)
self.x_tf = tf.placeholder(tf.int32, [self.batch_size, None])
self.y_tf = tf.placeholder(tf.int32, [self.batch_size, None])
self.initial_state = self.cell.zero_state(self.batch_size, tf.float32)
with tf.variable_scope('rnnlm'):
softmax_w = tf.get_variable("softmax_w", [self.rnn_size, data.words_size])
softmax_b = tf.get_variable("softmax_b", [data.words_size])
with tf.device("/cpu:0"):
embedding = tf.get_variable(
"embedding", [data.words_size, self.rnn_size])
inputs = tf.nn.embedding_lookup(embedding, self.x_tf)
outputs, final_state = tf.nn.dynamic_rnn(
self.cell, inputs, initial_state=self.initial_state, scope='rnnlm')
self.output = tf.reshape(outputs, [-1, self.rnn_size])
self.logits = tf.matmul(self.output, softmax_w) + softmax_b
self.probs = tf.nn.softmax(self.logits)
self.final_state = final_state
pred = tf.reshape(self.y_tf, [-1])
# seq2seq
loss = seq2seq.sequence_loss_by_example([self.logits],
[pred],
[tf.ones_like(pred, dtype=tf.float32)],)
self.cost = tf.reduce_mean(loss)
self.learning_rate = tf.Variable(0.0, trainable=False)
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(self.cost, tvars), 5)
optimizer = tf.train.AdamOptimizer(self.learning_rate)
self.train_op = optimizer.apply_gradients(zip(grads, tvars))