def create_graph(filename, result_extract, verbose, path_write_png='/tmp/analysis_result.png', dangerous_score=5):
#create DOT
dot_content = 'digraph Analysis {\nratio=auto;\nnodesep="2.5 equally";\nranksep="2.5 equally";\n'
color="green"
if result_extract[u'GlobalRiskScore'] >= dangerous_score:
color="red"
dot_content += 'R_0 [shape=record, label="{{' + os.path.basename(filename) + '|' + str(result_extract[u'GlobalRiskScore']) + '|' + 'Coef:' + str(result_extract[u'GlobalRiskScoreCoef']) + '}|' + result_extract[u'RootFileType'].encode('utf8') + '}", color=' + color + '];\n'
if result_extract[u'Yara']:
dot_content += 'R_0_info [label="' + str(result_extract[u'Yara']).replace('}, {', '},\n{').replace('"', '').replace("'", '').encode('utf8') + '", color=blue];\n'
dot_content += 'R_0 -- R_0_info [style=dotted];\n'
dot_content += json2dot(result_extract, dangerous_score, 'R_0', 'R_0')
dot_content += '}'
if verbose:
print dot_content
#convert dot to png
(graph,) = pydot.graph_from_dot_data(dot_content)
graph.write_png(path_write_png)
python类graph_from_dot_data()的实例源码
def decision_tree(X, y, regression, max_depth=3):
from sklearn.tree import export_graphviz
from sklearn.externals.six import StringIO
from IPython.core.pylabtools import figsize
from IPython.display import Image
figsize(12.5, 6)
import pydot
if regression:
clf = DecisionTreeRegressor(max_depth=max_depth)
else:
clf = DecisionTreeClassifier(max_depth=max_depth)
clf.fit(X, y)
dot_data = StringIO()
export_graphviz(clf, out_file=dot_data, feature_names=list(X.columns),
filled=True, rounded=True,)
graph = pydot.graph_from_dot_data(dot_data.getvalue())
return Image(graph.create_png())
def method2format(output, _format="png", mx=None, raw=None):
"""
Export method to a specific file format
@param output : output filename
@param _format : format type (png, jpg ...) (default : png)
@param mx : specify the MethodAnalysis object
@param raw : use directly a dot raw buffer if None
"""
try:
import pydot
except ImportError:
error("module pydot not found")
buff = "digraph {\n"
buff += "graph [rankdir=TB]\n"
buff += "node [shape=plaintext]\n"
if raw:
data = raw
else:
data = method2dot(mx)
# subgraphs cluster
buff += "subgraph cluster_" + hashlib.md5(output).hexdigest() + " {\nlabel=\"%s\"\n" % data['name']
buff += data['nodes']
buff += "}\n"
# subgraphs edges
buff += data['edges']
buff += "}\n"
d = pydot.graph_from_dot_data(buff)
if d:
getattr(d, "write_" + _format.lower())(output)
def read_hypergraph(string):
"""
Read a hypergraph from a string in dot format. Nodes and edges specified in the input will be
added to the current hypergraph.
@type string: string
@param string: Input string in dot format specifying a graph.
@rtype: hypergraph
@return: Hypergraph
"""
hgr = hypergraph()
dotG = pydot.graph_from_dot_data(string)
# Read the hypernode nodes...
# Note 1: We need to assume that all of the nodes are listed since we need to know if they
# are a hyperedge or a normal node
# Note 2: We should read in all of the nodes before putting in the links
for each_node in dotG.get_nodes():
if 'hypernode' == each_node.get('hyper_node_type'):
hgr.add_node(each_node.get_name())
elif 'hyperedge' == each_node.get('hyper_node_type'):
hgr.add_hyperedge(each_node.get_name())
# Now read in the links to connect the hyperedges
for each_link in dotG.get_edges():
if hgr.has_node(each_link.get_source()):
link_hypernode = each_link.get_source()
link_hyperedge = each_link.get_destination()
elif hgr.has_node(each_link.get_destination()):
link_hypernode = each_link.get_destination()
link_hyperedge = each_link.get_source()
hgr.link(link_hypernode, link_hyperedge)
return hgr
def method2format(output, _format="png", mx=None, raw=None):
"""
Export method to a specific file format
@param output : output filename
@param _format : format type (png, jpg ...) (default : png)
@param mx : specify the MethodAnalysis object
@param raw : use directly a dot raw buffer if None
"""
try:
import pydot
except ImportError:
error("module pydot not found")
buff = "digraph {\n"
buff += "graph [rankdir=TB]\n"
buff += "node [shape=plaintext]\n"
if raw:
data = raw
else:
data = method2dot(mx)
# subgraphs cluster
buff += "subgraph cluster_" + hashlib.md5(output).hexdigest() + " {\nlabel=\"%s\"\n" % data['name']
buff += data['nodes']
buff += "}\n"
# subgraphs edges
buff += data['edges']
buff += "}\n"
d = pydot.graph_from_dot_data(buff)
if d:
getattr(d, "write_" + _format.lower())(output)
def method2format(output, _format="png", mx=None, raw=None):
"""
Export method to a specific file format
@param output : output filename
@param _format : format type (png, jpg ...) (default : png)
@param mx : specify the MethodAnalysis object
@param raw : use directly a dot raw buffer if None
"""
try:
import pydot
except ImportError:
error("module pydot not found")
buff = "digraph {\n"
buff += "graph [rankdir=TB]\n"
buff += "node [shape=plaintext]\n"
if raw:
data = raw
else:
data = method2dot(mx)
# subgraphs cluster
buff += "subgraph cluster_" + hashlib.md5(output).hexdigest() + " {\nlabel=\"%s\"\n" % data['name']
buff += data['nodes']
buff += "}\n"
# subgraphs edges
buff += data['edges']
buff += "}\n"
d = pydot.graph_from_dot_data(buff)
if d:
getattr(d, "write_" + _format.lower())(output)
def method2format(output, _format="png", mx=None, raw=None):
"""
Export method to a specific file format
@param output : output filename
@param _format : format type (png, jpg ...) (default : png)
@param mx : specify the MethodAnalysis object
@param raw : use directly a dot raw buffer if None
"""
try:
import pydot
except ImportError:
error("module pydot not found")
buff = "digraph {\n"
buff += "graph [rankdir=TB]\n"
buff += "node [shape=plaintext]\n"
if raw:
data = raw
else:
data = method2dot(mx)
# subgraphs cluster
buff += "subgraph cluster_" + hashlib.md5(output).hexdigest() + " {\nlabel=\"%s\"\n" % data['name']
buff += data['nodes']
buff += "}\n"
# subgraphs edges
buff += data['edges']
buff += "}\n"
d = pydot.graph_from_dot_data(buff)
if d:
getattr(d, "write_" + _format.lower())(output)
def parse_elf(workspace, file):
r2 = r2pipe.open(file.filepath)
r2.cmd("aa")
r2.cmd("afl")
result = r2.cmd("agC")
output_dir = os.path.join(workspace, "graphs")
if not os.path.exists(output_dir):
os.makedirs(output_dir)
out_file = os.path.join(output_dir, file.hash)
graph = pydot.graph_from_dot_data(result)
graph[0].write_png(out_file)
file.graph_file = out_file
file.save()
print("%s parsed" % file.filepath)
def method2format(output, _format="png", mx=None, raw=None):
"""
Export method to a specific file format
@param output : output filename
@param _format : format type (png, jpg ...) (default : png)
@param mx : specify the MethodAnalysis object
@param raw : use directly a dot raw buffer if None
"""
try:
import pydot
except ImportError:
error("module pydot not found")
buff = "digraph {\n"
buff += "graph [rankdir=TB]\n"
buff += "node [shape=plaintext]\n"
if raw:
data = raw
else:
data = method2dot(mx)
# subgraphs cluster
buff += "subgraph cluster_" + hashlib.md5(output).hexdigest() + " {\nlabel=\"%s\"\n" % data['name']
buff += data['nodes']
buff += "}\n"
# subgraphs edges
buff += data['edges']
buff += "}\n"
d = pydot.graph_from_dot_data(buff)
if d:
getattr(d, "write_" + _format.lower())(output)
def method2format(output, _format="png", mx=None, raw=None):
"""
Export method to a specific file format
@param output : output filename
@param _format : format type (png, jpg ...) (default : png)
@param mx : specify the MethodAnalysis object
@param raw : use directly a dot raw buffer if None
"""
try:
import pydot
except ImportError:
error("module pydot not found")
buff = "digraph {\n"
buff += "graph [rankdir=TB]\n"
buff += "node [shape=plaintext]\n"
if raw:
data = raw
else:
data = method2dot(mx)
# subgraphs cluster
buff += "subgraph cluster_" + hashlib.md5(output).hexdigest() + " {\nlabel=\"%s\"\n" % data['name']
buff += data['nodes']
buff += "}\n"
# subgraphs edges
buff += data['edges']
buff += "}\n"
d = pydot.graph_from_dot_data(buff)
if d:
getattr(d, "write_" + _format.lower())(output)
def classifyTree(Xtr, ytr, Xte, yte, splitCriterion="gini", maxDepth=0, visualizeTree=False):
""" Classifies data using CART """
try:
accuracyRate, probabilities, timing = 0.0, [], 0.0
# Perform classification
cartClassifier = tree.DecisionTreeClassifier(criterion=splitCriterion, max_depth=maxDepth)
startTime = time.time()
prettyPrint("Training a CART tree for classification using \"%s\" and maximum depth of %s" % (splitCriterion, maxDepth), "debug")
cartClassifier.fit(numpy.array(Xtr), numpy.array(ytr))
prettyPrint("Submitting the test samples", "debug")
predicted = cartClassifier.predict(Xte)
endTime = time.time()
# Compare the predicted and ground truth and append result to list
accuracyRate = round(metrics.accuracy_score(predicted, yte), 2)
# Also append the probability estimates
probs = cartClassifier.predict_proba(Xte)
probabilities.append(probs)
timing = endTime-startTime # Keep track of performance
if visualizeTree:
# Visualize the tree
dot_data = StringIO()
tree.export_graphviz(cartClassifier, out_file=dot_data)
graph = pydot.graph_from_dot_data(dot_data.getvalue())
prettyPrint("Saving learned CART to \"tritonTree_%s.pdf\"" % getTimestamp(), "debug")
graph.write_pdf("tree_%s.pdf" % getTimestamp())
except Exception as e:
prettyPrint("Error encountered in \"classifyTree\": %s" % e, "error")
return accuracyRate, timing, probabilities, predicted
def visualize_tree(clf, outname, headers):
from sklearn.externals.six import StringIO
import pydot
dot_data = StringIO()
tree.export_graphviz(clf, out_file=dot_data, feature_names=list(headers))
graph = pydot.graph_from_dot_data(dot_data.getvalue().decode('latin1').encode('utf8'))
graph.write_pdf(outname)
def read(string):
"""
Read a graph from a string in Dot language and return it. Nodes and edges specified in the
input will be added to the current graph.
@type string: string
@param string: Input string in Dot format specifying a graph.
@rtype: graph
@return: Graph
"""
dotG = pydot.graph_from_dot_data(string)
if (dotG.get_type() == "graph"):
G = graph()
elif (dotG.get_type() == "digraph"):
G = digraph()
elif (dotG.get_type() == "hypergraph"):
return read_hypergraph(string)
else:
raise InvalidGraphType
# Read nodes...
# Note: If the nodes aren't explicitly listed, they need to be
for each_node in dotG.get_nodes():
G.add_node(each_node.get_name())
for each_attr_key, each_attr_val in each_node.get_attributes().items():
G.add_node_attribute(each_node.get_name(), (each_attr_key, each_attr_val))
# Read edges...
for each_edge in dotG.get_edges():
# Check if the nodes have been added
if not G.has_node(each_edge.get_source()):
G.add_node(each_edge.get_source())
if not G.has_node(each_edge.get_destination()):
G.add_node(each_edge.get_destination())
# See if there's a weight
if 'weight' in each_edge.get_attributes().keys():
_wt = each_edge.get_attributes()['weight']
else:
_wt = 1
# See if there is a label
if 'label' in each_edge.get_attributes().keys():
_label = each_edge.get_attributes()['label']
else:
_label = ''
G.add_edge((each_edge.get_source(), each_edge.get_destination()), wt = _wt, label = _label)
for each_attr_key, each_attr_val in each_edge.get_attributes().items():
if not each_attr_key in ['weight', 'label']:
G.add_edge_attribute((each_edge.get_source(), each_edge.get_destination()), \
(each_attr_key, each_attr_val))
return G
def makePrediction(para,rawData,totalNumRows,labels):
traingSetSize=int(math.floor(totalNumRows*para['trainingSetPercent']))
print('%d instances are selected as training dataset!'%traingSetSize)
trainX=np.array(rawData[0:traingSetSize])
trainY=np.array(labels[0:traingSetSize])
clf=tree.DecisionTreeClassifier()
clf=clf.fit(trainX,trainY)
feaNames=['event'+str(i) for i in range(1,386)]
classNames=trainY
# generate the decision tree figure
# dot_data = StringIO() #class_names=classNames,
# tree.export_graphviz(clf, out_file=dot_data, feature_names=feaNames,
# filled=True, rounded=True,
# special_characters=True)
# graph = pydot.graph_from_dot_data(dot_data.getvalue())
# graph.write_png('sample_SOSP.png')
testingX=rawData[traingSetSize:]
testingY=labels[traingSetSize:]
prediction=list(clf.predict(testingX))
if len(prediction)!=len(testingY):
print ('prediction and testingY have different length and SOMEWHERE WRONG!')
sameLabelNum=0
sameFailureNum=0
for i in range(len(testingY)):
if prediction[i]==testingY[i]:
sameLabelNum+=1
if prediction[i]==1:
sameFailureNum+=1
accuracy=float(sameLabelNum)/len(testingY)
print ('accuracy is %.5f:'%accuracy)
predictSuccess=0
predictFailure=0
for item in prediction:
if item==0:
predictSuccess+=1
elif item==1:
predictFailure+=1
testSuccess=0
testFailure=0
for tt in testingY:
if tt==0:
testSuccess+=1
elif tt==1:
testFailure+=1
print(predictSuccess,predictFailure,testSuccess,testFailure,sameFailureNum)
if sameFailureNum==0:
print ('precision is 0 and recall is 0')
else:
precision=float(sameFailureNum)/(predictFailure)
print('precision is %.5f'%precision)
recall=float(sameFailureNum)/(testFailure)
print('recall is %.5f'%recall)
F_measure=2*precision*recall/(precision+recall)
print('F_measure is %.5f'%F_measure)
return predictFailure,testFailure,sameFailureNum,precision,recall,F_measure
def classifyTreeKFold(X, y, kFold=2, splitCriterion="gini", maxDepth=0, visualizeTree=False):
""" Classifies data using CART and K-Fold cross validation """
try:
groundTruthLabels, predictedLabels = [], []
accuracyRates = [] # Meant to hold the accuracy rates
# Split data into training and test datasets
trainingDataset, testDataset = [], []
trainingLabels, testLabels = [], []
accuracyRates = []
probabilities = []
timings = []
kFoldValidator = KFold(n=len(X), n_folds=kFold, shuffle=False)
currentFold = 1
for trainingIndices, testIndices in kFoldValidator:
# Prepare the training and testing datasets
for trIndex in trainingIndices:
trainingDataset.append(X[trIndex])
trainingLabels.append(y[trIndex])
for teIndex in testIndices:
testDataset.append(X[teIndex])
testLabels.append(y[teIndex])
# Perform classification
startTime = time.time()
cartClassifier = tree.DecisionTreeClassifier(criterion=splitCriterion, max_depth=maxDepth)
prettyPrint("Training a CART tree for classification using \"%s\" and maximum depth of %s" % (splitCriterion, maxDepth), "debug")
cartClassifier.fit(numpy.array(trainingDataset), numpy.array(trainingLabels))
prettyPrint("Submitting the test samples", "debug")
predicted = cartClassifier.predict(testDataset)
endTime = time.time()
# Add that to the groundTruthLabels and predictedLabels matrices
groundTruthLabels.append(testLabels)
predictedLabels.append(predicted)
# Compare the predicted and ground truth and append result to list
accuracyRates.append(round(metrics.accuracy_score(predicted, testLabels), 2))
# Also append the probability estimates
probs = cartClassifier.predict_proba(testDataset)
probabilities.append(probs)
timings.append(endTime-startTime) # Keep track of performance
if visualizeTree:
# Visualize the tree
dot_data = StringIO()
tree.export_graphviz(cartClassifier, out_file=dot_data)
graph = pydot.graph_from_dot_data(dot_data.getvalue())
prettyPrint("Saving learned CART to \"tritonTree_%s.pdf\"" % currentFold, "debug")
graph.write_pdf("tritonTree_%s.pdf" % currentFold)
trainingDataset, trainingLabels = [], []
testDataset, testLabels = [], []
currentFold += 1
except Exception as e:
prettyPrint("Error encountered in \"classifyTreeKFold\": %s" % e, "error")
return [], [], []
return accuracyRates, probabilities, timings, groundTruthLabels, predictedLabels