python类Literal()的实例源码

mesh_changes.py 文件源码 项目:scheduled-bots 作者: SuLab 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def get_do_metadata():
    # from the do owl file, get do labels, descriptions
    g = Graph()
    g.parse(DO_OWL_PATH)

    disease_ontology = Literal('disease_ontology', datatype=URIRef('http://www.w3.org/2001/XMLSchema#string'))
    query = """
        SELECT * WHERE {
            ?id oboInOwl:hasOBONamespace ?disease_ontology .
            ?id rdfs:label ?label .
            OPTIONAL {?id obo:IAO_0000115 ?descr}
            FILTER NOT EXISTS {?id owl:deprecated ?dep}
        }
        """
    rows = g.query(query, initBindings={'disease_ontology': disease_ontology})
    res = [{str(k): str(v) for k, v in binding.items()} for binding in rows.bindings]
    df = pd.DataFrame(res)
    df.drop_duplicates(subset=['id'], inplace=True)
    df.fillna("", inplace=True)
    do = df.to_dict("records")
    do = {purl_to_curie(x['id']): x for x in do}
    return do
mapper_twitter_xr2rml.py 文件源码 项目:odmtp-tpf 作者: benjimor 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def result_set_2_rdf(self, result_set, reduced_mapping, fragment):
        for tweet in result_set:
            for s, p, o in reduced_mapping.mapping:
                subject = s
                obj = o
                splited_subject = subject.split('{')
                subject_prefix = splited_subject[0]
                subject_jsonpath = parse(splited_subject[1].split('}')[0])
                subject_values = [match.value for match in subject_jsonpath.find(tweet)]
                if '$.' in obj:
                    object_jsonpath = parse(obj.split('{')[0].split('}')[0])
                    object_values = [match.value for match in object_jsonpath.find(tweet)]
                    for object_value in object_values:
                        fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, Literal(object_value))
                else:
                    fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, obj)
mapper_github_xr2rml.py 文件源码 项目:odmtp-tpf 作者: benjimor 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def result_set_2_rdf(self, result_set, reduced_mapping, fragment):
        for repo in result_set:
            for s, p, o in reduced_mapping.mapping:
                subject = s
                obj = o
                splited_subject = subject.split('{')
                subject_prefix = splited_subject[0]
                subject_jsonpath = parse(splited_subject[1].split('}')[0])
                subject_values = [match.value for match in subject_jsonpath.find(repo)]
                if '$.' in obj:
                    object_jsonpath = parse(obj.split('{')[0].split('}')[0])
                    object_values = [match.value for match in object_jsonpath.find(repo)]
                    for object_value in object_values:
                        fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, Literal(object_value))
                else:
                    fragment.add_data_triple(URIRef("%s%s" % (subject_prefix, subject_values[0])), p, obj)
test_graphregistry.py 文件源码 项目:smartcontainers 作者: crcresearch 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_create_graph():
    """Create new graphFactory Object"""
    from sc import graphManager

    PROV = Namespace("http://www.w3.org/ns/prov#")
    tstregistry = graphManager.VocabularyRegistry()

    vocab1 = Vocabulary1()
    tstregistry.register(vocab1)
    vocab2 = Vocabulary2()
    tstregistry.register(vocab2)

    tstregistry.build_graph()
    print tstregistry.get_turtle()
    # Check assertions in global graph store
    assert (URIRef("http://orcid.org/000-0003-4901-6059"),
            RDF.type, PROV.Person) in tstregistry.global_graph
    assert (URIRef(uuidurn),
            RDFS.label,  Literal(
                "Docker: https://www.docker.com/")) in tstregistry.global_graph
    # Check Serialization
    jsongraph = json.loads(tstregistry.get_json_ld())
    assert '@context' in jsongraph
