python类BNode()的实例源码

rdfobject.py 文件源码 项目:programming-the-semantic-web 作者: utecht 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __addAction(self, action, statement):
        element = self.trans.createElement(action)
        for item in statement:
            if isinstance(item, Literal):
                literal = self.trans.createElement("literal")
                if item.datatype is not None: literal.setAttribute("datatype", str(item.datatype))
                if item.language is not None: literal.setAttribute("xml:lang", str(item.language))
                literal.appendChild(self.trans.createTextNode(str(item)))
                element.appendChild(literal)
            elif isinstance(item, URIRef):
                uri = self.trans.createElement("uri")
                uri.appendChild(self.trans.createTextNode(str(item)))
                element.appendChild(uri)
            elif isinstance(item, BNode):
                bnode = self.trans.createElement("bnode")
                bnode.appendChild(self.trans.createTextNode(str(item)))
                element.appendChild(bnode)
            else:
                raise Exception("Unknown element: " + item)
        self.trans.childNodes[0].appendChild(element)
rdfobject.py 文件源码 项目:programming-the-semantic-web 作者: utecht 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __setattr__(self, name, values):
        self._objectGraph._load(self.uri)
        unwrappedValues = []
        for value in values:
            # unwrap rdfobjects:
            if isinstance(value, RDFObject):
                unwrappedValues.append(value.uri)
            # pass through rdflib objects:
            elif isinstance(value, URIRef) or isinstance(value, BNode) or isinstance(value, Literal):
                unwrappedValues.append(value)
            # wrap literals:
            else:
                unwrappedValues.append(Literal(value))
        # look for a property mapping for this name:
        prop = self._getProp(name)
        if name.startswith("r_"):
            self._objectGraph._setSubjects(unwrappedValues, prop, self.uri)
        else:
            self._objectGraph._setObjects(self.uri, prop, unwrappedValues)
profiles.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _object_value_int(self, subject, predicate):
        '''
        Given a subject and a predicate, returns the value of the object as an
        integer

        Both subject and predicate must be rdflib URIRef or BNode objects

        If the value can not be parsed as intger, returns None
        '''
        object_value = self._object_value(subject, predicate)
        if object_value:
            try:
                return int(object_value)
            except ValueError:
                pass
        return None
profiles.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _contact_details(self, subject, predicate):
        '''
        Returns a dict with details about a vcard expression

        Both subject and predicate must be rdflib URIRef or BNode objects

        Returns keys for uri, name and email with the values set to
        None if they could not be found
        '''

        contact = {}

        for agent in self.g.objects(subject, predicate):

            contact['uri'] = (unicode(agent) if isinstance(agent,
                              rdflib.term.URIRef) else None)

            contact['name'] = self._object_value(agent, VCARD.fn)

            contact['email'] = self._object_value(agent, VCARD.hasEmail)

        return contact
test_euro_dcatap_profile_serialize.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_publisher_no_uri(self):
        dataset = {
            'id': '4b6fe9ca-dc77-4cec-92a4-55c6624a5bd6',
            'name': 'test-dataset',
            'extras': [
                {'key': 'publisher_name', 'value': 'Example Publisher'},
            ]
        }
        extras = self._extras(dataset)

        s = RDFSerializer()
        g = s.g

        dataset_ref = s.graph_from_dataset(dataset)

        publisher = self._triple(g, dataset_ref, DCT.publisher, None)[2]
        assert publisher
        assert_true(isinstance(publisher, BNode))

        assert self._triple(g, publisher, RDF.type, FOAF.Organization)
        assert self._triple(g, publisher, FOAF.name, extras['publisher_name'])
