def addTree(self, title, data, row=None, column=0, colspan=0, rowspan=0):
self.__verifyItem(self.n_trees, title, True)
self.__importAjtree()
if parseString is False:
self.warn("Unable to parse xml files. .addTree() not available")
return
xmlDoc = parseString(data)
frame = ScrollPane(
self.__getContainer(),
relief=RAISED,
borderwidth=2,
bg="white",
highlightthickness=0,
takefocus=1)
self.__positionWidget(frame, row, column, colspan, rowspan, "NSEW")
item = ajTreeData(xmlDoc.documentElement)
node = ajTreeNode(frame.getPane(), None, item)
self.n_trees[title] = node
# update() & expand() called in go() function
python类parseString()的实例源码
def parseDomainManager (self):
identifier = self.addTreeWidgetItem(self.domMgrItem, 'Identifier:', self.domManager._get_identifier())
profile = self.addTreeWidgetItem(self.domMgrItem, 'Profile:', self.domManager._get_domainManagerProfile())
self.domMgrPropsItem = self.addTreeWidgetItem(self.domMgrItem, 'Properties')
# Read the DMD file to get the SPD file, which can then be used to get the properties.
_xmlFile = self.fileMgr.open(str(self.domManager._get_domainManagerProfile()), True)
dmd = minidom.parseString(_xmlFile.read(_xmlFile.sizeOf()))
_xmlFile.close()
spdFile = dmd.getElementsByTagName('localfile')[0].getAttribute('name')
if not spdFile.startswith("/"):
spdFile = os.path.join(os.path.dirname(xmlfile), spdFile)
# Get the property name mapping.
prfFile = getPropertyFile(spdFile, self.fileMgr)
self.domMgrProps = parsePropertyFile(prfFile, self.fileMgr)
# Create entries for all of the properties.
try:
props = self.domManager.query([])
except:
props = []
self.buildPropertiesListView_old(self.domMgrPropsItem, props, self.domMgrProps)
def __init__(self, xml_string):
self.xml_unescape_table = {}
self.xml_map = {}
try:
self.xml = minidom.parseString(xml_string)
except:
print xml_string
self.xml_unescape_tabl = get_xml_unescape_table()
self.xml_map = get_xml_unescape_map()
for k, v in self.xml_map.items():
xml_string = xml_string.replace(k, v)
self.xml = minidom.parseString(xml_string)
self.bucket = get_tag_text(self.xml, 'Bucket', convert_to_bool = False)
self.object = get_tag_text(self.xml, 'Key', convert_to_bool = False)
if self.xml_map:
for k, v in self.xml_map.items():
self.object = self.object.replace(v, k)
self.object = unescape(self.object, self.xml_unescape_table)
self.key = get_tag_text(self.xml, 'Key', convert_to_bool = False)
self.upload_id = get_tag_text(self.xml, 'UploadId')
self.marker = get_tag_text(self.xml, 'Marker', convert_to_bool = False)
def __init__(self, xml_string):
try:
self.xml = minidom.parseString(xml_string)
except:
print xml_string
self.bucket = get_tag_text(self.xml, 'Bucket', convert_to_bool = False)
self.type = get_tag_text(self.xml, 'Type', convert_to_bool = False)
self.key = get_tag_text(self.xml, 'Key', convert_to_bool = False)
self.last_modified = get_tag_text(self.xml, 'LastModified', convert_to_bool = False)
self.etag = get_tag_text(self.xml, 'ETag', convert_to_bool = False)
self.content_type = get_tag_text(self.xml, 'Content-Type')
self.size = get_tag_text(self.xml, 'Size', convert_to_bool = False)
self.parts = []
parts = self.xml.getElementsByTagName('Part')
for p in parts:
self.parts.append(Part(p))
def get_broadcast_token(self, nick, uid):
""" Token required to start a broadcast.
:param nick: Client nick name
:type nick: str
:param uid: Client user identification
:type uid: str | int
:return: The broadcast token required to start a broadcast or None on failure.
:rtype: str | None
"""
_url = self._broadcast_token_url.format(self.room_name, nick, uid)
if self.is_greenroom:
_url.replace('site=tinychat', 'site=greenroom')
_response = util.web.http_get(url=_url, proxy=self.proxy)
log.debug('broadcast token response: %s' % _response)
if _response['content'] is not None:
_xml = parseString(_response['content'])
root = _xml.getElementsByTagName('response')[0]
result = root.getAttribute('result')
if result == 'PW':
return result
return root.getAttribute('token')
return None
def addTree(self, title, data, row=None, column=0, colspan=0, rowspan=0):
self.__verifyItem(self.n_trees, title, True)
self.__importAjtree()
if parseString is False:
self.warn("Unable to parse xml files. .addTree() not available")
return
xmlDoc = parseString(data)
frame = ScrollPane(
self.__getContainer(),
relief=RAISED,
borderwidth=2,
bg="white",
highlightthickness=0,
takefocus=1)
self.__positionWidget(frame, row, column, colspan, rowspan, "NSEW")
item = ajTreeData(xmlDoc.documentElement)
node = ajTreeNode(frame.getPane(), None, item)
self.n_trees[title] = node
# update() & expand() called in go() function
def get_xml(self, encoding='utf-8'):
"""
Returns the XML-code of this schedule.
The returned byte string contains the XML in the requested encoding. You save the byte sting to file in binary
mode (the file will encoded the requested encoding). Or you can convert the byte string to a string with
.decode(encoding).
:param str encoding: The encoding of the XML.
:rtype: bytes
"""
tree = Element(None)
self.generate_xml(tree)
xml_string = ElementTree.tostring(tree)
document = minidom.parseString(xml_string)
return document.toprettyxml(indent=' ', encoding=encoding)
# ------------------------------------------------------------------------------------------------------------------
def parse(self, xml, pvars):
#tmp = minidom.parseString(xml)
if sys.version_info >= (3, 0):
pl = plistlib.readPlistFromBytes(xml.encode());
else:
pl = plistlib.readPlistFromString(xml);
parsed= {}
pvars = self.getVars(pvars)
for k,v in pvars.items():
parsed[k] = pl[k] if k in pl else None
return parsed;
def _parse_igd_profile(profile_xml):
"""
Traverse the profile xml DOM looking for either
WANIPConnection or WANPPPConnection and return
the value found as well as the 'controlURL'.
"""
dom = parseString(profile_xml)
service_types = dom.getElementsByTagName('serviceType')
for service in service_types:
if _node_val(service).find('WANIPConnection') > 0 or \
_node_val(service).find('WANPPPConnection') > 0:
control_url = service.parentNode.getElementsByTagName(
'controlURL'
)[0].childNodes[0].data
upnp_schema = _node_val(service).split(':')[-2]
return control_url, upnp_schema
return False
def _parse_for_errors(soap_response):
if soap_response.status == 500:
response_data = soap_response.read()
try:
err_dom = parseString(response_data)
err_code = _node_val(err_dom.getElementsByTagName('errorCode')[0])
err_msg = _node_val(
err_dom.getElementsByTagName('errorDescription')[0]
)
except Exception, err:
logging.error("Unable to parse SOAP error: {0}, response: {1}".format(err, response_data))
return False
logging.error('SOAP request error: {0} - {1}'.format(err_code, err_msg))
raise Exception(
'SOAP request error: {0} - {1}'.format(err_code, err_msg)
)
return False
else:
return True
def test_valid_calls(endpoint, details):
"""Test all endpoints with valid calls."""
full_url = MALTEGO_SERVER + endpoint
logging.debug("Requesting %s" % full_url)
response = requests.post(
full_url,
data=build_maltego_response(details),
auth=(API_USERNAME, API_KEY),
verify=False
)
logging.debug("Response %s" % response.content)
assert response.status_code == 200
xmldoc = minidom.parseString(response.content)
messages = xmldoc.getElementsByTagName('UIMessage')
for message in messages:
value = message.attributes['MessageType'].value
assert value != 'FatalError'
def send_body(self, body):
"""Send the body
:param body:
:return: A tuple in the form
``(:class:`~xml.dom.minidom.Element`, String)``
"""
out = body.toxml()
response = \
self.request_session.post(
self.bosh_service.geturl(),
data=out)
if response.status_code == 200:
data = response.text
else:
data = ''
doc = minidom.parseString(data)
return (doc.documentElement, data)
def write(self, filename):
xml = ET.Element("phonebooks")
for book in self.phonebookList:
xml.append(book.getXML())
tree = ET.ElementTree(xml)
if False:
tree.write(filename, encoding="iso-8859-1", xml_declaration=True)
else:
rough_string = ET.tostring(tree.getroot(), encoding="iso-8859-1", method="xml")
reparsed = parseString(rough_string)
pretty = reparsed.toprettyxml(indent=" ", encoding="iso-8859-1").decode("iso-8859-1")
with open(filename, 'w', encoding="iso-8859-1") as outfile:
outfile.write(pretty)
# sid: Login session ID
# phonebookid: 0 for main phone book
# 1 for next phone book in list, etc...
def get_requestUrl(dl_url, server, **options):
""" Get the request url."""
stopWatch = stop_watch.localThreadStopWatch()
start_time = datetime.datetime.now()
stopWatch.start('get_request')
log.info( "Requesting file to download (this can take a while)..." )
# Get request id
m = utils_http.open_url(dl_url, **options)
responseStr = m.read()
dom = minidom.parseString(responseStr)
node = dom.getElementsByTagName('statusModeResponse')[0]
status = node.getAttribute('status')
if status == "2":
msg = node.getAttribute('msg')
log.error(msg)
get_req_url = None
else:
requestId = node.getAttribute('requestId')
# Get request url
get_req_url = server + '?action=getreqstatus&requestid=' + requestId
stopWatch.stop('get_request')
return get_req_url
def saveFile(filename, subdirs, filetype):
dir_path = getDirectoryPath(subdirs)
xmldom = parseString("".join(request.body))
if filename != "":
cfg = readconfig()
xmldir = cfg[filetype]
indented_xml = "".join(xmldom.toprettyxml(newl='\n'))
corrected_xml = remove_extra_newlines_char_xml(indented_xml)
print corrected_xml
with open(xmldir + dir_path + os.sep + filename, "w") as f:
f.write(corrected_xml)
output = {"success": True,
"path": filename}
else:
output = {"success": False,
"error": "Save called without a filename or content!"}
return output
def get_output_response(api_response):
if api_response is not None:
try:
output_response = parseString("".join(api_response.text))
except:
try:
JSON.loads(api_response.text)
except:
output_response = api_response.text.encode('ascii', 'ignore')
pNote("api_response Text: \n {0}".format(output_response))
else:
output_response = api_response.json()
pNote("api_response (JSON format): \n {0}".
format(JSON.dumps(output_response, indent=4)))
else:
pNote("api_response (XML format): \n {0}".
format(output_response.toprettyxml(newl='\n')))
else:
output_response = None
return output_response
def main():
""" This function basically creates the xml file by calling various other
functions, runs the file and then saves it.
"""
root = Element('data')
dir_path = os.path.dirname(os.path.realpath(sys.argv[0]))
rel_path = get_relative_path(dir_path, "data.xml")
tree = xml.etree.ElementTree.parse(rel_path)
input_root = tree.getroot()
nodes = get_firstlevel_children(input_root, "tag")
populate_xml(root, nodes)
temp_xml = 'temp.xml'
pretty_xml = minidom.parseString(xml.etree.ElementTree.tostring(root))\
.toprettyxml(indent=" ")
with open(temp_xml, "w") as config_file:
config_file.write(pretty_xml)
config_file.flush()
config_file.close()
save_file(temp_xml)
def getVmIsoList(self):
isoList = []
try:
vMdisk = self.conn.listStoragePools()
for vM in vMdisk:
pool = self.conn.storagePoolLookupByName(vM)
stgvols = pool.listVolumes()
for stgvolname in stgvols:
volData = dict()
stgvol = pool.storageVolLookupByName(stgvolname)
info = stgvol.info()
try:
volXml = stgvol.XMLDesc(0)
xml = minidom.parseString(volXml)
volData['vol_type'] = xml.getElementsByTagName('target')[0].getElementsByTagName('format')[0].getAttribute('type')
except:
volData['vol_type'] = 'unkonwn'
volData['vol_name'] = stgvol.name()
volData['vol_size'] = info[1] / 1024/ 1024/ 1024
volData['vol_available'] = info[2] / 1024/ 1024/ 1024
volData['vol_path'] = stgvol.path()
if volData['vol_type'].endswith('.iso') or volData['vol_path'].endswith('.iso'):isoList.append(volData)
return isoList
except libvirt.libvirtError:
return isoList
def delInstanceDisk(self,instance,volPath):
'''????'''
diskXml = None
raw_xml = instance.XMLDesc(0)
domXml = minidom.parseString(raw_xml)
for ds in domXml.getElementsByTagName('disk'):
try:
path = ds.getElementsByTagName('source')[0].getAttribute('file')
except:
continue
if path == volPath:diskXml = ds.toxml()
if diskXml:
try:
return instance.detachDeviceFlags(diskXml,3)
except libvirt.libvirtError,e:
return '??????????????{result}'.format(result=e.get_error_message())
else:return False
def parseSimpleTag(text,ignoreEntities=[]):
docText = u"<doc>%s</doc>" % text
xmldoc = minidom.parseString(docText.encode('utf8'))
docNode = xmldoc.childNodes[0]
text,unmergedEntities,relations = parseSimpleTag_helper(docNode,ignoreEntities=ignoreEntities)
missingSourceEntityID = [ e.sourceEntityID == '' for e in unmergedEntities ]
assert all(missingSourceEntityID) or (not any(missingSourceEntityID)), 'All entities or none (not some) should be given IDs'
assert (not any(missingSourceEntityID)) or len(relations) == 0, "Cannot include relations with no-ID entities"
if all(missingSourceEntityID):
for i,e in enumerate(unmergedEntities):
e.sourceEntityID = i+1
entities = mergeEntitiesWithMatchingIDs(unmergedEntities)
combinedData = kindred.Document(text,entities=entities,relations=relations)
return combinedData
def do_NOTIFY(self):
global needs_to_reload_zone_config
global raumfeld_handler
uuid = "uuid:"+self.path[1:]
friendly_name = RfCmd.map_udn_to_friendly_name(uuid)
content_length = int(self.headers['content-length'])
notification = self.rfile.read(content_length)
result = minidom.parseString(notification.decode('UTF-8'))
#print("\n\n#NOTIFY:\n" + result.toprettyxml())
notification_content = XmlHelper.xml_extract_dict(result,
['LastChange',
'Revision',
'SystemUpdateID',
'BufferFilled'])
if len(notification_content['LastChange']):
if '/Preferences/ZoneConfig/Rooms' in notification_content['LastChange']:
needs_to_reload_zone_config = True
last_change = minidom.parseString(notification_content['LastChange'])
uuid_store.set(uuid, friendly_name[0], friendly_name[1], last_change)
raumfeld_handler.set_subscription_values("uuid:" + self.path[1:], last_change)
#print("\n\n#NOTIFY LastChange: "+self.path+"\n"+last_change.toprettyxml())
self.send_response(200)
self.end_headers()
def get_services_from_location(location):
try:
(xml_headers, xml_data) = UpnpSoap.get(location)
if xml_data is not False:
xml_root = minidom.parseString(xml_data)
services_list = list()
for service in xml_root.getElementsByTagName("service"):
service_dict = XmlHelper.xml_extract_dict(service, ['serviceType',
'controlURL',
'eventSubURL',
'SCPDURL',
'serviceId'])
services_list.append(service_dict)
return services_list
except Exception as e:
print("Error get_subscription_urls:{0}".format(e))
return None
def _parse_cas_xml_response(cls, response_text):
cas_type = 'noResponse'
cas_data = {}
if not response_text:
return cas_type, cas_data
xml_document = parseString(response_text)
node_element = xml_document.documentElement
if node_element.nodeName != 'cas:serviceResponse':
raise Exception
for child in node_element.childNodes:
if child.nodeType != child.ELEMENT_NODE:
continue
cas_type = child.nodeName.replace("cas:", "")
cas_data = cls._parse_cas_xml_data(child)
break
return cas_type, cas_data
def get_broadcast_token(self, nick, uid):
""" Token required to start a broadcast.
:param nick: Client nick name
:type nick: str
:param uid: Client user identification
:type uid: str | int
:return: The broadcast token required to start a broadcast or None on failure.
:rtype: str | None
"""
_url = self._broadcast_token_url.format(self.room_name, nick, uid)
if self.is_greenroom:
_url.replace('site=tinychat', 'site=greenroom')
_response = util.web.http_get(url=_url, proxy=self.proxy)
log.debug('broadcast token response: %s' % _response)
if _response['content'] is not None:
_xml = parseString(_response['content'])
root = _xml.getElementsByTagName('response')[0]
result = root.getAttribute('result')
if result == 'PW':
return result
return root.getAttribute('token')
return None
def upload(self,filename, **params):
#x = flickr._prepare_params(params)
#args['api_key'] = self.__api_key
args = params
sig = flickr._get_api_sig(params=params)
args['api_key'] = flickr.API_KEY
args['api_sig'] = sig
args['auth_token'] = flickr.userToken()
f = file(filename, 'rb')
photo_data = f.read()
f.close()
# now make a "files" array to pass to uploader
files = [('photo', filename, photo_data)]
response = post_multipart('api.flickr.com', '/services/upload/', args, files)
# use get data since error checking is handled by it already
data = flickr._get_data(minidom.parseString(response))
photo = flickr.Photo(data.rsp.photoid.text)
return photo
def module_run(self):
filename = self.options['filename']
with codecs.open(filename, 'wb', encoding='utf-8') as outfile:
# build a list of table names
tables = [x.strip() for x in self.options['tables'].split(',')]
data_dict = {}
cnt = 0
for table in tables:
data_dict[table] = []
columns = [x[0] for x in self.get_columns(table)]
rows = self.query('SELECT "%s" FROM "%s" ORDER BY 1' % ('", "'.join(columns), table))
for row in rows:
row_dict = {}
for i in range(0,len(columns)):
row_dict[columns[i]] = row[i]
data_dict[table].append(row_dict)
cnt += 1
# write the xml to a file
reparsed = parseString(dicttoxml(data_dict))
outfile.write(reparsed.toprettyxml(indent=' '*4))
self.output('%d records added to \'%s\'.' % (cnt, filename))
7_6_search_amazon_for_books.py 文件源码
项目:Python-Network-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def search_for_books(tag, index):
"""Search Amazon for Books """
amazon = bottlenose.Amazon(ACCESS_KEY, SECRET_KEY, AFFILIATE_ID)
results = amazon.ItemSearch(
SearchIndex = index,
Sort = "relevancerank",
Keywords = tag
)
parsed_result = xml.parseString(results)
all_items = []
attrs = ['Title','Author', 'URL']
for item in parsed_result.getElementsByTagName('Item'):
parse_item = {}
for attr in attrs:
parse_item[attr] = ""
try:
parse_item[attr] = item.getElementsByTagName(attr)[0].childNodes[0].data
except:
pass
all_items.append(parse_item)
return all_items
def gen_xml(rowList):
items = Element('items')
for row in rowList:
item = SubElement(items, 'item')
item.set('autocomplete', row.get('autocomplete') or '')
item.set('uid', row.get('uid') or '')
item.set('arg', row.get('title') or '')
item.set('valid', row.get('valid') or '')
title = SubElement(item, 'title')
title.text = row.get('title') or ''
subtitle = SubElement(item, 'subtitle')
subtitle.text = row.get('subtitle') or ''
icon = SubElement(item, 'icon')
icon.text = row.get('icon')
tree = minidom.parseString(etree.tostring(items))
return tree.toxml()
def glyphUpdateFromSVG(g, svgCode):
doc = xmlparseString(svgCode)
svg = doc.documentElement
paths = findPathNodes(svg)
if len(paths) == 0:
raise Exception('no <path> found in SVG')
path = paths[0]
if len(paths) != 1:
for p in paths:
id = p.getAttribute('id')
if id is not None and id.find('stroke') == -1:
path = p
break
tr = nodeTranslation(path)
d = path.getAttribute('d')
g.clearContours()
drawSVGPath(g, d, tr)
def svgGetPaths(svgCode):
doc = xmlparseString(svgCode)
svg = doc.documentElement
paths = findPathNodes(svg)
isFigmaSVG = svgCode.find('Figma</desc>') != -1
if len(paths) == 0:
return paths, (0,0)
paths2 = []
for path in paths:
id = path.getAttribute('id')
if not isFigmaSVG or (id is None or id.find('stroke') == -1):
tr = nodeTranslation(path)
d = path.getAttribute('d')
paths2.append((d, tr))
return paths2, isFigmaSVG