def remove_cloudinit(self, name):
conn = self.conn
try:
vm = conn.lookupByName(name)
xml = vm.XMLDesc(0)
root = ET.fromstring(xml)
except:
print("VM %s not found" % name)
return {'result': 'failure', 'reason': "VM %s not found" % name}
for element in root.getiterator('disk'):
disktype = element.get('device')
if disktype == 'cdrom':
source = element.find('source')
path = source.get('file')
if source is None:
break
volume = conn.storageVolLookupByPath(path)
volume.delete(0)
element.remove(source)
newxml = ET.tostring(root)
conn.defineXML(newxml)
python类fromstring()的实例源码
def vm_ports(self, name):
conn = self.conn
networks = []
try:
vm = conn.lookupByName(name)
except:
common.pprint("VM %s not found" % name, color='red')
return networks
xml = vm.XMLDesc(0)
root = ET.fromstring(xml)
for element in root.getiterator('interface'):
networktype = element.get('type')
if networktype == 'bridge':
network = element.find('source').get('bridge')
else:
network = element.find('source').get('network')
networks.append(network)
return networks
def _get_bridge(self, name):
conn = self.conn
bridges = [interface.name() for interface in conn.listAllInterfaces()]
if name in bridges:
return name
try:
net = self.conn.networkLookupByName(name)
except:
return None
netxml = net.XMLDesc(0)
root = ET.fromstring(netxml)
bridge = root.getiterator('bridge')
if bridge:
attributes = bridge[0].attrib
bridge = attributes.get('name')
return bridge
def _le_xml(self, arquivo):
if arquivo is None:
return False
if not isinstance(arquivo, basestring):
arquivo = etree.tounicode(arquivo)
if arquivo is not None:
if isinstance(arquivo, basestring):
if NAMESPACE_NFSE in arquivo:
arquivo = por_acentos(arquivo)
if u'<' in arquivo:
self._xml = etree.fromstring(tira_abertura(arquivo))
else:
arq = open(arquivo)
txt = ''.join(arq.readlines())
txt = tira_abertura(txt)
arq.close()
self._xml = etree.fromstring(txt)
else:
self._xml = etree.parse(arquivo)
return True
return False
def validar(self):
arquivo_esquema = self.caminho_esquema + self.arquivo_esquema
# Aqui é importante remover a declaração do encoding
# para evitar erros de conversão unicode para ascii
xml = tira_abertura(self.xml).encode(u'utf-8')
esquema = etree.XMLSchema(etree.parse(arquivo_esquema))
if not esquema.validate(etree.fromstring(xml)):
for e in esquema.error_log:
if e.level == 1:
self.alertas.append(e.message.replace('{http://www.portalfiscal.inf.br/nfe}', ''))
elif e.level == 2:
self.erros.append(e.message.replace('{http://www.portalfiscal.inf.br/nfe}', ''))
return esquema.error_log
def xml_to_dict(data):
try:
root = ET.fromstring(data)
except ET.ParseError:
root = []
d = {}
for item in root:
dd = {}
for subitem in item:
m = {}
m['text'] = subitem.text
m.update(subitem.attrib)
dd[subitem.tag] = m
d[dd['title']['text']] = dd
return d
def test_check_host_status_by_cibadmin(
self, mock_get_cib_xml, mock_set_cib_xml, mock_have_quorum,
mock_get_node_state_tag_list, mock_check_if_status_changed):
mock_get_cib_xml.return_value = STATUS_TAG_XML
mock_set_cib_xml.return_value = None
mock_have_quorum.return_value = 1
status_tag = ElementTree.fromstring(STATUS_TAG_XML)
node_state_tag_list = status_tag.getchildren()
mock_get_node_state_tag_list.return_value = node_state_tag_list
mock_check_if_status_changed.return_value = None
obj = handle_host.HandleHost()
ret = obj._check_host_status_by_cibadmin()
self.assertEqual(0, ret)
mock_get_cib_xml.assert_called_once_with()
mock_set_cib_xml.assert_called_once_with(STATUS_TAG_XML)
mock_have_quorum.assert_called_once_with()
mock_get_node_state_tag_list.assert_called_once_with()
mock_check_if_status_changed.assert_called_once_with(
node_state_tag_list)
def test_check_host_status_by_cibadmin_no_quorum(
self, mock_get_cib_xml, mock_set_cib_xml, mock_have_quorum,
mock_get_node_state_tag_list, mock_check_if_status_changed):
mock_get_cib_xml.return_value = STATUS_TAG_XML
mock_set_cib_xml.return_value = None
mock_have_quorum.return_value = 0
status_tag = ElementTree.fromstring(STATUS_TAG_XML)
node_state_tag_list = status_tag.getchildren()
mock_get_node_state_tag_list.return_value = node_state_tag_list
mock_check_if_status_changed.return_value = None
obj = handle_host.HandleHost()
ret = obj._check_host_status_by_cibadmin()
self.assertEqual(0, ret)
mock_get_cib_xml.assert_called_once_with()
mock_set_cib_xml.assert_called_once_with(STATUS_TAG_XML)
mock_have_quorum.assert_called_once_with()
mock_get_node_state_tag_list.assert_called_once_with()
mock_check_if_status_changed.assert_called_once_with(
node_state_tag_list)
def testParseClassNode(self):
example_xml_string = (
'<class name="Class1" extends="java.lang.Object">'
'<method name="method1">'
'</method>'
'<method name="method2">'
'</method>'
'</class>')
actual = dexdump._ParseClassNode(
ElementTree.fromstring(example_xml_string))
expected = {
'methods': ['method1', 'method2'],
'superclass': 'java.lang.Object',
}
self.assertEquals(expected, actual)
def del_tags_from_xml(xml, tag_list=[]):
"""
It deletes the tags either by their names or xpath
Arguments:
1.xml: It takes xml file path or xml string as input
2.tag_list: It contains list of tags which needs to be removed
Returns:
It returns xml string
"""
if os.path.exists(xml):
tree = ElementTree.parse(xml)
root = tree.getroot()
else:
root = ElementTree.fromstring(xml)
for tag in tag_list:
if 'xpath=' in tag:
tag = tag.strip('xpath=')
req_tags = getChildElementsListWithSpecificXpath(root, tag)
else:
req_tags = getChildElementsListWithSpecificXpath(root, ".//{0}".format(tag))
recursive_delete_among_children(root, req_tags)
xml_string = ElementTree.tostring(root, encoding='utf-8', method='xml')
return xml_string
def _update_interfaces(self):
self._interfaces = {}
root = etree.fromstring(self._introspect_xml)
for e in root.iter('interface'):
name = e.attrib['name']
self._interfaces[name] = Interface(
self, self._service, self._address, self._path, e,
self._camel_convert, self._timeout_ms,
self._changed_coroutines.get(name, None)
)
for e in root.iter('node'):
try:
self._nodes.append(e.attrib['name'])
except KeyError:
pass
def do_discover_reports(sdk_client):
url = 'https://adwords.google.com/api/adwords/reportdownload/{}/reportDefinition.xsd'.format(VERSION) #pylint: disable=line-too-long
xsd = request_xsd(url)
root = ET.fromstring(xsd)
nodes = list(root.find(".//*[@name='ReportDefinition.ReportType']/*"))
stream_names = [p.attrib['value'] for p in nodes if p.attrib['value'] in VERIFIED_REPORTS] #pylint: disable=line-too-long
streams = []
LOGGER.info("Starting report discovery")
for stream_name in stream_names:
schema, mdata = create_schema_for_report(stream_name, sdk_client)
streams.append({'stream': stream_name,
'tap_stream_id': stream_name,
'metadata' : metadata.to_list(mdata),
'schema': schema})
LOGGER.info("Report discovery complete")
return streams
def mountIso(self,instance,dev, image):
tree = ElementTree.fromstring(self.getInsXMLDesc(instance,0))
for disk in tree.findall('devices/disk'):
if disk.get('device') == 'cdrom':
for elm in disk:
if elm.tag == 'target':
if elm.get('dev') == dev:
src_media = ElementTree.Element('source')
src_media.set('file', image)
disk.append(src_media)
if instance.state()[0] == 1:
xml_disk = ElementTree.tostring(disk)
try:
instance.attachDevice(xml_disk)
except libvirt.libvirtError,e:
return '??????????{result}'.format(result=e.get_error_message())
xmldom = self.getInsXMLDesc(instance,1)
if instance.state()[0] == 5:
xmldom = ElementTree.tostring(tree)
try:
return self.defineXML(xmldom)
except libvirt.libvirtError,e:
return '??????????{result}'.format(result=e.get_error_message())
def getNetUsage(self,instance):
devices = []
dev_usage = []
tree = ElementTree.fromstring(self.getInsXMLDesc(instance, flag=1))
if instance.state()[0] == 1:
tree = ElementTree.fromstring(self.getInsXMLDesc(instance, flag=1))
for target in tree.findall("devices/interface/target"):
devices.append(target.get("dev"))
for i, dev in enumerate(devices):
rx_use_ago = instance.interfaceStats(dev)[0]
tx_use_ago = instance.interfaceStats(dev)[4]
time.sleep(1)
rx_use_now = instance.interfaceStats(dev)[0]
tx_use_now = instance.interfaceStats(dev)[4]
rx_diff_usage = (rx_use_now - rx_use_ago) * 8
tx_diff_usage = (tx_use_now - tx_use_ago) * 8
dev_usage.append({'dev': i, 'rx': rx_diff_usage, 'tx': tx_diff_usage})
else:
for i, dev in enumerate(self.get_net_device(instance)):
dev_usage.append({'dev': i, 'rx': 0, 'tx': 0})
return dev_usage
def delInstanceCdrom(self,instance,cdrom):
'''????'''
raw_xml = instance.XMLDesc(0)
root = ElementTree.fromstring(raw_xml)
for dk in root.findall('./devices'):
devs = dk.getchildren()
for dev in devs:
if dev.tag == 'disk'and dev.get('device')=='cdrom':
for iter in dev:
if iter.tag == 'target' and iter.get('dev') == cdrom:
devs.remove(dev)
diskXml = ElementTree.tostring(root)
try:
return self.defineXML(diskXml)
except libvirt.libvirtError,e:
return '??????????????{result}'.format(result=e.get_error_message())
def addInstanceInterface(self,instance,brName):
netk = self.getNetwork(brName)
if netk:
xml = netk.XMLDesc(0)
tree = ElementTree.fromstring(xml)
try:
mode = tree.find('virtualport').get('type')
except:
mode = 'brctl'
model = tree.find('forward').get('mode')
interXml = Const.CreateNetcard(nkt_br=brName, ntk_name=brName +'-'+CommTools.radString(length=4), data={'type':model,'mode':mode})
try:
return instance.attachDeviceFlags(interXml,3)#??????????flags?3??????????????
except libvirt.libvirtError,e:
return '??????????????{result}'.format(result=e.get_error_message())
else:return False
def setInterfaceBandwidth(self,instance,port,bandwidth):
'''????'''
domXml = instance.XMLDesc(0)
root = ElementTree.fromstring(domXml)
try:
for dev in root.findall('.//devices/'):
if dev.tag == 'interface':
for iter in dev:
if iter.tag == 'target' and iter.get('dev') == port:
bwXml = ElementTree.SubElement(dev,'bandwidth')
inbdXml = ElementTree.Element('inbound')
inbdXml.set('average',str(int(bandwidth)*1024))
inbdXml.set('peak',str(int(bandwidth)*1024))
inbdXml.set('burst','1024')
outbdXml = ElementTree.Element('outbound')
outbdXml.set('average',str(int(bandwidth)*1024))
outbdXml.set('peak',str(int(bandwidth)*1024))
outbdXml.set('burst','1024')
bwXml.append(inbdXml)
bwXml.append(outbdXml)
domXml = ElementTree.tostring(root)
except Exception,e:
return {"status":"faild",'data':e}
if self.defineXML(domXml):return {"status":"success",'data':None}
def setUpClass(cls):
xml = """
<temporal>
<fluent_changes>
<fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="0" new_value="off"/>
<fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="3890" new_value="on" old_value="off"/>
<fluent_change energy="60.281" fluent="PHONE_ACTIVE" frame="3990" new_value="off" old_value="on"/>
</fluent_changes>
<actions>
<event action="makecall_START" energy="60.281" frame="3718"/>
<event action="makecall_END" energy="60.281" frame="3767"/>
</actions>
</temporal>"""
cls.parsexml = ET.fromstring(xml)