test_euro_dcatap_profile_serialize.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_spatial_bad_geojson_no_wkt(self):
        dataset = {
            'id': '4b6fe9ca-dc77-4cec-92a4-55c6624a5bd6',
            'name': 'test-dataset',
            'extras': [
                {'key': 'spatial', 'value': '{"key": "NotGeoJSON"}'},

            ]
        }
        extras = self._extras(dataset)

        s = RDFSerializer()
        g = s.g

        dataset_ref = s.graph_from_dataset(dataset)

        spatial = self._triple(g, dataset_ref, DCT.spatial, None)[2]
        assert spatial
        assert_true(isinstance(spatial, BNode))
        # Geometry in GeoJSON
        assert self._triple(g, spatial, LOCN.geometry, extras['spatial'], GEOJSON_IMT)

        # Geometry in WKT
        eq_(len([t for t in g.triples((spatial, LOCN.geometry, None))]), 1)
test_euro_dcatap_profile_serialize.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_spatial_bad_json_no_wkt(self):
        dataset = {
            'id': '4b6fe9ca-dc77-4cec-92a4-55c6624a5bd6',
            'name': 'test-dataset',
            'extras': [
                {'key': 'spatial', 'value': 'NotJSON'},

            ]
        }
        extras = self._extras(dataset)

        s = RDFSerializer()
        g = s.g

        dataset_ref = s.graph_from_dataset(dataset)

        spatial = self._triple(g, dataset_ref, DCT.spatial, None)[2]
        assert spatial
        assert_true(isinstance(spatial, BNode))
        # Geometry in GeoJSON
        assert self._triple(g, spatial, LOCN.geometry, extras['spatial'], GEOJSON_IMT)

        # Geometry in WKT
        eq_(len([t for t in g.triples((spatial, LOCN.geometry, None))]), 1)
test_euro_dcatap_profile_parse.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_dataset_license_from_distribution_by_title(self):
        # license_id retrieved from dct:title of dcat:license object
        g = Graph()

        dataset = URIRef("http://example.org/datasets/1")
        g.add((dataset, RDF.type, DCAT.Dataset))

        distribution = URIRef("http://example.org/datasets/1/ds/1")
        g.add((distribution, RDF.type, DCAT.Distribution))
        g.add((dataset, DCAT.distribution, distribution))
        license = BNode()
        g.add((distribution, DCT.license, license))
        g.add((license, DCT.title, Literal("Creative Commons Attribution")))

        p = RDFParser(profiles=['euro_dcat_ap'])

        p.g = g

        dataset = [d for d in p.datasets()][0]
        eq_(dataset['license_id'], 'cc-by')
profiles.py 文件源码 项目:dati-ckan-docker 作者: italia 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _add_agent(self, _dict, ref, basekey, _type):
        ''' Stores the Agent in this format:
                <dct:publisher rdf:resource="http://dati.gov.it/resource/Amministrazione/r_liguri"/>
                    <dcatapit:Agent rdf:about="http://dati.gov.it/resource/Amministrazione/r_liguri">
                        <rdf:type rdf:resource="&foaf;Agent"/>
                        <dct:identifier>r_liguri</dct:identifier>
                        <foaf:name>Regione Liguria</foaf:name>
                    </dcatapit:Agent>

            Returns the ref to the agent node
        '''

        agent_name = self._get_dict_value(_dict, basekey + '_name', 'N/A')
        agent_id = self._get_dict_value(_dict, basekey + '_identifier','N/A')

        agent = BNode()

        self.g.add((agent, RDF['type'], DCATAPIT.Agent))
        self.g.add((agent, RDF['type'], FOAF.Agent))
        self.g.add((ref, _type, agent))

        self.g.add((agent, FOAF.name, Literal(agent_name)))
        self.g.add((agent, DCT.identifier, Literal(agent_id)))

        return agent
CompareRDF.py 文件源码 项目:SNOMEDToOWL 作者: hsolbrig 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def complete_definition(subj: Identifier,
                        source_graph: Graph,
                        target_graph: Optional[SNOMEDGraph] = None) -> SNOMEDGraph:
    """
    Add a full definition for the supplied subject, following any object bnodes, to target_graph
    :param subj: URI or BNode for subject
    :param source_graph: Graph containing defininition
    :param target_graph: Graph to carry definition
    :return: target_graph
    """
    if not target_graph:
        target_graph = SNOMEDGraph()
    for p, o in source_graph.predicate_objects(subj):
        target_graph.add((subj, p, o))
        if isinstance(o, BNode):
            complete_definition(o, source_graph, target_graph)
    return target_graph