rdfobject.py 文件源码 项目:programming-the-semantic-web 作者: utecht 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __addAction(self, action, statement):
        element = self.trans.createElement(action)
        for item in statement:
            if isinstance(item, Literal):
                literal = self.trans.createElement("literal")
                if item.datatype is not None: literal.setAttribute("datatype", str(item.datatype))
                if item.language is not None: literal.setAttribute("xml:lang", str(item.language))
                literal.appendChild(self.trans.createTextNode(str(item)))
                element.appendChild(literal)
            elif isinstance(item, URIRef):
                uri = self.trans.createElement("uri")
                uri.appendChild(self.trans.createTextNode(str(item)))
                element.appendChild(uri)
            elif isinstance(item, BNode):
                bnode = self.trans.createElement("bnode")
                bnode.appendChild(self.trans.createTextNode(str(item)))
                element.appendChild(bnode)
            else:
                raise Exception("Unknown element: " + item)
        self.trans.childNodes[0].appendChild(element)
rdfobject.py 文件源码 项目:programming-the-semantic-web 作者: utecht 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def __setattr__(self, name, values):
        self._objectGraph._load(self.uri)
        unwrappedValues = []
        for value in values:
            # unwrap rdfobjects:
            if isinstance(value, RDFObject):
                unwrappedValues.append(value.uri)
            # pass through rdflib objects:
            elif isinstance(value, URIRef) or isinstance(value, BNode) or isinstance(value, Literal):
                unwrappedValues.append(value)
            # wrap literals:
            else:
                unwrappedValues.append(Literal(value))
        # look for a property mapping for this name:
        prop = self._getProp(name)
        if name.startswith("r_"):
            self._objectGraph._setSubjects(unwrappedValues, prop, self.uri)
        else:
            self._objectGraph._setObjects(self.uri, prop, unwrappedValues)
profiles.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _add_date_triple(self, subject, predicate, value, _type=Literal):
        '''
        Adds a new triple with a date object

        Dates are parsed using dateutil, and if the date obtained is correct,
        added to the graph as an XSD.dateTime value.

        If there are parsing errors, the literal string value is added.
        '''
        if not value:
            return
        try:
            default_datetime = datetime.datetime(1, 1, 1, 0, 0, 0)
            _date = parse_date(value, default=default_datetime)

            self.g.add((subject, predicate, _type(_date.isoformat(),
                                                  datatype=XSD.dateTime)))
        except ValueError:
            self.g.add((subject, predicate, _type(value)))
