def get_do_metadata():
# from the do owl file, get do labels, descriptions
g = Graph()
g.parse(DO_OWL_PATH)
disease_ontology = Literal('disease_ontology', datatype=URIRef('http://www.w3.org/2001/XMLSchema#string'))
query = """
SELECT * WHERE {
?id oboInOwl:hasOBONamespace ?disease_ontology .
?id rdfs:label ?label .
OPTIONAL {?id obo:IAO_0000115 ?descr}
FILTER NOT EXISTS {?id owl:deprecated ?dep}
}
"""
rows = g.query(query, initBindings={'disease_ontology': disease_ontology})
res = [{str(k): str(v) for k, v in binding.items()} for binding in rows.bindings]
df = pd.DataFrame(res)
df.drop_duplicates(subset=['id'], inplace=True)
df.fillna("", inplace=True)
do = df.to_dict("records")
do = {purl_to_curie(x['id']): x for x in do}
return do
python类URIRef()的实例源码
def result_set_2_rdf(self, result_set, reduced_mapping, fragment):
for tweet in result_set:
for s, p, o in reduced_mapping.mapping:
subject = s
obj = o
splited_subject = subject.split('{')
subject_prefix = splited_subject[0]
subject_jsonpath = parse(splited_subject[1].split('}')[0])
subject_values = [match.value for match in subject_jsonpath.find(tweet)]
if '$.' in obj:
object_jsonpath = parse(obj.split('{')[0].split('}')[0])
object_values = [match.value for match in object_jsonpath.find(tweet)]
for object_value in object_values:
fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, Literal(object_value))
else:
fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, obj)
def result_set_2_rdf(self, result_set, reduced_mapping, fragment):
for repo in result_set:
for s, p, o in reduced_mapping.mapping:
subject = s
obj = o
splited_subject = subject.split('{')
subject_prefix = splited_subject[0]
subject_jsonpath = parse(splited_subject[1].split('}')[0])
subject_values = [match.value for match in subject_jsonpath.find(repo)]
if '$.' in obj:
object_jsonpath = parse(obj.split('{')[0].split('}')[0])
object_values = [match.value for match in object_jsonpath.find(repo)]
for object_value in object_values:
fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, Literal(object_value))
else:
fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, obj)
def _preprocess_mapping(self):
resources = []
for s in self.mapping.subjects():
subject = None
if isinstance(s, URIRef) and s not in resources:
resources.append(s)
for node in self.mapping.objects(subject=s, predicate=rr.subjectMap):
for template in self.mapping.objects(subject=node, predicate=rr.template):
subject = template
for type_class in self.mapping.objects(subject=node, predicate=rr['class']):
self.preprocessed_mapping.add((subject, RDF.type, type_class))
for node in self.mapping.objects(subject=s, predicate=rr.predicateObjectMap):
predicate = None
for predicate_object in self.mapping.objects(subject=node, predicate=rr.predicate):
predicate = predicate_object
for object_map in self.mapping.objects(subject=node, predicate=rr.objectMap):
for reference in self.mapping.objects(subject=object_map, predicate=xrr.reference):
self.preprocessed_mapping.add((subject, predicate, reference))
for node in self.mapping.objects(subject=s, predicate=xrr.logicalSource):
subject_prefix = subject.split('{')[0]
self.logical_sources[subject_prefix] = {}
for query in self.mapping.objects(subject=node, predicate=xrr.query):
self.logical_sources[subject_prefix]['query'] = query
for iterator in self.mapping.objects(subject=node, predicate=rml.iterator):
self.logical_sources[subject_prefix]['iterator'] = iterator
def getgraphcontent(self, graphuri):
"""Get the serialized content of a named graph.
Args:
graphuri: The URI of a named graph.
Returns:
content: A list of strings where each string is a quad.
"""
data = []
context = self.store.get_context(URIRef(graphuri))
triplestring = context.serialize(format='nt').decode('UTF-8')
# Since we have triples here, we transform them to quads by adding the graphuri
# TODO This might cause problems if ' .\n' will be part of a literal.
# Maybe a regex would be a better solution
triplestring = triplestring.replace(' .\n', ' <' + graphuri + '> .\n')
data = triplestring.splitlines()
data.remove('')
return data
def test_create_graph():
"""Create new graphFactory Object"""
from sc import graphManager
PROV = Namespace("http://www.w3.org/ns/prov#")
tstregistry = graphManager.VocabularyRegistry()
vocab1 = Vocabulary1()
tstregistry.register(vocab1)
vocab2 = Vocabulary2()
tstregistry.register(vocab2)
tstregistry.build_graph()
print tstregistry.get_turtle()
# Check assertions in global graph store
assert (URIRef("http://orcid.org/000-0003-4901-6059"),
RDF.type, PROV.Person) in tstregistry.global_graph
assert (URIRef(uuidurn),
RDFS.label, Literal(
"Docker: https://www.docker.com/")) in tstregistry.global_graph
# Check Serialization
jsongraph = json.loads(tstregistry.get_json_ld())
assert '@context' in jsongraph
def __addAction(self, action, statement):
element = self.trans.createElement(action)
for item in statement:
if isinstance(item, Literal):
literal = self.trans.createElement("literal")
if item.datatype is not None: literal.setAttribute("datatype", str(item.datatype))
if item.language is not None: literal.setAttribute("xml:lang", str(item.language))
literal.appendChild(self.trans.createTextNode(str(item)))
element.appendChild(literal)
elif isinstance(item, URIRef):
uri = self.trans.createElement("uri")
uri.appendChild(self.trans.createTextNode(str(item)))
element.appendChild(uri)
elif isinstance(item, BNode):
bnode = self.trans.createElement("bnode")
bnode.appendChild(self.trans.createTextNode(str(item)))
element.appendChild(bnode)
else:
raise Exception("Unknown element: " + item)
self.trans.childNodes[0].appendChild(element)
def __setattr__(self, name, values):
self._objectGraph._load(self.uri)
unwrappedValues = []
for value in values:
# unwrap rdfobjects:
if isinstance(value, RDFObject):
unwrappedValues.append(value.uri)
# pass through rdflib objects:
elif isinstance(value, URIRef) or isinstance(value, BNode) or isinstance(value, Literal):
unwrappedValues.append(value)
# wrap literals:
else:
unwrappedValues.append(Literal(value))
# look for a property mapping for this name:
prop = self._getProp(name)
if name.startswith("r_"):
self._objectGraph._setSubjects(unwrappedValues, prop, self.uri)
else:
self._objectGraph._setObjects(self.uri, prop, unwrappedValues)
def _object_value_int(self, subject, predicate):
'''
Given a subject and a predicate, returns the value of the object as an
integer
Both subject and predicate must be rdflib URIRef or BNode objects
If the value can not be parsed as intger, returns None
'''
object_value = self._object_value(subject, predicate)
if object_value:
try:
return int(object_value)
except ValueError:
pass
return None
def _contact_details(self, subject, predicate):
'''
Returns a dict with details about a vcard expression
Both subject and predicate must be rdflib URIRef or BNode objects
Returns keys for uri, name and email with the values set to
None if they could not be found
'''
contact = {}
for agent in self.g.objects(subject, predicate):
contact['uri'] = (unicode(agent) if isinstance(agent,
rdflib.term.URIRef) else None)
contact['name'] = self._object_value(agent, VCARD.fn)
contact['email'] = self._object_value(agent, VCARD.hasEmail)
return contact
def graph_from_catalog(self, catalog_dict=None):
'''
Creates a graph for the catalog (CKAN site) using the loaded profiles
The class RDFLib graph (accessible via `serializer.g`) will be updated
by the loaded profiles.
Returns the reference to the catalog, which will be an rdflib URIRef.
'''
catalog_ref = URIRef(catalog_uri())
for profile_class in self._profiles:
profile = profile_class(self.g, self.compatibility_mode)
profile.graph_from_catalog(catalog_dict, catalog_ref)
return catalog_ref
def test_object_list(self):
p = RDFProfile(_default_graph())
p.g.add((URIRef('http://example.org/datasets/1'),
DCAT.keyword,
Literal('space')))
p.g.add((URIRef('http://example.org/datasets/1'),
DCAT.keyword,
Literal('moon')))
value = p._object_value_list(URIRef('http://example.org/datasets/1'),
DCAT.keyword)
assert isinstance(value, list)
assert isinstance(value[0], unicode)
eq_(len(value), 2)
eq_(sorted(value), ['moon', 'space'])
def test_publisher_ref(self):
data = '''<?xml version="1.0" encoding="utf-8" ?>
<rdf:RDF
xmlns:dct="http://purl.org/dc/terms/"
xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
xmlns:rdfs="http://www.w3.org/2000/01/rdf-schema#">
<rdfs:SomeClass rdf:about="http://example.org">
<dct:publisher rdf:resource="http://orgs.vocab.org/some-org" />
</rdfs:SomeClass>
</rdf:RDF>
'''
g = Graph()
g.parse(data=data)
p = RDFProfile(g)
publisher = p._publisher(URIRef('http://example.org'), DCT.publisher)
eq_(publisher['uri'], 'http://orgs.vocab.org/some-org')
test_euro_dcatap_profile_serialize.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_graph_from_catalog_dict(self):
catalog_dict = {
'title': 'My Catalog',
'description': 'An Open Data Catalog',
'homepage': 'http://example.com',
'language': 'de',
}
s = RDFSerializer()
g = s.g
catalog = s.graph_from_catalog(catalog_dict)
eq_(unicode(catalog), utils.catalog_uri())
# Basic fields
assert self._triple(g, catalog, RDF.type, DCAT.Catalog)
assert self._triple(g, catalog, DCT.title, catalog_dict['title'])
assert self._triple(g, catalog, DCT.description, catalog_dict['description'])
assert self._triple(g, catalog, FOAF.homepage, URIRef(catalog_dict['homepage']))
assert self._triple(g, catalog, DCT.language, catalog_dict['language'])
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_dataset_license_from_distribution_by_uri(self):
# license_id retrieved from the URI of dcat:license object
g = Graph()
dataset = URIRef("http://example.org/datasets/1")
g.add((dataset, RDF.type, DCAT.Dataset))
distribution = URIRef("http://example.org/datasets/1/ds/1")
g.add((dataset, DCAT.distribution, distribution))
g.add((distribution, RDF.type, DCAT.Distribution))
g.add((distribution, DCT.license,
URIRef("http://www.opendefinition.org/licenses/cc-by")))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
dataset = [d for d in p.datasets()][0]
eq_(dataset['license_id'], 'cc-by')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_dataset_license_from_distribution_by_title(self):
# license_id retrieved from dct:title of dcat:license object
g = Graph()
dataset = URIRef("http://example.org/datasets/1")
g.add((dataset, RDF.type, DCAT.Dataset))
distribution = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution, RDF.type, DCAT.Distribution))
g.add((dataset, DCAT.distribution, distribution))
license = BNode()
g.add((distribution, DCT.license, license))
g.add((license, DCT.title, Literal("Creative Commons Attribution")))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
dataset = [d for d in p.datasets()][0]
eq_(dataset['license_id'], 'cc-by')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_distribution_access_url(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['url'], u'http://access.url.org')
assert 'download_url' not in resource
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def test_distribution_both_access_and_download_url(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org')))
g.add((distribution1_1, DCAT.downloadURL, Literal('http://download.url.org')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['url'], u'http://access.url.org')
eq_(resource['download_url'], u'http://download.url.org')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_distribution_format_imt_and_format(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
g.add((distribution1_1, DCT['format'], Literal('CSV')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'CSV')
eq_(resource['mimetype'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_distribution_format_format_only(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCT['format'], Literal('CSV')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'CSV')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_distribution_format_imt_only(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
if toolkit.check_ckan_version(min_version='2.3'):
eq_(resource['format'], u'CSV')
eq_(resource['mimetype'], u'text/csv')
else:
eq_(resource['format'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_distribution_format_imt_only_normalize_false(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'text/csv')
eq_(resource['mimetype'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_distribution_format_unknown_imt(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'text/unknown-imt')
eq_(resource['mimetype'], u'text/unknown-imt')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_distribution_format_imt_normalized(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
eq_(resource['format'], u'text/unknown-imt')
eq_(resource['mimetype'], u'text/unknown-imt')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def test_distribution_format_format_normalized(self):
g = Graph()
dataset1 = URIRef("http://example.org/datasets/1")
g.add((dataset1, RDF.type, DCAT.Dataset))
distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
g.add((distribution1_1, RDF.type, DCAT.Distribution))
g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
g.add((distribution1_1, DCT['format'], Literal('Comma Separated Values')))
g.add((dataset1, DCAT.distribution, distribution1_1))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
resource = datasets[0]['resources'][0]
if toolkit.check_ckan_version(min_version='2.3'):
eq_(resource['format'], u'CSV')
eq_(resource['mimetype'], u'text/csv')
else:
eq_(resource['format'], u'Comma Separated Values')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_spatial_rdfs_label(self):
g = Graph()
dataset = URIRef('http://example.org/datasets/1')
g.add((dataset, RDF.type, DCAT.Dataset))
spatial_uri = URIRef('http://geonames/Newark')
g.add((dataset, DCT.spatial, spatial_uri))
g.add((spatial_uri, RDF.type, DCT.Location))
g.add((spatial_uri, RDFS.label, Literal('Newark')))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
extras = self._extras(datasets[0])
eq_(extras['spatial_text'], 'Newark')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def test_spatial_wkt_only(self):
g = Graph()
dataset = URIRef('http://example.org/datasets/1')
g.add((dataset, RDF.type, DCAT.Dataset))
spatial_uri = URIRef('http://geonames/Newark')
g.add((dataset, DCT.spatial, spatial_uri))
g.add((spatial_uri, RDF.type, DCT.Location))
g.add((spatial_uri,
LOCN.geometry,
Literal('POINT (67 89)', datatype=GSP.wktLiteral)))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
extras = self._extras(datasets[0])
# NOTE: geomet returns floats for coordinates on WKT -> GeoJSON
eq_(extras['spatial'], '{"type": "Point", "coordinates": [67.0, 89.0]}')
test_euro_dcatap_profile_parse.py 文件源码
项目:dati-ckan-docker
作者: italia
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
def test_spatial_uri_only(self):
g = Graph()
dataset = URIRef('http://example.org/datasets/1')
g.add((dataset, RDF.type, DCAT.Dataset))
spatial_uri = URIRef('http://geonames/Newark')
g.add((dataset, DCT.spatial, spatial_uri))
p = RDFParser(profiles=['euro_dcat_ap'])
p.g = g
datasets = [d for d in p.datasets()]
extras = self._extras(datasets[0])
eq_(extras['spatial_uri'], 'http://geonames/Newark')
assert_true('spatial_text' not in extras)
assert_true('spatial' not in extras)
def add_property_definition(self, concept: RF2Files.Concept, concept_uri: URIRef) -> None:
"""
Add a property definition
:param concept: Concept entry for the given property
:param concept_uri: Concept URI
:return:
"""
parents = [parent for parent in self._relationships.parents(concept.id)
if concept.id != Concept_model_attribute_sctid]
if len(parents) > 1 and concept.definitionStatusId == Defined_sctid:
target, collection = intersection(self)
[collection.append(as_uri(parent)) for parent in parents]
self.add_t((concept_uri, OWL.equivalentProperty, target), self._stats.num_properties)
else:
[self.add_t((concept_uri, RDFS.subPropertyOf, as_uri(parent)), self._stats.num_properties)
for parent in parents]
# add an owl:propertyChain assertion for $subject if is in the RIGHT_ID
if concept.id in self._context.RIGHT_ID:
node = BNode()
self.add_t((node, RDFS.subPropertyOf, concept_uri), None)
coll = BNode()
Collection(self, coll, [concept_uri, as_uri(self._context.RIGHT_ID[concept.id])])
self.add_t((node, OWL.propertyChain, coll), self._stats.num_propchains)
def verify_virtual_columns(sub, g, orig_value_str, encoded_value_str):
v1_triples = list(g.triples((sub, PRE_NS['v1p{}'.format(encoded_value_str)], None)))
assert len(v1_triples) == 1
assert "v1p{}".format(encoded_value_str) in str(v1_triples[0][1])
assert orig_value_str == str(v1_triples[0][2])
v2_triples = list(g.triples((sub, PRE_NS['v2p{}'.format(encoded_value_str)], None)))
assert len(v2_triples) == 1
assert "v2p{}".format(encoded_value_str) in str(v2_triples[0][1])
assert 'v2v{}'.format(encoded_value_str) in str(v2_triples[0][2])
# Standalone virtual column
standalone_sub = URIRef('http://www.example.org/v3s{}'.format(encoded_value_str))
v3_triples = list(g.triples((standalone_sub, None, None)))
assert len(v3_triples) == 1
assert "v3p{}".format(encoded_value_str) in str(v3_triples[0][1])
assert 'v3v{}'.format(encoded_value_str) in str(v3_triples[0][2])