def calculate_account_data(prov_document: provmodel.ProvDocument) -> list:
"""
Transforms a ProvDocument into a list of tuples including:
ProvAgent, list of ProvRelations from agent,
list of ProvElements associated to ProvAgent,
list of Namespaces
:param prov_document: Document to transform
:type prov_document:
:return: List of tuples(ProvAgent, list(), list(), list())
:rtype: list
"""
namespaces = prov_document.get_registered_namespaces()
g = provgraph.prov_to_graph(prov_document=prov_document)
sorted_nodes = topological_sort(g, reverse=True)
agents = list(filter(lambda elem: isinstance(elem, provmodel.ProvAgent), sorted_nodes))
elements = list(filter(lambda elem: not isinstance(elem, provmodel.ProvAgent), sorted_nodes))
# Check on compatibility
if not is_directed_acyclic_graph(g):
raise Exception("Provenance graph is not acyclic")
if isolates(g):
raise Exception("Provenance not compatible with role-based concept. Has isolated Elements")
for element in elements:
if provmodel.ProvAgent not in [type(n) for n in g.neighbors(element)]:
raise Exception(
"Provenance not compatible with role-based concept. Element {} has not relation to any agent".format(
element))
accounts = []
for agent in agents:
# find out-going relations from agent
agent_relations = []
for u, v in g.out_edges(agent):
# Todo check if filter does not left out some info
agent_relations.append(g.get_edge_data(u, v)[0]['relation'])
agent_elements = {}
i = 0
for element in elements:
element_relations = []
if g.has_edge(element, agent):
for u, v in set(g.out_edges(element)):
for relation in g[u][v].values():
element_relations.append(relation['relation'])
agent_elements[i] = {element: element_relations}
i += 1
accounts.append((agent, agent_relations, agent_elements, namespaces))
return accounts
评论列表
文章目录