test_base_profile.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_object_list(self):

        p = RDFProfile(_default_graph())

        p.g.add((URIRef('http://example.org/datasets/1'),
                 DCAT.keyword,
                 Literal('space')))
        p.g.add((URIRef('http://example.org/datasets/1'),
                 DCAT.keyword,
                 Literal('moon')))

        value = p._object_value_list(URIRef('http://example.org/datasets/1'),
                                     DCAT.keyword)

        assert isinstance(value, list)
        assert isinstance(value[0], unicode)
        eq_(len(value), 2)
        eq_(sorted(value), ['moon', 'space'])
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_dataset_license_from_distribution_by_title(self):
        # license_id retrieved from dct:title of dcat:license object
        g = Graph()

        dataset = URIRef("http://example.org/datasets/1")
        g.add((dataset, RDF.type, DCAT.Dataset))

        distribution = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution, RDF.type, DCAT.Distribution))
        g.add((dataset, DCAT.distribution, distribution))
        license = BNode()
        g.add((distribution, DCT.license, license))
        g.add((license, DCT.title, Literal("Creative Commons Attribution")))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        dataset = [d for d in p.datasets()][0]
        eq_(dataset['license_id'], 'cc-by')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_distribution_access_url(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['url'], u'http://access.url.org')
        assert 'download_url' not in resource
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_distribution_download_url(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.downloadURL, Literal('http://download.url.org')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['url'], u'http://download.url.org')
        eq_(resource['download_url'], u'http://download.url.org')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_distribution_both_access_and_download_url(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.accessURL, Literal('http://access.url.org')))
        g.add((distribution1_1, DCAT.downloadURL, Literal('http://download.url.org')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['url'], u'http://access.url.org')
        eq_(resource['download_url'], u'http://download.url.org')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_distribution_format_imt_and_format(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
        g.add((distribution1_1, DCT['format'], Literal('CSV')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'CSV')
        eq_(resource['mimetype'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_distribution_format_imt_only(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]
        if toolkit.check_ckan_version(min_version='2.3'):
            eq_(resource['format'], u'CSV')
            eq_(resource['mimetype'], u'text/csv')
        else:
            eq_(resource['format'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_distribution_format_imt_only_normalize_false(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/csv')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'text/csv')
        eq_(resource['mimetype'], u'text/csv')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_distribution_format_format_only_normalize_false(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCT['format'], Literal('CSV')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'CSV')
        assert 'mimetype' not in resource
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_distribution_format_unknown_imt(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'text/unknown-imt')
        eq_(resource['mimetype'], u'text/unknown-imt')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_distribution_format_imt_normalized(self):
        g = Graph()

        dataset1 = URIRef("http://example.org/datasets/1")
        g.add((dataset1, RDF.type, DCAT.Dataset))

        distribution1_1 = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution1_1, RDF.type, DCAT.Distribution))
        g.add((distribution1_1, DCAT.mediaType, Literal('text/unknown-imt')))
        g.add((dataset1, DCAT.distribution, distribution1_1))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        resource = datasets[0]['resources'][0]

        eq_(resource['format'], u'text/unknown-imt')
        eq_(resource['mimetype'], u'text/unknown-imt')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_spatial_rdfs_label(self):
        g = Graph()

        dataset = URIRef('http://example.org/datasets/1')
        g.add((dataset, RDF.type, DCAT.Dataset))

        spatial_uri = URIRef('http://geonames/Newark')
        g.add((dataset, DCT.spatial, spatial_uri))

        g.add((spatial_uri, RDF.type, DCT.Location))
        g.add((spatial_uri, RDFS.label, Literal('Newark')))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        extras = self._extras(datasets[0])

        eq_(extras['spatial_text'], 'Newark')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_spatial_wkt_only(self):
        g = Graph()

        dataset = URIRef('http://example.org/datasets/1')
        g.add((dataset, RDF.type, DCAT.Dataset))

        spatial_uri = URIRef('http://geonames/Newark')
        g.add((dataset, DCT.spatial, spatial_uri))

        g.add((spatial_uri, RDF.type, DCT.Location))
        g.add((spatial_uri,
               LOCN.geometry,
               Literal('POINT (67 89)', datatype=GSP.wktLiteral)))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        extras = self._extras(datasets[0])
        # NOTE: geomet returns floats for coordinates on WKT -> GeoJSON
        eq_(extras['spatial'], '{"type": "Point", "coordinates": [67.0, 89.0]}')
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_spatial_literal_only(self):
        g = Graph()

        dataset = URIRef('http://example.org/datasets/1')
        g.add((dataset, RDF.type, DCAT.Dataset))

        g.add((dataset, DCT.spatial, Literal('Newark')))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        datasets = [d for d in p.datasets()]

        extras = self._extras(datasets[0])

        eq_(extras['spatial_text'], 'Newark')
        assert_true('spatial_uri' not in extras)
        assert_true('spatial' not in extras)
profiles.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _add_agent(self, _dict, ref, basekey, _type):
        ''' Stores the Agent in this format:
                <dct:publisher rdf:resource="http://dati.gov.it/resource/Amministrazione/r_liguri"/>
                    <dcatapit:Agent rdf:about="http://dati.gov.it/resource/Amministrazione/r_liguri">
                        <rdf:type rdf:resource="&foaf;Agent"/>
                        <dct:identifier>r_liguri</dct:identifier>
                        <foaf:name>Regione Liguria</foaf:name>
                    </dcatapit:Agent>

            Returns the ref to the agent node
        '''

        agent_name = self._get_dict_value(_dict, basekey + '_name', 'N/A')
        agent_id = self._get_dict_value(_dict, basekey + '_identifier','N/A')

        agent = BNode()

        self.g.add((agent, RDF['type'], DCATAPIT.Agent))
        self.g.add((agent, RDF['type'], FOAF.Agent))
        self.g.add((ref, _type, agent))

        self.g.add((agent, FOAF.name, Literal(agent_name)))
        self.g.add((agent, DCT.identifier, Literal(agent_id)))

        return agent
test_null_values.py 文件源码 项目:pycsvw 作者: bloomberg 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_null_values_with_multiple_strings():
    csvw = CSVW(csv_path="tests/null1.csv",
                metadata_path="tests/null1.multiple.csv-metadata.json")
    rdf_contents = csvw.to_rdf()
    g = ConjunctiveGraph()
    g.parse(data=rdf_contents, format="turtle")

    all_objects = {x for x in g.objects()}

    assert Literal('null_key', datatype=XSD.token) not in all_objects
    assert Literal('null_sector') not in all_objects
    assert Literal('null_id', datatype=XSD.token) not in all_objects
    for id in ['10', '11', '12', '13']:
        assert Literal(id, datatype=XSD.token) not in all_objects

    all_preds = {x for x in g.predicates()}
    assert id_uri not in all_preds

    assert Literal('1', datatype=XSD.token) not in all_objects
test_datatypes.py 文件源码 项目:pycsvw 作者: bloomberg 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_date():
    with CSVW(csv_path="tests/datatypes.date.csv",
              metadata_path="tests/datatypes.date.csv-metadata.json") as csvw:
        rdf_output = csvw.to_rdf()

    g = ConjunctiveGraph()
    g.parse(data=rdf_output, format="turtle")

    date1_lit = Literal("2017-01-09", datatype=XSD.date)
    assert len(list(g.triples((NS['event/1'], NS['date1'], date1_lit)))) == 1

    date2_lit = Literal("2017-01-10Z", datatype=XSD.date)
    assert len(list(g.triples((NS['event/1'], NS['date2'], date2_lit)))) == 1

    date3_lit = Literal("2017-01-11", datatype=XSD.date)
    assert len(list(g.triples((NS['event/1'], NS['date3'], date3_lit)))) == 1

    date4_lit = Literal("2002-09-24-06:00", datatype=XSD.date)
    assert len(list(g.triples((NS['event/1'], NS['date4'], date4_lit)))) == 1

    date5_lit = Literal("2002-09-24+04:00", datatype=XSD.date)
    assert len(list(g.triples((NS['event/1'], NS['date5'], date5_lit)))) == 1
test_datatypes.py 文件源码 项目:pycsvw 作者: bloomberg 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_datetime():
    with CSVW(csv_path="tests/datatypes.datetime.csv",
              metadata_path="tests/datatypes.datetime.csv-metadata.json") as csvw:
        rdf_output = csvw.to_rdf()

    g = ConjunctiveGraph()
    g.parse(data=rdf_output, format="turtle")

    dt1_lit = Literal("2002-05-30T09:00:00", datatype=XSD.dateTime)
    assert len(list(g.triples((NS['event/1'], NS['datetime1'], dt1_lit)))) == 1

    dt2_lit = Literal("2002-05-30T09:30:10.5", datatype=XSD.dateTime)
    assert len(list(g.triples((NS['event/1'], NS['datetime2'], dt2_lit)))) == 1

    dt3_lit = Literal("2002-05-30T09:30:10Z", datatype=XSD.dateTime)
    assert len(list(g.triples((NS['event/1'], NS['datetime3'], dt3_lit)))) == 1

    dt4_lit = Literal("2002-05-30T09:30:10-06:00", datatype=XSD.dateTime)
    assert len(list(g.triples((NS['event/1'], NS['datetime4'], dt4_lit)))) == 1

    dt5_lit = Literal("2002-05-30T09:30:10+04:00", datatype=XSD.dateTime)
    assert len(list(g.triples((NS['event/1'], NS['datetime5'], dt5_lit)))) == 1

    datestamp = Literal("2004-04-12T13:20:00-05:00", datatype=XSD.dateTimeStamp)
    assert len(list(g.triples((NS['event/1'], NS['datetimestamp'], datestamp)))) == 1
test_datatypes.py 文件源码 项目:pycsvw 作者: bloomberg 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_bool_with_format():
    csvw = CSVW(csv_path="tests/datatypes.bool.csv",
                metadata_path="tests/datatypes.bool.csv-metadata.json")
    rdf_output = csvw.to_rdf()
    g = ConjunctiveGraph()
    g.parse(data=rdf_output, format="turtle")

    true_lit = Literal(True, datatype=XSD.boolean)
    false_lit = Literal(False, datatype=XSD.boolean)

    assert len(list(g.triples((NS['event/1'], NS['bool1'], true_lit)))) == 1
    assert len(list(g.triples((NS['event/1'], NS['bool2'], true_lit)))) == 1
    assert len(list(g.triples((NS['event/1'], NS['bool3'], true_lit)))) == 1
    assert len(list(g.triples((NS['event/2'], NS['bool1'], false_lit)))) == 1
    assert len(list(g.triples((NS['event/2'], NS['bool2'], false_lit)))) == 1
    assert len(list(g.triples((NS['event/2'], NS['bool3'], false_lit)))) == 1
    assert len(list(g.triples((NS['event/3'], NS['bool1'], false_lit)))) == 1
    assert len(list(g.triples((NS['event/3'], NS['bool2'], false_lit)))) == 1
    assert len(list(g.triples((NS['event/3'], NS['bool3'], false_lit)))) == 1
test_empty_values.py 文件源码 项目:pycsvw 作者: bloomberg 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_empty_boolean():
    csvw = CSVW(csv_path="tests/empty.csv",
                metadata_path="tests/empty.bool.csv-metadata.json")
    rdf_output = csvw.to_rdf()

    g = ConjunctiveGraph()
    g.parse(data=rdf_output, format="turtle")

    assert len(g) == 2
    assert len(list(g.triples((None, None, Literal(False))))) == 2

    csvw = CSVW(csv_path="tests/empty.csv",
                metadata_path="tests/empty.invalid_base.csv-metadata.json")
    rdf_output = csvw.to_rdf()

    g = ConjunctiveGraph()
    g.parse(data=rdf_output, format="turtle")

    assert len(g) == 0
test_bibcat_funcs.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def setUp(self):
        self.graph = rdflib.Graph()
        self.entity = rdflib.URIRef("https://bibcat.org/test-entity")
        self.simple_title_bnode = rdflib.BNode()
        self.graph.add((self.entity,
                        rdflib.RDF.type,
                        BF.Title))
        self.graph.add((self.entity, BF.title, self.simple_title_bnode))
        self.graph.add((self.simple_title_bnode, 
                        BF.mainTitle, 
                        rdflib.Literal("This is a test")))
        self.top_title_bnode = rdflib.BNode()
        self.graph.add((self.entity, BF.title, self.top_title_bnode))
        secondary_title_bnode = rdflib.BNode()
        self.graph.add((self.top_title_bnode, rdflib.RDF.type, BF.Topic))
        self.graph.add((self.top_title_bnode, 
                        rdflib.RDFS.label, 
                        rdflib.Literal("This is a title and a name")))
        self.graph.add((self.top_title_bnode, SCHEMA.name, secondary_title_bnode))
        self.graph.add((secondary_title_bnode, 
                        rdflib.RDF.value,
                        rdflib.Literal("This is a name")))
test_bibcat_funcs.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def setUp(self):
        self.graph = rdflib.Graph()
        self.entity_one = rdflib.URIRef("https://bibcat.org/test-entity")
        self.graph.add((self.entity_one, 
                        rdflib.RDF.type, 
                        rdflib.RDFS.Resource))
        self.graph.add((self.entity_one, 
                        rdflib.RDFS.label, 
                        rdflib.Literal("Test Entity One", lang="en")))
        self.entity_two = rdflib.URIRef("https://bibcat.org/test-entity-two")
        self.graph.add((self.entity_two, 
                        rdflib.RDF.type, 
                        rdflib.RDFS.Resource))
        self.graph.add((self.entity_two, 
                        rdflib.RDFS.label, 
                        rdflib.Literal("Test Entity Two", lang="en")))
        title_bnode = rdflib.BNode()
        self.graph.add((self.entity_two, BF.title, title_bnode))
        self.graph.add((title_bnode, rdflib.RDF.type, BF.Title))
        self.graph.add((title_bnode, BF.subTitle, rdflib.Literal("Subtitle ")))
processor.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __generate_object_term__(self, datatype, value):
        """Internal method takes a datatype (can be None) and returns
        the RDF Object Term

        Args:

        -----
            datatype: None, or rdflib.URIRef
            value: Varys depending on ingester
        """
        if datatype == NS_MGR.xsd.anyURI:
            term = rdflib.URIRef(value)
        elif datatype:
            term = rdflib.Literal(value, datatype=datatype)
        else:
            term = rdflib.Literal(value)
        return term


问题


面经


文章

微信
公众号

扫码关注公众号