SNOMEDToOWL.py 文件源码 项目:SNOMEDToOWL 作者: hsolbrig 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def add_property_definition(self, concept: RF2Files.Concept, concept_uri: URIRef) -> None:
        """
        Add a property definition
        :param concept: Concept entry for the given property
        :param concept_uri: Concept URI
        :return:
        """
        parents = [parent for parent in self._relationships.parents(concept.id)
                   if concept.id != Concept_model_attribute_sctid]
        if len(parents) > 1 and concept.definitionStatusId == Defined_sctid:
            target, collection = intersection(self)
            [collection.append(as_uri(parent)) for parent in parents]
            self.add_t((concept_uri, OWL.equivalentProperty, target), self._stats.num_properties)
        else:
            [self.add_t((concept_uri, RDFS.subPropertyOf, as_uri(parent)), self._stats.num_properties)
             for parent in parents]

        # add an owl:propertyChain assertion for $subject if is in the RIGHT_ID
        if concept.id in self._context.RIGHT_ID:
            node = BNode()
            self.add_t((node, RDFS.subPropertyOf, concept_uri), None)
            coll = BNode()
            Collection(self, coll, [concept_uri, as_uri(self._context.RIGHT_ID[concept.id])])
            self.add_t((node, OWL.propertyChain, coll), self._stats.num_propchains)
test_mods_ingester.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def setUp(self):
        self.ingester = mods.MODSIngester(source=SAMPLE_MODS)
        self.entity = self.ingester.__generate_uri__()
        self.cc = rdflib.URIRef("http://coloradocollege.edu/")
        bc_org = getattr(NS_MGR.kds, "bf-Organization")
        self.ingester.rules_graph.add((
            bc_org,
            NS_MGR.rdf.type,
            NS_MGR.kds.PropertyLinker))
        self.held_by = rdflib.BNode()
        self.ingester.rules_graph.add((
            bc_org,
            NS_MGR.kds.destPropUri,
            self.held_by))
        self.ingester.rules_graph.add((
            self.held_by,
            NS_MGR.bf.heldBy,
            self.cc))
        self.ingester.rules_graph.add((
            bc_org,
            NS_MGR.kds.destClassUri,
            NS_MGR.bf.Item))
test_bibcat_funcs.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def setUp(self):
        self.graph = rdflib.Graph()
        self.entity_one = rdflib.URIRef("https://bibcat.org/test-entity")
        self.graph.add((self.entity_one, 
                        rdflib.RDF.type, 
                        rdflib.RDFS.Resource))
        self.graph.add((self.entity_one, 
                        rdflib.RDFS.label, 
                        rdflib.Literal("Test Entity One", lang="en")))
        self.entity_two = rdflib.URIRef("https://bibcat.org/test-entity-two")
        self.graph.add((self.entity_two, 
                        rdflib.RDF.type, 
                        rdflib.RDFS.Resource))
        self.graph.add((self.entity_two, 
                        rdflib.RDFS.label, 
                        rdflib.Literal("Test Entity Two", lang="en")))
        title_bnode = rdflib.BNode()
        self.graph.add((self.entity_two, BF.title, title_bnode))
        self.graph.add((title_bnode, rdflib.RDF.type, BF.Title))
        self.graph.add((title_bnode, BF.subTitle, rdflib.Literal("Subtitle ")))
processor.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __handle_parents__(self, **kwargs):
        """Internal method handles parentTriplesMaps

        Keyword args:

        -------------

            parent_map: SimpleNamespace of ParentTriplesMap
            subject: rdflib.URIRef or rdflib.BNode
            predicate: rdflib.URIRef
        """
        parent_map = kwargs.pop("parent_map")
        subject = kwargs.pop('subject')
        predicate = kwargs.pop('predicate')
        parent_objects = self.execute(
            self.triple_maps[str(parent_map)],
            **kwargs)
        for parent_obj in parent_objects:
            if parent_obj == subject:
                continue
            self.output.add((
                subject,
                predicate,
                parent_obj))
