python类URIRef()的实例源码

mesh_changes.py 文件源码 项目:scheduled-bots 作者: SuLab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def get_do_metadata():
    # from the do owl file, get do labels, descriptions
    g = Graph()
    g.parse(DO_OWL_PATH)

    disease_ontology = Literal('disease_ontology', datatype=URIRef('http://www.w3.org/2001/XMLSchema#string'))
    query = """
        SELECT * WHERE {
            ?id oboInOwl:hasOBONamespace ?disease_ontology .
            ?id rdfs:label ?label .
            OPTIONAL {?id obo:IAO_0000115 ?descr}
            FILTER NOT EXISTS {?id owl:deprecated ?dep}
        }
        """
    rows = g.query(query, initBindings={'disease_ontology': disease_ontology})
    res = [{str(k): str(v) for k, v in binding.items()} for binding in rows.bindings]
    df = pd.DataFrame(res)
    df.drop_duplicates(subset=['id'], inplace=True)
    df.fillna("", inplace=True)
    do = df.to_dict("records")
    do = {purl_to_curie(x['id']): x for x in do}
    return do
mapper_twitter_xr2rml.py 文件源码 项目:odmtp-tpf 作者: benjimor 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def result_set_2_rdf(self, result_set, reduced_mapping, fragment):
        for tweet in result_set:
            for s, p, o in reduced_mapping.mapping:
                subject = s
                obj = o
                splited_subject = subject.split('{')
                subject_prefix = splited_subject[0]
                subject_jsonpath = parse(splited_subject[1].split('}')[0])
                subject_values = [match.value for match in subject_jsonpath.find(tweet)]
                if '$.' in obj:
                    object_jsonpath = parse(obj.split('{')[0].split('}')[0])
                    object_values = [match.value for match in object_jsonpath.find(tweet)]
                    for object_value in object_values:
                        fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, Literal(object_value))
                else:
                    fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, obj)
mapper_github_xr2rml.py 文件源码 项目:odmtp-tpf 作者: benjimor 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def result_set_2_rdf(self, result_set, reduced_mapping, fragment):
        for repo in result_set:
            for s, p, o in reduced_mapping.mapping:
                subject = s
                obj = o
                splited_subject = subject.split('{')
                subject_prefix = splited_subject[0]
                subject_jsonpath = parse(splited_subject[1].split('}')[0])
                subject_values = [match.value for match in subject_jsonpath.find(repo)]
                if '$.' in obj:
                    object_jsonpath = parse(obj.split('{')[0].split('}')[0])
                    object_values = [match.value for match in object_jsonpath.find(repo)]
                    for object_value in object_values:
                        fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, Literal(object_value))
                else:
                    fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, obj)
xr2rml_mapper.py 文件源码 项目:odmtp-tpf 作者: benjimor 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _preprocess_mapping(self):
        resources = []
        for s in self.mapping.subjects():
            subject = None
            if isinstance(s, URIRef) and s not in resources:
                resources.append(s)
                for node in self.mapping.objects(subject=s, predicate=rr.subjectMap):
                    for template in self.mapping.objects(subject=node, predicate=rr.template):
                        subject = template
                        for type_class in self.mapping.objects(subject=node, predicate=rr['class']):
                            self.preprocessed_mapping.add((subject, RDF.type, type_class))
                for node in self.mapping.objects(subject=s, predicate=rr.predicateObjectMap):
                    predicate = None
                    for predicate_object in self.mapping.objects(subject=node, predicate=rr.predicate):
                        predicate = predicate_object
                        for object_map in self.mapping.objects(subject=node, predicate=rr.objectMap):
                            for reference in self.mapping.objects(subject=object_map, predicate=xrr.reference):
                                self.preprocessed_mapping.add((subject, predicate, reference))
                for node in self.mapping.objects(subject=s, predicate=xrr.logicalSource):
                    subject_prefix = subject.split('{')[0]
                    self.logical_sources[subject_prefix] = {}
                    for query in self.mapping.objects(subject=node, predicate=xrr.query):
                        self.logical_sources[subject_prefix]['query'] = query
                    for iterator in self.mapping.objects(subject=node, predicate=rml.iterator):
                        self.logical_sources[subject_prefix]['iterator'] = iterator
core.py 文件源码 项目:QuitStore 作者: AKSW 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def getgraphcontent(self, graphuri):
        """Get the serialized content of a named graph.

        Args:
            graphuri: The URI of a named graph.
        Returns:
            content: A list of strings where each string is a quad.
        """
        data = []
        context = self.store.get_context(URIRef(graphuri))
        triplestring = context.serialize(format='nt').decode('UTF-8')

        # Since we have triples here, we transform them to quads by adding the graphuri
        # TODO This might cause problems if ' .\n' will be part of a literal.
        #   Maybe a regex would be a better solution
        triplestring = triplestring.replace(' .\n', ' <' + graphuri + '> .\n')

        data = triplestring.splitlines()
        data.remove('')

        return data
