def _recurse_tree(tree, lst, mdlp, node_id=0, depth=0, min_val=np.NINF, max_val=np.PINF):
left_child = tree.children_left[node_id]
right_child = tree.children_right[node_id]
if left_child == sklearn.tree._tree.TREE_LEAF:
lst.append(((min_val, max_val), tree.value[node_id].flatten().tolist()))
return
else:
if mdlp and _check_mdlp_stop(tree, node_id):
lst.append(((min_val, max_val), tree.value[node_id].flatten().tolist()))
return
_recurse_tree(tree, lst, mdlp, left_child, depth=depth + 1, min_val=min_val, max_val=tree.threshold[node_id])
if right_child == sklearn.tree._tree.TREE_LEAF:
lst.append(((min_val, max_val), tree.value[node_id].flatten().tolist()))
return
else:
if mdlp and _check_mdlp_stop(tree, node_id):
lst.append(((min_val, max_val), tree.value[node_id].flatten().tolist()))
return
_recurse_tree(tree, lst, mdlp, right_child, depth=depth + 1, min_val=tree.threshold[node_id], max_val=max_val)
python类tree()的实例源码
def _get_variables_for_entropy_calculation(tree, node_id):
left_child = tree.children_left[node_id]
right_child = tree.children_right[node_id]
full_set_values = tree.value[node_id].flatten()
left_set_values = tree.value[left_child].flatten()
right_set_values = tree.value[right_child].flatten()
# remove zeros from value_counts to continue processing
full_set_without_zero_counts = full_set_values[np.where(full_set_values > 0)[0]]
full_set_tree_classes = full_set_without_zero_counts.size
left_set_without_zero_counts = left_set_values[np.where(left_set_values > 0)[0]]
left_set_tree_classes = left_set_without_zero_counts.size
right_set_without_zero_counts = right_set_values[np.where(right_set_values > 0)[0]]
right_set_tree_classes = right_set_without_zero_counts.size
return full_set_without_zero_counts, full_set_tree_classes, left_set_without_zero_counts, left_set_tree_classes, right_set_without_zero_counts, right_set_tree_classes
decision_tree_manual_classifier.py 文件源码
项目:SLIC_cityscapes
作者: wpqmanu
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def classify(observation, tree):
if tree.results != None:
return tree.results
else:
v = observation[tree.col]
branch = None
if isinstance(v, int) or isinstance(v, float):
if v >= tree.value:
branch = tree.tb
else:
branch = tree.fb
else:
if v == tree.value:
branch = tree.tb
else:
branch = tree.fb
return classify(observation, branch)
decision_tree_manual_classifier.py 文件源码
项目:SLIC_cityscapes
作者: wpqmanu
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def prune(tree, mingain):
# If the branches aren't leaves, then prune them
if tree.tb.results == None:
prune(tree.tb, mingain)
if tree.fb.results == None:
prune(tree.fb, mingain)
# If both the subbranches are now leaves, see if they
# should merged
if tree.tb.results != None and tree.fb.results != None:
# Build a combined dataset
tb, fb = [], []
for v, c in tree.tb.results.items():
tb += [[v]] * c
for v, c in tree.fb.results.items():
fb += [[v]] * c
# Test the reduction in entropy
delta = entropy(tb + fb) - (entropy(tb) + entropy(fb) / 2)
if delta < mingain:
# Merge the branches
tree.tb, tree.fb = None, None
tree.results = uniquecounts(tb + fb)
def __init__(self,feature_names=None,max_depth=3,fill_na=-1,return_numeric=True,return_array=False,decimal=2,**kwds):
'''
????????????
feature_names: ?????????????????????
max_depth: ????????????????????
kwds: ??????????sklearn.tree.DecisionTreeClassifier?
'''
BaseDiscretizer.__init__(self,feature_names=feature_names,fill_na=fill_na,return_numeric=return_numeric,return_array=return_array,decimal=decimal)
self.max_depth=max_depth
self.kwds=kwds
def fit(self,X,y=None):
'''
?feature_names???????????????
X: ?????????DataFrame??Series?
y: ??????Series?
'''
if y is None:
raise Exception('y????')
dt=sklearn.tree.DecisionTreeClassifier(criterion='entropy',max_depth=self.max_depth,**self.kwds)
if len(X.shape)==1:
dt.fit(X.reshape((-1,1)),y)
cuts=getTreeSplits(dt)
if cuts is None:
# ?????????????????????
cuts=np.array([np.median(X)])
else:
cuts=dict()
if self.feature_names is None:
try:
feature_names=list(X.columns)
except:
feature_names=list(range(X.shape[1]))
else:
feature_names=self.feature_names
for feature in feature_names:
try:
x=X[:,feature]
except:
x=X[feature]
x=x.reshape((-1,1))
dt.fit(x,y)
cut=getTreeSplits(dt)
if cut is None:
cut=np.array([np.median(x)])
cuts[feature]=cut.copy()
self.cuts=copy.deepcopy(cuts)
return self
def getTreeSplits(dt):
'''
????????????????
dt: ????????????sklearn.tree.DecisionTreeClassifier?
???????None????????????
'''
cut=dt.tree_.threshold[np.where(dt.tree_.children_left>-1)]
if cut.shape[0]==0:
return None
return np.sort(cut)
def visualize_tree(tree, feature_names):
with open("dt.dot", 'w') as f:
export_graphviz(tree, out_file=f, feature_names=feature_names)
command = ["dot", "-Tpng", "dt.dot", "-o", "dt.png"]
subprocess.check_call(command)
def visualize_tree(tree, feature_names):
with open("dt.dot", 'w') as f:
export_graphviz(tree, out_file=f, feature_names=feature_names)
command = ["dot", "-Tpng", "dt.dot", "-o", "dt.png"]
subprocess.check_call(command)
def predictKFoldRandomForest(X, y, estimators=10, criterion="gini", maxdepth=None, selectKBest=0, kfold=10):
"""
Classifies the data using decision trees and k-fold CV
:param X: The matrix of feature vectors
:type X: list
:param y: The vector containing labels corresponding to the feature vectors
:type y: list
:param estimators: The number of random trees to use in classification
:type estimators: int
:param criterion: The splitting criterion employed by the decision tree
:type criterion: str
:param splitter: The method used to split the data
:type splitter: str
:param maxDepth: The maximum depth the tree is allowed to grow
:type maxDepth: int
:param selectKBest: The number of best features to select
:type selectKBest: int
:param kfold: The number of folds to use in K-fold CV
:type kfold: int
:return: A list of predicted labels across the k-folds
"""
try:
# Prepare data
X, y = numpy.array(X), numpy.array(y)
# Define classifier
clf = ensemble.RandomForestClassifier(n_estimators=estimators, criterion=criterion, max_depth=maxdepth)
X_new = SelectKBest(chi2, k=selectKBest).fit_transform(X, y) if selectKBest > 0 else X
predicted = cross_val_predict(clf, X_new, y, cv=kfold).tolist()
except Exception as e:
prettyPrintError(e)
return []
return predicted
def apprend_arbre(train,labels,depth=10,min_samples_leaf=2,min_samples_split=2):
tree = DecisionTreeClassifier(max_depth=depth,min_samples_leaf=min_samples_leaf,min_samples_split=min_samples_split)
tree.fit(train,labels)
return tree
def affiche_arbre(tree):
long = 10
sep1="|"+"-"*(long-1)
sepl="|"+" "*(long-1)
sepr=" "*long
def aux(node,sep):
if tree.tree_.children_left[node]<0:
ls ="(%s)" % (", ".join( "%s: %d" %(tree.classes_[i],int(x)) for i,x in enumerate(tree.tree_.value[node].flat)))
return sep+sep1+"%s\n" % (ls,)
return (sep+sep1+"X%d<=%0.2f\n"+"%s"+sep+sep1+"X%d>%0.2f\n"+"%s" )% \
(tree.tree_.feature[node],tree.tree_.threshold[node],aux(tree.tree_.children_left[node],sep+sepl),
tree.tree_.feature[node],tree.tree_.threshold[node],aux(tree.tree_.children_right[node],sep+sepr))
return aux(0,"")
def genere_dot(tree,fn):
with file(fn,"w") as f:
export_graphviz(tree,f,class_names = tree.classes_,feature_names=getattr(tree,"feature_names",None), filled = True,rounded=True)
print('Use "dot -Tpdf %s -o %s.pdf" to generate pdf' % (fn,fn[:-3]))
def __init__(self,tree,dic,get_features):
super(DTreeStrategy,self).__init__("Tree Strategy")
self.dic = dic
self.tree = tree
self.get_features= get_features
def compute_strategy(self, state, id_team, id_player):
label = self.tree.predict([self.get_features(state,id_team,id_player)])[0]
if label not in self.dic:
logger.error("Erreur : strategie %s non trouve" %(label,))
return SoccerAction()
return self.dic[label].compute_strategy(state,id_team,id_player)
def test_boston(self):
from sklearn.tree import DecisionTreeRegressor as DecisionTreeRegressorSklearn
model = DecisionTreeRegressor(max_n_splits=3)
model_sklearn = DecisionTreeRegressorSklearn()
dataset = load_boston()
mse = []
mse_sklearn = []
for fold in range(5):
X_train, X_test, y_train, y_test = train_test_split(
dataset.data, dataset.target, test_size=0.33)
model.fit(X_train, y_train)
y = model.predict(X_test)
mse.append(mean_squared_error(y, y_test))
model_sklearn.fit(X_train, y_train)
y = model_sklearn.predict(X_test)
mse_sklearn.append(mean_squared_error(y, y_test))
mean_mse = np.mean(mse)
mean_mse_sklearn = np.mean(mse_sklearn)
print(mean_mse, mean_mse_sklearn)
# Check that our model differs in MSE no worse than 20%
self.assertTrue(np.abs(mean_mse - mean_mse_sklearn) / mean_mse_sklearn < 0.2)
def test_boston(self):
from sklearn.tree import DecisionTreeRegressor as DecisionTreeRegressorSklearn
model = DecisionTreeRegressor(tree_type='oblivious', max_n_splits=3)
model_sklearn = DecisionTreeRegressorSklearn()
dataset = load_boston()
mse = []
mse_sklearn = []
for fold in range(5):
X_train, X_test, y_train, y_test = train_test_split(
dataset.data, dataset.target, test_size=0.33)
model.fit(X_train, y_train)
y = model.predict(X_test)
mse.append(mean_squared_error(y, y_test))
model_sklearn.fit(X_train, y_train)
y = model_sklearn.predict(X_test)
mse_sklearn.append(mean_squared_error(y, y_test))
mean_mse = np.mean(mse)
mean_mse_sklearn = np.mean(mse_sklearn)
print(mean_mse, mean_mse_sklearn)
# Check that our model differs in MSE no worse than 50%
self.assertTrue(np.abs(mean_mse - mean_mse_sklearn) / mean_mse_sklearn < 0.5)
# def test_check_estimators(self):
# """
# Tests that models adhere to scikit-learn Estimator interface.
# """
# check_estimator(DecisionTreeClassifier)
def __predict(trees, shrinkage, feature_vectors, output):
for tree in trees:
output += tree.predict(feature_vectors, check_input=False)
output *= shrinkage
def feature_importances(self):
'''
Return the feature importances.
'''
if len(self.estimators) == 0:
raise ValueError('the model has not been trained yet')
importances = Parallel(n_jobs=self.n_jobs, backend="threading")(
delayed(getattr, check_pickle=False)(
tree, 'feature_importances_'
)
for tree in self.estimators
)
return sum(importances) / self.n_estimators
def feature_importances(self):
'''
Return the feature importances.
'''
if self.trained is False:
raise ValueError('the model has not been trained yet')
importances = Parallel(n_jobs=self.n_jobs, backend="threading")(delayed(getattr, check_pickle=False)
(tree, 'feature_importances_') for tree in self.estimators)
return sum(importances) / self.n_estimators
def _check_mdlp_stop(tree, node_id):
"""
The MDLP implementation follows the paper of
U. S. Fayyad and K. B. Irani, Multi-Interval Discretization of Continuous-Valued Attributes for Classification Learning, JPL TRS 1992
http://hdl.handle.net/2014/35171
"""
num_samples = tree.value[node_id].flatten().sum()
gain = _calculate_gain(tree, node_id)
delta = _calculate_noise_delta(tree, node_id)
return gain < (delta + np.log2(num_samples - 1)) / num_samples
def _calculate_gain(tree, node_id):
S, nS, S1, nS1, S2, nS2 = _get_variables_for_entropy_calculation(tree, node_id)
return _calculate_entropy(S) \
- S1.sum() / S.sum() * _calculate_entropy(S1) \
- S2.sum() / S.sum() * _calculate_entropy(S2)
def _calculate_noise_delta(tree, node_id):
S, nS, S1, nS1, S2, nS2 = _get_variables_for_entropy_calculation(tree, node_id)
return np.log2(np.power(3, nS) - 2) \
- (nS * _calculate_entropy(S)
- nS1 * _calculate_entropy(S1)
- nS2 * _calculate_entropy(S2))
def decision_tree_classifier(all_feature_data):
input_data=np.asarray(all_feature_data[0])
label=np.asarray(all_feature_data[1])
data=input_data[:,:]
# data=sklearn.preprocessing.normalize(data,axis=0)
# clf = DecisionTreeClassifier(criterion="gini",
# splitter="best",
# max_features=None,
# max_depth=5,
# min_samples_leaf=1,
# min_samples_split=2,
# class_weight=None
# )
clf = DecisionTreeClassifier()
fit_clf=clf.fit(data,label)
result=fit_clf.predict(data)
accuracy=float(np.sum(result==label))/len(label)
print "Training accuracy is " + str(accuracy)
with open("cityscapes.dot", 'w') as f:
f = tree.export_graphviz(clf, out_file=f)
# dot_data = StringIO()
# tree.export_graphviz(clf, out_file=dot_data)
# graph = pydotplus.graph_from_dot_data(dot_data.getvalue())
# graph.write_pdf("cityscapes.pdf")
# scores = cross_val_score(clf, data, label, cv=10)
# print "Cross validation score is "+ str(scores.mean())
return fit_clf
decision_tree_manual_classifier.py 文件源码
项目:SLIC_cityscapes
作者: wpqmanu
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def printtree(tree, indent=''):
# Is this a leaf node?
if tree.results != None:
print str(tree.results)
else:
# Print the criteria
print str(tree.col) + ':' + str(tree.value) + '? '
# Print the branches
print indent + 'T->',
printtree(tree.tb, indent + ' ')
print indent + 'F->',
printtree(tree.fb, indent + ' ')
decision_tree_manual_classifier.py 文件源码
项目:SLIC_cityscapes
作者: wpqmanu
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def getwidth(tree):
if tree.tb == None and tree.fb == None: return 1
return getwidth(tree.tb) + getwidth(tree.fb)
decision_tree_manual_classifier.py 文件源码
项目:SLIC_cityscapes
作者: wpqmanu
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def getdepth(tree):
if tree.tb == None and tree.fb == None: return 0
return max(getdepth(tree.tb), getdepth(tree.fb)) + 1
decision_tree_manual_classifier.py 文件源码
项目:SLIC_cityscapes
作者: wpqmanu
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def drawtree(tree, jpeg='tree.jpg'):
w = getwidth(tree) * 100
h = getdepth(tree) * 100 + 120
img = Image.new('RGB', (w, h), (255, 255, 255))
draw = ImageDraw.Draw(img)
drawnode(draw, tree, w / 2, 20)
img.save(jpeg)
decision_tree_manual_classifier.py 文件源码
项目:SLIC_cityscapes
作者: wpqmanu
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def mdclassify(observation, tree):
if tree.results != None:
return tree.results
else:
v = observation[tree.col]
if v == None:
tr, fr = mdclassify(observation, tree.tb), mdclassify(observation, tree.fb)
tcount = sum(tr.values())
fcount = sum(fr.values())
tw = float(tcount) / (tcount + fcount)
fw = float(fcount) / (tcount + fcount)
result = {}
for k, v in tr.items(): result[k] = v * tw
for k, v in fr.items(): result[k] = v * fw
return result
else:
if isinstance(v, int) or isinstance(v, float):
if v >= tree.value:
branch = tree.tb
else:
branch = tree.fb
else:
if v == tree.value:
branch = tree.tb
else:
branch = tree.fb
return mdclassify(observation, branch)
def predictAndTestRandomForest(X, y, Xtest, ytest, estimators=10, criterion="gini", maxdepth=None, selectKBest=0):
"""
Trains a tree using the training data and tests it using the test data using K-fold cross validation
:param Xtr: The matrix of training feature vectors
:type Xtr: list
:param ytr: The labels corresponding to the training feature vectors
:type ytr: list
:param Xte: The matrix of test feature vectors
:type yte: list
:param estimators: The number of random trees to use in classification
:type estimators: int
:param criterion: The splitting criterion employed by the decision tree
:type criterion: str
:param maxdepth: The maximum depth the tree is allowed to grow
:type maxdepth: int
:param selectKBest: The number of best features to select
:type selectKBest: int
:return: Two lists of the validation and test accuracies across the 10 folds
"""
try:
predicted, predicted_test = [], []
# Define classifier and cross validation iterator
clf = ensemble.RandomForestClassifier(n_estimators=estimators, criterion=criterion, max_depth=maxdepth)
# Start the cross validation learning
X, y, Xtest, ytest = numpy.array(X), numpy.array(y), numpy.array(Xtest), numpy.array(ytest)
# Select K Best features if enabled
prettyPrint("Selecting %s best features from feature vectors" % selectKBest)
X_new = SelectKBest(chi2, k=selectKBest).fit_transform(X, y) if selectKBest > 0 else X
Xtest_new = SelectKBest(chi2, k=selectKBest).fit_transform(Xtest, ytest) if selectKBest > 0 else Xtest
# Fit model
prettyPrint("Fitting model")
clf.fit(X_new, y)
# Validate and test model
prettyPrint("Validating model using training data")
predicted = clf.predict(X_new)
prettyPrint("Testing model")
predicted_test = clf.predict(Xtest_new)
except Exception as e:
prettyPrintError(e)
return [], []
return predicted, predicted_test