def _handle_response(self, resp, force_list=None):
"""Ensure response is successful and return body as XML."""
self._debuglog("Request executed: " + str(resp.status_code))
if resp.status_code != 200:
return None
if resp.headers["Content-Type"] != "text/xml":
# JSON requests not currently supported
return None
self._debuglog("Response Text: " + resp.text)
data = xmltodict.parse(resp.content, force_list=force_list)['QDocRoot']
auth_passed = data['authPassed']
if auth_passed is not None and len(auth_passed) == 1 and auth_passed == "0":
self._session_error = True
return None
return data
python类parse()的实例源码
def preprocess_message(self, request):
'''
????????
'''
component = get_component()
content = component.crypto.decrypt_message(
request.body,
request.query_params['msg_signature'],
int(request.query_params['timestamp']),
int(request.query_params['nonce'])
)
message = xmltodict.parse(to_text(content))['xml']
cc = json.loads(json.dumps(message))
cc['CreateTime'] = int(cc['CreateTime'])
cc['CreateTime'] = datetime.fromtimestamp(cc['CreateTime'])
if 'MsgId' in cc:
cc['MsgId'] = int(cc['MsgId'])
return cc
def web_response_to_json(response):
"""
Modify the web response output to json format
param response: response object from requests library
return: json object representing the response content
"""
try:
if response:
resp_dict = json.loads(response.content)
except:
try:
resp_ordereddict = xmltodict.parse(response.content)
resp_json = json.dumps(resp_ordereddict, indent=4,
sort_keys=True)
resp_dict = json.loads(resp_json)
except:
raise exception.UnknownOutputFormat()
return resp_dict
def working_directory(directory):
owd = os.getcwd()
try:
os.chdir(directory)
yield directory
finally:
os.chdir(owd)
# def extractXML(project_dir, apk_location):
# """
# Parses AndroidManifest file and returns a dictionary object
# :param project_dir: Project Location
# :param apk_location: Apk location
# :return: Parsed AndroidManifest Dictionary
# """
# with working_directory(project_dir):
# subprocess.check_output(["./gradlew", "assembleRelease"])
# with working_directory("/tmp"):
# subprocess.call(["apktool", "d", apk_location])
# with working_directory("/tmp" + "/app-release/"):
# with open("AndroidManifest.xml") as fd:
# obj_file = xmltodict.parse(fd.read())
# return ast.literal_eval(json.dumps(obj_file))
def extractXML(apk_location,config_location):
"""
@param project_dir:
@param apk_location:
@return:
"""
with working_directory("/tmp"):
subprocess.call(["apktool", "d", apk_location])
config = ConfigParser.ConfigParser()
config.read(config_location)
app_name = "app-external-release"
temp = config.get("APP_NAME","app_flavor_name")
if temp != None:
app_name = temp
with working_directory("/tmp/" + app_name):
with open("AndroidManifest.xml") as fd:
obj_file = xmltodict.parse(fd.read())
return ast.literal_eval(json.dumps(obj_file))
def calcAvgPrecision(p, gt):
precision = 0
rcnt = 0
#parse ordered predictions
for i in range(0, len(p)):
#relevant species
if p[i][0] in gt:
rcnt += 1
precision += float(rcnt) / (i + 1)
#precision by relevant species from ground truth
avgp = precision / len(gt)
return avgp
def neighbor_discover_regex(self):
'''
regex to parse output of discovery_command
looking for:
device_device
device_ip
device_ipv6
device_model
local_interface
device_interface
device_version
'''
return "Device ID: (?P<device_device>[\w\d\_\-\.]+)[\W\w]+?\n"\
"\s+IP [Aa]ddress: (?P<device_ip>[0-9\.]+)\n" \
"(?:\s+IPv6 address: (?P<device_ipv6>[a-z0-9\:]+)(?:\s+\(global unicast\)\n)?)?" \
"[\n\W\w]*?" \
"Platform:\s*[Cc]isco\s(?P<device_model>[\w\d\-\_\.]+)[\W\w\s]+?\n" \
"Interface: (?P<local_interface>[A-Za-z0-9/\-]+)" \
".*: (?P<device_interface>[A-Za-z0-9/\-]+)\n" \
"[\n\W\w\S\s]*?" \
"Version.*\n" \
"(?P<device_version>[\w\W]+?)\n"
def neighbor_discover_regex(self):
'''
regex to parse output of discovery_command
looking for:
device_device
device_ip
device_ipv6
device_model
local_interface
device_interface
device_version
'''
return "Device ID:(?P<device_name>[\w\d\_\-\.]+)[\W\w]+?\n"\
"\s+IPv4 [Aa]ddress: (?P<device_ip>[0-9\.]+)\n" \
"(?:\s+IPv6 [Aa]ddress: (?!fe80)(?P<device_ipv6>[a-z0-9\:]+)\n)?" \
"[\n\W\w]*?" \
"Platform:\s*(?P<device_model>[\w\d\-\_\.]+)[\W\w\s]+?\n" \
"Interface: (?P<local_interface>[A-Za-z0-9/]+)" \
".*: (?P<device_interface>[A-Za-z0-9/\-]+)\n" \
"[\n\W\w\S\s]*?" \
"Version.*\n" \
"(?P<device_version>[\w\W]+?)\n"
def load_gt(xml_file):
res = []
with open(xml_file) as f:
xml = dict(xmltodict.parse(f.read())['annotation'])
try:
obj = xml['object']
except KeyError:
print "xml {} has no objects.".format(xml_file)
return np.asarray(res)
if type(obj) is not list:
boxes = [obj]
else:
boxes = obj
for box in boxes:
track_id = str(box['trackid'])
bbox = map(int, [box['bndbox']['xmin'],
box['bndbox']['ymin'],
box['bndbox']['xmax'],
box['bndbox']['ymax'],
track_id])
res.append(bbox)
return np.asarray(res)
def mock_urlopen(request, cafile=None):
response = {}
url = request.get_full_url()
try:
data = xmltodict.parse(request.data)
except:
data = {}
try:
if url == 'https://api.sofort.com/api/xml':
if 'transaction_request' in data:
if 'transaction' in data['transaction_request']:
if data['transaction_request']['transaction'] == '123-abc-received':
response = TEST_RESPONSES['123-abc-received']
elif data['transaction_request']['transaction'] == '123-abc-loss':
response = TEST_RESPONSES['123-abc-loss']
except KeyError:
response = False
result = MockResponse(response)
else:
result = MockResponse(response)
result.headers.update({'Content-type': 'application/xml; charset=UTF-8'})
result.headers.update({'Accept': 'application/xml; charset=UTF-8'})
return result
def get_density_map(data_name, save_data, show_image = False):
assert(data_name.endswith(".xml"))
#xml_data = data_name + '.xml'
with open(data_name) as xml_d:
doc = xmltodict.parse(xml_d.read())
img = np.zeros((240,352), np.float32)
def add_to_image(image, bbox):
xmin = int(bbox['xmin'])
ymin = int(bbox['ymin'])
xmax = int(bbox['xmax'])
ymax = int(bbox['ymax'])
density = 1/ float((ymax - ymin) * (xmax - xmin))
image[ymin:ymax, xmin:xmax] += density
return image
for vehicle in doc['annotation']['vehicle']:
add_to_image(img, vehicle['bndbox'])
if show_image:
show_mask(img, data_name.replace('xml','jpg'))
if save_data:
img.tofile(data_name.replace('xml','desmap'))
def xml_to_comparable_dict(xml):
def _sort_key(value):
"""Recursively sort lists embedded within dicts."""
if hasattr(value, 'items'):
return six.text_type(sorted([(k, _sort_key(v)) for k, v in value.items()]))
elif isinstance(value, (tuple, set, list)):
return six.text_type(sorted(value, key=_sort_key))
else:
return six.text_type(value)
def _unorder(value):
"""Convert from a `collections.OrderedDict` to a `dict` with predictably sorted lists."""
if hasattr(value, 'items'):
return {k: _unorder(v) for k, v in value.items()}
elif isinstance(value, (tuple, set, list)):
return sorted(tuple(_unorder(v) for v in value), key=_sort_key)
else:
return value
return _unorder(xmltodict.parse(xml))
def parse_bracketed(s):
'''Parse word features [abc=... def = ...]
Also manages to parse out features that have XML within them
'''
word = None
attrs = {}
temp = {}
# Substitute XML tags, to replace them later
for i, tag in enumerate(re.findall(r"(<[^<>]+>.*<\/[^<>]+>)", s)):
temp["^^^%d^^^" % i] = tag
s = s.replace(tag, "^^^%d^^^" % i)
# Load key-value pairs, substituting as necessary
for attr, val in re.findall(r"([^=\s]*)=([^\s]*)", s):
if val in temp:
val = remove_escapes(temp[val])
if attr == 'Text':
word = remove_escapes(val)
else:
attrs[attr] = remove_escapes(val)
return (word, attrs)
def toDict(self, body):
root = xmltodict.parse(body, attr_prefix="")
def walker(d):
if not isinstance(d, list) and not isinstance(d, OrderedDict):
return
for key, val in d.items():
if isinstance(val, list):
for val2 in val:
walker(val2)
elif isinstance(val, OrderedDict):
walker(val)
elif val is None:
d[key] = OrderedDict()
elif val == "false":
d[key] = False
elif val == "true":
d[key] = True
walker(root)
return root
def call_api(device_ip, token, resource, xml_attribs=True):
headers = {}
if token is not None:
headers = {'__RequestVerificationToken': token}
try:
r = requests.get(url='http://' + device_ip + resource, headers=headers, allow_redirects=False, timeout=(2.0,2.0))
except requests.exceptions.RequestException as e:
print ("Error: "+str(e))
return False;
if r.status_code == 200:
d = xmltodict.parse(r.text, xml_attribs=xml_attribs)
if 'error' in d:
raise Exception('Received error code ' + d['error']['code'] + ' for URL ' + r.url)
return d
else:
raise Exception('Received status code ' + str(r.status_code) + ' for URL ' + r.url)
def test_getcomps_comps(self):
RAW_XML = ""
with open('./testdata/get_comps.xml', 'r') as f:
RAW_XML = ''.join(f.readlines())
data = xmltodict.parse(RAW_XML)
comps = data.get('Comps:comps')['response']['properties']['comparables']['comp']
comp_places = []
for datum in comps:
place = Place()
place.set_data(datum)
comp_places.append(place)
self.assertEqual(10, len(comp_places))
def decode_json(self):
"""
Decode the message and convert to a standard JSON dictionary.
:returns: a string that contains the converted JSON document.
"""
if not self.decoded_json_cache:
cls = self.__class__
d = self.decode()
try:
import xmltodict
if "Msg" in d:
d["Msg"] = xmltodict.parse(d["Msg"])
except ImportError:
pass
self.decoded_json_cache = json.dumps(d, cls=SuperEncoder)
return self.decoded_json_cache
def _dispatch(self, xml):
logger.info('post xml:%s', xml)
# ?????????
d = xmltodict.parse(xml)
# ?????????????????
root = d['xml']
msg_type = root['MsgType']
meth = getattr(self, 'on_%s' % msg_type, None)
r = meth(root)
logger.info('return: %s', r)
return r
def get_shelves(gr_user_id, goodreads_key):
"""Pulls user's shelves out of their user info."""
user = requests.get('https://www.goodreads.com/user/show.xml?key=$%s&v=2&id=%s' % (goodreads_key, gr_user_id))
user_info = xmltodict.parse(user.content)
# initialize user shelf dictionary
shelves = {}
# extract user shelves: name, id, book count; eventually this should save to DB
for user_shelf in user_info['GoodreadsResponse']['user']['user_shelves']['user_shelf']:
shelf_name = user_shelf['name']
shelf_id = user_shelf['id']['#text']
num_of_books = int(user_shelf['book_count']['#text'])
num_pages = (num_of_books/200) + 1
shelves[shelf_name] = {'id': shelf_id, 'item_count': num_of_books, 'pages': num_pages}
return shelves
def get_books_from_shelves(shelves, goodreads_key):
"""Takes in dictionary of user's shelves; returns list of all books on shelves.
Return list: books stored in tuples: (shelf name, book info)."""
all_books = []
for shelf in shelves.keys():
pages = shelves[shelf]['pages']
for page in range(1,pages+1):
shelf_response = requests.get('https://www.goodreads.com/review/list.xml?key=$%s&v=2&id=%s&shelf=%s&per_page=200&page=%d'
% (goodreads_key, gr_user_id, shelf, page))
parsed_shelf = xmltodict.parse(shelf_response.content)
for book in parsed_shelf['GoodreadsResponse']['reviews']['review']:
all_books.append((shelf, book))
return all_books
##### ADD BOOK ####
def fetch_book_data():
"""Based on book's Goodreads ID, fetch language & original publication year.
Uses GR method book.show; saves data to library table."""
to_update = Book.query.filter((Book.language.is_(None)) | (Book.original_pub_year.is_(None))).all()
# for book in need_language:
for book in to_update:
response = requests.get("https://www.goodreads.com/book/show/%s?key=%s&format=xml" % (book.goodreads_bid, goodreads_key))
parsed_response = xmltodict.parse(response.content)
book_info = parsed_response['GoodreadsResponse']
book.original_pub_year = int(book_info['book']['work']['original_publication_year']['#text'])
book.language = book_info['book']['language_code']
db.session.add(book)
db.session.commit()
return
###### ADD USERBOOK ######
def fetch_book_data():
"""Based on book's Goodreads ID, fetch language & original publication year.
Uses GR method book.show; saves data to library table."""
to_update = Book.query.filter((Book.language.is_(None)) | (Book.original_pub_year.is_(None))).all()
for book in to_update:
response = requests.get("https://www.goodreads.com/book/show/%s?key=%s&format=xml" % (book.goodreads_bid, goodreads_key))
parsed_response = xmltodict.parse(response.content)
book_info = parsed_response['GoodreadsResponse']
book.original_pub_year = int(book_info['book']['work']['original_publication_year']['#text'])
book.language = book_info['book']['language_code']
db.session.add(book)
db.session.commit()
###################################
# FUNCTION CALLS
# connect_to_db(app)
# fetch_book_metadata()
def parser(self, response):
if response:
match = re.findall(r'API limit', response.text) #
if match:
self.error_log.error_log('Sorry API Limit reached (300 q/month). Get new: https://totalhash.cymru.com/contact-us/',
self.station_name)
return []
xml_dict = xmltodict.parse(response.text)
if 'response' in xml_dict:
if 'result' in xml_dict['response']:
if 'doc' in xml_dict['response']['result']:
try:
response = xml_dict['response']['result'] # Unpacking xmltodict
for records in response['doc']:
self.hash_list.append(records['str']['#text']) # Append hash_values
except: pass
return self.hash_list # Return empty
def woeid_search(query):
"""
Find the first Where On Earth ID for the given query. Result is the etree
node for the result, so that location data can still be retrieved. Returns
None if there is no result, or the woeid field is empty.
"""
glados.log('woeid_search for: "{}"'.format(query))
query = urllib.parse.quote('select * from geo.places where text="{}"'.format(query))
query = 'http://query.yahooapis.com/v1/public/yql?q=' + query
glados.log('Request: {}'.format(query))
body = urllib.request.urlopen(query).read()
parsed = xmltodict.parse(body).get('query')
results = parsed.get('results')
if results is None or results.get('place') is None:
return None
if type(results.get('place')) is list:
return results.get('place')[0]
return results.get('place')
def get_woeid(self, message, user):
if user == '':
user = message.author.name
try:
woeid = self.woeid_db[user.lower()]
query = urllib.parse.quote('select * from weather.forecast where woeid="{}" and u=\'c\''.format(woeid))
query = 'http://query.yahooapis.com/v1/public/yql?q=' + query
glados.log('Request: {}'.format(query))
body = urllib.request.urlopen(query).read()
parsed = xmltodict.parse(body).get('query')
results = parsed.get('results')
if results is None:
await self.client.send_message(message.channel, 'Couldn\'t look up location. The WOEID of {} is: {}'.format(user, woeid))
return
location = results.get('channel').get('title')
await self.client.send_message(message.channel, 'Location of {} is {}'.format(user, location))
except KeyError:
await self.client.send_message(message.channel, 'No location set. You can use .setlocation to set one')
def _transform_yang_to_dict(yang_model_string):
class Opts(object):
def __init__(self, yin_canonical=False, yin_pretty_strings=True):
self.yin_canonical = yin_canonical
self.yin_pretty_strings = yin_pretty_strings
ctx = Context(FileRepository())
yang_mod = ctx.add_module('yang', yang_model_string, format='yang')
yin = YINPlugin()
modules = []
modules.append(yang_mod)
ctx.opts = Opts()
yin_string = StringIO()
yin.emit(ctx=ctx, modules=modules, fd=yin_string)
xml = yin_string.getvalue()
return xmltodict.parse(xml)
def import_user_xml(xml_file='', _user=None):
'''
Import an XML file created by the ShakeCast workbook; Users
Args:
xml_file (string): The filepath to the xml_file that will be uploaded
_user (int): User id of admin making inventory changes
Returns:
dict: a dictionary that contains information about the function run
::
data = {'status': either 'finished' or 'failed',
'message': message to be returned to the UI,
'log': message to be added to ShakeCast log
and should contain info on error}
'''
with open(xml_file, 'r') as xml_str:
user_xml_dict = json.loads(json.dumps(xmltodict.parse(xml_str)))
user_list = user_xml_dict['UserTable']['UserRow']
if isinstance(user_list, list) is False:
user_list = [user_list]
data = import_user_dicts(user_list, _user)
return data
def determine_xml(xml_file=''):
'''
Determine what type of XML file this is; facility, group,
user, master, or unknown
'''
tree = ET.parse(xml_file)
root = tree.getroot()
xml_type = ''
if 'FacilityTable' in str(root):
xml_type = 'facility'
elif 'GroupTable' in str(root):
xml_type = 'group'
elif 'UserTable' in str(root):
xml_type = 'user'
elif 'Inventory' in str(root):
xml_type = 'master'
else:
xml_type = 'unknown'
return xml_type
def submit_file(self, file_obj, filename="sample"):
"""Submits a file to WildFire for analysis
Args:
file_obj (file): The file to send
filename (str): An optional filename
Returns:
dict: Analysis results
Raises:
WildFireException: If an API error occurs
"""
url = "{0}{1}".format(self.api_root, "/submit/file")
data = dict(apikey=self.api_key)
files = dict(file=(filename, file_obj))
response = self.session.post(url, data=data, files=files)
return xmltodict.parse(response.text)['wildfire']['upload-file-info']
def submit_remote_file(self, url):
"""Submits a file from a remote URL for analysis
Args:
url (str): The URL where the file is located
Returns:
dict: Analysis results
Raises:
WildFireException: If an API error occurs
Notes:
This is for submitting files located at remote URLs, not web pages.
See Also:
submit_urls(self, urls)
"""
request_url = "{0}{1}".format(self.api_root, "/submit/url")
data = dict(apikey=self.api_key, url=url)
response = self.session.post(request_url, data=data)
return xmltodict.parse(response.text)['wildfire']['upload-file-info']