def _build_policy(env, predictor, epsilon):
eye = np.eye(env.num_states)
q_values = predictor.predict(
{str(i): eye[i]
for i in range(env.num_states)}
)
policy_vector = [
env.ACTIONS[np.argmax([q_values[action][i] for action in env.ACTIONS])]
for i in range(env.num_states)
]
def policy(state) -> str:
if np.random.random() < epsilon:
return np.random.choice(env.ACTIONS)
else:
return policy_vector[state]
return policy
python类argmax()的实例源码
def choose_action(d, c, q_table):
global epsilon
state_actions = q_table[d][c][:]
# random move or no data recorded for this state yet
if (np.random.uniform() < epsilon) or (np.sum(state_actions) == 0):
action_chose = np.random.randint(n_actions)
# decrease random moves over time to a minimum of 10%
if epsilon > 0.1:
epsilon *= 0.9
else:
action_chose = state_actions.argmax()
return action_chose
def choose_action(d, c, q_table):
global epsilon
state_actions = q_table[d][c][:]
# random move or no data recorded for this state yet
if (np.random.uniform() < epsilon) or (np.sum(state_actions) == 0):
action_chose = np.random.randint(n_actions)
# decrease random moves over time to a minimum of 10%
if epsilon > 0.1: epsilon *= 0.9
else:
action_chose = state_actions.argmax()
return action_chose
def get_predicted_antecedents(self, antecedents, antecedent_scores):
"""
Forms a list of predicted antecedent labels
Args:
antecedents: [] get from C++ function
antecedent_scores: [num_mentions, max_ant + 1] output of fully-connected network
that compute antecedent_scores
Returns: a list of predicted antecedent labels
"""
predicted_antecedents = []
for i, index in enumerate(np.argmax(antecedent_scores, axis=1) - 1):
if index < 0:
predicted_antecedents.append(-1)
else:
predicted_antecedents.append(antecedents[i, index])
return predicted_antecedents
def multiclass_classifier(X_train, Y_train, X_val, Y_val, X_test, Y_test, nb_epoch=200, batch_size=10, seed=7):
clf = softmax_network(X_train.shape[1], Y_train.shape[1])
clf.fit(X_train, Y_train,
epochs=nb_epoch,
batch_size=batch_size,
shuffle=True,
validation_data=(X_val, Y_val),
callbacks=[
ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=3, min_lr=0.01),
EarlyStopping(monitor='val_loss', min_delta=1e-5, patience=5, verbose=0, mode='auto'),
]
)
acc = clf.test_on_batch(X_test, Y_test)[1]
# confusion matrix and precision-recall
true = np.argmax(Y_test,axis=1)
pred = np.argmax(clf.predict(X_test), axis=1)
print confusion_matrix(true, pred)
print classification_report(true, pred)
return acc
def adjust_prediction(self, probability, image):
crf = dcrf.DenseCRF(np.prod(probability.shape), 2)
# crf = dcrf.DenseCRF(np.prod(probability.shape), 1)
binary_prob = np.stack((1 - probability, probability), axis=0)
unary = unary_from_softmax(binary_prob)
# unary = unary_from_softmax(np.expand_dims(probability, axis=0))
crf.setUnaryEnergy(unary)
# per dimension scale factors
sdims = [self.sdims] * 3
smooth = create_pairwise_gaussian(sdims=sdims, shape=probability.shape)
crf.addPairwiseEnergy(smooth, compat=2)
if self.schan:
# per channel scale factors
schan = [self.schan] * 6
appearance = create_pairwise_bilateral(sdims=sdims, schan=schan, img=image, chdim=3)
crf.addPairwiseEnergy(appearance, compat=2)
result = crf.inference(self.iter)
crf_prediction = np.argmax(result, axis=0).reshape(probability.shape).astype(np.float32)
return crf_prediction
def evaluate_performance(ladder, valid_loader, e, agg_cost_scaled, agg_supervised_cost_scaled,
agg_unsupervised_cost_scaled, args):
correct = 0.
total = 0.
for batch_idx, (data, target) in enumerate(valid_loader):
if args.cuda:
data = data.cuda()
data, target = Variable(data), Variable(target)
output = ladder.forward_encoders_clean(data)
# TODO: Do away with the below hack for GPU tensors.
if args.cuda:
output = output.cpu()
target = target.cpu()
output = output.data.numpy()
preds = np.argmax(output, axis=1)
target = target.data.numpy()
correct += np.sum(target == preds)
total += target.shape[0]
print("Epoch:", e + 1, "\t",
"Total Cost:", "{:.4f}".format(agg_cost_scaled), "\t",
"Supervised Cost:", "{:.4f}".format(agg_supervised_cost_scaled), "\t",
"Unsupervised Cost:", "{:.4f}".format(agg_unsupervised_cost_scaled), "\t",
"Validation Accuracy:", correct / total)
def extract_digits(self, image):
"""
Extract digits from a binary image representing a sudoku
:param image: binary image/sudoku
:return: array of digits and their probabilities
"""
prob = np.zeros(4, dtype=np.float32)
digits = np.zeros((4, 9, 9), dtype=object)
for i in range(4):
labeled, features = label(image, structure=CROSS)
objs = find_objects(labeled)
for obj in objs:
roi = image[obj]
# center of bounding box
cy = (obj[0].stop + obj[0].start) / 2
cx = (obj[1].stop + obj[1].start) / 2
dists = cdist([[cy, cx]], CENTROIDS, 'euclidean')
pos = np.argmin(dists)
cy, cx = pos % 9, pos / 9
# 28x28 image, center relative to sudoku
prediction = self.classifier.classify(morph(roi))
if digits[i, cy, cx] is 0:
# Newly found digit
digits[i, cy, cx] = prediction
prob[i] += prediction[0, 0]
elif prediction[0, 0] > digits[i, cy, cx][0, 0]:
# Overlapping! (noise), choose the most probable prediction
prob[i] -= digits[i, cy, cx][0, 0]
digits[i, cy, cx] = prediction
prob[i] += prediction[0, 0]
image = np.rot90(image)
logging.info(prob)
return digits[np.argmax(prob)]
def extract_digits(self, image):
"""
Extract digits from a binary image representing a sudoku
:param image: binary image/sudoku
:return: array of digits and their probabilities
"""
prob = np.zeros(4, dtype=np.float32)
digits = np.zeros((4, 9, 9), dtype=object)
for i in range(4):
labeled, features = label(image, structure=CROSS)
objs = find_objects(labeled)
for obj in objs:
roi = image[obj]
# center of bounding box
cy = (obj[0].stop + obj[0].start) / 2
cx = (obj[1].stop + obj[1].start) / 2
dists = cdist([[cy, cx]], CENTROIDS, 'euclidean')
pos = np.argmin(dists)
cy, cx = pos % 9, pos / 9
# 28x28 image, center relative to sudoku
prediction = self.classifier.classify(morph(roi))
if digits[i, cy, cx] is 0:
# Newly found digit
digits[i, cy, cx] = prediction
prob[i] += prediction[0, 0]
elif prediction[0, 0] > digits[i, cy, cx][0, 0]:
# Overlapping! (noise), choose the most probable prediction
prob[i] -= digits[i, cy, cx][0, 0]
digits[i, cy, cx] = prediction
prob[i] += prediction[0, 0]
image = np.rot90(image)
logging.info(prob)
return digits[np.argmax(prob)]
def predict(self, data):
predictions = []
for name in data:
x = np.array([self.__parse_name(name)])
prediction = self.__model.predict([x])
predictions.append(self.__indexes_classes[np.argmax(prediction)])
return predictions
def binarize_predictions(array, task='binary.classification'):
''' Turn predictions into decisions {0,1} by selecting the class with largest
score for multiclass problems and thresholding at 0.5 for other cases.'''
# add a very small random value as tie breaker (a bit bad because this changes the score every time)
# so to make sure we get the same result every time, we seed it
#eps = 1e-15
#np.random.seed(sum(array.shape))
#array = array + eps*np.random.rand(array.shape[0],array.shape[1])
bin_array = np.zeros(array.shape)
if (task != 'multiclass.classification') or (array.shape[1]==1):
bin_array[array>=0.5] = 1
else:
sample_num=array.shape[0]
for i in range(sample_num):
j = np.argmax(array[i,:])
bin_array[i,j] = 1
return bin_array
def run_epoch_doc(docs, labels, tags, tm, pad_id, cf):
batches = int(math.ceil(float(len(docs))/cf.batch_size))
accs = []
for b in xrange(batches):
d, y, m, t, num_docs = get_batch_doc(docs, labels, tags, b, cf.doc_len, cf.tag_len, cf.batch_size, pad_id)
prob = sess.run(tm.sup_probs, {tm.doc:d, tm.label:y, tm.sup_mask: m, tm.tag: t})
pred = np.argmax(prob, axis=1)
accs.extend(pred[:num_docs] == y[:num_docs])
print "\ntest classification accuracy = %.3f" % np.mean(accs)
def gen_sent_on_doc(docs, tags, idxvocab, vocabxid, start_symbol, end_symbol, cf):
topics, _ = tm.get_topics(sess, topn=topn)
topics = [ " ".join([idxvocab[w] for w in t]) for t in topics ]
doc_text = [ item.replace("\t", "\n") for item in codecs.open(args.input_doc, "r", "utf-8").readlines() ]
output = codecs.open(args.gen_sent_on_doc, "w", "utf-8")
with tf.variable_scope("model", reuse=True, initializer=initializer):
mgen = LM(is_training=False, vocab_size=len(idxvocab), batch_size=1, num_steps=1, config=cf, \
reuse_conv_variables=True)
for d in range(len(docs)):
output.write("\n" + "="*100 + "\n")
output.write("Doc " + str(d) +":\n")
output.write(doc_text[d])
doc, _, _, t, _ = get_batch_doc(docs, None, tags, d, cf.doc_len, cf.tag_len, 1, vocabxid[pad_symbol])
best_topics, best_words = mgen.get_topics_on_doc(sess, doc, t, topn)
output.write("\nRepresentative topics:\n")
output.write("\n".join([ ("[%.3f] %s: %s" % (item[1],str(item[0]).zfill(3),topics[item[0]])) \
for item in best_topics ]) + "\n")
output.write("\nRepresentative words:\n")
output.write("\n".join([ ("[%.3f] %s" % (item[1], idxvocab[item[0]])) for item in best_words ]) + "\n")
output.write("\nSentence generation (greedy; argmax):" + "\n")
s = mgen.generate_on_doc(sess, doc, t, vocabxid[start_symbol], 0, cf.lm_sent_len+10, vocabxid[end_symbol])
output.write("[0] " + " ".join([ idxvocab[item] for item in s ]) + "\n")
for temp in gen_temps:
output.write("\nSentence generation (random; temperature = " + str(temp) + "):\n")
for i in xrange(gen_num):
s = mgen.generate_on_doc(sess, doc, t, vocabxid[start_symbol], temp, cf.lm_sent_len+10, \
vocabxid[end_symbol])
output.write("[" + str(i) + "] " + " ".join([ idxvocab[item] for item in s ]) + "\n")
######
#main#
######
#load the vocabulary
def sample(self, probs, temperature):
if temperature == 0:
return np.argmax(probs)
probs = probs.astype(np.float64) #convert to float64 for higher precision
probs = np.log(probs) / temperature
probs = np.exp(probs) / math.fsum(np.exp(probs))
return np.argmax(np.random.multinomial(1, probs, 1))
#generate a sentence given conv_hidden
def write_predictions(self, inputs):
rev_label_map = {j: i for (i, j) in self.label_map.items()}
predictions = numpy.argmax(self.model.predict(inputs), axis=1)
test_output_file = open("%s.predictions" % self.model_name_prefix, "w")
for prediction in predictions:
print >>test_output_file, rev_label_map[prediction + 1]
def test(self, inputs, targets):
if not self.model:
raise RuntimeError, "Model not trained!"
metrics = self.model.evaluate(inputs, targets)
print >>sys.stderr, "Test accuracy: %.4f" % (metrics[1]) # The first metric is loss.
predictions = numpy.argmax(self.model.predict(inputs), axis=1)
rev_label_map = {ind: label for label, ind in self.label_map.items()}
predicted_labels = [rev_label_map[pred] for pred in predictions]
return predicted_labels
def calculate_assignments(self, assignment_weights):
clusters = np.argmax(assignment_weights, axis=1)
return clusters
def calculate_roc(thresholds, embeddings1, embeddings2, actual_issame, nrof_folds=10):
assert(embeddings1.shape[0] == embeddings2.shape[0])
assert(embeddings1.shape[1] == embeddings2.shape[1])
nrof_pairs = min(len(actual_issame), embeddings1.shape[0])
nrof_thresholds = len(thresholds)
k_fold = KFold(n_splits=nrof_folds, shuffle=False)
tprs = np.zeros((nrof_folds,nrof_thresholds))
fprs = np.zeros((nrof_folds,nrof_thresholds))
accuracy = np.zeros((nrof_folds))
diff = np.subtract(embeddings1, embeddings2)
dist = np.sum(np.square(diff),1)
indices = np.arange(nrof_pairs)
for fold_idx, (train_set, test_set) in enumerate(k_fold.split(indices)):
# Find the best threshold for the fold
acc_train = np.zeros((nrof_thresholds))
for threshold_idx, threshold in enumerate(thresholds):
_, _, acc_train[threshold_idx] = calculate_accuracy(threshold, dist[train_set], actual_issame[train_set])
best_threshold_index = np.argmax(acc_train)
for threshold_idx, threshold in enumerate(thresholds):
tprs[fold_idx,threshold_idx], fprs[fold_idx,threshold_idx], _ = calculate_accuracy(threshold, dist[test_set], actual_issame[test_set])
_, _, accuracy[fold_idx] = calculate_accuracy(thresholds[best_threshold_index], dist[test_set], actual_issame[test_set])
tpr = np.mean(tprs,0)
fpr = np.mean(fprs,0)
return tpr, fpr, accuracy
def length_histogram(fqin, name):
'''
Create a histogram, and return the bin edges of the bin containing the most reads
'''
logging.info("Creating length histogram to find bin with most reads.")
lengths = get_lengths(fqin)
plt.hist(lengths, bins='auto')
plt.savefig(name, format='png', dpi=100)
plt.close("all")
hist, bin_edges = np.histogram(lengths, bins='auto')
maxindex = np.argmax(hist)
return (bin_edges[maxindex], bin_edges[maxindex + 1])
def test_estimate_tree(num_edges):
set_random_seed(0)
E = num_edges
V = 1 + E
grid = make_complete_graph(V)
K = grid.shape[1]
edge_logits = np.random.random([K]) - 0.5
edges = estimate_tree(grid, edge_logits)
# Check size.
assert len(edges) == E
for v in range(V):
assert any(v in edge for edge in edges)
# Check optimality.
edges = tuple(edges)
if V < len(TREE_GENERATORS):
all_trees = get_spanning_trees(V)
assert edges in all_trees
all_trees = list(all_trees)
logits = []
for tree in all_trees:
logits.append(
sum(edge_logits[find_complete_edge(u, v)] for (u, v) in tree))
expected = all_trees[np.argmax(logits)]
assert edges == expected