def mountIso(self,instance,dev, image):
tree = ElementTree.fromstring(self.getInsXMLDesc(instance,0))
for disk in tree.findall('devices/disk'):
if disk.get('device') == 'cdrom':
for elm in disk:
if elm.tag == 'target':
if elm.get('dev') == dev:
src_media = ElementTree.Element('source')
src_media.set('file', image)
disk.append(src_media)
if instance.state()[0] == 1:
xml_disk = ElementTree.tostring(disk)
try:
instance.attachDevice(xml_disk)
except libvirt.libvirtError,e:
return '??????????{result}'.format(result=e.get_error_message())
xmldom = self.getInsXMLDesc(instance,1)
if instance.state()[0] == 5:
xmldom = ElementTree.tostring(tree)
try:
return self.defineXML(xmldom)
except libvirt.libvirtError,e:
return '??????????{result}'.format(result=e.get_error_message())
python类Element()的实例源码
def setInterfaceBandwidth(self,instance,port,bandwidth):
'''????'''
domXml = instance.XMLDesc(0)
root = ElementTree.fromstring(domXml)
try:
for dev in root.findall('.//devices/'):
if dev.tag == 'interface':
for iter in dev:
if iter.tag == 'target' and iter.get('dev') == port:
bwXml = ElementTree.SubElement(dev,'bandwidth')
inbdXml = ElementTree.Element('inbound')
inbdXml.set('average',str(int(bandwidth)*1024))
inbdXml.set('peak',str(int(bandwidth)*1024))
inbdXml.set('burst','1024')
outbdXml = ElementTree.Element('outbound')
outbdXml.set('average',str(int(bandwidth)*1024))
outbdXml.set('peak',str(int(bandwidth)*1024))
outbdXml.set('burst','1024')
bwXml.append(inbdXml)
bwXml.append(outbdXml)
domXml = ElementTree.tostring(root)
except Exception,e:
return {"status":"faild",'data':e}
if self.defineXML(domXml):return {"status":"success",'data':None}
def create_xml_page(dictionary: dict, creator_name='DocSeg') -> 'Page':
page = Page.from_dict(dictionary)
# Create xml
root = ET.Element('PcGts')
root.set('xmlns', _ns['p'])
# Metadata
generated_on = str(datetime.datetime.now())
metadata = ET.SubElement(root, 'Metadata')
creator = ET.SubElement(metadata, 'Creator')
creator.text = creator_name
created = ET.SubElement(metadata, 'Created')
created.text = generated_on
last_change = ET.SubElement(metadata, 'LastChange')
last_change.text = generated_on
root.append(page.to_xml())
for k, v in _attribs.items():
root.attrib[k] = v
return root
def _build_xml(self, tag, val=None, children=Void()):
"""Construct an xml element with a given tag, value, and children."""
attribs = {'val': val} if val is not None else {}
# If children is a list then convert each element separately, if it is
# a tuple, treat it as a single element
if isinstance(children, self.Void):
children = ()
elif isinstance(children, list):
children = [self._from_value(child) for child in children]
else:
children = (self._from_value(children),)
xml = ET.Element(tag, attribs)
xml.extend(children)
return xml
def query(self, cmd, state, *args, **kwargs):
"""Create an XML string for the 'interp' command."""
# Attrs:
# raw - ?
# bool - Verbose output
# int - The current state id
# Args:
# string - The query to evaluate
elt = ET.Element('call', {'val': 'interp',
'raw': 'true',
'verbose': 'true',
'id': str(state)})
elt.text = cmd
return ('Query',
ET.tostring(elt,
kwargs.get('encoding', 'utf-8')))
def send_feedback(self):
"""Print stored items to console/Alfred as XML."""
root = ET.Element('items')
for item in self._items:
root.append(item.elem)
sys.stdout.write('<?xml version="1.0" encoding="utf-8"?>\n')
sys.stdout.write(ET.tostring(root).encode('utf-8'))
sys.stdout.flush()
####################################################################
# Updating methods
####################################################################
def build_soap_xml(items, namespace=None):
"""Build a SOAP XML.
:param items: a list of dictionaries where key is the element name
and the value is the element text.
:param namespace: the namespace for the elements, None for no
namespace. Defaults to None
:returns: a XML string.
"""
def _create_element(name, value=None):
xml_string = name
if namespace:
xml_string = "{%(namespace)s}%(item)s" % {'namespace': namespace,
'item': xml_string}
element = ElementTree.Element(xml_string)
element.text = value
return element
soap_namespace = "http://www.w3.org/2003/05/soap-envelope"
envelope_element = ElementTree.Element("{%s}Envelope" % soap_namespace)
body_element = ElementTree.Element("{%s}Body" % soap_namespace)
for item in items:
for i in item:
insertion_point = _create_element(i)
if isinstance(item[i], dict):
for j, value in item[i].items():
insertion_point.append(_create_element(j, value))
else:
insertion_point.text = item[i]
body_element.append(insertion_point)
envelope_element.append(body_element)
return ElementTree.tostring(envelope_element)
def onFinishBuilding(app, exception):
currentVersion = app.env.config["version"]
if "latest_docs_version" in app.env.config["html_context"].keys():
latestVersion = app.env.config["html_context"]["latest_docs_version"]
else:
latestVersion = "dev"
base_domain = app.env.config["html_context"]["SITEMAP_DOMAIN"]
file_path = "./_build/algolia_index/index.json"
sitemap_path = "./_build/sitemap/sitemap_" + currentVersion + ".xml"
checkDirectory(file_path)
checkDirectory(sitemap_path)
f = open(file_path, 'w+')
root = ET.Element("urlset")
root.set("xmlns", "http://www.sitemaps.org/schemas/sitemap/0.9")
for link in indexObjs:
url = ET.SubElement(root, "url")
ET.SubElement(url, "loc").text = base_domain + str(currentVersion) + "/" + link["url"]
ET.SubElement(url, "changefreq").text = "daily"
ET.SubElement(url, "priority").text = "1" if ( currentVersion == latestVersion ) else "0.5"
ET.ElementTree(root).write(sitemap_path)
f.write(json.dumps(indexObjs))
def eccu_doc_for_purge_dir(path_to_purge):
root = ET.Element('eccu')
parent = root
names = (name for name in path_to_purge.split('/') if name)
for name in names:
parent = ET.SubElement(parent, 'match:recursive-dirs', {'value': name})
revalidate = ET.SubElement(parent, 'revalidate')
revalidate.text = 'now'
return ET.tostring(root, encoding='ascii') #'<?xml version="1.0"?>\n' + ET.tostring(root)
#xml = StringIO()
#xml.write('<?xml version="1.0"?>\n')
#xml.write(ET.tostring(root))
#return xml
def add_to_document(self, parent):
"""Adds an ``Argument`` object to this ElementTree document.
Adds an <arg> subelement to the parent element, typically <args>
and sets up its subelements with their respective text.
:param parent: An ``ET.Element`` to be the parent of a new <arg> subelement
:returns: An ``ET.Element`` object representing this argument.
"""
arg = ET.SubElement(parent, "arg")
arg.set("name", self.name)
if self.title is not None:
ET.SubElement(arg, "title").text = self.title
if self.description is not None:
ET.SubElement(arg, "description").text = self.description
if self.validation is not None:
ET.SubElement(arg, "validation").text = self.validation
# add all other subelements to this Argument, represented by (tag, text)
subelements = [
("data_type", self.data_type),
("required_on_edit", self.required_on_edit),
("required_on_create", self.required_on_create)
]
for name, value in subelements:
ET.SubElement(arg, name).text = str(value).lower()
return arg
def to_xml(self):
"""Creates an ``ET.Element`` representing self, then returns it.
:returns root, an ``ET.Element`` representing this scheme.
"""
root = ET.Element("scheme")
ET.SubElement(root, "title").text = self.title
# add a description subelement if it's defined
if self.description is not None:
ET.SubElement(root, "description").text = self.description
# add all other subelements to this Scheme, represented by (tag, text)
subelements = [
("use_external_validation", self.use_external_validation),
("use_single_instance", self.use_single_instance),
("streaming_mode", self.streaming_mode)
]
for name, value in subelements:
ET.SubElement(root, name).text = str(value).lower()
endpoint = ET.SubElement(root, "endpoint")
args = ET.SubElement(endpoint, "args")
# add arguments as subelements to the <args> element
for arg in self.arguments:
arg.add_to_document(args)
return root
def write_to(self, stream):
"""Write an XML representation of self, an ``Event`` object, to the given stream.
The ``Event`` object will only be written if its data field is defined,
otherwise a ``ValueError`` is raised.
:param stream: stream to write XML to.
"""
if self.data is None:
raise ValueError("Events must have at least the data field set to be written to XML.")
event = ET.Element("event")
if self.stanza is not None:
event.set("stanza", self.stanza)
event.set("unbroken", str(int(self.unbroken)))
# if a time isn't set, let Splunk guess by not creating a <time> element
if self.time is not None:
ET.SubElement(event, "time").text = str(self.time)
# add all other subelements to this Event, represented by (tag, text)
subelements = [
("source", self.source),
("sourcetype", self.sourceType),
("index", self.index),
("host", self.host),
("data", self.data)
]
for node, value in subelements:
if value is not None:
ET.SubElement(event, node).text = value
if self.done:
ET.SubElement(event, "done")
stream.write(ET.tostring(event))
stream.flush()
def send_feedback(self):
"""Print stored items to console/Alfred as XML."""
root = ET.Element('items')
for item in self._items:
root.append(item.elem)
sys.stdout.write('<?xml version="1.0" encoding="utf-8"?>\n')
sys.stdout.write(ET.tostring(root).encode('utf-8'))
sys.stdout.flush()
####################################################################
# Updating methods
####################################################################
def send_feedback(self):
"""Print stored items to console/Alfred as XML."""
root = ET.Element('items')
for item in self._items:
root.append(item.elem)
sys.stdout.write('<?xml version="1.0" encoding="utf-8"?>\n')
sys.stdout.write(ET.tostring(root).encode('utf-8'))
sys.stdout.flush()
####################################################################
# Updating methods
####################################################################
def CDATA(text=None):
"""
A CDATA element factory function that uses the function itself as the tag
(based on the Comment factory function in the ElementTree implementation).
"""
element = etree.Element(CDATA)
element.text = text
return element
# We're replacing the _write method of the ElementTree class so that it would
# recognize and correctly print out CDATA sections.
def __walk(self, node, parent):
name = self.__genName(node.tag)
tag = self.__getNamespace(node.tag)
if parent is None:
self.root = name
self.lines.append("%s = ET.Element(%s)" % (name, tag))
else:
self.lines.append("%s = ET.SubElement(%s, %s)" % (name, parent, tag))
# handles text
try:
t = node.text.strip()
if t == '': t = None
except:
t = None
if t is not None:
self.lines.append("%s.text = kwargs.get('', '%s') # PARAMETERIZE" % (name, t))
# handles attributes
for key,val in node.items():
key = self.__getNamespace(key)
self.lines.append("%s.set(%s, kwargs.get('', '%s')) # PARAMETERIZE" % (name, key, val))
for i in node.getchildren():
self.__walk(i, name)
def add_to_document(self, parent):
"""Adds an ``Argument`` object to this ElementTree document.
Adds an <arg> subelement to the parent element, typically <args>
and sets up its subelements with their respective text.
:param parent: An ``ET.Element`` to be the parent of a new <arg> subelement
:returns: An ``ET.Element`` object representing this argument.
"""
arg = ET.SubElement(parent, "arg")
arg.set("name", self.name)
if self.title is not None:
ET.SubElement(arg, "title").text = self.title
if self.description is not None:
ET.SubElement(arg, "description").text = self.description
if self.validation is not None:
ET.SubElement(arg, "validation").text = self.validation
# add all other subelements to this Argument, represented by (tag, text)
subelements = [
("data_type", self.data_type),
("required_on_edit", self.required_on_edit),
("required_on_create", self.required_on_create)
]
for name, value in subelements:
ET.SubElement(arg, name).text = str(value).lower()
return arg
def to_xml(self):
"""Creates an ``ET.Element`` representing self, then returns it.
:returns root, an ``ET.Element`` representing this scheme.
"""
root = ET.Element("scheme")
ET.SubElement(root, "title").text = self.title
# add a description subelement if it's defined
if self.description is not None:
ET.SubElement(root, "description").text = self.description
# add all other subelements to this Scheme, represented by (tag, text)
subelements = [
("use_external_validation", self.use_external_validation),
("use_single_instance", self.use_single_instance),
("streaming_mode", self.streaming_mode)
]
for name, value in subelements:
ET.SubElement(root, name).text = str(value).lower()
endpoint = ET.SubElement(root, "endpoint")
args = ET.SubElement(endpoint, "args")
# add arguments as subelements to the <args> element
for arg in self.arguments:
arg.add_to_document(args)
return root
def write_to(self, stream):
"""Write an XML representation of self, an ``Event`` object, to the given stream.
The ``Event`` object will only be written if its data field is defined,
otherwise a ``ValueError`` is raised.
:param stream: stream to write XML to.
"""
if self.data is None:
raise ValueError("Events must have at least the data field set to be written to XML.")
event = ET.Element("event")
if self.stanza is not None:
event.set("stanza", self.stanza)
event.set("unbroken", str(int(self.unbroken)))
# if a time isn't set, let Splunk guess by not creating a <time> element
if self.time is not None:
ET.SubElement(event, "time").text = str(self.time)
# add all other subelements to this Event, represented by (tag, text)
subelements = [
("source", self.source),
("sourcetype", self.sourceType),
("index", self.index),
("host", self.host),
("data", self.data)
]
for node, value in subelements:
if value is not None:
ET.SubElement(event, node).text = value
if self.done:
ET.SubElement(event, "done")
stream.write(ET.tostring(event))
stream.flush()