python类fromstring()的实例源码

__init__.py 文件源码 项目:kcli 作者: karmab 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def remove_cloudinit(self, name):
        conn = self.conn
        try:
            vm = conn.lookupByName(name)
            xml = vm.XMLDesc(0)
            root = ET.fromstring(xml)
        except:
            print("VM %s not found" % name)
            return {'result': 'failure', 'reason': "VM %s not found" % name}
        for element in root.getiterator('disk'):
            disktype = element.get('device')
            if disktype == 'cdrom':
                source = element.find('source')
                path = source.get('file')
                if source is None:
                    break
                volume = conn.storageVolLookupByPath(path)
                volume.delete(0)
                element.remove(source)
        newxml = ET.tostring(root)
        conn.defineXML(newxml)
__init__.py 文件源码 项目:kcli 作者: karmab 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def vm_ports(self, name):
        conn = self.conn
        networks = []
        try:
            vm = conn.lookupByName(name)
        except:
            common.pprint("VM %s not found" % name, color='red')
            return networks
        xml = vm.XMLDesc(0)
        root = ET.fromstring(xml)
        for element in root.getiterator('interface'):
            networktype = element.get('type')
            if networktype == 'bridge':
                network = element.find('source').get('bridge')
            else:
                network = element.find('source').get('network')
            networks.append(network)
        return networks
__init__.py 文件源码 项目:kcli 作者: karmab 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _get_bridge(self, name):
        conn = self.conn
        bridges = [interface.name() for interface in conn.listAllInterfaces()]
        if name in bridges:
            return name
        try:
            net = self.conn.networkLookupByName(name)
        except:
            return None
        netxml = net.XMLDesc(0)
        root = ET.fromstring(netxml)
        bridge = root.getiterator('bridge')
        if bridge:
            attributes = bridge[0].attrib
            bridge = attributes.get('name')
        return bridge
base.py 文件源码 项目:PySIGNFe 作者: thiagopena 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _le_xml(self, arquivo):
        if arquivo is None:
            return False

        if not isinstance(arquivo, basestring):
            arquivo = etree.tounicode(arquivo)

        if arquivo is not None:
            if isinstance(arquivo, basestring): 
                if NAMESPACE_NFSE in arquivo:
                    arquivo = por_acentos(arquivo)
                if u'<' in arquivo:
                    self._xml = etree.fromstring(tira_abertura(arquivo))
                else:
                    arq = open(arquivo)
                    txt = ''.join(arq.readlines())
                    txt = tira_abertura(txt)
                    arq.close()
                    self._xml = etree.fromstring(txt)
            else:
                self._xml = etree.parse(arquivo)
            return True

        return False
base.py 文件源码 项目:PySIGNFe 作者: thiagopena 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def validar(self):
        arquivo_esquema = self.caminho_esquema + self.arquivo_esquema

        # Aqui é importante remover a declaração do encoding
        # para evitar erros de conversão unicode para ascii
        xml = tira_abertura(self.xml).encode(u'utf-8')

        esquema = etree.XMLSchema(etree.parse(arquivo_esquema))

        if not esquema.validate(etree.fromstring(xml)):
            for e in esquema.error_log:
                if e.level == 1:
                    self.alertas.append(e.message.replace('{http://www.portalfiscal.inf.br/nfe}', ''))
                elif e.level == 2:
                    self.erros.append(e.message.replace('{http://www.portalfiscal.inf.br/nfe}', ''))

        return esquema.error_log
utils.py 文件源码 项目:eitbapi 作者: erral 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def xml_to_dict(data):
    try:
        root = ET.fromstring(data)
    except ET.ParseError:
        root = []
    d = {}
    for item in root:
        dd = {}
        for subitem in item:
            m = {}
            m['text'] = subitem.text
            m.update(subitem.attrib)
            dd[subitem.tag] = m

        d[dd['title']['text']] = dd

    return d
test_handle_host.py 文件源码 项目:masakari-monitors 作者: openstack 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def test_check_host_status_by_cibadmin(
        self, mock_get_cib_xml, mock_set_cib_xml, mock_have_quorum,
        mock_get_node_state_tag_list, mock_check_if_status_changed):
        mock_get_cib_xml.return_value = STATUS_TAG_XML
        mock_set_cib_xml.return_value = None
        mock_have_quorum.return_value = 1
        status_tag = ElementTree.fromstring(STATUS_TAG_XML)
        node_state_tag_list = status_tag.getchildren()
        mock_get_node_state_tag_list.return_value = node_state_tag_list
        mock_check_if_status_changed.return_value = None

        obj = handle_host.HandleHost()
        ret = obj._check_host_status_by_cibadmin()

        self.assertEqual(0, ret)
        mock_get_cib_xml.assert_called_once_with()
        mock_set_cib_xml.assert_called_once_with(STATUS_TAG_XML)
        mock_have_quorum.assert_called_once_with()
        mock_get_node_state_tag_list.assert_called_once_with()
        mock_check_if_status_changed.assert_called_once_with(
            node_state_tag_list)