test_graphregistry.py 文件源码 项目:smartcontainers 作者: crcresearch 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_create_graph():
    """Create new graphFactory Object"""
    from sc import graphManager

    PROV = Namespace("http://www.w3.org/ns/prov#")
    tstregistry = graphManager.VocabularyRegistry()

    vocab1 = Vocabulary1()
    tstregistry.register(vocab1)
    vocab2 = Vocabulary2()
    tstregistry.register(vocab2)

    tstregistry.build_graph()
    print tstregistry.get_turtle()
    # Check assertions in global graph store
    assert (URIRef("http://orcid.org/000-0003-4901-6059"),
            RDF.type, PROV.Person) in tstregistry.global_graph
    assert (URIRef(uuidurn),
            RDFS.label,  Literal(
                "Docker: https://www.docker.com/")) in tstregistry.global_graph
    # Check Serialization
    jsongraph = json.loads(tstregistry.get_json_ld())
    assert '@context' in jsongraph
rdfobject.py 文件源码 项目:programming-the-semantic-web 作者: utecht 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def __addAction(self, action, statement):
        element = self.trans.createElement(action)
        for item in statement:
            if isinstance(item, Literal):
                literal = self.trans.createElement("literal")
                if item.datatype is not None: literal.setAttribute("datatype", str(item.datatype))
                if item.language is not None: literal.setAttribute("xml:lang", str(item.language))
                literal.appendChild(self.trans.createTextNode(str(item)))
                element.appendChild(literal)
            elif isinstance(item, URIRef):
                uri = self.trans.createElement("uri")
                uri.appendChild(self.trans.createTextNode(str(item)))
                element.appendChild(uri)
            elif isinstance(item, BNode):
                bnode = self.trans.createElement("bnode")
                bnode.appendChild(self.trans.createTextNode(str(item)))
                element.appendChild(bnode)
            else:
                raise Exception("Unknown element: " + item)
        self.trans.childNodes[0].appendChild(element)
rdfobject.py 文件源码 项目:programming-the-semantic-web 作者: utecht 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __setattr__(self, name, values):
        self._objectGraph._load(self.uri)
        unwrappedValues = []
        for value in values:
            # unwrap rdfobjects:
            if isinstance(value, RDFObject):
                unwrappedValues.append(value.uri)
            # pass through rdflib objects:
            elif isinstance(value, URIRef) or isinstance(value, BNode) or isinstance(value, Literal):
                unwrappedValues.append(value)
            # wrap literals:
            else:
                unwrappedValues.append(Literal(value))
        # look for a property mapping for this name:
        prop = self._getProp(name)
        if name.startswith("r_"):
            self._objectGraph._setSubjects(unwrappedValues, prop, self.uri)
        else:
            self._objectGraph._setObjects(self.uri, prop, unwrappedValues)
profiles.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _object_value_int(self, subject, predicate):
        '''
        Given a subject and a predicate, returns the value of the object as an
        integer

        Both subject and predicate must be rdflib URIRef or BNode objects

        If the value can not be parsed as intger, returns None
        '''
        object_value = self._object_value(subject, predicate)
        if object_value:
            try:
                return int(object_value)
            except ValueError:
                pass
        return None
profiles.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _contact_details(self, subject, predicate):
        '''
        Returns a dict with details about a vcard expression

        Both subject and predicate must be rdflib URIRef or BNode objects

        Returns keys for uri, name and email with the values set to
        None if they could not be found
        '''

        contact = {}

        for agent in self.g.objects(subject, predicate):

            contact['uri'] = (unicode(agent) if isinstance(agent,
                              rdflib.term.URIRef) else None)

            contact['name'] = self._object_value(agent, VCARD.fn)

            contact['email'] = self._object_value(agent, VCARD.hasEmail)

        return contact
processors.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def graph_from_catalog(self, catalog_dict=None):
        '''
        Creates a graph for the catalog (CKAN site) using the loaded profiles

        The class RDFLib graph (accessible via `serializer.g`) will be updated
        by the loaded profiles.

        Returns the reference to the catalog, which will be an rdflib URIRef.
        '''

        catalog_ref = URIRef(catalog_uri())

        for profile_class in self._profiles:
            profile = profile_class(self.g, self.compatibility_mode)
            profile.graph_from_catalog(catalog_dict, catalog_ref)

        return catalog_ref