processor.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def generate_term(self, **kwargs):
        """Method generates a rdflib.Term based on kwargs"""
        term_map = kwargs.pop('term_map')
        if hasattr(term_map, "termType") and\
            term_map.termType == NS_MGR.rr.BlankNode:
            return rdflib.BNode()
        if not hasattr(term_map, 'datatype'):
            term_map.datatype = NS_MGR.xsd.anyURI
        if hasattr(term_map, "template") and term_map.template is not None:
            template_vars = kwargs
            template_vars.update(self.constants)
            # Call any functions to generate values
            for key, value in template_vars.items():
                if hasattr(value, "__call__"):
                    template_vars[key] = value()
            raw_value = term_map.template.format(**template_vars)
            if term_map.datatype == NS_MGR.xsd.anyURI:
                return rdflib.URIRef(raw_value)
            return rdflib.Literal(raw_value,
                                  datatype=term_map.datatype)
        if term_map.reference is not None:
            # Each child will have different mechanisms for referencing the
            # source based
            return self.__generate_reference__(term_map, **kwargs)
ingester.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def add_admin_metadata(self, entity):
        """Takes a graph and adds the AdminMetadata for the entity

        Args:
            entity (rdflib.URIRef): URI of the entity
        """
        generate_msg = "Generated by BIBCAT version {} from KnowledgeLinks.io"
        generation_process = rdflib.BNode()
        self.graph.add((generation_process,
                        rdflib.RDF.type,
                        NS_MGR.bf.GenerationProcess))
        self.graph.add((generation_process,
                        NS_MGR.bf.generationDate,
                        rdflib.Literal(
                            datetime.datetime.utcnow().isoformat())))
        self.graph.add((generation_process,
                        rdflib.RDF.value,
                        rdflib.Literal(generate_msg.format(__version__),
                                       lang="en")))
        #! Should add bibcat's current git MD5 commit
        self.graph.add(
            (entity,
             NS_MGR.bf.generationProcess,
             generation_process)
        )
ingester.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def new_existing_bnode(self, bf_property, rule):
        """Returns existing blank node or a new if it doesn't exist

        Args:
            bf_property (str): RDF property URI
            rule (rdflib.URIRef): RDF subject of the map rule

        Returns:
            rdflib.BNode: Existing or New blank node
        """
        blank_node = None
        for row in self.rules_graph.query(HAS_MULTI_NODES.format(rule)):
            if str(row[0]).lower().startswith("true"):
                return rdflib.BNode()
        for subject in self.graph.query(GET_BLANK_NODE.format(bf_property)):
            # set to first and exist loop
            blank_node = subject[0]
            break
        if not blank_node:
            blank_node = rdflib.BNode()
        return blank_node
ingester.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def add_admin_metadata(self, entity):
        """Takes a graph and adds the AdminMetadata for the entity

        Args:
            entity (rdflib.URIRef): URI of the entity
        """
        generate_msg = "Generated by BIBCAT version {} from KnowledgeLinks.io"
        generation_process = rdflib.BNode()
        self.graph.add((generation_process,
                        rdflib.RDF.type,
                        NS_MGR.bf.GenerationProcess))
        self.graph.add((generation_process,
                        NS_MGR.bf.generationDate,
                        rdflib.Literal(
                            datetime.datetime.utcnow().isoformat())))
        self.graph.add((generation_process,
                        rdflib.RDF.value,
                        rdflib.Literal(generate_msg.format(__version__),
                                       lang="en")))
        #! Should add bibcat's current git MD5 commit
        self.graph.add(
            (entity,
             NS_MGR.bf.generationProcess,
             generation_process)
        )
__init__.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def create_rdf_list(graph, nodes):
    """Creates a RDF List with the ordering based on the nodes.
    Returns a blank node that functions in the object role for adding
    a triple.

    Args:
        graph(rdflib.Graph|rdflib.ConjuctiveGraph): Source graph
        nodes(list): Python list of nodes
    """
    if len(nodes) < 1:
        return rdflib.RDF.nil
    ordered_bnode = rdflib.BNode()
    graph.add((ordered_bnode, rdflib.RDF.first, nodes[0]))
    graph.add((ordered_bnode,
               rdflib.RDF.rest,
               create_rdf_list(graph, nodes[1:])))
    return ordered_bnode