test_handle_host.py 文件源码 项目:masakari-monitors 作者: openstack 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def test_check_host_status_by_cibadmin_no_quorum(
        self, mock_get_cib_xml, mock_set_cib_xml, mock_have_quorum,
        mock_get_node_state_tag_list, mock_check_if_status_changed):
        mock_get_cib_xml.return_value = STATUS_TAG_XML
        mock_set_cib_xml.return_value = None
        mock_have_quorum.return_value = 0
        status_tag = ElementTree.fromstring(STATUS_TAG_XML)
        node_state_tag_list = status_tag.getchildren()
        mock_get_node_state_tag_list.return_value = node_state_tag_list
        mock_check_if_status_changed.return_value = None

        obj = handle_host.HandleHost()
        ret = obj._check_host_status_by_cibadmin()

        self.assertEqual(0, ret)
        mock_get_cib_xml.assert_called_once_with()
        mock_set_cib_xml.assert_called_once_with(STATUS_TAG_XML)
        mock_have_quorum.assert_called_once_with()
        mock_get_node_state_tag_list.assert_called_once_with()
        mock_check_if_status_changed.assert_called_once_with(
            node_state_tag_list)
dexdump_test.py 文件源码 项目:nojs 作者: chrisdickinson 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def testParseClassNode(self):
    example_xml_string = (
        '<class name="Class1" extends="java.lang.Object">'
        '<method name="method1">'
        '</method>'
        '<method name="method2">'
        '</method>'
        '</class>')

    actual = dexdump._ParseClassNode(
        ElementTree.fromstring(example_xml_string))

    expected = {
      'methods': ['method1', 'method2'],
      'superclass': 'java.lang.Object',
    }
    self.assertEquals(expected, actual)
xml_Utils.py 文件源码 项目:warriorframework 作者: warriorframework 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def del_tags_from_xml(xml, tag_list=[]):
    """
        It deletes the tags either by their names or xpath

        Arguments:
            1.xml: It takes xml file path or xml string as input
            2.tag_list: It contains list of tags which needs to be removed
        Returns:
            It returns xml string
    """
    if os.path.exists(xml):
        tree = ElementTree.parse(xml)
        root = tree.getroot()
    else:
        root = ElementTree.fromstring(xml)
    for tag in tag_list:
        if 'xpath=' in tag:
            tag = tag.strip('xpath=')
            req_tags = getChildElementsListWithSpecificXpath(root, tag)
        else:
            req_tags = getChildElementsListWithSpecificXpath(root, ".//{0}".format(tag))
        recursive_delete_among_children(root, req_tags)

    xml_string = ElementTree.tostring(root, encoding='utf-8', method='xml')
    return xml_string
proxy.py 文件源码 项目:adbus 作者: ccxtechnologies 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _update_interfaces(self):
        self._interfaces = {}

        root = etree.fromstring(self._introspect_xml)
        for e in root.iter('interface'):
            name = e.attrib['name']
            self._interfaces[name] = Interface(
                    self, self._service, self._address, self._path, e,
                    self._camel_convert, self._timeout_ms,
                    self._changed_coroutines.get(name, None)
            )

        for e in root.iter('node'):
            try:
                self._nodes.append(e.attrib['name'])
            except KeyError:
                pass
__init__.py 文件源码 项目:tap-adwords 作者: singer-io 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def do_discover_reports(sdk_client):
    url = 'https://adwords.google.com/api/adwords/reportdownload/{}/reportDefinition.xsd'.format(VERSION) #pylint: disable=line-too-long
    xsd = request_xsd(url)
    root = ET.fromstring(xsd)
    nodes = list(root.find(".//*[@name='ReportDefinition.ReportType']/*"))

    stream_names = [p.attrib['value'] for p in nodes if p.attrib['value'] in VERIFIED_REPORTS] #pylint: disable=line-too-long
    streams = []
    LOGGER.info("Starting report discovery")
    for stream_name in stream_names:
        schema, mdata = create_schema_for_report(stream_name, sdk_client)
        streams.append({'stream': stream_name,
                        'tap_stream_id': stream_name,
                        'metadata' : metadata.to_list(mdata),
                        'schema': schema})

    LOGGER.info("Report discovery complete")
    return streams
vMConUtils.py 文件源码 项目:VManagePlatform 作者: welliamcao 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def mountIso(self,instance,dev, image):
        tree = ElementTree.fromstring(self.getInsXMLDesc(instance,0))
        for disk in tree.findall('devices/disk'):
            if disk.get('device') == 'cdrom':
                for elm in disk:
                    if elm.tag == 'target':
                        if elm.get('dev') == dev:
                            src_media = ElementTree.Element('source')
                            src_media.set('file', image)
                            disk.append(src_media)
                            if instance.state()[0] == 1:
                                xml_disk = ElementTree.tostring(disk)
                                try:
                                    instance.attachDevice(xml_disk)
                                except libvirt.libvirtError,e:
                                    return '??????????{result}'.format(result=e.get_error_message()) 
                                xmldom = self.getInsXMLDesc(instance,1)
                            if instance.state()[0] == 5:
                                xmldom = ElementTree.tostring(tree)
                            try:
                                return self.defineXML(xmldom)
                            except libvirt.libvirtError,e:
                                return '??????????{result}'.format(result=e.get_error_message())
