def generate_term(self, **kwargs):
"""Method generates a rdflib.Term based on kwargs"""
term_map = kwargs.pop('term_map')
if hasattr(term_map, "termType") and\
term_map.termType == NS_MGR.rr.BlankNode:
return rdflib.BNode()
if not hasattr(term_map, 'datatype'):
term_map.datatype = NS_MGR.xsd.anyURI
if hasattr(term_map, "template") and term_map.template is not None:
template_vars = kwargs
template_vars.update(self.constants)
# Call any functions to generate values
for key, value in template_vars.items():
if hasattr(value, "__call__"):
template_vars[key] = value()
raw_value = term_map.template.format(**template_vars)
if term_map.datatype == NS_MGR.xsd.anyURI:
return rdflib.URIRef(raw_value)
return rdflib.Literal(raw_value,
datatype=term_map.datatype)
if term_map.reference is not None:
# Each child will have different mechanisms for referencing the
# source based
return self.__generate_reference__(term_map, **kwargs)
python类Literal()的实例源码
def __generate_reference__(self, triple_map, **kwargs):
"""Generates a RDF entity based on triple map
Args:
triple_map(SimpleNamespace): Triple Map
"""
raw_value = self.source.get(str(triple_map.reference))
if raw_value is None or len(raw_value) < 1:
return
if hasattr(triple_map, "datatype"):
if triple_map.datatype == NS_MGR.xsd.anyURI:
output = rdflib.URIRef(raw_value)
else:
output = rdflib.Literal(
raw_value,
datatype=triple_map.datatype)
else:
output = rdflib.Literal(raw_value)
return output
def __reference_handler__(self, **kwargs):
"""Internal method for handling rr:reference in triples map
Keyword Args:
-------------
predicate_obj_map: SimpleNamespace
obj: dict
subject: rdflib.URIRef
"""
subjects = []
pred_obj_map = kwargs.get("predicate_obj_map")
obj = kwargs.get("obj")
subject = kwargs.get("subject")
if pred_obj_map.reference is None:
return subjects
predicate = pred_obj_map.predicate
ref_exp = jsonpath_ng.parse(str(pred_obj_map.refernce))
found_objects = [r.value for r in ref_exp(obj)]
for row in found_objects:
self.output.add((subject, predicate, rdflib.Literal(row)))
def add_admin_metadata(self, entity):
"""Takes a graph and adds the AdminMetadata for the entity
Args:
entity (rdflib.URIRef): URI of the entity
"""
generate_msg = "Generated by BIBCAT version {} from KnowledgeLinks.io"
generation_process = rdflib.BNode()
self.graph.add((generation_process,
rdflib.RDF.type,
NS_MGR.bf.GenerationProcess))
self.graph.add((generation_process,
NS_MGR.bf.generationDate,
rdflib.Literal(
datetime.datetime.utcnow().isoformat())))
self.graph.add((generation_process,
rdflib.RDF.value,
rdflib.Literal(generate_msg.format(__version__),
lang="en")))
#! Should add bibcat's current git MD5 commit
self.graph.add(
(entity,
NS_MGR.bf.generationProcess,
generation_process)
)
def add_admin_metadata(self, entity):
"""Takes a graph and adds the AdminMetadata for the entity
Args:
entity (rdflib.URIRef): URI of the entity
"""
generate_msg = "Generated by BIBCAT version {} from KnowledgeLinks.io"
generation_process = rdflib.BNode()
self.graph.add((generation_process,
rdflib.RDF.type,
NS_MGR.bf.GenerationProcess))
self.graph.add((generation_process,
NS_MGR.bf.generationDate,
rdflib.Literal(
datetime.datetime.utcnow().isoformat())))
self.graph.add((generation_process,
rdflib.RDF.value,
rdflib.Literal(generate_msg.format(__version__),
lang="en")))
#! Should add bibcat's current git MD5 commit
self.graph.add(
(entity,
NS_MGR.bf.generationProcess,
generation_process)
)
def __top_result__(query_result, type_=None, class_=None):
"""Internal function takes a JSON query results and returns
the top result as a rdflib.URIRef IRI if more than one.
Args:
----
query_result(dict): Query result
"""
if query_result.get("totalResultsCount", 0) > 0:
print(query_result.get("geonames")[0])
top_result = query_result.get("geonames")[0]
geo_id = top_result.get("geonameId")
place_iri = rdflib.URIRef("{}{}/".format(IRI_BASE, geo_id))
if type_ is not None and type_.startswith("rdf"):
output = rdflib.Graph()
rdf_type = rdflib.RDFS.Resource
if class_ is not None:
rdf_type = class_
output.add((place_iri, rdflib.RDF.type, rdf_type))
output.add((place_iri,
rdflib.RDFS.label,
rdflib.Literal(top_result.get("name"))))
return output
return place_iri
def __add_work_title__(self, work_graph, work_uri, instance_uri):
"""Method takes a new work graph and instance uri, queries for
bf:InstanceTitle of instance uri and adds values to work graph
Args:
work_graph(rdflib.Graph): RDF Graph of new BF Work
instance_uri(rdflib.URIRef): URI of BF Instance
"""
instance_key = str(instance_uri)
if instance_key in self.processed and\
"title" in self.processed[instance_key]:
work_title_bnode = rdflib.BNode()
work_graph.add((work_uri, NS_MGR.bf.title, work_title_bnode))
work_graph.add((work_title_bnode,
NS_MGR.rdf.type,
NS_MGR.bf.WorkTitle))
for row in self.processed[instance_key]["title"]:
main_title, subtitle = row["mainTitle"], row["subtitle"]
work_graph.add((work_title_bnode,
NS_MGR.bf.mainTitle,
rdflib.Literal(main_title)))
if subtitle:
work_graph.add((work_title_bnode,
NS_MGR.bf.subtitle,
rdflib.Literal(subtitle)))
def graph_member(self, ldp_root, c_id, obj=None):
if not obj:
obj = self.member()
node = URIRef(ldp_root+encoder.encode(c_id)+"/member/"+encoder.encode(obj.id))
mappings = URIRef(node+"#mappings")
g = Graph(identifier=node)
g.add((node, RDF.type, RDA.Member))
g.add((node, DCTERMS.identifier, Literal(obj.id)))
g.add((node, RDA.location, Literal(obj.location)))
if hasattr(obj, 'datatype'):
g.add((node, RDA.datatype, Literal(obj.datatype)))
if hasattr(obj, 'ontology'):
g.add((node, RDA.ontology, Literal(obj.ontology)))
if hasattr(obj, 'mappings'):
g.add((node, RDA.mappings, mappings))
mp = obj.mappings
if hasattr(mp, 'role'):
g.add((mappings, RDA.role, URIRef(obj.mappings.role)))
if hasattr(mp, 'index'):
g.add((mappings, RDA.itemIndex, Literal(obj.mappings.index)))
if hasattr(mp, 'dateAdded'):
g.add((mappings, RDA.dateAdded, Literal(obj.mappings.dateAdded)))
return g
def string_to_literal(string, to_uri=True):
"""
Creates a literal or URI object from a given string.
:param string: String to convert from
:param to_uri: Whether to try convert to an URI if not a valid literal is given.
:return: Literal object or URI object (if failed to create a literal and desired) or else input string.
"""
if "^^http://www.w3.org/2001/XMLSchema#" in string:
try:
value, datatype = string.split("^^")
return Literal(value, datatype=datatype)
except:
loginfo("Could not deliteralize string.")
return Literal(string)
else:
loginfo("not a literal")
return URIRef(string) if to_uri else string
def check_value_type(self, value):
"""
Check type of a value that I'm analyzing
:param value to check
:return: value that are casted to a rdflib type (float, string or uri if it's a resource)
"""
# i can have input value like list or like single input, i need to make a filter and get
# unique element of this list
result = value
if self.mapper.is_float(result):
data_type = rdflib.namespace.XSD.float
elif self.mapper.is_int(result):
data_type = rdflib.namespace.XSD.int
else:
# If this string represents a resource
resource = self.check_if_is_resource(result)
# if it is a resource in dbpedia
if resource:
return rdflib.URIRef(resource)
else:
data_type = rdflib.namespace.XSD.string
return rdflib.Literal(result, datatype=data_type)
def _resolve_range(self, range_uri):
"""
Resolve a rdfs.Property rdfs.range value to a type.
"""
range_name = self._extract_name(range_uri)
if range_name in self.namespace:
return self.namespace[range_name]
if is_a_literal(range_uri):
return Literal
elif is_a_property(range_uri):
return RDF_Property
elif is_a_list(range_uri):
return RDF.List
return RDFS_Class
def rdf(request):
uri = request.GET['uri']
g = Graph()
annotations = Annotation.objects.filter(uri=uri)
for annotation in annotations:
if annotation.title:
g.add( ( URIRef(annotation.uri), URIRef("http://localhost/metawiki/index.php/Special:URIResolver/Property-3ATitle"), Literal(annotation.title) ) )
if annotation.notes:
g.add( ( URIRef(annotation.uri), URIRef("http://localhost/metawiki/index.php/Special:URIResolver/Property-3ANotes"), Literal(annotation.notes) ) )
for tag in annotation.tags.all():
g.add( ( URIRef(annotation.uri), URIRef("http://localhost/metawiki/index.php/Special:URIResolver/Property-3ATag"), Literal(tag.prefLabel) ) )
status = HttpResponse(g.serialize( format='xml' ) )
status["Content-Type"] = "application/rdf+xml"
return status
def add_vcard(self, position, name):
"""
:param position: number in author order
:param name: name as string - last, first, middle
:return: rdflib.Graph
"""
g = Graph()
# vcard individual
vci_uri = D['vcard-individual-' + position + '-' + self.localid]
g.add((vci_uri, RDF.type, VCARD.Individual))
# vcard name
vcn_uri = D['vcard-name-' + position + '-' + self.localid]
g.add((vcn_uri, RDF.type, VCARD.Name))
g.add((vcn_uri, RDFS.label, Literal(name)))
# Parse name into first, last, middle
name = HumanName(name)
g.add((vcn_uri, VCARD.givenName, Literal(name.first)))
g.add((vcn_uri, VCARD.familyName, Literal(name.last)))
if name.middle != "":
g.add((vcn_uri, VIVO.middleName, Literal(name.middle)))
# Relate vcard individual to vcard name
g.add((vci_uri, VCARD.hasName, vcn_uri))
return vci_uri, g
def authorship(self):
"""
Add authorship statements and vcards for authors.
:return: rdflib.Graph
"""
g = Graph()
for num, au in enumerate(self.authors()):
position = str(num + 1)
vcard_individual_uri, vcard_stmts = self.add_vcard(position, au)
g += vcard_stmts
# Authorship
aship_uri = D['authorship-' + position + '-' + self.localid]
g.add((aship_uri, RDF.type, VIVO.Authorship))
g.add((aship_uri, VIVO.rank, Literal(int(position))))
# Relate pub and authorship
g.add((aship_uri, VIVO.relates, self.pub_uri))
# Relate vcard and authorship
g.add((aship_uri, VIVO.relates, vcard_individual_uri))
return g
def add_vcard_weblink(self):
"""
Build statements for weblinks in VIVO.
:return: rdflib.Graph
"""
base_url = "http://ws.isiknowledge.com/cps/openurl/service?url_ver=Z39.88-2004&rft_id=info:ut/WOS:{}"
g = Graph()
# vcard individual for pub
vci_uri = D['vcard-individual-pub-' + self.localid]
g.add((vci_uri, RDF.type, VCARD.Individual))
# vcard URL
vcu_uri = D['vcard-url-pub-' + self.localid]
g.add((vcu_uri, RDF.type, VCARD.URL))
g.add((vcu_uri, RDFS.label, Literal(u"Web of Science™")))
g.add((vcu_uri, VCARD.url, Literal(base_url.format(self.ut()))))
# Relate vcard individual to url
g.add((vci_uri, VCARD.hasURL, vcu_uri))
return vci_uri, g
def _vcard_email(self):
g = Graph()
try:
emails = [e for e in self.profile["emails"].split("|")]
except KeyError:
try:
emails = [self.profile['email']]
except KeyError:
emails = []
for email in emails:
vt = Resource(g, self.vcard_email_uri)
vt.set(RDF.type, VCARD.Work)
# Label probably not necessary
vt.set(RDFS.label, Literal(email))
vt.set(VCARD.email, Literal(email))
return g
def org_total_cites(orgs):
g = Graph()
for org_name in orgs:
org_uri = waan_uri(org_name)
#print>>sys.stderr, "Processing", org_name, "total cites"
ln = local_name(org_uri)
tc = load_incites_json_file(org_name, 'cites')
for item in tc:
curi = D['citecount-' + ln + '-' + str(item['year'])]
g.add((curi, RDF.type, WOS.InCitesCitesPerYear))
g.add((curi, RDFS.label, Literal("{} - {}".format(item['year'], item['count']))))
g.add((curi, WOS.number, Literal(item['count'])))
g.add((curi, WOS.year, Literal(item['year'])))
g.add((org_uri, VIVO.relates, curi))
#print g.serialize(format="turtle")
ng = "http://localhost/data/incites-total-cites-year-counts"
backend.sync_updates(ng, g)
return True
def org_top_categories(orgs):
g = Graph()
for org_name in orgs:
#print>>sys.stderr, "Processing", org_name, "top categories"
org_uri = waan_uri(org_name)
ln = local_name(org_uri)
top_cat = load_incites_json_file(org_name, 'categories')
for item in top_cat:
cat = item['category']
category_uri = get_category_uri(cat)
curi = D['topcategory-'] + ln + slugify(cat)
g.add((curi, RDF.type, WOS.InCitesTopCategory))
g.add((curi, RDFS.label, Literal("{} - {}".format(org_name, cat))))
g.add((curi, WOS.number, Literal(item['count'])))
g.add((curi, VIVO.relates, category_uri))
g.add((curi, VIVO.relates, org_uri))
#print g.serialize(format="turtle")
ng = "http://localhost/data/incites-top-categories"
backend.sync_updates(ng, g)
return True
def run(self):
g = Graph()
wos_top = D['wos-topics']
g.add((wos_top, RDF.type, WOS.TopTopic))
g.add((wos_top, RDFS.label, Literal("Web of Science Subject Schemas")))
with open(self.input_file) as inf:
for row in csv.DictReader(inf):
ra = row['Research Area (eASCA)']
category = row['WoS Category (tASCA)']
broad, ra1, ra2 = self.chunk_ras(ra)
broad_uri, cg = self.do_term(broad, clz=WOS.BroadDiscipline)
g.add((broad_uri, SKOS.broader, wos_top))
g += cg
ra1_uri, cg = self.do_term(ra1, broader=broad_uri, clz=WOS.ResearchArea, uri_prefix="wosra")
g += cg
ra2_uri = None
if ra2 is not None:
ra2_uri, cg = self.do_term(ra2, broader=ra1_uri, clz=WOS.ResearchArea, uri_prefix="wosra")
g += cg
cat_uri, cg = self.do_term(category, broader=ra2_uri or ra1_uri, clz=WOS.Category)
g += cg
self.serialize(g)
def add_grant(grant, pub_uri):
"""
Create a funder and grant(s).
"""
g = Graph()
if grant.get("agency") is None:
logger.info("No agency found for {} with ids.".format(pub_uri, ";".join(grant.get("ids", []))))
return g
slug = slugify(grant["agency"])
uri = D['funder-' + slug]
g.add((uri, RDF.type, WOS.Funder))
g.add((uri, RDFS.label, Literal(grant["agency"])))
for gid in grant["ids"]:
label = "{} - {}".format(grant["agency"], gid)
guri = D['grant-'] + slugify(label)
g.add((guri, RDF.type, WOS.Grant))
g.add((guri, RDFS.label, Literal(label)))
g.add((guri, WOS.grantId, Literal(gid)))
g.add((guri, VIVO.relates, uri))
g.add((guri, VIVO.relates, pub_uri))
return g