work.py 文件源码 项目:bibcat 作者: KnowledgeLinks 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __add_work_title__(self, work_graph, work_uri, instance_uri):
        """Method takes a new work graph and instance uri, queries for
        bf:InstanceTitle of instance uri and adds values to work graph

        Args:
            work_graph(rdflib.Graph): RDF Graph of new BF Work
            instance_uri(rdflib.URIRef): URI of BF Instance
        """
        instance_key = str(instance_uri)
        if instance_key in self.processed and\
        "title" in self.processed[instance_key]:
            work_title_bnode = rdflib.BNode()
            work_graph.add((work_uri,  NS_MGR.bf.title, work_title_bnode))
            work_graph.add((work_title_bnode, 
                            NS_MGR.rdf.type, 
                            NS_MGR.bf.WorkTitle))
            for row in self.processed[instance_key]["title"]:
                main_title, subtitle = row["mainTitle"], row["subtitle"]
                work_graph.add((work_title_bnode,
                                NS_MGR.bf.mainTitle,
                                rdflib.Literal(main_title)))
                if subtitle:
                    work_graph.add((work_title_bnode,
                                    NS_MGR.bf.subtitle,
                                    rdflib.Literal(subtitle)))
TopbraidDiff.py 文件源码 项目:QuitDiff 作者: AKSW 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def serialize(self, add, delete):
        diff = Namespace("http://topbraid.org/diff#")

        g = ConjunctiveGraph()

        namespace_manager = NamespaceManager(g)
        namespace_manager.bind('diff', diff, override=False)
        namespace_manager.bind('owl', OWL, override=False)

        graphUris = set(delete.keys()) | set(add.keys())

        for graphUri in graphUris:
            if (graphUri in delete.keys() and len(delete[graphUri]) > 0) or (graphUri in add.keys() and len(add[graphUri]) > 0):
                changeset = Namespace("urn:diff:" + str(uuid.uuid1()))
                graphTerm = changeset.term("")
                if str(graphUri) != 'http://quitdiff.default/':
                    g.add((graphTerm, OWL.imports, graphUri, graphTerm))
                g.add((graphTerm, RDF.type, OWL.Ontology, graphTerm))
                g.add((graphTerm, OWL.imports, diff.term(""), graphTerm))
                if graphUri in delete.keys() and len(delete[graphUri]) > 0:
                    i = 0
                    for triple in delete[graphUri]:
                        deleteStatementName = BNode()
                        g.add((deleteStatementName, RDF.type, diff.DeletedTripleDiff, graphTerm))
                        g.add((deleteStatementName, RDF.subject, triple[0], graphTerm))
                        g.add((deleteStatementName, RDF.predicate, triple[1], graphTerm))
                        g.add((deleteStatementName, RDF.object, triple[2], graphTerm))
                        i += 1
                if graphUri in add.keys() and len(add[graphUri]) > 0:
                    i = 0
                    for triple in add[graphUri]:
                        insertGraphName = BNode()
                        g.add((insertGraphName, RDF.type, diff.AddedTripleDiff, graphTerm))
                        g.add((insertGraphName, RDF.subject, triple[0], graphTerm))
                        g.add((insertGraphName, RDF.predicate, triple[1], graphTerm))
                        g.add((insertGraphName, RDF.object, triple[2], graphTerm))
                        i += 1

        return g.serialize(format="trig").decode("utf-8")