test_base_profile.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_object_list(self):

        p = RDFProfile(_default_graph())

        p.g.add((URIRef('http://example.org/datasets/1'),
                 DCAT.keyword,
                 Literal('space')))
        p.g.add((URIRef('http://example.org/datasets/1'),
                 DCAT.keyword,
                 Literal('moon')))

        value = p._object_value_list(URIRef('http://example.org/datasets/1'),
                                     DCAT.keyword)

        assert isinstance(value, list)
        assert isinstance(value[0], unicode)
        eq_(len(value), 2)
        eq_(sorted(value), ['moon', 'space'])
test_base_profile.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_publisher_ref(self):

        data = '''<?xml version="1.0" encoding="utf-8" ?>
        <rdf:RDF
         xmlns:dct="http://purl.org/dc/terms/"
         xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#"
         xmlns:rdfs="http://www.w3.org/2000/01/rdf-schema#">
        <rdfs:SomeClass rdf:about="http://example.org">
          <dct:publisher rdf:resource="http://orgs.vocab.org/some-org" />
        </rdfs:SomeClass>
        </rdf:RDF>
        '''

        g = Graph()

        g.parse(data=data)

        p = RDFProfile(g)

        publisher = p._publisher(URIRef('http://example.org'), DCT.publisher)

        eq_(publisher['uri'], 'http://orgs.vocab.org/some-org')
