def _set_boot_device(conn, domain, device):
"""Set the boot device.
:param conn: active libvirt connection.
:param domain: libvirt domain object.
:raises: LibvirtError if failed update domain xml.
"""
parsed = ET.fromstring(domain.XMLDesc())
os = parsed.find('os')
boot_list = os.findall('boot')
# Clear boot list
for boot_el in boot_list:
os.remove(boot_el)
boot_el = ET.SubElement(os, 'boot')
boot_el.set('dev', device)
try:
conn.defineXML(ET.tostring(parsed))
except libvirt.libvirtError as e:
raise isd_exc.LibvirtError(err=e)
python类fromstring()的实例源码
def choose_aws_role(assertion):
""" Choose AWS role from SAML assertion """
aws_attribute_role = 'https://aws.amazon.com/SAML/Attributes/Role'
attribute_value_urn = '{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue'
roles = []
role_tuple = namedtuple("RoleTuple", ["principal_arn", "role_arn"])
root = ET.fromstring(base64.b64decode(assertion))
for saml2attribute in root.iter('{urn:oasis:names:tc:SAML:2.0:assertion}Attribute'):
if saml2attribute.get('Name') == aws_attribute_role:
for saml2attributevalue in saml2attribute.iter(attribute_value_urn):
roles.append(role_tuple(*saml2attributevalue.text.split(',')))
for index, role in enumerate(roles):
role_name = role.role_arn.split('/')[1]
print("%d: %s" % (index+1, role_name))
role_choice = input('Please select the AWS role: ')-1
return roles[role_choice]
def getFileURLs(file_ids):
'''
Retrieve the ftp location for a list of file IDs
@param file_ids: List of file IDs
@return List of ftp locations
'''
info_url='http://modwebsrv.modaps.eosdis.nasa.gov/axis2/services/MODAPSservices/getFileUrls?fileIds='
for file_id in file_ids:
info_url += str(file_id) + ','
info_url = info_url[:-1]
url = urlopen(info_url)
tree = ET.fromstring(url.read().decode())
url.close()
return [ child.text for child in tree ]
def checkCobertura(config, checkoutSteps, buildSteps, packageSteps, **kwargs):
found = False
for s in checkoutSteps:
if s.getPackage().getName().endswith("unittests"): found = True
for s in buildSteps:
if s.getPackage().getName().endswith("unittests"): found = True
for s in packageSteps:
if s.getPackage().getName().endswith("unittests"): found = True
if found:
root = ElementTree.fromstring(config)
publishers = root.find("publishers")
if publishers.find("hudson.plugins.cobertura.CoberturaPublisher") is None:
publishers.append(PLUGIN)
config = ElementTree.tostring(root, encoding="UTF-8")
return config
def _scanDir(self, workspace, dir):
self.__dir = dir
try:
info = ElementTree.fromstring(subprocess.check_output(
["svn", "info", "--xml", dir],
cwd=workspace, universal_newlines=True))
self.__url = info.find('entry/url').text
self.__revision = int(info.find('entry').get('revision'))
self.__repoRoot = info.find('entry/repository/root').text
self.__repoUuid = info.find('entry/repository/uuid').text
status = subprocess.check_output(["svn", "status", dir],
cwd=workspace, universal_newlines=True)
self.__dirty = status != ""
except subprocess.CalledProcessError as e:
raise BuildError("Svn audit failed: " + str(e))
except OSError as e:
raise BuildError("Error calling git: " + str(e))
except ElementTree.ParseError as e:
raise BuildError("Invalid XML received from svn")
def testSetGitShallowClone(self):
self.executeBobJenkinsCmd("set-options -o scm.git.shallow=42 myTestJenkins")
self.executeBobJenkinsCmd("push -q myTestJenkins")
send = self.jenkinsMock.getServerData()
config = ElementTree.fromstring(send[0][1])
for clone in config.iter('hudson.plugins.git.extensions.impl.CloneOption'):
found = 0
for a in clone.getiterator():
if a.tag == 'shallow':
assert(a.text == 'true')
found += 1
if a.tag == 'depth':
assert(a.text == '42')
found += 1
assert(found == 2)
self.executeBobJenkinsCmd("set-options -o scm.git.shallow=-1 myTestJenkins")
with self.assertRaises(Exception) as c:
self.executeBobJenkinsCmd("push -q myTestJenkins")
assert(type(c.exception) == BuildError)
def testShortDescription(self):
self.createComplexRecipes()
self.executeBobJenkinsCmd("set-options --shortdescription myTestJenkinsComplex")
self.executeBobJenkinsCmd("push -q myTestJenkinsComplex")
send = self.jenkinsMock.getServerData()
result_set = set()
try:
for i in send:
if i[0] == '/createItem?name=dependency-two':
for items in ElementTree.fromstring(i[1]).iter('description'):
for line in [x for x in items.itertext()][0].splitlines():
if line.startswith('<li>') and line.endswith('</li>'):
result_set.add(line[4:-5])
except:
print("Malformed Data Recieved")
self.assertEqual(result_set, {'root/dependency-one/dependency-two'})
def FromXml(self, xml):
if not xml:
raise WxPayException("xml?????")
# ??ElementTree
try:
import xml.etree.cElementTree as ET
except ImportError:
import xml.etree.ElementTree as ET
# ??xml
xml2dict = {}
xml_tree = ET.fromstring(xml)
for child_node in xml_tree:
xml2dict[child_node.tag] = child_node.text
self.values = xml2dict
return self.values
# ?????????url??
def _get_defects(self, xml_string):
'''evaluate the xml string returned by cppcheck (on sdterr) and use it to create
a list of defects.
'''
defects = []
for error in ElementTree.fromstring(xml_string).iter('error'):
defect = {}
defect['id'] = error.get('id')
defect['severity'] = error.get('severity')
defect['msg'] = str(error.get('msg')).replace('<','<')
defect['verbose'] = error.get('verbose')
for location in error.findall('location'):
defect['file'] = location.get('file')
defect['line'] = str(int(location.get('line')) - 1)
defects.append(defect)
return defects
def _create_html_index(self, files):
name = self.generator.get_name()
root = ElementTree.fromstring(CPPCHECK_HTML_FILE)
title = root.find('head/title')
title.text = 'cppcheck - report - %s' % name
body = root.find('body')
for div in body.findall('div'):
if div.get('id') == 'page':
page = div
break
for div in page.findall('div'):
if div.get('id') == 'header':
h1 = div.find('h1')
h1.text = 'cppcheck report - %s' % name
if div.get('id') == 'content':
content = div
self._create_html_table(content, files)
s = ElementTree.tostring(root, method='html')
s = CCPCHECK_HTML_TYPE + s
node = self.generator.path.get_bld().find_or_declare('cppcheck/index.html')
node.write(s)
return node
def _create_html_table(self, content, files):
table = ElementTree.fromstring(CPPCHECK_HTML_TABLE)
for name, val in files.items():
f = val['htmlfile']
s = '<tr><td colspan="4"><a href="%s">%s</a></td></tr>\n' % (f,name)
row = ElementTree.fromstring(s)
table.append(row)
errors = sorted(val['errors'], key=lambda e: int(e['line']) if e.has_key('line') else sys.maxint)
for e in errors:
if not e.has_key('line'):
s = '<tr><td></td><td>%s</td><td>%s</td><td>%s</td></tr>\n' % (e['id'], e['severity'], e['msg'])
else:
attr = ''
if e['severity'] == 'error':
attr = 'class="error"'
s = '<tr><td><a href="%s#line-%s">%s</a></td>' % (f, e['line'], e['line'])
s+= '<td>%s</td><td>%s</td><td %s>%s</td></tr>\n' % (e['id'], e['severity'], attr, e['msg'])
row = ElementTree.fromstring(s)
table.append(row)
content.append(table)
def _get_defects(self, xml_string):
'''evaluate the xml string returned by cppcheck (on sdterr) and use it to create
a list of defects.
'''
defects = []
for error in ElementTree.fromstring(xml_string).iter('error'):
defect = {}
defect['id'] = error.get('id')
defect['severity'] = error.get('severity')
defect['msg'] = str(error.get('msg')).replace('<','<')
defect['verbose'] = error.get('verbose')
for location in error.findall('location'):
defect['file'] = location.get('file')
defect['line'] = str(int(location.get('line')) - 1)
defects.append(defect)
return defects
def _create_html_index(self, files):
name = self.generator.get_name()
root = ElementTree.fromstring(CPPCHECK_HTML_FILE)
title = root.find('head/title')
title.text = 'cppcheck - report - %s' % name
body = root.find('body')
for div in body.findall('div'):
if div.get('id') == 'page':
page = div
break
for div in page.findall('div'):
if div.get('id') == 'header':
h1 = div.find('h1')
h1.text = 'cppcheck report - %s' % name
if div.get('id') == 'content':
content = div
self._create_html_table(content, files)
s = ElementTree.tostring(root, method='html')
s = CCPCHECK_HTML_TYPE + s
node = self.generator.path.get_bld().find_or_declare('cppcheck/index.html')
node.write(s)
return node
def _create_html_table(self, content, files):
table = ElementTree.fromstring(CPPCHECK_HTML_TABLE)
for name, val in files.items():
f = val['htmlfile']
s = '<tr><td colspan="4"><a href="%s">%s</a></td></tr>\n' % (f,name)
row = ElementTree.fromstring(s)
table.append(row)
errors = sorted(val['errors'], key=lambda e: int(e['line']) if e.has_key('line') else sys.maxint)
for e in errors:
if not e.has_key('line'):
s = '<tr><td></td><td>%s</td><td>%s</td><td>%s</td></tr>\n' % (e['id'], e['severity'], e['msg'])
else:
attr = ''
if e['severity'] == 'error':
attr = 'class="error"'
s = '<tr><td><a href="%s#line-%s">%s</a></td>' % (f, e['line'], e['line'])
s+= '<td>%s</td><td>%s</td><td %s>%s</td></tr>\n' % (e['id'], e['severity'], attr, e['msg'])
row = ElementTree.fromstring(s)
table.append(row)
content.append(table)
def _create_html_index(self, files):
name = self.generator.get_name()
root = ElementTree.fromstring(CPPCHECK_HTML_FILE)
title = root.find('head/title')
title.text = 'cppcheck - report - %s' % name
body = root.find('body')
for div in body.findall('div'):
if div.get('id') == 'page':
page = div
break
for div in page.findall('div'):
if div.get('id') == 'header':
h1 = div.find('h1')
h1.text = 'cppcheck report - %s' % name
if div.get('id') == 'content':
content = div
self._create_html_table(content, files)
s = ElementTree.tostring(root, method='html')
s = CCPCHECK_HTML_TYPE + s
node = self.generator.path.get_bld().find_or_declare('cppcheck/index.html')
node.write(s)
return node
def _create_html_table(self, content, files):
table = ElementTree.fromstring(CPPCHECK_HTML_TABLE)
for name, val in files.items():
f = val['htmlfile']
s = '<tr><td colspan="4"><a href="%s">%s</a></td></tr>\n' % (f,name)
row = ElementTree.fromstring(s)
table.append(row)
errors = sorted(val['errors'], key=lambda e: int(e['line']) if e.has_key('line') else sys.maxint)
for e in errors:
if not e.has_key('line'):
s = '<tr><td></td><td>%s</td><td>%s</td><td>%s</td></tr>\n' % (e['id'], e['severity'], e['msg'])
else:
attr = ''
if e['severity'] == 'error':
attr = 'class="error"'
s = '<tr><td><a href="%s#line-%s">%s</a></td>' % (f, e['line'], e['line'])
s+= '<td>%s</td><td>%s</td><td %s>%s</td></tr>\n' % (e['id'], e['severity'], attr, e['msg'])
row = ElementTree.fromstring(s)
table.append(row)
content.append(table)
def get_xml_trees(db, host, pid, usernames, graceful=False):
""" Params: db, host, paragraph id, the list of usernames wanted,
Optional:
graceful: True if no excpetions are to be raised
excpetion raised if a user did not submit an annotation for the passage
returns a list of xml roots elements
"""
cur = get_cursor(db, host)
xmls = []
for username in usernames:
username = str(username) # precaution for cases bad input e.g. 101
cur_uid = get_uid(db, host, username)
cur.execute("SELECT xml FROM xmls WHERE paid=" + PLACE_HOLDER +
" AND uid=" + PLACE_HOLDER + "ORDER BY ts DESC",
(pid, cur_uid))
raw_xml = cur.fetchone()
if not raw_xml and not graceful:
raise Exception("The user " + username +
" did not submit an annotation for this passage")
else:
xmls.append(fromstring(raw_xml[0]))
return xmls
def get_by_xid(db, host, xid, graceful=False):
"""Returns the passages that correspond to the xid
Optional:
graceful: True if no excpetions are to be raised
excpetion raised if xid does no exist
"""
# precaution for bad input e.g. 101->'101'
xid = str(xid)
cur = get_cursor(db, host)
cur.execute("SELECT xml FROM xmls WHERE id" + "=" +
PLACE_HOLDER, (int(xid),))
raw_xml = cur.fetchone()
if not raw_xml and not graceful:
raise Exception("The xid " + xid + " does not exist")
elif raw_xml:
return fromstring(raw_xml[0])
newest_dhcp_lease.py 文件源码
项目:kolla-kubernetes-personal
作者: rthallisey
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def get_mac_address(conn, domain_name):
"""Get MAC address from domain XML."""
domain = conn.lookupByName(domain_name)
domain_xml = domain.XMLDesc()
domain_tree = etree.fromstring(domain_xml)
devices = domain_tree.find('devices')
interfaces = devices.iterfind('interface')
for interface in interfaces:
source = interface.find('source')
if source is None or source.get('network') != 'vagrant-private-dhcp':
continue
mac_element = interface.find('mac')
mac_address = mac_element.get('address')
return mac_address
raise NoPrivateDHCPInterfaceException()
def __init__(self, path, *args, **kwargs):
cls = self.__class__
super(cls, self).__init__(path, *args, **kwargs)
while True:
sess = PeektaggedSectionHeader(self.fh)
if sess.tag == 'pkts':
break
sess.payload = self.fh.read(sess.len)
if sess.tag == PEEKTAGGED_FILE_MAGIC:
root = ET.fromstring(sess.payload)
if root.tag != 'VersionInfo':
raise PeektaggedException("Corrupted version info")
for child in root:
setattr(self, child.tag, child.text)
elif sess.tag == 'sess':
root = ET.fromstring(sess.payload)
for child in root:
if child.tag == 'PacketCount':
setattr(self, 'total_packets', int(child.text))
break
def CreateClassFromXMLString(target_class, xml_string, string_encoding=None):
"""Creates an instance of the target class from the string contents.
Args:
target_class: class The class which will be instantiated and populated
with the contents of the XML. This class must have a _tag and a
_namespace class variable.
xml_string: str A string which contains valid XML. The root element
of the XML string should match the tag and namespace of the desired
class.
string_encoding: str The character encoding which the xml_string should
be converted to before it is interpreted and translated into
objects. The default is None in which case the string encoding
is not changed.
Returns:
An instance of the target class with members assigned according to the
contents of the XML - or None if the root XML tag and namespace did not
match those of the target class.
"""
encoding = string_encoding or XML_STRING_ENCODING
if encoding and isinstance(xml_string, unicode):
xml_string = xml_string.encode(encoding)
tree = ElementTree.fromstring(xml_string)
return _CreateClassFromElementTree(target_class, tree)
def parse(xml_string, target_class=None, version=1, encoding=None):
"""Parses the XML string according to the rules for the target_class.
Args:
xml_string: str or unicode
target_class: XmlElement or a subclass. If None is specified, the
XmlElement class is used.
version: int (optional) The version of the schema which should be used when
converting the XML into an object. The default is 1.
encoding: str (optional) The character encoding of the bytes in the
xml_string. Default is 'UTF-8'.
"""
if target_class is None:
target_class = XmlElement
if isinstance(xml_string, unicode):
if encoding is None:
xml_string = xml_string.encode(STRING_ENCODING)
else:
xml_string = xml_string.encode(encoding)
tree = ElementTree.fromstring(xml_string)
return _xml_element_from_tree(tree, target_class, version)
def doSomethingWithResult(response):
if response is None:
return "KO"
else:
tree = ET.fromstring(response.text)
status = "Free"
# arrgh, namespaces!!
elems=tree.findall(".//{http://schemas.microsoft.com/exchange/services/2006/types}BusyType")
for elem in elems:
status=elem.text
tree2=ET.fromstring(response.request.body)
elems2=tree2.findall(".//{http://schemas.microsoft.com/exchange/services/2006/types}Address")
for e in elems2:
room=e.text
#sys.stderr.write(str(now.isoformat())+": Get BusyType for room "+str(rooms[room])+": len: "+str(len(elems))+" => "+str(elems[0])+"\n")
sys.stderr.write(str(now.isoformat())+": Status for room "+str(rooms[room])+": "+str(room)+" => "+status+"\n")
result.append((status, rooms[room], room))
def snabb_state(query_output):
root = ET.fromstring(query_output)
print ("<snabb>")
for instance in root:
# In each instance, we need to query the id, pci, pid.
print ("<instance>")
for child in instance:
PRINT_TAG(child, "id")
PRINT_TAG(child,"pid")
PRINT_TAG(child,"next_hop_mac_v4")
PRINT_TAG(child,"next_hop_mac_v6")
#child = None
#for child in instance:
if child.tag == "pci":
for pcis in child:
for pci_child in pcis:
PRINT_TAG(pci_child,"rxpackets")
PRINT_TAG(pci_child,"txpackets")
PRINT_TAG(pci_child,"rxdrop")
PRINT_TAG(pci_child,"txdrop")
print "</instance>"
print ("</snabb>")
return
test_rpc_get_lwaftr_statistics.py 文件源码
项目:vmx-docker-lwaftr
作者: Juniper
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def snabb_statistics(output,argv):
root = ET.fromstring(output)
print "<snabb>"
found = 0
instance_id = ""
for i in range(0,len(argv)):
if argv[i] == "id":
instance_id = argv[i+1]
break
for instance in root:
if instance_id != '' and instance.findall("./id")[0].text != argv[2]:
pass
else:
found += 1
stats_per_instance(instance)
if found == 0:
print "<statistics><id_error>no instance found</id_error></statistics>"
print "</snabb>"
return
def snabb_statistics(output,argv):
root = ET.fromstring(output)
print "<snabb>"
found = 0
instance_id = ""
for i in range(0,len(argv)):
if argv[i] == "id":
instance_id = argv[i+1]
break
for instance in root:
if instance_id != '' and instance.findall("./id")[0].text != argv[2]:
pass
else:
found += 1
stats_per_instance(instance)
if found == 0:
print "<statistics><id_error>no instance found</id_error></statistics>"
print "</snabb>"
return
def _http_get_and_parse(self, cmd, payload = {}):
"""
Call http_get and parse the returned Foscam data into a hash. The data
all looks like: var id='000C5DDC9D6C';
"""
ret = {}
data = self._http_get(cmd,payload)
self.parent.logger.debug("FoscamHD2:_http_get_and_parse:%s: data=%s" % (self.name,data))
if data is False:
code = -1
else:
# Return code is good, unless CGI result changes it later
code = 0
root = ET.fromstring(data)
for child in root.iter():
if child.tag == 'result':
code = int(child.text)
elif child.tag != 'CGI_Result':
ret[child.tag] = child.text
self.parent.logger.debug("FoscamHD2:_http_get_and_parse:%s: code=%d, ret=%s" % (self.name,code,ret))
return code,ret
def ip(self, name):
leases = {}
conn = self.conn
for network in conn.listAllNetworks():
for lease in network.DHCPLeases():
ip = lease['ipaddr']
mac = lease['mac']
leases[mac] = ip
try:
vm = conn.lookupByName(name)
xml = vm.XMLDesc(0)
root = ET.fromstring(xml)
except:
return None
for element in root.getiterator('{kvirt}info'):
e = element.find('{kvirt}ip')
if e is not None:
return e.text
nic = root.getiterator('interface')[0]
mac = nic.find('mac').get('address')
if vm.isActive() and mac in leases:
return leases[mac]
else:
return None
def volumes(self, iso=False):
isos = []
templates = []
default_templates = [os.path.basename(t) for t in defaults.TEMPLATES.values() if t is not None]
conn = self.conn
for storage in conn.listStoragePools():
storage = conn.storagePoolLookupByName(storage)
storage.refresh(0)
storagexml = storage.XMLDesc(0)
root = ET.fromstring(storagexml)
for element in root.getiterator('path'):
storagepath = element.text
break
for volume in storage.listVolumes():
if volume.endswith('iso'):
isos.append("%s/%s" % (storagepath, volume))
elif volume.endswith('qcow2') or volume.endswith('qc2') or volume in default_templates:
templates.append("%s/%s" % (storagepath, volume))
if iso:
return sorted(isos, key=lambda s: s.lower())
else:
return sorted(templates, key=lambda s: s.lower())
def _uploadimage(self, name, pool='default', origin='/tmp', suffix='.ISO'):
name = "%s%s" % (name, suffix)
conn = self.conn
poolxml = pool.XMLDesc(0)
root = ET.fromstring(poolxml)
for element in root.getiterator('path'):
poolpath = element.text
break
imagepath = "%s/%s" % (poolpath, name)
imagexml = self._xmlvolume(path=imagepath, size=0, diskformat='raw')
pool.createXML(imagexml, 0)
imagevolume = conn.storageVolLookupByPath(imagepath)
stream = conn.newStream(0)
imagevolume.upload(stream, 0, 0)
with open("%s/%s" % (origin, name)) as ori:
stream.sendAll(self.handler, ori)
stream.finish()