def _get_minibatch_feed_dict(self, target_q_values,
non_terminal_minibatch, terminal_minibatch):
"""
Helper to construct the feed_dict for train_op. Takes the non-terminal and
terminal minibatches as well as the max q-values computed from the target
network for non-terminal states. Computes the expected q-values based on
discounted future reward.
@return: feed_dict to be used for train_op
"""
assert len(target_q_values) == len(non_terminal_minibatch)
states = []
expected_q = []
actions = []
# Compute expected q-values to plug into the loss function
minibatch = itertools.chain(non_terminal_minibatch, terminal_minibatch)
for item, target_q in zip_longest(minibatch, target_q_values, fillvalue=0):
state, action, reward, _, _ = item
states.append(state)
# target_q will be 0 for terminal states due to fillvalue in zip_longest
expected_q.append(reward + self.config.reward_discount * target_q)
actions.append(utils.one_hot(action, self.env.action_space.n))
return {
self.network.x_placeholder: states,
self.network.q_placeholder: expected_q,
self.network.action_placeholder: actions,
}
python类one_hot()的实例源码
def gen_onehot_keys(input_size, batch_size, targets):
return np.vstack([one_hot(input_size, [t]) for t in targets])
def __init__(self, source):
print("Loading in word2vec model")
self.vmodel = gs.models.Word2Vec.load('vectors.bin')
print("Loading in text")
text = load2(source)
parsed_words = text.split(" ")
code_num = 1
print("Creating word -> vector dictionary...")
for word in parsed_words:
if not word in word_coding:
word_coding[word] = code_num
coded_word[code_num] = word
code_num += 1
vec_values[word] = self.vmodel[word]
coded_vector.append(word_coding[word])
print('Number of distinct words: ', len(word_coding))
sd_size = int(len(coded_vector) / sd_len)
x_d = y_d = v_d = i_d = []
for idx in range(0, sd_size - 1):
for iidx in range(0, sd_len - 1):
indexD = coded_vector[idx * sd_len + iidx + 0:(idx + 1) * sd_len + iidx]
i_D.append(indexD)
vectorValD = [vec_values[myWord] for myWord in
parsed_words[idx * sd_len + iidx + 0:(idx + 1) * sd_len + iidx]]
x_D.append(vectorValD)
y_D.append(one_hot(coded_vector[(idx + 1) * sd_len + iidx], word_coding))
v_D.append(vec_values[parsed_words[(idx + 1) * sd_len + iidx]])
self.x_d = np.asarray(x_D)
self.y_d = np.asarray(y_D)
self.v_d = np.asarray(v_D)
self.i_d = np.asarray(i_D)
print('shapes: ' + str(self.x_d.shape))
def main():
""" Test an RNN trained for TIMIT phoneme recognition. """
args, params_str, layer_kwargs = parse_args()
_, _, test_inputs, test_labels = timitphonemerec.load_split(args.data_dir, val=False,
mfcc=True, normalize=True)
# Input seqs have shape [length, INPUT_SIZE]. Label seqs are int8 arrays with shape [length],
# but need to have shape [length, 1] for the batch generator.
test_labels = [seq[:, np.newaxis] for seq in test_labels]
test_batches = utils.full_bptt_batch_generator(test_inputs, test_labels, TEST_BATCH_SIZE,
num_epochs=1, shuffle=False)
model = models.RNNClassificationModel(args.layer_type, INPUT_SIZE, TARGET_SIZE, args.num_hidden_units,
args.activation_type, **layer_kwargs)
def _error_rate(valid_predictions, valid_targets):
incorrect_mask = tf.logical_not(tf.equal(tf.argmax(valid_predictions, 1), tf.argmax(valid_targets, 1)))
return tf.reduce_mean(tf.to_float(incorrect_mask))
model.error_rate = _error_rate(model.valid_predictions, model.valid_targets)
config = tf.ConfigProto()
config.gpu_options.allow_growth = False
sess = tf.Session(config=config)
saver = tf.train.Saver()
saver.restore(sess, os.path.join(args.results_dir, 'model.ckpt'))
batch_inputs, batch_labels = next(test_batches)
batch_targets = utils.one_hot(np.squeeze(batch_labels, 2), TARGET_SIZE)
valid_predictions, valid_targets, error_rate = sess.run(
[model.valid_predictions, model.valid_targets, model.error_rate],
feed_dict={model.inputs: batch_inputs,
model.targets: batch_targets}
)
print('%f' % error_rate)
with open(os.path.join(args.results_dir, 'test_result.txt'), 'w') as f:
print('%f' % error_rate, file=f)
def main():
""" Test an RNN for sequential (possibly permuted) MNIST recognition. """
args, params_str, layer_kwargs = parse_args()
outs = mnist.load_split(args.data_dir, val=False, permute=args.permute, normalize=True, seed=0)
_, _, test_images, test_labels = outs
# Flatten the images.
test_inputs = test_images.reshape([len(test_images), -1, INPUT_SIZE])
# Align sequence-level labels with the appropriate time steps by padding with NaNs,
# and to do so, first convert the labels to floats.
length = test_inputs.shape[1]
pad = lambda x: np.pad(x, [[0, 0], [length - 1, 0], [0, 0]], mode='constant', constant_values=np.nan)
test_labels = pad(test_labels.reshape([-1, 1, 1]).astype(np.float))
test_batches = utils.full_bptt_batch_generator(test_inputs, test_labels, TEST_BATCH_SIZE, num_epochs=1,
shuffle=False)
model = models.RNNClassificationModel(args.layer_type, INPUT_SIZE, TARGET_SIZE, args.num_hidden_units,
args.activation_type, **layer_kwargs)
def _error_rate(valid_predictions, valid_targets):
incorrect_mask = tf.logical_not(tf.equal(tf.argmax(valid_predictions, 1), tf.argmax(valid_targets, 1)))
return tf.reduce_mean(tf.to_float(incorrect_mask))
model.error_rate = _error_rate(model.valid_predictions, model.valid_targets)
config = tf.ConfigProto()
config.gpu_options.allow_growth = False
sess = tf.Session(config=config)
saver = tf.train.Saver()
saver.restore(sess, os.path.join(args.results_dir, 'model.ckpt'))
error_rates = []
for batch_inputs, batch_labels in test_batches:
batch_targets = utils.one_hot(np.squeeze(batch_labels, 2), TARGET_SIZE)
valid_predictions, valid_targets, batch_error_rates = sess.run(
[model.valid_predictions, model.valid_targets, model.error_rate],
feed_dict={model.inputs: batch_inputs,
model.targets: batch_targets}
)
error_rates.append(batch_error_rates)
error_rate = np.mean(error_rates, dtype=np.float)
print('%f' % error_rate)
with open(os.path.join(args.results_dir, 'test_result.txt'), 'w') as f:
print('%f' % error_rate, file=f)