test_euro_dcatap_profile_serialize.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_graph_from_catalog_dict(self):

        catalog_dict = {
            'title': 'My Catalog',
            'description': 'An Open Data Catalog',
            'homepage': 'http://example.com',
            'language': 'de',
        }

        s = RDFSerializer()
        g = s.g

        catalog = s.graph_from_catalog(catalog_dict)

        eq_(unicode(catalog), utils.catalog_uri())

        # Basic fields
        assert self._triple(g, catalog, RDF.type, DCAT.Catalog)
        assert self._triple(g, catalog, DCT.title, catalog_dict['title'])
        assert self._triple(g, catalog, DCT.description, catalog_dict['description'])
        assert self._triple(g, catalog, FOAF.homepage, URIRef(catalog_dict['homepage']))
        assert self._triple(g, catalog, DCT.language, catalog_dict['language'])
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_dataset_license_from_distribution_by_uri(self):
        # license_id retrieved from the URI of dcat:license object
        g = Graph()

        dataset = URIRef("http://example.org/datasets/1")
        g.add((dataset, RDF.type, DCAT.Dataset))

        distribution = URIRef("http://example.org/datasets/1/ds/1")
        g.add((dataset, DCAT.distribution, distribution))
        g.add((distribution, RDF.type, DCAT.Distribution))
        g.add((distribution, DCT.license,
               URIRef("http://www.opendefinition.org/licenses/cc-by")))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        dataset = [d for d in p.datasets()][0]
        eq_(dataset['license_id'], 'cc-by')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_dataset_license_from_distribution_by_title(self):
        # license_id retrieved from dct:title of dcat:license object
        g = Graph()

        dataset = URIRef("http://example.org/datasets/1")
        g.add((dataset, RDF.type, DCAT.Dataset))

        distribution = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution, RDF.type, DCAT.Distribution))
        g.add((dataset, DCAT.distribution, distribution))
        license = BNode()
        g.add((distribution, DCT.license, license))
        g.add((license, DCT.title, Literal("Creative Commons Attribution")))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        dataset = [d for d in p.datasets()][0]
        eq_(dataset['license_id'], 'cc-by')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_distribution_access_url(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['url'], u'http://access.url.org')
        assert 'download_url' not in resource
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_distribution_both_access_and_download_url(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org')))
        g.add((distribution1_1, DCAT.downloadURL, Literal('http://download.url.org')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['url'], u'http://access.url.org')
        eq_(resource['download_url'], u'http://download.url.org')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_distribution_format_imt_and_format(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
        g.add((distribution1_1, DCT['format'], Literal('CSV')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'CSV')
        eq_(resource['mimetype'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_distribution_format_format_only(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCT['format'], Literal('CSV')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'CSV')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_distribution_format_imt_only(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]
        if toolkit.check_ckan_version(min_version='2.3'):
            eq_(resource['format'], u'CSV')
            eq_(resource['mimetype'], u'text/csv')
        else:
            eq_(resource['format'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_distribution_format_imt_only_normalize_false(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'text/csv')
        eq_(resource['mimetype'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_distribution_format_unknown_imt(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'text/unknown-imt')
        eq_(resource['mimetype'], u'text/unknown-imt')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_distribution_format_imt_normalized(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'text/unknown-imt')
        eq_(resource['mimetype'], u'text/unknown-imt')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_distribution_format_format_normalized(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
        g.add((distribution1_1, DCT['format'], Literal('Comma Separated Values')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        if toolkit.check_ckan_version(min_version='2.3'):
            eq_(resource['format'], u'CSV')
            eq_(resource['mimetype'], u'text/csv')
        else:
            eq_(resource['format'], u'Comma Separated Values')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_spatial_rdfs_label(self):
        g = Graph()

        dataset = URIRef('http://example.org/datasets/1')
        g.add((dataset, RDF.type, DCAT.Dataset))

        spatial_uri = URIRef('http://geonames/Newark')
        g.add((dataset, DCT.spatial, spatial_uri))

        g.add((spatial_uri, RDF.type, DCT.Location))
        g.add((spatial_uri, RDFS.label, Literal('Newark')))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        extras = self._extras(datasets[0])

        eq_(extras['spatial_text'], 'Newark')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_spatial_wkt_only(self):
        g = Graph()

        dataset = URIRef('http://example.org/datasets/1')
        g.add((dataset, RDF.type, DCAT.Dataset))

        spatial_uri = URIRef('http://geonames/Newark')
        g.add((dataset, DCT.spatial, spatial_uri))

        g.add((spatial_uri, RDF.type, DCT.Location))
        g.add((spatial_uri,
               LOCN.geometry,
               Literal('POINT (67 89)', datatype=GSP.wktLiteral)))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        extras = self._extras(datasets[0])
        # NOTE: geomet returns floats for coordinates on WKT -> GeoJSON
        eq_(extras['spatial'], '{"type": "Point", "coordinates": [67.0, 89.0]}')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_spatial_uri_only(self):
        g = Graph()

        dataset = URIRef('http://example.org/datasets/1')
        g.add((dataset, RDF.type, DCAT.Dataset))

        spatial_uri = URIRef('http://geonames/Newark')
        g.add((dataset, DCT.spatial, spatial_uri))
        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        extras = self._extras(datasets[0])

        eq_(extras['spatial_uri'], 'http://geonames/Newark')
        assert_true('spatial_text' not in extras)
        assert_true('spatial' not in extras)
SNOMEDToOWL.py 文件源码 项目:SNOMEDToOWL 作者: hsolbrig 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def add_property_definition(self, concept: RF2Files.Concept, concept_uri: URIRef) -> None:
        """
        Add a property definition
        :param concept: Concept entry for the given property
        :param concept_uri: Concept URI
        :return:
        """
        parents = [parent for parent in self._relationships.parents(concept.id)
                   if concept.id != Concept_model_attribute_sctid]
        if len(parents) > 1 and concept.definitionStatusId == Defined_sctid:
            target, collection = intersection(self)
            [collection.append(as_uri(parent)) for parent in parents]
            self.add_t((concept_uri, OWL.equivalentProperty, target), self._stats.num_properties)
        else:
            [self.add_t((concept_uri, RDFS.subPropertyOf, as_uri(parent)), self._stats.num_properties)
             for parent in parents]

        # add an owl:propertyChain assertion for $subject if is in the RIGHT_ID
        if concept.id in self._context.RIGHT_ID:
            node = BNode()
            self.add_t((node, RDFS.subPropertyOf, concept_uri), None)
            coll = BNode()
            Collection(self, coll, [concept_uri, as_uri(self._context.RIGHT_ID[concept.id])])
            self.add_t((node, OWL.propertyChain, coll), self._stats.num_propchains)
test_url_special_chars.py 文件源码 项目:pycsvw 作者: bloomberg 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def verify_virtual_columns(sub, g, orig_value_str, encoded_value_str):
    v1_triples = list(g.triples((sub, PRE_NS['v1p{}'.format(encoded_value_str)], None)))
    assert len(v1_triples) == 1
    assert "v1p{}".format(encoded_value_str) in str(v1_triples[0][1])
    assert orig_value_str == str(v1_triples[0][2])
    v2_triples = list(g.triples((sub, PRE_NS['v2p{}'.format(encoded_value_str)], None)))
    assert len(v2_triples) == 1
    assert "v2p{}".format(encoded_value_str) in str(v2_triples[0][1])
    assert 'v2v{}'.format(encoded_value_str) in str(v2_triples[0][2])

    # Standalone virtual column
    standalone_sub = URIRef('http://www.example.org/v3s{}'.format(encoded_value_str))
    v3_triples = list(g.triples((standalone_sub, None, None)))
    assert len(v3_triples) == 1
    assert "v3p{}".format(encoded_value_str) in str(v3_triples[0][1])
    assert 'v3v{}'.format(encoded_value_str) in str(v3_triples[0][2])


问题


面经


文章

微信
公众号

扫码关注公众号