def get_do_metadata():
# from the do owl file, get do labels, descriptions
g = Graph()
g.parse(DO_OWL_PATH)
disease_ontology = Literal('disease_ontology', datatype=URIRef('http://www.w3.org/2001/XMLSchema#string'))
query = """
SELECT * WHERE {
?id oboInOwl:hasOBONamespace ?disease_ontology .
?id rdfs:label ?label .
OPTIONAL {?id obo:IAO_0000115 ?descr}
FILTER NOT EXISTS {?id owl:deprecated ?dep}
}
"""
rows = g.query(query, initBindings={'disease_ontology': disease_ontology})
res = [{str(k): str(v) for k, v in binding.items()} for binding in rows.bindings]
df = pd.DataFrame(res)
df.drop_duplicates(subset=['id'], inplace=True)
df.fillna("", inplace=True)
do = df.to_dict("records")
do = {purl_to_curie(x['id']): x for x in do}
return do
python类Graph()的实例源码
def test_graph_parse(self):
# collect graphs
graphs = []
# loop through Content-Types, save parsed graphs
content_types = [
'application/ld+json',
'application/n-triples',
'application/rdf+xml',
'text/n3',
'text/plain',
'text/turtle'
]
for content_type in content_types:
logger.debug("testing parsing of Content-Type: %s" % content_type)
foo = repo.get_resource('%s/foo' % testing_container_uri, response_format=content_type)
# test that graph was parsed correctly
assert type(foo.rdf.graph) == rdflib.graph.Graph
# create child container foo/bar (basic container)
def __readGraphIriFile(self, graphfile):
"""Search for a graph uri in graph file and return it.
Args:
graphfile: String containing the path of a graph file
Returns:
graphuri: String with the graph URI
"""
try:
with open(graphfile, 'r') as f:
graphuri = f.readline().strip()
except FileNotFoundError:
logger.debug("File not found {}".format(graphfile))
return
try:
urlparse(graphuri)
logger.debug("Graph URI {} found in {}".format(graphuri, graphfile))
except Exception:
graphuri = None
logger.debug("No graph URI found in {}".format(graphfile))
return graphuri
def getgraphfromfile(self):
"""Return a Conjunctive Graph generated from the referenced file.
Returns:
A ConjunctiveGraph
"""
graph = ConjunctiveGraph()
try:
graph.parse(self.path, format='nquads', publicID='http://localhost:5000/')
logger.debug('Success: File', self.path, 'parsed')
except KeyError as e:
# Given file contains non valid rdf data
# logger.debug('Error: File', self.path, 'not parsed')
# self.__setcontent([[None][None][None][None]])
pass
return graph
def __init__(self, namespace={}, prefixes='', newqueries={}):
self.graph = rdflib.Graph()
self.namespace.update(namespace)
self.prefixes += prefixes
# run all given "SELECT" queries through prepareQuery function.
for (id, query) in newqueries.iteritems():
leader = query.strip()[0:6]
if leader == 'SELECT': # prepareQuery only works on SELECT ...
self.queries[id] = rdflib.plugins.sparql.prepareQuery(query, initNs = self.namespace)
print "Adding SELECT query"
elif leader == 'DELETE' or leader == 'INSERT':
self.queries[id] = query
print "Adding DEL/INS query"
#self.queries.update(queries)
#print "Done query prep."
def notification(self, iri, **kwargs):
"""
Retrieve a single LDN notification and decode into a Python object.
"""
headers = kwargs.pop("headers", dict())
if 'accept' not in headers:
headers['accept'] = kwargs.pop("accept", self.accept_headers)
r = requests.get(iri, headers=headers, **kwargs)
r.raise_for_status()
mime_type = self.content_type_to_mime_type(r.headers['content-type'])
if mime_type == self.JSON_LD:
return r.json()
else:
g = Graph().parse(data=r.text, format=mime_type)
return json.loads(str(g.serialize(format="json-ld"), 'utf-8'))
def __init__(self, filename='sc.config'):
"""Initialize
Parameters
----------
:param filename: string
For renaming the configuration file name. Default value.
Returns
-------
:returns: none
"""
self.graph = rdflib.Graph()
self.filename = filename
if os.environ.get('SC_HOME'):
self.config_path = os.getenv('SC_HOME')
else:
os.environ['SC_HOME'] = os.environ['HOME'] + '/.sc/'
self.config_path = os.getenv('SC_HOME')
def __init__(self, graph, compatibility_mode=False):
'''Class constructor
Graph is an rdflib.Graph instance.
In compatibility mode, some fields are modified to maintain
compatibility with previous versions of the ckanext-dcat parsers
(eg adding the `dcat_` prefix or storing comma separated lists instead
of JSON dumps).
'''
self.g = graph
self.compatibility_mode = compatibility_mode
# Cache for mappings of licenses URL/title to ID built when needed in
# _license().
self._licenceregister_cache = None
def test_publisher_ref(self):
data = '''<?xml version="1.0" encoding="utf-8" ?>
<rdf:RDF
xmlns:dct="http://purl.org/dc/terms/"
xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
xmlns:rdfs="http://www.w3.org/2000/01/rdf-schema#">
<rdfs:SomeClass rdf:about="http://example.org">
<dct:publisher rdf:resource="http://orgs.vocab.org/some-org" />
</rdfs:SomeClass>
</rdf:RDF>
'''
g = Graph()
g.parse(data=data)
p = RDFProfile(g)
publisher = p._publisher(URIRef('http://example.org'), DCT.publisher)
eq_(publisher['uri'], 'http://orgs.vocab.org/some-org')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def test_dataset_license_from_distribution_by_uri(self):
# license_id retrieved from the URI of dcat:license object
g = Graph()
dataset = URIRef("http://example.org/datasets/1")
g.add((dataset, RDF.type, DCAT.Dataset))
distribution = URIRef("http://example.org/datasets/1/ds/1")
g.add((dataset, DCAT.distribution, distribution))
g.add((distribution, RDF.type, DCAT.Distribution))
g.add((distribution, DCT.license,
URIRef("http://www.opendefinition.org/licenses/cc-by")))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
dataset = [d for d in p.datasets()][0]
eq_(dataset['license_id'], 'cc-by')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_dataset_license_from_distribution_by_title(self):
# license_id retrieved from dct:title of dcat:license object
g = Graph()
dataset = URIRef("http://example.org/datasets/1")
g.add((dataset, RDF.type, DCAT.Dataset))
distribution = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution, RDF.type, DCAT.Distribution))
g.add((dataset, DCAT.distribution, distribution))
license = BNode()
g.add((distribution, DCT.license, license))
g.add((license, DCT.title, Literal("Creative Commons Attribution")))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
dataset = [d for d in p.datasets()][0]
eq_(dataset['license_id'], 'cc-by')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_distribution_access_url(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['url'], u'http://access.url.org')
assert 'download_url' not in resource
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_distribution_download_url(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.downloadURL, Literal('http://download.url.org')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['url'], u'http://download.url.org')
eq_(resource['download_url'], u'http://download.url.org')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_distribution_format_imt_and_format(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
g.add((distribution1_1, DCT['format'], Literal('CSV')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'CSV')
eq_(resource['mimetype'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def test_distribution_format_format_only(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCT['format'], Literal('CSV')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'CSV')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_distribution_format_imt_only(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
if toolkit.check_ckan_version(min_version='2.3'):
eq_(resource['format'], u'CSV')
eq_(resource['mimetype'], u'text/csv')
else:
eq_(resource['format'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_distribution_format_imt_only_normalize_false(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'text/csv')
eq_(resource['mimetype'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def test_distribution_format_format_only_normalize_false(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCT['format'], Literal('CSV')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'CSV')
assert 'mimetype' not in resource
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_distribution_format_imt_normalized(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'text/unknown-imt')
eq_(resource['mimetype'], u'text/unknown-imt')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def test_distribution_format_format_normalized(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
g.add((distribution1_1, DCT['format'], Literal('Comma Separated Values')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
if toolkit.check_ckan_version(min_version='2.3'):
eq_(resource['format'], u'CSV')
eq_(resource['mimetype'], u'text/csv')
else:
eq_(resource['format'], u'Comma Separated Values')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_spatial_rdfs_label(self):
g = Graph()
dataset = URIRef('http://example.org/datasets/1')
g.add((dataset, RDF.type, DCAT.Dataset))
spatial_uri = URIRef('http://geonames/Newark')
g.add((dataset, DCT.spatial, spatial_uri))
g.add((spatial_uri, RDF.type, DCT.Location))
g.add((spatial_uri, RDFS.label, Literal('Newark')))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
extras = self._extras(datasets[0])
eq_(extras['spatial_text'], 'Newark')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_spatial_wkt_only(self):
g = Graph()
dataset = URIRef('http://example.org/datasets/1')
g.add((dataset, RDF.type, DCAT.Dataset))
spatial_uri = URIRef('http://geonames/Newark')
g.add((dataset, DCT.spatial, spatial_uri))
g.add((spatial_uri, RDF.type, DCT.Location))
g.add((spatial_uri,
LOCN.geometry,
Literal('POINT (67 89)', datatype=GSP.wktLiteral)))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
extras = self._extras(datasets[0])
# NOTE: geomet returns floats for coordinates on WKT -> GeoJSON
eq_(extras['spatial'], '{"type": "Point", "coordinates": [67.0, 89.0]}')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_spatial_literal_only(self):
g = Graph()
dataset = URIRef('http://example.org/datasets/1')
g.add((dataset, RDF.type, DCAT.Dataset))
g.add((dataset, DCT.spatial, Literal('Newark')))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
extras = self._extras(datasets[0])
eq_(extras['spatial_text'], 'Newark')
assert_true('spatial_uri' not in extras)
assert_true('spatial' not in extras)
def sentics_api(self, concept, parsed_graph=None):
"""
Return sentics of a concept.
If you pass a parsed graph, the method do not load the rdf again.
"""
concept_sentics_uri = self.concept_base_uri + concept + "/sentics"
sentics = {
"pleasantness": 0,
"attention": 0,
"sensitivity": 0,
"aptitude": 0
}
if parsed_graph is None:
graph = rdflib.Graph()
parsed_graph = graph.parse(concept_sentics_uri, format="xml")
result, _ = self._output(concept_sentics_uri)
sentics["pleasantness"] = result[3]
sentics["attention"] = result[0]
sentics["sensitivity"] = result[1]
sentics["aptitude"] = result[2]
return sentics
def polarity(self, concept, parsed_graph=None):
"""
Return the polarity of a concept.
If you pass a parsed graph, the method do not load the rdf again.
"""
concept_polarity_uri = self.concept_base_uri+concept+"/polarity"
if parsed_graph is None:
try:
graph = rdflib.Graph()
parsed_graph = graph.parse(concept_polarity_uri, format="xml")
result, _ = self._output(concept_polarity_uri)
return result[0]
except Exception:
return 0
def _output(self, url):
"""
Downloads and returns the output avoiding w3.org error
"""
response = requests.get(url)
html = response.text
html = html.replace('w3.org', 'www.w3.org')
graph = rdflib.Graph()
parsed_graph = graph.parse(data=html, format="xml")
result = []
stresult = []
for s, p, o in parsed_graph:
if type(o) == rdflib.term.Literal:
result.append(o.toPython())
else:
stresult.append(o.toPython())
return result, stresult
def setUp(self):
self.graph = rdflib.Graph()
self.entity = rdflib.URIRef("https://bibcat.org/test-entity")
self.simple_title_bnode = rdflib.BNode()
self.graph.add((self.entity,
rdflib.RDF.type,
BF.Title))
self.graph.add((self.entity, BF.title, self.simple_title_bnode))
self.graph.add((self.simple_title_bnode,
BF.mainTitle,
rdflib.Literal("This is a test")))
self.top_title_bnode = rdflib.BNode()
self.graph.add((self.entity, BF.title, self.top_title_bnode))
secondary_title_bnode = rdflib.BNode()
self.graph.add((self.top_title_bnode, rdflib.RDF.type, BF.Topic))
self.graph.add((self.top_title_bnode,
rdflib.RDFS.label,
rdflib.Literal("This is a title and a name")))
self.graph.add((self.top_title_bnode, SCHEMA.name, secondary_title_bnode))
self.graph.add((secondary_title_bnode,
rdflib.RDF.value,
rdflib.Literal("This is a name")))
def setUp(self):
self.graph = rdflib.Graph()
self.entity_one = rdflib.URIRef("https://bibcat.org/test-entity")
self.graph.add((self.entity_one,
rdflib.RDF.type,
rdflib.RDFS.Resource))
self.graph.add((self.entity_one,
rdflib.RDFS.label,
rdflib.Literal("Test Entity One", lang="en")))
self.entity_two = rdflib.URIRef("https://bibcat.org/test-entity-two")
self.graph.add((self.entity_two,
rdflib.RDF.type,
rdflib.RDFS.Resource))
self.graph.add((self.entity_two,
rdflib.RDFS.label,
rdflib.Literal("Test Entity Two", lang="en")))
title_bnode = rdflib.BNode()
self.graph.add((self.entity_two, BF.title, title_bnode))
self.graph.add((title_bnode, rdflib.RDF.type, BF.Title))
self.graph.add((title_bnode, BF.subTitle, rdflib.Literal("Subtitle ")))
def create_rdf_list(graph, nodes):
"""Creates a RDF List with the ordering based on the nodes.
Returns a blank node that functions in the object role for adding
a triple.
Args:
graph(rdflib.Graph|rdflib.ConjuctiveGraph): Source graph
nodes(list): Python list of nodes
"""
if len(nodes) < 1:
return rdflib.RDF.nil
ordered_bnode = rdflib.BNode()
graph.add((ordered_bnode, rdflib.RDF.first, nodes[0]))
graph.add((ordered_bnode,
rdflib.RDF.rest,
create_rdf_list(graph, nodes[1:])))
return ordered_bnode
def replace_iri(graph, old_iri, new_iri):
"""Replaces old IRI with a new IRI in the graph
Args:
----
graph: rdflib.Graph
old_iri: rdflib.URIRef, Old IRI
new_iri: rdflib.URIRef, New IRI
"""
if old_iri == new_iri:
# Otherwise deletes all occurrences of the iri in the
# graph
return
for pred, obj in graph.predicate_objects(subject=old_iri):
graph.add((new_iri, pred, obj))
graph.remove((old_iri, pred, obj))
for subj, pred in graph.subject_predicates(object=old_iri):
graph.add((subj, pred, new_iri))
graph.remove((subj, pred, old_iri))