ChangesetDiff.py 文件源码 项目:QuitDiff 作者: AKSW 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def serialize(self, add, delete):

        changeset = Namespace("http://purl.org/vocab/changeset/schema#")

        g = ConjunctiveGraph()

        namespace_manager = NamespaceManager(g)
        namespace_manager.bind('changeset', changeset, override=False)

        graphUris = set(delete.keys()) | set(add.keys())

        for graphUri in graphUris:
            if (graphUri in delete.keys() and len(delete[graphUri]) > 0) or (graphUri in add.keys() and len(add[graphUri]) > 0):
                diff = Namespace("urn:changeset:" + str(uuid.uuid1()))
                graphTerm = diff.term("")
                g.add((graphTerm, RDF.type, changeset.ChangeSet))
                if str(graphUri) != 'http://quitdiff.default/':
                    g.add((graphTerm, changeset.subjectOfChange, graphUri))
                if graphUri in delete.keys() and len(delete[graphUri]) > 0:
                    i = 0
                    for triple in delete[graphUri]:
                        deleteStatementName = BNode()
                        g.add((graphTerm, changeset.removal, deleteStatementName))
                        g.add((deleteStatementName, RDF.type, RDF.Statement))
                        g.add((deleteStatementName, RDF.subject, triple[0]))
                        g.add((deleteStatementName, RDF.predicate, triple[1]))
                        g.add((deleteStatementName, RDF.object, triple[2]))
                        i += 1
                if graphUri in add.keys() and len(add[graphUri]) > 0:
                    i = 0
                    for triple in add[graphUri]:
                        insertGraphName = BNode()
                        g.add((graphTerm, changeset.addition, insertGraphName))
                        g.add((insertGraphName, RDF.type, RDF.Statement))
                        g.add((insertGraphName, RDF.subject, triple[0]))
                        g.add((insertGraphName, RDF.predicate, triple[1]))
                        g.add((insertGraphName, RDF.object, triple[2]))
                        i += 1

        return g.serialize(format="turtle").decode("utf-8")
utils.py 文件源码 项目:Meiji 作者: GiovanniBalestrieri 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def generate_RDF_collection( graph, vals ) :
    """
    Generate an RDF List from vals, returns the head of the list
    @param graph: RDF graph
    @type graph: RDFLib Graph
    @param vals: array of RDF Resources
    @return: head of the List (an RDF Resource)
    """
    # generate an RDF List, returns the head
    # list has all the elements in RDF format already
    heads = [ BNode() for r in vals ] + [ ns_rdf["nil"] ]
    for i in range(0, len(vals)) :
        graph.add( (heads[i], ns_rdf["first"], vals[i]) )
        graph.add( (heads[i], ns_rdf["rest"],  heads[i+1]) )
    return heads[0]

#################################################################################
xmlresults.py 文件源码 项目:Meiji 作者: GiovanniBalestrieri 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def parseTerm(element):
    """rdflib object (Literal, URIRef, BNode) for the given
    elementtree element"""
    tag, text = element.tag, element.text
    if tag == RESULTS_NS_ET + 'literal':
        if text is None:
            text = ''
        datatype = None
        lang = None
        if element.get('datatype', None):
            datatype = URIRef(element.get('datatype'))
        elif element.get("{%s}lang" % XML_NAMESPACE, None):
            lang = element.get("{%s}lang" % XML_NAMESPACE)

        ret = Literal(text, datatype=datatype, lang=lang)

        return ret
    elif tag == RESULTS_NS_ET + 'uri':
        return URIRef(text)
    elif tag == RESULTS_NS_ET + 'bnode':
        return BNode(text)
    else:
        raise TypeError("unknown binding type %r" % element)
jsonresults.py 文件源码 项目:Meiji 作者: GiovanniBalestrieri 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def parseJsonTerm(d):
    """rdflib object (Literal, URIRef, BNode) for the given json-format dict.

    input is like:
      { 'type': 'uri', 'value': 'http://famegame.com/2006/01/username' }
      { 'type': 'literal', 'value': 'drewp' }
    """

    t = d['type']
    if t == 'uri':
        return URIRef(d['value'])
    elif t == 'literal':
        if 'xml:lang' in d:
            return Literal(d['value'], lang=d['xml:lang'])
        return Literal(d['value'])
    elif t == 'typed-literal':
        return Literal(d['value'], datatype=URIRef(d['datatype']))
    elif t == 'bnode':
        return BNode(d['value'])
    else:
        raise NotImplementedError("json term type %r" % t)
