def get_do_metadata():
# from the do owl file, get do labels, descriptions
g = Graph()
g.parse(DO_OWL_PATH)
disease_ontology = Literal('disease_ontology', datatype=URIRef('http://www.w3.org/2001/XMLSchema#string'))
query = """
SELECT * WHERE {
?id oboInOwl:hasOBONamespace ?disease_ontology .
?id rdfs:label ?label .
OPTIONAL {?id obo:IAO_0000115 ?descr}
FILTER NOT EXISTS {?id owl:deprecated ?dep}
}
"""
rows = g.query(query, initBindings={'disease_ontology': disease_ontology})
res = [{str(k): str(v) for k, v in binding.items()} for binding in rows.bindings]
df = pd.DataFrame(res)
df.drop_duplicates(subset=['id'], inplace=True)
df.fillna("", inplace=True)
do = df.to_dict("records")
do = {purl_to_curie(x['id']): x for x in do}
return do
python类Literal()的实例源码
def result_set_2_rdf(self, result_set, reduced_mapping, fragment):
for tweet in result_set:
for s, p, o in reduced_mapping.mapping:
subject = s
obj = o
splited_subject = subject.split('{')
subject_prefix = splited_subject[0]
subject_jsonpath = parse(splited_subject[1].split('}')[0])
subject_values = [match.value for match in subject_jsonpath.find(tweet)]
if '$.' in obj:
object_jsonpath = parse(obj.split('{')[0].split('}')[0])
object_values = [match.value for match in object_jsonpath.find(tweet)]
for object_value in object_values:
fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, Literal(object_value))
else:
fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, obj)
def result_set_2_rdf(self, result_set, reduced_mapping, fragment):
for repo in result_set:
for s, p, o in reduced_mapping.mapping:
subject = s
obj = o
splited_subject = subject.split('{')
subject_prefix = splited_subject[0]
subject_jsonpath = parse(splited_subject[1].split('}')[0])
subject_values = [match.value for match in subject_jsonpath.find(repo)]
if '$.' in obj:
object_jsonpath = parse(obj.split('{')[0].split('}')[0])
object_values = [match.value for match in object_jsonpath.find(repo)]
for object_value in object_values:
fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, Literal(object_value))
else:
fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, obj)
def test_create_graph():
"""Create new graphFactory Object"""
from sc import graphManager
PROV = Namespace("http://www.w3.org/ns/prov#")
tstregistry = graphManager.VocabularyRegistry()
vocab1 = Vocabulary1()
tstregistry.register(vocab1)
vocab2 = Vocabulary2()
tstregistry.register(vocab2)
tstregistry.build_graph()
print tstregistry.get_turtle()
# Check assertions in global graph store
assert (URIRef("http://orcid.org/000-0003-4901-6059"),
RDF.type, PROV.Person) in tstregistry.global_graph
assert (URIRef(uuidurn),
RDFS.label, Literal(
"Docker: https://www.docker.com/")) in tstregistry.global_graph
# Check Serialization
jsongraph = json.loads(tstregistry.get_json_ld())
assert '@context' in jsongraph
def __addAction(self, action, statement):
element = self.trans.createElement(action)
for item in statement:
if isinstance(item, Literal):
literal = self.trans.createElement("literal")
if item.datatype is not None: literal.setAttribute("datatype", str(item.datatype))
if item.language is not None: literal.setAttribute("xml:lang", str(item.language))
literal.appendChild(self.trans.createTextNode(str(item)))
element.appendChild(literal)
elif isinstance(item, URIRef):
uri = self.trans.createElement("uri")
uri.appendChild(self.trans.createTextNode(str(item)))
element.appendChild(uri)
elif isinstance(item, BNode):
bnode = self.trans.createElement("bnode")
bnode.appendChild(self.trans.createTextNode(str(item)))
element.appendChild(bnode)
else:
raise Exception("Unknown element: " + item)
self.trans.childNodes[0].appendChild(element)
def __setattr__(self, name, values):
self._objectGraph._load(self.uri)
unwrappedValues = []
for value in values:
# unwrap rdfobjects:
if isinstance(value, RDFObject):
unwrappedValues.append(value.uri)
# pass through rdflib objects:
elif isinstance(value, URIRef) or isinstance(value, BNode) or isinstance(value, Literal):
unwrappedValues.append(value)
# wrap literals:
else:
unwrappedValues.append(Literal(value))
# look for a property mapping for this name:
prop = self._getProp(name)
if name.startswith("r_"):
self._objectGraph._setSubjects(unwrappedValues, prop, self.uri)
else:
self._objectGraph._setObjects(self.uri, prop, unwrappedValues)
def _add_date_triple(self, subject, predicate, value, _type=Literal):
'''
Adds a new triple with a date object
Dates are parsed using dateutil, and if the date obtained is correct,
added to the graph as an XSD.dateTime value.
If there are parsing errors, the literal string value is added.
'''
if not value:
return
try:
default_datetime = datetime.datetime(1, 1, 1, 0, 0, 0)
_date = parse_date(value, default=default_datetime)
self.g.add((subject, predicate, _type(_date.isoformat(),
datatype=XSD.dateTime)))
except ValueError:
self.g.add((subject, predicate, _type(value)))
def test_object_list(self):
p = RDFProfile(_default_graph())
p.g.add((URIRef('http://example.org/datasets/1'),
DCAT.keyword,
Literal('space')))
p.g.add((URIRef('http://example.org/datasets/1'),
DCAT.keyword,
Literal('moon')))
value = p._object_value_list(URIRef('http://example.org/datasets/1'),
DCAT.keyword)
assert isinstance(value, list)
assert isinstance(value[0], unicode)
eq_(len(value), 2)
eq_(sorted(value), ['moon', 'space'])
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_dataset_license_from_distribution_by_title(self):
# license_id retrieved from dct:title of dcat:license object
g = Graph()
dataset = URIRef("http://example.org/datasets/1")
g.add((dataset, RDF.type, DCAT.Dataset))
distribution = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution, RDF.type, DCAT.Distribution))
g.add((dataset, DCAT.distribution, distribution))
license = BNode()
g.add((distribution, DCT.license, license))
g.add((license, DCT.title, Literal("Creative Commons Attribution")))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
dataset = [d for d in p.datasets()][0]
eq_(dataset['license_id'], 'cc-by')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_distribution_access_url(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['url'], u'http://access.url.org')
assert 'download_url' not in resource
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 29
收藏 0
点赞 0
评论 0
def test_distribution_download_url(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.downloadURL, Literal('http://download.url.org')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['url'], u'http://download.url.org')
eq_(resource['download_url'], u'http://download.url.org')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_distribution_both_access_and_download_url(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org')))
g.add((distribution1_1, DCAT.downloadURL, Literal('http://download.url.org')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['url'], u'http://access.url.org')
eq_(resource['download_url'], u'http://download.url.org')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_distribution_format_imt_and_format(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
g.add((distribution1_1, DCT['format'], Literal('CSV')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'CSV')
eq_(resource['mimetype'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_distribution_format_imt_only(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
if toolkit.check_ckan_version(min_version='2.3'):
eq_(resource['format'], u'CSV')
eq_(resource['mimetype'], u'text/csv')
else:
eq_(resource['format'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_distribution_format_imt_only_normalize_false(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'text/csv')
eq_(resource['mimetype'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_distribution_format_format_only_normalize_false(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCT['format'], Literal('CSV')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'CSV')
assert 'mimetype' not in resource
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 34
收藏 0
点赞 0
评论 0
def test_distribution_format_unknown_imt(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'text/unknown-imt')
eq_(resource['mimetype'], u'text/unknown-imt')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_distribution_format_imt_normalized(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'text/unknown-imt')
eq_(resource['mimetype'], u'text/unknown-imt')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_spatial_rdfs_label(self):
g = Graph()
dataset = URIRef('http://example.org/datasets/1')
g.add((dataset, RDF.type, DCAT.Dataset))
spatial_uri = URIRef('http://geonames/Newark')
g.add((dataset, DCT.spatial, spatial_uri))
g.add((spatial_uri, RDF.type, DCT.Location))
g.add((spatial_uri, RDFS.label, Literal('Newark')))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
extras = self._extras(datasets[0])
eq_(extras['spatial_text'], 'Newark')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_spatial_wkt_only(self):
g = Graph()
dataset = URIRef('http://example.org/datasets/1')
g.add((dataset, RDF.type, DCAT.Dataset))
spatial_uri = URIRef('http://geonames/Newark')
g.add((dataset, DCT.spatial, spatial_uri))
g.add((spatial_uri, RDF.type, DCT.Location))
g.add((spatial_uri,
LOCN.geometry,
Literal('POINT (67 89)', datatype=GSP.wktLiteral)))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
extras = self._extras(datasets[0])
# NOTE: geomet returns floats for coordinates on WKT -> GeoJSON
eq_(extras['spatial'], '{"type": "Point", "coordinates": [67.0, 89.0]}')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_spatial_literal_only(self):
g = Graph()
dataset = URIRef('http://example.org/datasets/1')
g.add((dataset, RDF.type, DCAT.Dataset))
g.add((dataset, DCT.spatial, Literal('Newark')))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
extras = self._extras(datasets[0])
eq_(extras['spatial_text'], 'Newark')
assert_true('spatial_uri' not in extras)
assert_true('spatial' not in extras)
def _add_agent(self, _dict, ref, basekey, _type):
''' Stores the Agent in this format:
<dct:publisher rdf:resource="http://dati.gov.it/resource/Amministrazione/r_liguri"/>
<dcatapit:Agent rdf:about="http://dati.gov.it/resource/Amministrazione/r_liguri">
<rdf:type rdf:resource="&foaf;Agent"/>
<dct:identifier>r_liguri</dct:identifier>
<foaf:name>Regione Liguria</foaf:name>
</dcatapit:Agent>
Returns the ref to the agent node
'''
agent_name = self._get_dict_value(_dict, basekey + '_name', 'N/A')
agent_id = self._get_dict_value(_dict, basekey + '_identifier','N/A')
agent = BNode()
self.g.add((agent, RDF['type'], DCATAPIT.Agent))
self.g.add((agent, RDF['type'], FOAF.Agent))
self.g.add((ref, _type, agent))
self.g.add((agent, FOAF.name, Literal(agent_name)))
self.g.add((agent, DCT.identifier, Literal(agent_id)))
return agent
def test_null_values_with_multiple_strings():
csvw = CSVW(csv_path="tests/null1.csv",
metadata_path="tests/null1.multiple.csv-metadata.json")
rdf_contents = csvw.to_rdf()
g = ConjunctiveGraph()
g.parse(data=rdf_contents, format="turtle")
all_objects = {x for x in g.objects()}
assert Literal('null_key', datatype=XSD.token) not in all_objects
assert Literal('null_sector') not in all_objects
assert Literal('null_id', datatype=XSD.token) not in all_objects
for id in ['10', '11', '12', '13']:
assert Literal(id, datatype=XSD.token) not in all_objects
all_preds = {x for x in g.predicates()}
assert id_uri not in all_preds
assert Literal('1', datatype=XSD.token) not in all_objects
def test_date():
with CSVW(csv_path="tests/datatypes.date.csv",
metadata_path="tests/datatypes.date.csv-metadata.json") as csvw:
rdf_output = csvw.to_rdf()
g = ConjunctiveGraph()
g.parse(data=rdf_output, format="turtle")
date1_lit = Literal("2017-01-09", datatype=XSD.date)
assert len(list(g.triples((NS['event/1'], NS['date1'], date1_lit)))) == 1
date2_lit = Literal("2017-01-10Z", datatype=XSD.date)
assert len(list(g.triples((NS['event/1'], NS['date2'], date2_lit)))) == 1
date3_lit = Literal("2017-01-11", datatype=XSD.date)
assert len(list(g.triples((NS['event/1'], NS['date3'], date3_lit)))) == 1
date4_lit = Literal("2002-09-24-06:00", datatype=XSD.date)
assert len(list(g.triples((NS['event/1'], NS['date4'], date4_lit)))) == 1
date5_lit = Literal("2002-09-24+04:00", datatype=XSD.date)
assert len(list(g.triples((NS['event/1'], NS['date5'], date5_lit)))) == 1
def test_datetime():
with CSVW(csv_path="tests/datatypes.datetime.csv",
metadata_path="tests/datatypes.datetime.csv-metadata.json") as csvw:
rdf_output = csvw.to_rdf()
g = ConjunctiveGraph()
g.parse(data=rdf_output, format="turtle")
dt1_lit = Literal("2002-05-30T09:00:00", datatype=XSD.dateTime)
assert len(list(g.triples((NS['event/1'], NS['datetime1'], dt1_lit)))) == 1
dt2_lit = Literal("2002-05-30T09:30:10.5", datatype=XSD.dateTime)
assert len(list(g.triples((NS['event/1'], NS['datetime2'], dt2_lit)))) == 1
dt3_lit = Literal("2002-05-30T09:30:10Z", datatype=XSD.dateTime)
assert len(list(g.triples((NS['event/1'], NS['datetime3'], dt3_lit)))) == 1
dt4_lit = Literal("2002-05-30T09:30:10-06:00", datatype=XSD.dateTime)
assert len(list(g.triples((NS['event/1'], NS['datetime4'], dt4_lit)))) == 1
dt5_lit = Literal("2002-05-30T09:30:10+04:00", datatype=XSD.dateTime)
assert len(list(g.triples((NS['event/1'], NS['datetime5'], dt5_lit)))) == 1
datestamp = Literal("2004-04-12T13:20:00-05:00", datatype=XSD.dateTimeStamp)
assert len(list(g.triples((NS['event/1'], NS['datetimestamp'], datestamp)))) == 1
def test_bool_with_format():
csvw = CSVW(csv_path="tests/datatypes.bool.csv",
metadata_path="tests/datatypes.bool.csv-metadata.json")
rdf_output = csvw.to_rdf()
g = ConjunctiveGraph()
g.parse(data=rdf_output, format="turtle")
true_lit = Literal(True, datatype=XSD.boolean)
false_lit = Literal(False, datatype=XSD.boolean)
assert len(list(g.triples((NS['event/1'], NS['bool1'], true_lit)))) == 1
assert len(list(g.triples((NS['event/1'], NS['bool2'], true_lit)))) == 1
assert len(list(g.triples((NS['event/1'], NS['bool3'], true_lit)))) == 1
assert len(list(g.triples((NS['event/2'], NS['bool1'], false_lit)))) == 1
assert len(list(g.triples((NS['event/2'], NS['bool2'], false_lit)))) == 1
assert len(list(g.triples((NS['event/2'], NS['bool3'], false_lit)))) == 1
assert len(list(g.triples((NS['event/3'], NS['bool1'], false_lit)))) == 1
assert len(list(g.triples((NS['event/3'], NS['bool2'], false_lit)))) == 1
assert len(list(g.triples((NS['event/3'], NS['bool3'], false_lit)))) == 1
def test_empty_boolean():
csvw = CSVW(csv_path="tests/empty.csv",
metadata_path="tests/empty.bool.csv-metadata.json")
rdf_output = csvw.to_rdf()
g = ConjunctiveGraph()
g.parse(data=rdf_output, format="turtle")
assert len(g) == 2
assert len(list(g.triples((None, None, Literal(False))))) == 2
csvw = CSVW(csv_path="tests/empty.csv",
metadata_path="tests/empty.invalid_base.csv-metadata.json")
rdf_output = csvw.to_rdf()
g = ConjunctiveGraph()
g.parse(data=rdf_output, format="turtle")
assert len(g) == 0
def setUp(self):
self.graph = rdflib.Graph()
self.entity = rdflib.URIRef("https://bibcat.org/test-entity")
self.simple_title_bnode = rdflib.BNode()
self.graph.add((self.entity,
rdflib.RDF.type,
BF.Title))
self.graph.add((self.entity, BF.title, self.simple_title_bnode))
self.graph.add((self.simple_title_bnode,
BF.mainTitle,
rdflib.Literal("This is a test")))
self.top_title_bnode = rdflib.BNode()
self.graph.add((self.entity, BF.title, self.top_title_bnode))
secondary_title_bnode = rdflib.BNode()
self.graph.add((self.top_title_bnode, rdflib.RDF.type, BF.Topic))
self.graph.add((self.top_title_bnode,
rdflib.RDFS.label,
rdflib.Literal("This is a title and a name")))
self.graph.add((self.top_title_bnode, SCHEMA.name, secondary_title_bnode))
self.graph.add((secondary_title_bnode,
rdflib.RDF.value,
rdflib.Literal("This is a name")))
def setUp(self):
self.graph = rdflib.Graph()
self.entity_one = rdflib.URIRef("https://bibcat.org/test-entity")
self.graph.add((self.entity_one,
rdflib.RDF.type,
rdflib.RDFS.Resource))
self.graph.add((self.entity_one,
rdflib.RDFS.label,
rdflib.Literal("Test Entity One", lang="en")))
self.entity_two = rdflib.URIRef("https://bibcat.org/test-entity-two")
self.graph.add((self.entity_two,
rdflib.RDF.type,
rdflib.RDFS.Resource))
self.graph.add((self.entity_two,
rdflib.RDFS.label,
rdflib.Literal("Test Entity Two", lang="en")))
title_bnode = rdflib.BNode()
self.graph.add((self.entity_two, BF.title, title_bnode))
self.graph.add((title_bnode, rdflib.RDF.type, BF.Title))
self.graph.add((title_bnode, BF.subTitle, rdflib.Literal("Subtitle ")))
def __generate_object_term__(self, datatype, value):
"""Internal method takes a datatype (can be None) and returns
the RDF Object Term
Args:
-----
datatype: None, or rdflib.URIRef
value: Varys depending on ingester
"""
if datatype == NS_MGR.xsd.anyURI:
term = rdflib.URIRef(value)
elif datatype:
term = rdflib.Literal(value, datatype=datatype)
else:
term = rdflib.Literal(value)
return term