vMConUtils.py 文件源码 项目:VManagePlatform 作者: welliamcao 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def getNetUsage(self,instance):
        devices = []
        dev_usage = []
        tree = ElementTree.fromstring(self.getInsXMLDesc(instance, flag=1))
        if instance.state()[0] == 1:
            tree = ElementTree.fromstring(self.getInsXMLDesc(instance, flag=1))
            for target in tree.findall("devices/interface/target"):
                devices.append(target.get("dev"))
            for i, dev in enumerate(devices):
                rx_use_ago = instance.interfaceStats(dev)[0]
                tx_use_ago = instance.interfaceStats(dev)[4]
                time.sleep(1)
                rx_use_now = instance.interfaceStats(dev)[0]
                tx_use_now = instance.interfaceStats(dev)[4]
                rx_diff_usage = (rx_use_now - rx_use_ago) * 8
                tx_diff_usage = (tx_use_now - tx_use_ago) * 8
                dev_usage.append({'dev': i, 'rx': rx_diff_usage, 'tx': tx_diff_usage})
        else:
            for i, dev in enumerate(self.get_net_device(instance)):
                dev_usage.append({'dev': i, 'rx': 0, 'tx': 0})
        return dev_usage
vMConUtils.py 文件源码 项目:VManagePlatform 作者: welliamcao 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def delInstanceCdrom(self,instance,cdrom):
        '''????'''
        raw_xml = instance.XMLDesc(0) 
        root = ElementTree.fromstring(raw_xml) 
        for dk in root.findall('./devices'):
            devs = dk.getchildren()
            for dev in devs:
                if dev.tag == 'disk'and dev.get('device')=='cdrom':
                    for iter in dev:
                        if iter.tag == 'target' and iter.get('dev') == cdrom:
                            devs.remove(dev)
        diskXml = ElementTree.tostring(root)      
        try:
            return self.defineXML(diskXml)
        except libvirt.libvirtError,e:
            return '??????????????{result}'.format(result=e.get_error_message())
vMConUtils.py 文件源码 项目:VManagePlatform 作者: welliamcao 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def addInstanceInterface(self,instance,brName):
        netk = self.getNetwork(brName)
        if netk:     
            xml = netk.XMLDesc(0)
            tree = ElementTree.fromstring(xml)
            try:
                mode = tree.find('virtualport').get('type') 
            except:
                mode = 'brctl'
            model = tree.find('forward').get('mode')               
            interXml = Const.CreateNetcard(nkt_br=brName, ntk_name=brName +'-'+CommTools.radString(length=4), data={'type':model,'mode':mode})
            try:
                return instance.attachDeviceFlags(interXml,3)#??????????flags?3?????????????? 
            except libvirt.libvirtError,e:
                return '??????????????{result}'.format(result=e.get_error_message()) 
        else:return False
vMConUtils.py 文件源码 项目:VManagePlatform 作者: welliamcao 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def setInterfaceBandwidth(self,instance,port,bandwidth):
        '''????'''
        domXml = instance.XMLDesc(0)
        root = ElementTree.fromstring(domXml)
        try:
            for dev in root.findall('.//devices/'):
                if dev.tag == 'interface':
                    for iter in dev:
                        if iter.tag == 'target' and iter.get('dev') == port:
                            bwXml = ElementTree.SubElement(dev,'bandwidth')   
                            inbdXml = ElementTree.Element('inbound')
                            inbdXml.set('average',str(int(bandwidth)*1024))
                            inbdXml.set('peak',str(int(bandwidth)*1024))
                            inbdXml.set('burst','1024')
                            outbdXml = ElementTree.Element('outbound')
                            outbdXml.set('average',str(int(bandwidth)*1024))
                            outbdXml.set('peak',str(int(bandwidth)*1024))
                            outbdXml.set('burst','1024')
                            bwXml.append(inbdXml)
                            bwXml.append(outbdXml)
            domXml = ElementTree.tostring(root)
        except Exception,e:
            return {"status":"faild",'data':e}
        if self.defineXML(domXml):return {"status":"success",'data':None}
database_unittests.py 文件源码 项目:Causality 作者: vcla 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def setUpClass(cls):
        xml = """
<temporal>
        <fluent_changes>
                <fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="0" new_value="off"/>
                <fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="3890" new_value="on" old_value="off"/>
                <fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="3990" new_value="off" old_value="on"/>
        </fluent_changes>
        <actions>
                <event action="makecall_START" energy="60.281" frame="3718"/>
                <event action="makecall_END" energy="60.281" frame="3767"/>
        </actions>
</temporal>"""
        cls.parsexml = ET.fromstring(xml)


问题


面经


文章

微信
公众号

扫码关注公众号