def setUp(self):
self.ingester = mods.MODSIngester(source=SAMPLE_MODS)
self.entity = self.ingester.__generate_uri__()
self.cc = rdflib.URIRef("http://coloradocollege.edu/")
bc_org = getattr(NS_MGR.kds, "bf-Organization")
self.ingester.rules_graph.add((
bc_org,
NS_MGR.rdf.type,
NS_MGR.kds.PropertyLinker))
self.held_by = rdflib.BNode()
self.ingester.rules_graph.add((
bc_org,
NS_MGR.kds.destPropUri,
self.held_by))
self.ingester.rules_graph.add((
self.held_by,
NS_MGR.bf.heldBy,
self.cc))
self.ingester.rules_graph.add((
bc_org,
NS_MGR.kds.destClassUri,
NS_MGR.bf.Item))
python类URIRef()的实例源码
def setUp(self):
self.graph = rdflib.Graph()
self.entity = rdflib.URIRef("https://bibcat.org/test-entity")
self.simple_title_bnode = rdflib.BNode()
self.graph.add((self.entity,
rdflib.RDF.type,
BF.Title))
self.graph.add((self.entity, BF.title, self.simple_title_bnode))
self.graph.add((self.simple_title_bnode,
BF.mainTitle,
rdflib.Literal("This is a test")))
self.top_title_bnode = rdflib.BNode()
self.graph.add((self.entity, BF.title, self.top_title_bnode))
secondary_title_bnode = rdflib.BNode()
self.graph.add((self.top_title_bnode, rdflib.RDF.type, BF.Topic))
self.graph.add((self.top_title_bnode,
rdflib.RDFS.label,
rdflib.Literal("This is a title and a name")))
self.graph.add((self.top_title_bnode, SCHEMA.name, secondary_title_bnode))
self.graph.add((secondary_title_bnode,
rdflib.RDF.value,
rdflib.Literal("This is a name")))
def setUp(self):
self.graph = rdflib.Graph()
self.entity_one = rdflib.URIRef("https://bibcat.org/test-entity")
self.graph.add((self.entity_one,
rdflib.RDF.type,
rdflib.RDFS.Resource))
self.graph.add((self.entity_one,
rdflib.RDFS.label,
rdflib.Literal("Test Entity One", lang="en")))
self.entity_two = rdflib.URIRef("https://bibcat.org/test-entity-two")
self.graph.add((self.entity_two,
rdflib.RDF.type,
rdflib.RDFS.Resource))
self.graph.add((self.entity_two,
rdflib.RDFS.label,
rdflib.Literal("Test Entity Two", lang="en")))
title_bnode = rdflib.BNode()
self.graph.add((self.entity_two, BF.title, title_bnode))
self.graph.add((title_bnode, rdflib.RDF.type, BF.Title))
self.graph.add((title_bnode, BF.subTitle, rdflib.Literal("Subtitle ")))
def __generate_object_term__(self, datatype, value):
"""Internal method takes a datatype (can be None) and returns
the RDF Object Term
Args:
-----
datatype: None, or rdflib.URIRef
value: Varys depending on ingester
"""
if datatype == NS_MGR.xsd.anyURI:
term = rdflib.URIRef(value)
elif datatype:
term = rdflib.Literal(value, datatype=datatype)
else:
term = rdflib.Literal(value)
return term
def __handle_parents__(self, **kwargs):
"""Internal method handles parentTriplesMaps
Keyword args:
-------------
parent_map: SimpleNamespace of ParentTriplesMap
subject: rdflib.URIRef or rdflib.BNode
predicate: rdflib.URIRef
"""
parent_map = kwargs.pop("parent_map")
subject = kwargs.pop('subject')
predicate = kwargs.pop('predicate')
parent_objects = self.execute(
self.triple_maps[str(parent_map)],
**kwargs)
for parent_obj in parent_objects:
if parent_obj == subject:
continue
self.output.add((
subject,
predicate,
parent_obj))
def __generate_reference__(self, triple_map, **kwargs):
"""Generates a RDF entity based on triple map
Args:
triple_map(SimpleNamespace): Triple Map
"""
raw_value = self.source.get(str(triple_map.reference))
if raw_value is None or len(raw_value) < 1:
return
if hasattr(triple_map, "datatype"):
if triple_map.datatype == NS_MGR.xsd.anyURI:
output = rdflib.URIRef(raw_value)
else:
output = rdflib.Literal(
raw_value,
datatype=triple_map.datatype)
else:
output = rdflib.Literal(raw_value)
return output
def __reference_handler__(self, **kwargs):
"""Internal method for handling rr:reference in triples map
Keyword Args:
-------------
predicate_obj_map: SimpleNamespace
obj: dict
subject: rdflib.URIRef
"""
subjects = []
pred_obj_map = kwargs.get("predicate_obj_map")
obj = kwargs.get("obj")
subject = kwargs.get("subject")
if pred_obj_map.reference is None:
return subjects
predicate = pred_obj_map.predicate
ref_exp = jsonpath_ng.parse(str(pred_obj_map.refernce))
found_objects = [r.value for r in ref_exp(obj)]
for row in found_objects:
self.output.add((subject, predicate, rdflib.Literal(row)))
def __generate_reference__(self, triple_map, **kwargs):
"""Internal method takes a triple_map and returns the result of
applying to XPath to the current DOM context
Args:
-----
triple_map: SimpleNamespace
element: etree.Element
"""
element = kwargs.get("element")
found_elements = element.xpath(
triple_map.reference,
namespaces=self.xml_ns)
for elem in found_elements:
raw_text = elem.text.strip()
#! Quick and dirty test for valid URI
if not raw_text.startswith("http"):
continue
return rdflib.URIRef(raw_text)
def __get_object__(binding):
"""Method takes a binding extracts value and returns rdflib
entity
Args:
binding: binding row
"""
if isinstance(binding, rdflib.term.Node):
return binding
elif isinstance(binding, collections.Iterable):
for key, row in binding.items():
if isinstance(row, (rdflib.URIRef, rdflib.Literal)):
return row
elif isinstance(row, dict):
if row.get('type').startswith('uri'):
return rdflib.URIRef(row.get('value'))
return rdflib.Literal(row.get('value'))
elif isinstance(row, tuple):
print(row)
elif isinstance(row, str):
if row.startswith("literal") or "xml:lang" in key:
continue
return rdflib.Literal(row)
def new_existing_bnode(self, bf_property, rule):
"""Returns existing blank node or a new if it doesn't exist
Args:
bf_property (str): RDF property URI
rule (rdflib.URIRef): RDF subject of the map rule
Returns:
rdflib.BNode: Existing or New blank node
"""
blank_node = None
for row in self.rules_graph.query(HAS_MULTI_NODES.format(rule)):
if str(row[0]).lower().startswith("true"):
return rdflib.BNode()
for subject in self.graph.query(GET_BLANK_NODE.format(bf_property)):
# set to first and exist loop
blank_node = subject[0]
break
if not blank_node:
blank_node = rdflib.BNode()
return blank_node
def populate_entity(self, bf_class, existing_uri=None):
"""Takes a BIBFRAME graph and MODS XML, extracts info for each
entity's property and adds to graph.
Args:
bf_class(rdflib.URIRef): Namespace URI
Returns:
rdflib.URIRef: URI of new entity
"""
if existing_uri:
entity_uri = existing_uri
else:
# Check for custom IRIPattern
entity_uri = self.__pattern_uri__(bf_class)
# Finally generate an IRI from the default patterns
if not entity_uri:
entity_uri = self.__generate_uri__()
self.graph.add((entity_uri, rdflib.RDF.type, bf_class))
self.update_linked_classes(bf_class, entity_uri)
self.update_direct_properties(bf_class, entity_uri)
self.update_ordered_linked_classes(bf_class, entity_uri)
self.add_admin_metadata(entity_uri)
self.clean_rdf_types()
return entity_uri
def update_direct_properties(self,
entity_class,
entity):
"""Update the graph by adding all direct literal properties of the entity
in the graph.
Args:
entity_class (url): URL of the entity's class
entity (rdflib.URIRef): RDFlib Entity
"""
sparql = GET_DIRECT_PROPS.format(entity_class)
for dest_prop, rule in self.rules_graph.query(sparql):
self.__handle_pattern__(
entity=entity,
rule=rule,
destination_property=dest_prop)
def add_admin_metadata(self, entity):
"""Takes a graph and adds the AdminMetadata for the entity
Args:
entity (rdflib.URIRef): URI of the entity
"""
generate_msg = "Generated by BIBCAT version {} from KnowledgeLinks.io"
generation_process = rdflib.BNode()
self.graph.add((generation_process,
rdflib.RDF.type,
NS_MGR.bf.GenerationProcess))
self.graph.add((generation_process,
NS_MGR.bf.generationDate,
rdflib.Literal(
datetime.datetime.utcnow().isoformat())))
self.graph.add((generation_process,
rdflib.RDF.value,
rdflib.Literal(generate_msg.format(__version__),
lang="en")))
#! Should add bibcat's current git MD5 commit
self.graph.add(
(entity,
NS_MGR.bf.generationProcess,
generation_process)
)
def new_existing_bnode(self, bf_property, rule):
"""Returns existing blank node or a new if it doesn't exist
Args:
bf_property (str): RDF property URI
rule (rdflib.URIRef): RDF subject of the map rule
Returns:
rdflib.BNode: Existing or New blank node
"""
blank_node = None
for row in self.rules_graph.query(HAS_MULTI_NODES.format(rule)):
if str(row[0]).lower().startswith("true"):
return rdflib.BNode()
for subject in self.graph.query(GET_BLANK_NODE.format(bf_property)):
# set to first and exist loop
blank_node = subject[0]
break
if not blank_node:
blank_node = rdflib.BNode()
return blank_node
def populate_entity(self, bf_class, existing_uri=None):
"""Takes a BIBFRAME graph and MODS XML, extracts info for each
entity's property and adds to graph.
Args:
bf_class(rdflib.URIRef): Namespace URI
Returns:
rdflib.URIRef: URI of new entity
"""
if existing_uri:
entity_uri = existing_uri
else:
# Check for custom IRIPattern
entity_uri = self.__pattern_uri__(bf_class)
# Finally generate an IRI from the default patterns
if not entity_uri:
entity_uri = self.__generate_uri__()
self.graph.add((entity_uri, rdflib.RDF.type, bf_class))
self.update_linked_classes(bf_class, entity_uri)
self.update_direct_properties(bf_class, entity_uri)
self.update_ordered_linked_classes(bf_class, entity_uri)
self.add_admin_metadata(entity_uri)
self.clean_rdf_types()
return entity_uri
def transform(self, source=None, instance_uri=None, item_uri=None):
"""Takes new source, sets new graph, and creates a BF.Instance and
BF.Item entities
Args:
source: New source, could be URL, XML, or CSV row
instance_uri(rdflib.URIRef): Existing Instance URI, defaults to None
item_uri(rdflib.URIRef): Existing Item URI, defaults to None
Returns:
tuple: BIBFRAME Instance and Item
"""
if source is not None:
self.source = source
self.graph = new_graph()
bf_instance = self.populate_entity(NS_MGR.bf.Instance, instance_uri)
bf_item = self.populate_entity(NS_MGR.bf.Item, item_uri)
self.graph.add((bf_item, NS_MGR.bf.itemOf, bf_instance))
return bf_instance, bf_item
def __link_subject__(self, term, subject_iri):
"""Function takes a term and queries LOC service
Args:
term(str): Term
subject_iri(rdflib.URIRef): Subject IRI
"""
subject_result = self.__build_lc_url__(
term,
"http://id.loc.gov/authorities/subjects")
lsch_iri, title = self.__process_loc_results__(
subject_result.json(),
term)
if lsch_iri is None:
return None, None
entities = []
for row in self.graph.subjects(predicate=BF.subject,
object=subject_iri):
entities.append(row)
for entity in entities:
self.graph.add((entity, BF.subject, lsch_iri))
bibcat.delete_iri(self.graph, subject_iri)
return lsch_iri, title
return None, None
def __top_result__(query_result, type_=None, class_=None):
"""Internal function takes a JSON query results and returns
the top result as a rdflib.URIRef IRI if more than one.
Args:
----
query_result(dict): Query result
"""
if query_result.get("totalResultsCount", 0) > 0:
print(query_result.get("geonames")[0])
top_result = query_result.get("geonames")[0]
geo_id = top_result.get("geonameId")
place_iri = rdflib.URIRef("{}{}/".format(IRI_BASE, geo_id))
if type_ is not None and type_.startswith("rdf"):
output = rdflib.Graph()
rdf_type = rdflib.RDFS.Resource
if class_ is not None:
rdf_type = class_
output.add((place_iri, rdflib.RDF.type, rdf_type))
output.add((place_iri,
rdflib.RDFS.label,
rdflib.Literal(top_result.get("name"))))
return output
return place_iri
def replace_iri(graph, old_iri, new_iri):
"""Replaces old IRI with a new IRI in the graph
Args:
----
graph: rdflib.Graph
old_iri: rdflib.URIRef, Old IRI
new_iri: rdflib.URIRef, New IRI
"""
if old_iri == new_iri:
# Otherwise deletes all occurrences of the iri in the
# graph
return
for pred, obj in graph.predicate_objects(subject=old_iri):
graph.add((new_iri, pred, obj))
graph.remove((old_iri, pred, obj))
for subj, pred in graph.subject_predicates(object=old_iri):
graph.add((subj, pred, new_iri))
graph.remove((subj, pred, old_iri))
def __add_creators__(self, work_graph, work_uri, instance_uri):
"""Method takes a new work graph and instance uri, queries for
relators:creators of instance uri and adds values to work graph
Args:
work_graph(rdflib.Graph): RDF Graph of new BF Work
instance_uri(rdflib.URIRef): URI of BF Instance
"""
instance_key = str(instance_uri)
if instance_key in self.processed:
for code in self.creator_codes:
if not code in self.processed[instance_key]:
continue
relator = getattr(NS_MGR.relators, code)
for agent_uri in self.processed[instance_key][code]:
work_graph.add((work_uri,
relator,
agent_uri))