jsonresults.py 文件源码 项目:Meiji 作者: GiovanniBalestrieri 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def termToJSON(self, term):
    if isinstance(term, URIRef):
        return {'type': 'uri', 'value': unicode(term)}
    elif isinstance(term, Literal):
        if term.datatype is not None:
            return {'type': 'typed-literal',
                    'value': unicode(term),
                    'datatype': unicode(term.datatype)}
        else:
            r = {'type': 'literal',
                 'value': unicode(term)}
            if term.language is not None:
                r['xml:lang'] = term.language
            return r

    elif isinstance(term, BNode):
        return {'type': 'bnode', 'value': str(term)}
    elif term is None:
        return None
    else:
        raise ResultException(
            'Unknown term type: %s (%s)' % (term, type(term)))
sparqlstore.py 文件源码 项目:Meiji 作者: GiovanniBalestrieri 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _node_from_result(node):
    """
    Helper function that casts XML node in SPARQL results
    to appropriate rdflib term
    """
    if node.tag == '{%s}bnode' % SPARQL_NS:
        return BNode(node.text)
    elif node.tag == '{%s}uri' % SPARQL_NS:
        return URIRef(node.text)
    elif node.tag == '{%s}literal' % SPARQL_NS:
        value = node.text if node.text is not None else ''
        if 'datatype' in node.attrib:
            dt = URIRef(node.attrib['datatype'])
            return Literal(value, datatype=dt)
        elif '{http://www.w3.org/XML/1998/namespace}lang' in node.attrib:
            return Literal(value, lang=node.attrib[
                "{http://www.w3.org/XML/1998/namespace}lang"])
        else:
            return Literal(value)
    else:
        raise Exception('Unknown answer type')
custom_eval.py 文件源码 项目:Meiji 作者: GiovanniBalestrieri 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def customEval(ctx, part):
    """
    Rewrite triple patterns to get super-classes
    """

    if part.name == 'BGP':

        # rewrite triples
        triples = []
        for t in part.triples:
            if t[1] == rdflib.RDF.type:
                bnode = rdflib.BNode()
                triples.append((t[0], t[1], bnode))
                triples.append((bnode, inferredSubClass, t[2]))
            else:
                triples.append(t)

        # delegate to normal evalBGP
        return evalBGP(ctx, triples)

    raise NotImplementedError()
earl.py 文件源码 项目:Meiji 作者: GiovanniBalestrieri 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def add_test(test, res, info=None):
    a = BNode()
    report.add((a, RDF.type, EARL.Assertion))
    report.add((a, EARL.assertedBy, me))
    report.add((a, EARL.test, test))
    report.add((a, EARL.subject, rdflib))

    report.add((a, DC.date, now))

    r = BNode()
    report.add((a, EARL.result, r))
    report.add((r, RDF.type, EARL.TestResult))

    report.add((r, EARL.outcome, EARL[res]))
    if info:
        report.add((r, EARL.info, Literal(info)))
test_turtle_serialize.py 文件源码 项目:Meiji 作者: GiovanniBalestrieri 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testTurtleBoolList():
    subject = URIRef("http://localhost/user")
    predicate = URIRef("http://localhost/vocab#hasList")
    g1 = Graph()
    list_item1 = BNode()
    list_item2 = BNode()
    list_item3 = BNode()
    g1.add((subject, predicate, list_item1))
    g1.add((list_item1, RDF.first, Literal(True)))
    g1.add((list_item1, RDF.rest, list_item2))
    g1.add((list_item2, RDF.first, Literal(False)))
    g1.add((list_item2, RDF.rest, list_item3))
    g1.add((list_item3, RDF.first, Literal(True)))
    g1.add((list_item3, RDF.rest, RDF.nil))

    ttl_dump = g1.serialize(format="turtle")
    g2 = Graph()
    g2.parse(data=ttl_dump, format="turtle")

    list_id = g2.value(subject, predicate)
    bool_list = [i.toPython() for i in Collection(g2, list_id)]
    assert bool_list == [True, False, True]


问题


面经


文章

微信
公众号

扫码关注公众号