def getAUC(self,test_tasks):
mean_tpr = 0.0
mean_fpr = np.linspace(0, 1, 100)
for t in range(self.n_tasks):
X_t, Y_t = self.extractTaskData(self.train_tasks,t)
X_test_t, Y_test_t = self.extractTaskData(test_tasks, t)
overallKernel = self.constructKernelFunction(t)
self.classifiers[t] = SVC(C=self.C, kernel=overallKernel, probability=True, max_iter=self.max_iter_internal, tol=self.tolerance)
probas_ = self.classifiers[t].fit(X_t, Y_t).predict_proba(X_test_t)
fpr, tpr, thresholds = roc_curve(Y_test_t, probas_[:, 1])
mean_tpr += interp(mean_fpr, fpr, tpr)
mean_tpr[0] = 0.0
mean_tpr /= self.n_tasks
mean_tpr[-1] = 1.0
mean_auc = auc(mean_fpr, mean_tpr)
return mean_auc, mean_fpr, mean_tpr
python类interp()的实例源码
def addFold(self, fold_id, true_labels, predicted_proba, predicted_scores):
if len(true_labels) == 0:
return
if self.probabilist_model:
scores = predicted_proba
else:
scores = predicted_scores
fpr, tpr, thresholds = roc_curve(true_labels, scores)
self.mean_tpr += interp(self.mean_fpr, fpr, tpr)
self.thresholds = interp(self.mean_fpr, fpr, thresholds)
self.mean_tpr[0] = 0.0
self.thresholds[0] = 1.0
self.thresholds[-1] = 0.0
roc_auc = auc(fpr, tpr)
if self.num_folds > 1:
self.ax1.plot(fpr, tpr, lw = 1,
label = 'ROC fold %d (area = %0.2f)' % (fold_id, roc_auc))
else:
self.ax1.plot(fpr, tpr, lw = 3,
color = colors_tools.getLabelColor('all'),
label = 'ROC (area = %0.2f)' % (roc_auc))
def solve_nonlinear(self, params, unknowns, resids):
# obtain necessary inputs
direction_id = self.direction_id
pP = self.params['gen_params:pP']
wind_speed_ax = np.cos(self.params['yaw%i' % direction_id]*np.pi/180.0)**(pP/3.0)*self.params['wtVelocity%i' % direction_id]
# use interpolation on precalculated CP-CT curve
wind_speed_ax = np.maximum(wind_speed_ax, self.params['gen_params:windSpeedToCPCT_wind_speed'][0])
wind_speed_ax = np.minimum(wind_speed_ax, self.params['gen_params:windSpeedToCPCT_wind_speed'][-1])
self.unknowns['Cp_out'] = interp(wind_speed_ax, self.params['gen_params:windSpeedToCPCT_wind_speed'], self.params['gen_params:windSpeedToCPCT_CP'])
self.unknowns['Ct_out'] = interp(wind_speed_ax, self.params['gen_params:windSpeedToCPCT_wind_speed'], self.params['gen_params:windSpeedToCPCT_CT'])
# for i in range(0, len(self.unknowns['Ct_out'])):
# self.unknowns['Ct_out'] = max(max(self.unknowns['Ct_out']), self.unknowns['Ct_out'][i])
# normalize on incoming wind speed to correct coefficients for yaw
self.unknowns['Cp_out'] = self.unknowns['Cp_out'] * np.cos(self.params['yaw%i' % direction_id]*np.pi/180.0)**pP
self.unknowns['Ct_out'] = self.unknowns['Ct_out'] * np.cos(self.params['yaw%i' % direction_id]*np.pi/180.0)**2
# print 'in CPCT interp, wind_speed_hub = ', self.params['wtVelocity%i' % direction_id]
# print 'in CPCT: ', params['velocitiesTurbines0']
def _score_macro_average(self, n_classes):
"""
Compute the macro average scores for the ROCAUC curves.
"""
# Gather all FPRs
all_fpr = np.unique(np.concatenate([self.fpr[i] for i in range(n_classes)]))
avg_tpr = np.zeros_like(all_fpr)
# Compute the averages per class
for i in range(n_classes):
avg_tpr += interp(all_fpr, self.fpr[i], self.tpr[i])
# Finalize the average
avg_tpr /= n_classes
# Store the macro averages
self.fpr[MACRO] = all_fpr
self.tpr[MACRO] = avg_tpr
self.roc_auc[MACRO] = auc(self.fpr[MACRO], self.tpr[MACRO])
##########################################################################
## Quick method for ROCAUC
##########################################################################
def plot_allkfolds_ROC(timestamp, cv, fpr_arr, tpr_arr):
sns.set(style="white", palette="muted", color_codes=True)
mean_tpr = 0.0
mean_fpr = 0.0
all_roc_auc = []
bins_roc = np.linspace(0, 1, 300)
with plt.style.context(('seaborn-muted')):
fig, ax = plt.subplots(figsize=(10, 8))
for i, (train, test) in enumerate(cv):
mean_tpr += interp(bins_roc, fpr_arr[i], tpr_arr[i])
mean_tpr[0] = 0.0
mean_fpr += interp(bins_roc, fpr_arr[i], tpr_arr[i])
mean_fpr[0] = 0.0
roc_auc = metrics.auc(fpr_arr[i], tpr_arr[i])
all_roc_auc.append(roc_auc)
ax.plot(fpr_arr[i], tpr_arr[i], lw=1, label='KFold %d (AUC = %0.2f)' % (i, roc_auc))
ax.plot([0, 1], [0, 1], '--', color=(0.6, 0.6, 0.6), label='Random')
mean_tpr /= len(cv)
mean_tpr[-1] = 1.0
mean_auc = np.mean(all_roc_auc)
ax.plot(bins_roc, mean_tpr, 'k--',
label='Mean ROC (AUC = %0.2f)' % mean_auc, lw=2)
ax.set_xlim([-0.05, 1.05])
ax.set_ylim([-0.05, 1.05])
ax.set_xlabel('False Positive Rate')
ax.set_ylabel('True Positive Rate')
ax.set_title('Receiver Operating Characteristic')
ax.legend(loc="lower right")
plt.savefig('{}_roc.png'.format(timestamp))
plt.close('all')
return mean_auc
def print_metrics(clf):
#scores = cross_validation.cross_val_score(clf,features,labels,cv=5,scoring='accuracy')
#print 'Accuracy:',scores.mean()
cv = cross_validation.StratifiedKFold(labels,n_folds=5)
mean_tpr = 0.0
mean_fpr = np.linspace(0,1,100)
all_tpr = []
for i, (train,test) in enumerate(cv):
probas_ = clf.fit(features[train],labels[train]).predict_proba(features[test])
fpr,tpr,thresholds = metrics.roc_curve(labels[test],probas_[:,1])
mean_tpr += interp(mean_fpr,fpr,tpr)
mean_tpr[0] = 0.0
roc_auc = metrics.auc(fpr,tpr)
plt.plot(fpr,tpr,lw=1,label='ROC fold %d (area = %0.2f)' % (i,roc_auc))
plt.plot([0,1],[0,1],'--',color=(0.6,0.6,0.6),label='Luck')
mean_tpr /= len(cv)
mean_tpr[-1] = 1.0
mean_auc = metrics.auc(mean_fpr, mean_tpr)
plt.plot(mean_fpr, mean_tpr, 'k--',
label='Mean ROC (area = %0.2f)' % mean_auc, lw=2)
plt.xlim([-0.05, 1.05])
plt.ylim([-0.05, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic')
plt.legend(loc="lower right")
plt.savefig('auc_sent.png')
imdb_success_predictor.py 文件源码
项目:Movie-Success-Predictor
作者: Blueteak
项目源码
文件源码
阅读 37
收藏 0
点赞 0
评论 0
def test_classifier(clf, X, Y, loc):
folds = StratifiedKFold(Y, 5)
mean_tpr = 0.0
mean_fpr = numpy.linspace(0, 1, 100)
aucs = []
for i, (train, test) in enumerate(folds):
clf.fit(X[train], Y[train])
prediction = clf.predict_proba(X[test])
aucs.append(roc_auc_score(Y[test], prediction[:, 1]))
false_positive_rate, true_positive_rate, thresholds = roc_curve(Y[test], prediction[:, 1])
mean_tpr += interp(mean_fpr, false_positive_rate, true_positive_rate)
mean_tpr[0] = 0.0
roc_auc = auc(false_positive_rate, true_positive_rate)
plt.plot(false_positive_rate, true_positive_rate, lw=1,
label='ROC fold %d (area = %0.2f)' % ( i, roc_auc))
plt.plot([0, 1], [0, 1], '--', color=(0.6, 0.6, 0.6), label='Luck')
mean_tpr /= len(folds)
mean_tpr[-1] = 1.0
mean_auc = auc(mean_fpr, mean_tpr)
plt.plot(mean_fpr, mean_tpr, 'k--',
label='Mean ROC (area = %0.2f)' % mean_auc, lw=2)
plt.title('Receiver Operating Characteristic')
plt.xlim([0,1])
plt.ylim([0,1])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
plt.legend(loc='lower right')
plt.show()
plt.savefig('plots/'+loc+'/'+clf.__class__.__name__+'.png')
plt.clf()
print clf.__class__.__name__, aucs, numpy.mean(aucs)
def save_plots(roc_auc, fpr, tpr, nb_classes, path):
# aggregate all false positive rates
all_fpr = np.unique(np.concatenate([fpr[i] for i in range(nb_classes)]))
# interpolate all ROC curves at this points
mean_tpr = np.zeros_like(all_fpr)
for i in range(nb_classes):
mean_tpr += interp(all_fpr, fpr[i], tpr[i])
# average and compute AUC
mean_tpr /= nb_classes
fpr["macro"] = all_fpr
tpr["macro"] = mean_tpr
roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])
# plot
plt.figure()
plt.plot(fpr["micro"], tpr["micro"],
label='micro-average ROC curve (area = {0:0.2f})'.format(roc_auc["micro"]),
linewidth=2)
plt.plot(fpr["macro"], tpr["macro"],
label='macro-average ROC curve (area = {0:0.2f})'.format(roc_auc["macro"]),
linewidth=2)
for i in range(nb_classes):
plt.plot(fpr[i], tpr[i],
label='ROC curve of class {0} (area = {1:0.2f})'.format(i, roc_auc[i]))
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Some extension of Receiver operating characteristic to multi-class')
plt.legend(loc="lower right")
plt.savefig(path) # save plot
plt.close()
def plot_roc_per_class(model, test_data, test_truth, labels, title):
# Compute macro-average ROC curve and ROC area
fpr, tpr, roc_auc = get_fpr_tpr_roc(model, test_data, test_truth, labels)
# First aggregate all false positive rates
n_classes = len(labels)
all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))
# Then interpolate all ROC curves at this points
mean_tpr = np.zeros_like(all_fpr)
for i in range(n_classes):
mean_tpr += interp(all_fpr, fpr[i], tpr[i])
# Finally average it and compute AUC
mean_tpr /= n_classes
fpr["macro"] = all_fpr
tpr["macro"] = mean_tpr
roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])
# Plot all ROC curves
lw = 2
plt.figure(figsize=(20,16))
colors = cycle(['aqua', 'darkorange', 'cornflowerblue', 'green', 'pink', 'magenta', 'grey', 'purple'])
idx = 0
for key, color in zip(labels.keys(), colors):
plt.plot( fpr[idx], tpr[idx], color=color, lw=lw, label='ROC curve of class '+str(key) )
idx += 1
plt.plot([0, 1], [0, 1], 'k--', lw=lw)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC:'+ str(labels) + '\n' + title)
plt.legend(loc="lower right")
plt.savefig("./per_class_roc_"+title+".jpg")
def train_test(X, Y, choice):
estimators = []
if choice == 1:
estimators = build_model_mlp()
elif choice == 2:
estimators = build_model_rbm()
elif choice == 3:
estimators = build_model_rf()
elif choice == 4:
estimators = build_model_svc()
elif choice == 5:
estimators = build_model_latent()
clf = Pipeline(estimators)
mean_tpr = 0.0
mean_fpr = np.linspace(0., 1., 30)
auc_all = []
num_of_exp = 20
for i in range(1, num_of_exp+1):
print "?%d???,%d/%d." % (i, i, num_of_exp)
x_train, x_test, y_train, y_test = \
train_test_split(X, Y,
test_size=0.2,
random_state=np.random.randint(1, 100))
clf.fit(x_train, y_train)
y_pred = clf.predict_proba(x_test)[:, 1]
fpr, tpr, _ = metrics.roc_curve(y_test, y_pred)
mean_tpr += interp(mean_fpr, fpr, tpr)
mean_tpr[0] = 0.0
auc_all.append(metrics.roc_auc_score(y_test, y_pred))
mean_tpr /= num_of_exp
auc_array = np.array(auc_all)
auc = auc_array.mean()
auc_std = auc_array.std()
mean_tpr[-1] = 1.0
return mean_tpr, auc, auc_std
def unsupervised_method(attr, label):
fpr_, tpr_, thresholds_ = metrics.roc_curve(label, attr)
mean_tpr_ = 0.0
mean_fpr_ = np.linspace(0., 1., 30)
mean_tpr_ += interp(mean_fpr_, fpr_, tpr_)
roc_auc_attr = metrics.auc(fpr_, tpr_)
return mean_tpr_, roc_auc_attr
def train_test(X, Y, ratio):
estimators = build_model_mlp()
clf = Pipeline(estimators)
# clf = RandomForestClassifier(n_jobs=-1, n_estimators=12)
mean_tpr = 0.0
mean_fpr = np.linspace(0., 1., 30)
auc_all = []
folds = StratifiedKFold(Y, n_folds=10,
shuffle=True,
random_state=np.random.randint(1, 100))
# num_of_exp = 1
for i, (train, test) in enumerate(folds):
print "?%d?." % i
# x_train, x_test, y_train, y_test = \
# train_test_split(X, Y,
# test_size=ratio,
# random_state=np.random.randint(1, 100))
x_train, y_train = X[train], Y[train]
x_test, y_test = X[test], Y[test]
clf.fit(x_train, y_train)
y_pred = clf.predict_proba(x_test)[:, 1]
fpr, tpr, _ = metrics.roc_curve(y_test, y_pred)
mean_tpr += interp(mean_fpr, fpr, tpr)
mean_tpr[0] = 0.0
auc_all.append(metrics.roc_auc_score(y_test, y_pred))
mean_tpr /= len(folds)
auc_array = np.array(auc_all)
auc = auc_array.mean()
auc_std = auc_array.std()
mean_tpr[-1] = 1.0
return mean_tpr, auc, auc_std
def train_test(X, Y, ratio):
estimators = build_model_mlp()
clf = Pipeline(estimators)
# clf = RandomForestClassifier(n_jobs=-1, n_estimators=12)
mean_tpr = 0.0
mean_fpr = np.linspace(0., 1., 30)
auc_all = []
num_of_exp = 20
for i in range(1, num_of_exp+1):
print "?%d???,%d/%d." % (i, i, num_of_exp)
x_train, x_test, y_train, y_test = \
train_test_split(X, Y,
test_size=ratio,
random_state=np.random.randint(1, 100))
clf.fit(x_train, y_train)
y_pred = clf.predict_proba(x_test)[:, 1]
fpr, tpr, _ = metrics.roc_curve(y_test, y_pred)
mean_tpr += interp(mean_fpr, fpr, tpr)
mean_tpr[0] = 0.0
auc_all.append(metrics.roc_auc_score(y_test, y_pred))
mean_tpr /= num_of_exp
auc_array = np.array(auc_all)
auc = auc_array.mean()
auc_std = auc_array.std()
mean_tpr[-1] = 1.0
return mean_tpr, auc, auc_std
def train_test(X, Y, ratio):
# estimators = build_model_mlp()
# clf = Pipeline(estimators)
clf = RandomForestClassifier(n_jobs=-1, n_estimators=12)
mean_tpr = 0.0
mean_fpr = np.linspace(0., 1., 30)
auc_all = []
num_of_exp = 20
for i in range(1, num_of_exp+1):
print "?%d???,%d/%d." % (i, i, num_of_exp)
x_train, x_test, y_train, y_test = \
train_test_split(X, Y,
test_size=ratio,
random_state=np.random.randint(1, 100))
clf.fit(x_train, y_train)
y_pred = clf.predict_proba(x_test)[:, 1]
fpr, tpr, _ = metrics.roc_curve(y_test, y_pred)
mean_tpr += interp(mean_fpr, fpr, tpr)
mean_tpr[0] = 0.0
auc_all.append(metrics.roc_auc_score(y_test, y_pred))
mean_tpr /= num_of_exp
auc_array = np.array(auc_all)
auc = auc_array.mean()
auc_std = auc_array.std()
mean_tpr[-1] = 1.0
return mean_tpr, auc, auc_std
def weighted_median(points, weights):
sorted_indices = sp.argsort(points)
points = points[sorted_indices]
weights = weights[sorted_indices]
cs = sp.cumsum(weights)
median = sp.interp(.5, cs - .5*weights, points)
return median
def calc_macro_roc(fpr, tpr):
"""Calcs macro ROC on log scale"""
# Create log scale domain
all_fpr = sorted(itertools.chain(*fpr))
# Then interpolate all ROC curves at this points
mean_tpr = np.zeros_like(all_fpr)
for i in range(len(tpr)):
mean_tpr += interp(all_fpr, fpr[i], tpr[i])
return all_fpr, mean_tpr / len(tpr), auc(all_fpr, mean_tpr) / len(tpr)
def computeFpFn(self):
thresholds = sorted(list(set([float(round(Decimal(x), 3)) for x in self.all_predictions] + [0.0, 1.0])))
## Detection Rates
self.detection_rates = pd.DataFrame(
np.zeros((len(thresholds), len(self.malicious_families) + 1)),
index = thresholds,
columns = self.malicious_families.keys() + ['Mean'])
for family in self.malicious_families.keys():
predictions = sorted(self.malicious_families[family])
num_predictions = len(predictions)
predictions_u = sorted(list(set(predictions)))
f_thresholds = [0.0] + [(t+s)/2 for s, t in zip(predictions_u[:-1], predictions_u[1:])] + [1.0]
perf = np.array([0] * len(f_thresholds))
index = 0
for p in predictions:
while f_thresholds[index] < p:
index += 1
perf[index:] += 1
perf = 1 - perf / num_predictions
perf = interp(thresholds, f_thresholds, perf)
self.detection_rates[family] = perf
self.detection_rates['Mean'] = self.detection_rates.loc[:, self.malicious_families.keys()].mean(axis = 1)
## False Alarm Rates
self.false_alarm_rates = pd.DataFrame(
np.zeros((len(thresholds), len(self.benign_families) + 1)),
index = thresholds,
columns = self.benign_families.keys() + ['Mean'])
for family in self.benign_families.keys():
predictions = sorted(self.benign_families[family])
num_predictions = len(predictions)
predictions_u = sorted(list(set(predictions)))
f_thresholds = [0.0] + [(t+s)/2 for s, t in zip(predictions_u[:-1], predictions_u[1:])] + [1.0]
perf = np.array([0] * len(f_thresholds))
index = 0
for p in predictions:
while f_thresholds[index] < p:
index += 1
perf[index:] += 1
perf = 1 - perf / num_predictions
perf = interp(thresholds, f_thresholds, perf)
self.false_alarm_rates[family] = perf
self.false_alarm_rates['Mean'] = self.false_alarm_rates.loc[:, self.benign_families.keys()].mean(axis = 1)
def linearize(self, params, unknowns, resids): # standard central differencing
# set step size for finite differencing
h = 1e-6
direction_id = self.direction_id
# calculate upper and lower function values
wind_speed_ax_high_yaw = np.cos((self.params['yaw%i' % direction_id]+h)*np.pi/180.0)**(self.params['gen_params:pP']/3.0)*self.params['wtVelocity%i' % direction_id]
wind_speed_ax_low_yaw = np.cos((self.params['yaw%i' % direction_id]-h)*np.pi/180.0)**(self.params['gen_params:pP']/3.0)*self.params['wtVelocity%i' % direction_id]
wind_speed_ax_high_wind = np.cos(self.params['yaw%i' % direction_id]*np.pi/180.0)**(self.params['gen_params:pP']/3.0)*(self.params['wtVelocity%i' % direction_id]+h)
wind_speed_ax_low_wind = np.cos(self.params['yaw%i' % direction_id]*np.pi/180.0)**(self.params['gen_params:pP']/3.0)*(self.params['wtVelocity%i' % direction_id]-h)
# use interpolation on precalculated CP-CT curve
wind_speed_ax_high_yaw = np.maximum(wind_speed_ax_high_yaw, self.params['gen_params:windSpeedToCPCT_wind_speed'][0])
wind_speed_ax_low_yaw = np.maximum(wind_speed_ax_low_yaw, self.params['gen_params:windSpeedToCPCT_wind_speed'][0])
wind_speed_ax_high_wind = np.maximum(wind_speed_ax_high_wind, self.params['gen_params:windSpeedToCPCT_wind_speed'][0])
wind_speed_ax_low_wind = np.maximum(wind_speed_ax_low_wind, self.params['gen_params:windSpeedToCPCT_wind_speed'][0])
wind_speed_ax_high_yaw = np.minimum(wind_speed_ax_high_yaw, self.params['gen_params:windSpeedToCPCT_wind_speed'][-1])
wind_speed_ax_low_yaw = np.minimum(wind_speed_ax_low_yaw, self.params['gen_params:windSpeedToCPCT_wind_speed'][-1])
wind_speed_ax_high_wind = np.minimum(wind_speed_ax_high_wind, self.params['gen_params:windSpeedToCPCT_wind_speed'][-1])
wind_speed_ax_low_wind = np.minimum(wind_speed_ax_low_wind, self.params['gen_params:windSpeedToCPCT_wind_speed'][-1])
CP_high_yaw = interp(wind_speed_ax_high_yaw, self.params['gen_params:windSpeedToCPCT_wind_speed'], self.params['gen_params:windSpeedToCPCT_CP'])
CP_low_yaw = interp(wind_speed_ax_low_yaw, self.params['gen_params:windSpeedToCPCT_wind_speed'], self.params['gen_params:windSpeedToCPCT_CP'])
CP_high_wind = interp(wind_speed_ax_high_wind, self.params['gen_params:windSpeedToCPCT_wind_speed'], self.params['gen_params:windSpeedToCPCT_CP'])
CP_low_wind = interp(wind_speed_ax_low_wind, self.params['gen_params:windSpeedToCPCT_wind_speed'], self.params['gen_params:windSpeedToCPCT_CP'])
CT_high_yaw = interp(wind_speed_ax_high_yaw, self.params['gen_params:windSpeedToCPCT_wind_speed'], self.params['gen_params:windSpeedToCPCT_CT'])
CT_low_yaw = interp(wind_speed_ax_low_yaw, self.params['gen_params:windSpeedToCPCT_wind_speed'], self.params['gen_params:windSpeedToCPCT_CT'])
CT_high_wind = interp(wind_speed_ax_high_wind, self.params['gen_params:windSpeedToCPCT_wind_speed'], self.params['gen_params:windSpeedToCPCT_CT'])
CT_low_wind = interp(wind_speed_ax_low_wind, self.params['gen_params:windSpeedToCPCT_wind_speed'], self.params['gen_params:windSpeedToCPCT_CT'])
# normalize on incoming wind speed to correct coefficients for yaw
CP_high_yaw = CP_high_yaw * np.cos((self.params['yaw%i' % direction_id]+h)*np.pi/180.0)**self.params['gen_params:pP']
CP_low_yaw = CP_low_yaw * np.cos((self.params['yaw%i' % direction_id]-h)*np.pi/180.0)**self.params['gen_params:pP']
CP_high_wind = CP_high_wind * np.cos((self.params['yaw%i' % direction_id])*np.pi/180.0)**self.params['gen_params:pP']
CP_low_wind = CP_low_wind * np.cos((self.params['yaw%i' % direction_id])*np.pi/180.0)**self.params['gen_params:pP']
CT_high_yaw = CT_high_yaw * np.cos((self.params['yaw%i' % direction_id]+h)*np.pi/180.0)**2
CT_low_yaw = CT_low_yaw * np.cos((self.params['yaw%i' % direction_id]-h)*np.pi/180.0)**2
CT_high_wind = CT_high_wind * np.cos((self.params['yaw%i' % direction_id])*np.pi/180.0)**2
CT_low_wind = CT_low_wind * np.cos((self.params['yaw%i' % direction_id])*np.pi/180.0)**2
# compute derivative via central differencing and arrange in sub-matrices of the Jacobian
dCP_dyaw = np.eye(self.nTurbines)*(CP_high_yaw-CP_low_yaw)/(2.0*h)
dCP_dwind = np.eye(self.nTurbines)*(CP_high_wind-CP_low_wind)/(2.0*h)
dCT_dyaw = np.eye(self.nTurbines)*(CT_high_yaw-CT_low_yaw)/(2.0*h)
dCT_dwind = np.eye(self.nTurbines)*(CT_high_wind-CT_low_wind)/(2.0*h)
# compile Jacobian dict from sub-matrices
J = {}
J['Cp_out', 'yaw%i' % direction_id] = dCP_dyaw
J['Cp_out', 'wtVelocity%i' % direction_id] = dCP_dwind
J['Ct_out', 'yaw%i' % direction_id] = dCT_dyaw
J['Ct_out', 'wtVelocity%i' % direction_id] = dCT_dwind
return J
def solve_nonlinear(self, params, unknowns, resids):
direction_id = self.direction_id
pP = self.params['gen_params:pP']
yaw = self.params['yaw%i' % direction_id]
start = 5
skip = 8
# Cp = params['gen_params:windSpeedToCPCT_CP'][start::skip]
Cp = params['gen_params:windSpeedToCPCT_CP']
# Ct = params['gen_params:windSpeedToCPCT_CT'][start::skip]
Ct = params['gen_params:windSpeedToCPCT_CT']
# windspeeds = params['gen_params:windSpeedToCPCT_wind_speed'][start::skip]
windspeeds = params['gen_params:windSpeedToCPCT_wind_speed']
#
# Cp = np.insert(Cp, 0, Cp[0]/2.0)
# Cp = np.insert(Cp, 0, 0.0)
# Ct = np.insert(Ct, 0, np.max(params['gen_params:windSpeedToCPCT_CP'])*0.99)
# Ct = np.insert(Ct, 0, np.max(params['gen_params:windSpeedToCPCT_CT']))
# windspeeds = np.insert(windspeeds, 0, 2.5)
# windspeeds = np.insert(windspeeds, 0, 0.0)
#
# Cp = np.append(Cp, 0.0)
# Ct = np.append(Ct, 0.0)
# windspeeds = np.append(windspeeds, 30.0)
CPspline = Akima(windspeeds, Cp)
CTspline = Akima(windspeeds, Ct)
# n = 500
# x = np.linspace(0.0, 30., n)
CP, dCPdvel, _, _ = CPspline.interp(params['wtVelocity%i' % direction_id])
CT, dCTdvel, _, _ = CTspline.interp(params['wtVelocity%i' % direction_id])
# print 'in solve_nonlinear', dCPdvel, dCTdvel
# pP = 3.0
# print "in rotor, pP = ", pP
Cp_out = CP*np.cos(yaw*np.pi/180.)**pP
Ct_out = CT*np.cos(yaw*np.pi/180.)**2.
# print "in rotor, Cp = [%f. %f], Ct = [%f, %f]" % (Cp_out[0], Cp_out[1], Ct_out[0], Ct_out[1])
self.dCp_out_dyaw = (-np.sin(yaw*np.pi/180.))*(np.pi/180.)*pP*CP*np.cos(yaw*np.pi/180.)**(pP-1.)
self.dCp_out_dvel = dCPdvel*np.cos(yaw*np.pi/180.)**pP
# print 'in solve_nonlinear', self.dCp_out_dyaw, self.dCp_out_dvel
self.dCt_out_dyaw = (-np.sin(yaw*np.pi/180.))*(np.pi/180.)*2.*CT*np.cos(yaw*np.pi/180.)
self.dCt_out_dvel = dCTdvel*np.cos(yaw*np.pi/180.)**2.
# normalize on incoming wind speed to correct coefficients for yaw
self.unknowns['Cp_out'] = Cp_out
self.unknowns['Ct_out'] = Ct_out
def plot_roc(X, y, plot_dir, trial, clf_class, **kwargs):
# set_trace()
scaler = StandardScaler()
X = scaler.fit_transform(X)
kf = StratifiedKFold(y, n_folds=5, shuffle=True)
y_prob = np.zeros((len(y),2))
mean_tpr = 0.0
mean_fpr = np.linspace(0, 1, 100)
all_tpr = []
model_nm = str(clf_class).split('.')[-1:][0].split("'")[0]
plt.figure()
for i, (train_index, test_index) in enumerate(kf):
X_train, X_test = X[train_index], X[test_index]
y_train = y[train_index]
clf = clf_class(**kwargs)
clf.fit(X_train,y_train)
# Predict probabilities, not classes
#pdb.set_trace()
try:
y_prob[test_index] = clf.predict_proba(X_test)
except:
print "No true-positives calculated / No probability for ", str(clf_class).split('.')[-1:][0].split("'")[0]
return
fpr, tpr, thresholds = roc_curve(y[test_index], y_prob[test_index, 1])
mean_tpr += interp(mean_fpr, fpr, tpr)
mean_tpr[0] = 0.0
roc_auc = auc(fpr, tpr)
plt.plot(fpr, tpr, lw=1, label='ROC fold %d (area = %0.2f)' % (i, roc_auc))
mean_tpr /= len(kf)
mean_tpr[-1] = 1.0
mean_auc = auc(mean_fpr, mean_tpr)
plt.plot(mean_fpr, mean_tpr, 'k--',label='Mean ROC (area = %0.2f)' % mean_auc, lw=2)
plt.plot([0, 1], [0, 1], '--', color=(0.6, 0.6, 0.6), label='Random')
plt.xlim([-0.05, 1.05])
plt.ylim([-0.05, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC plot for ' + model_nm)
plt.legend(loc="lower right")
plt.tight_layout()
plt.savefig(plot_dir + 'ROC_plot_' + model_nm + trial + '.png')
plt.close()
def plot_roc_curve(X, y, plot_dir, trial, cv, model):
mean_tpr = 0.0
mean_fpr = np.linspace(0, 1, 100)
thresh_plt = 0.0
thresh_mean = 0.0
model_nm = str(model).split("(")[0]
### Create StratifiedKFold generator
cv = StratifiedKFold(y, n_folds=5, shuffle=True)
### Initialize StandardScaler
scaler = StandardScaler()
for i, (train, test) in enumerate(cv):
X_train = scaler.fit_transform(X[train])
X_test = scaler.transform(X[test])
probas_ = model.fit(X_train, y[train]).predict_proba(X_test)
# Compute ROC curve and area the curve
fpr, tpr, thresholds = roc_curve(y[test], probas_[:, 1])
mean_tpr += interp(mean_fpr, fpr, tpr)
mean_tpr[0] = 0.0
roc_auc = auc(fpr, tpr)
plt.plot(fpr, tpr, lw=1, label='ROC fold %d (area = %0.2f)' % (i+1, roc_auc))
thresholds[0] = min(1.0, thresholds[0])
thresholds[-1] = max(0.0, thresholds[-1])
thresh_mean += interp(mean_fpr, np.linspace(0,1,len(thresholds)), thresholds)
# plt.plot(fpr, thresholds, lw=1, label='Thresholds %d (%0.2f - %0.2f)' % (i+1, thresholds.max(), thresholds.min())) # np.linspace(0,1,len(thresholds))
plt.plot([0, 1], [0, 1], '--', color=(0.6, 0.6, 0.6), label='Random')
thresh_mean /= len(cv)
mean_tpr /= len(cv)
mean_tpr[-1] = 1.0
mean_auc = auc(mean_fpr, mean_tpr)
plt.plot(mean_fpr, mean_tpr, 'k--', label='Mean ROC (area = %0.2f)' % mean_auc, lw=2)
plt.plot(mean_fpr, thresh_mean, 'k--', label='Mean Threshold')
plt.xlim([-0.05, 1.05])
plt.ylim([-0.05, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC plot for ' + model_nm)
plt.legend(loc="lower right")
plt.tight_layout()
plt.savefig(plot_dir + model_nm + '_ROC_plot_' + trial + '.png')
plt.close()
def multiclass_roc(y_score, title=None, n_classes=10):
# Compute ROC curve and ROC area for each class
fpr = dict()
tpr = dict()
roc_auc = dict()
for i in range(n_classes):
fpr[i], tpr[i], _ = roc_curve(y_test[:, i], y_score[:, i])
roc_auc[i] = auc(fpr[i], tpr[i])
# Compute micro-average ROC curve and ROC area
fpr["micro"], tpr["micro"], _ = roc_curve(y_test.ravel(), y_score.ravel())
roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])
# Compute macro-average ROC curve and ROC area
# First aggregate all false positive rates
all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))
# Then interpolate all ROC curves at this points
mean_tpr = np.zeros_like(all_fpr)
for i in range(n_classes):
mean_tpr += interp(all_fpr, fpr[i], tpr[i])
# Finally average it and compute AUC
mean_tpr /= n_classes
fpr["macro"] = all_fpr
tpr["macro"] = mean_tpr
roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])
# Plot all ROC curves
plt.figure(figsize=(15,15))
plt.plot(fpr["micro"], tpr["micro"],
label='micro-average ROC curve (area = {0:0.2f})'
''.format(roc_auc["micro"]),
linewidth=2)
plt.plot(fpr["macro"], tpr["macro"],
label='macro-average ROC curve (area = {0:0.2f})'
''.format(roc_auc["macro"]),
linewidth=2)
for i in range(n_classes):
plt.plot(fpr[i], tpr[i], label='ROC curve of {0} (area = {1:0.2f})'
''.format(crimes[i+1], roc_auc[i]))
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Receiver operating characteristic to multi-class')
plt.legend(loc="lower right")
if title != None:
plt.savefig('img/'+title+'.png')
# plt.show()
def plot_roc_curve(y_pred, y_true, n_classes, title='ROC_Curve'):
# Compute ROC curve and ROC area for each class
fpr = dict()
tpr = dict()
tresholds = dict()
roc_auc = dict()
for i in range(n_classes):
fpr[i], tpr[i], tresholds[i] = roc_curve(y_true, y_pred, pos_label=i, drop_intermediate=False)
roc_auc[i] = auc(fpr[i], tpr[i])
# Compute micro-average ROC curve and ROC area
# fpr["micro"], tpr["micro"], _ = roc_curve(np.asarray(y_true).ravel(), np.asarray(y_pred).ravel(), pos_label=0, drop_intermediate=True)
# roc_auc["micro"] = auc(fpr["micro"], tpr["micro"])
# Aggregate all false positive rates
all_fpr = np.unique(np.concatenate([fpr[i] for i in range(n_classes)]))
# print("Thresholds:")
# Interpolate all ROC curves at this points
mean_tpr = np.zeros_like(all_fpr)
for i in range(n_classes):
mean_tpr += interp(all_fpr, fpr[i], tpr[i])
# print("Class_{0}: {1}".format(i, tresholds[i]))
# Average it and compute AUC
mean_tpr /= n_classes
fpr["macro"] = all_fpr
tpr["macro"] = mean_tpr
roc_auc["macro"] = auc(fpr["macro"], tpr["macro"])
# Plot all ROC curves
fig = plt.figure()
ax = fig.add_subplot(111)
# plt.plot(fpr["micro"], tpr["micro"],
# label='micro-average ROC curve (area = {0:0.2f})'
# ''.format(roc_auc["micro"]),
# linewidth=3, ls='--', color='red')
plt.plot(fpr["macro"], tpr["macro"],
label='macro-average ROC curve (area = {0:0.2f})'
''.format(roc_auc["macro"]),
linewidth=3, ls='--', color='green')
for i in range(n_classes):
plt.plot(fpr[i], tpr[i], label='ROC curve of class {0} (area = {1:0.2f})'
''.format(i, roc_auc[i]))
plt.plot([0, 1], [0, 1], 'k--', linewidth=2)
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('Multi-class Receiver Operating Characteristic')
lgd = ax.legend(bbox_to_anchor=(1.05, 1), loc=2, borderaxespad=0.)
plt.savefig(pjoin(FLAGS.output_dir, title.replace(' ', '_') + '_ROC.png'), bbox_extra_artists=(lgd,), bbox_inches='tight')
plt.close()