def lambda_handler(self, event, context):
# Loop through records provided by S3 Event trigger
self.logger.info("Working on bucket-key in S3...")
# Extract the Key and Bucket names for the asset uploaded to S3
key = event['key']
bucket = event['bucket']
self.logger.info("Bucket: {} \t Key: {}".format(bucket, key))
# Generate a signed URL for the uploaded asset
signed_url = self.get_signed_url(self.SIGNED_URL_EXPIRATION, bucket, key)
self.logger.info("Signed URL: {}".format(signed_url))
# Launch MediaInfo
# Pass the signed URL of the uploaded asset to MediaInfo as an input
# MediaInfo will extract the technical metadata from the asset
# The extracted metadata will be outputted in XML format and
# stored in the variable xml_output
xml_output = subprocess.check_output(["mediainfo", "--full", "--output=XML", signed_url])
self.logger.info("Output: {}".format(xml_output))
xml_json = xmltodict.parse(xml_output)
return self.write_job_spec_to_file(xml_json, bucket, key)
python类parse()的实例源码
def _decrypt_message(self, msg, msg_signature, timestamp, nonce):
"""???????????????????
:param msg: ?????POST?????
:param msg_signature: ??????URL???msg_signature
:param timestamp: ??????URL???timestamp
:param nonce: ??????URL???nonce
:return: ??????
"""
timestamp = to_binary(timestamp)
nonce = to_binary(nonce)
if isinstance(msg, six.string_types):
try:
msg = xmltodict.parse(to_text(msg))['xml']
except Exception as e:
raise ParseError(e)
encrypt = msg['Encrypt']
signature = get_sha1_signature(self.__token, timestamp, nonce, encrypt)
if signature != msg_signature:
raise ValidateSignatureError()
return self.__pc.decrypt(encrypt, self.__id)
def _parse(self, xml_file):
if xml_file:
data = ET.parse(xml_file).getroot()
else:
self.log.error("Failed to retrieve suppliers")
return False
'''
Data is an XML ElementTree Object
'''
self.id_map = {}
self.catid = ''
self.catname = ''
for elem in data.iter('SupplierMapping'):
self.mfrid = elem.attrib['supplier_id']
self.mfrname = elem.attrib['name']
if not self.mfrname:
self.mfrname = "Unknown"
self.id_map[self.mfrid] = self.mfrname
self.log.info("Parsed {} Manufacturers from IceCat Supplier Map".format(str(len(self.id_map.keys()))))
def _parse(self, xml_file):
if xml_file.endswith('.gz'):
with gzip.open(xml_file, 'rb') as f:
data = ET.parse(f).getroot()
else:
data = ET.parse(xml_file).getroot()
'''
Data is an XML ElementTree Object
'''
self.id_map = {}
self.catid = ''
self.catname = ''
self.findpath = 'Name[@langid="' + langid + '"]'
for elem in data.iter('Category'):
self.catid = elem.attrib['ID']
for name in elem.iterfind(self.findpath):
self.catname = name.attrib['Value']
# only need one match
break
if not self.catname:
self.catname = "Unknown"
self.id_map[self.catid] = self.catname
self.log.info("Parsed {} Categories from IceCat CategoriesList".format(str(len(self.id_map.keys()))))
def _parse(self, xml_file):
self.xml_file = xml_file
self.key_count = 0
if not self.suppliers:
self.suppliers = IceCatSupplierMapping(log=self.log, auth=self.auth, data_dir=self.data_dir)
if not self.categories:
self.categories = IceCatCategoryMapping(log=self.log, data_dir=self.data_dir, auth=self.auth)
print("Parsing products from index file:", xml_file)
with progressbar.ProgressBar(max_value=progressbar.UnknownLength) as self.bar:
with open(self.xml_file, 'rb') as f:
self.o = xmltodict.parse(f, attr_prefix='', postprocessor=self._postprocessor,
namespace_separator='', process_namespaces=True, namespaces=self._namespaces)
f.closed
# peel down to file key
self.o = self.o['icecat-interface']['files.index']['file']
self.log.info("Parsed {} products from IceCat catalog".format(str(len(self.o))))
return len(self.o)
def list_publish_profiles(resource_group_name, name, slot=None):
import xmltodict
content = _generic_site_operation(resource_group_name, name,
'list_publishing_profile_xml_with_secrets', slot)
full_xml = ''
for f in content:
full_xml += f.decode()
profiles = xmltodict.parse(full_xml, xml_attribs=True)['publishData']['publishProfile']
converted = []
for profile in profiles:
new = {}
for key in profile:
# strip the leading '@' xmltodict put in for attributes
new[key.lstrip('@')] = profile[key]
converted.append(new)
return converted
def validate_result(self, test, result, traceback=""):
"""Validate adding result gives the expected output.
Args:
test (rotest.core.case.TestCase): the test its result was added.
result (str): result to add to the test.
traceback (str): the traceback of the test.
Raises:
AssertionError. the result wasn't added as expected.
"""
if isinstance(test, TestBlock):
return
result_xml_file = os.path.join(test.work_dir,
XMLHandler.XML_REPORT_PATH)
expected_xml_file = self.expected_xml_files.next()
expected_xml = xmltodict.parse(open(expected_xml_file, "rt").read(),
dict_constructor=dict)
result_xml = xmltodict.parse(open(result_xml_file, "rt").read(),
dict_constructor=dict)
self.assertEqual(expected_xml, result_xml)
def post(self, request, *args, **kwargs):
pay = PayApi()
data = request.body
data = dict(xmltodict.parse(data)['xml'])
result = {}
sign = data['sign']
del data['sign']
#check_sign = wx.get_sign(data)
if sign:
order_id = data['out_trade_no'][10:]
pay_number = data['transaction_id']
result = self.handle_order(order_id, pay_number)
else:
result['return_code'] = 'FAIL'
result['return_msg'] = 'ERROR'
result_xml = pay.dict_to_xml(result)
return HttpResponse(result_xml)
def refresh_properties(self):
if self._instance.game.game == 'tm':
method = 'Trackmania.UI.GetProperties'
else:
method = 'Shootmania.UI.GetProperties'
try:
self._raw = await self._instance.gbx(method)
self._properties = xd.parse(self._raw['raw_1'])
except Exception as e:
self._properties = dict()
self._raw = None
def _transform_res(res, transform: str='xml'):
if transform == 'xml':
content = xmltodict.parse(res.text)
return content['GoodreadsResponse']
if transform == 'json':
content = json.loads(res.text)
# This is just for consistency of return values across
# different methods in this class - the ordering is not meaningful
return OrderedDict(content.items())
return res.text
def getJsonFromPlex(url):
response = requests.get(url)
xml_obj = xmltodict.parse(response.text)
json_obj = json.loads(json.dumps(xml_obj))
return json_obj
def _parse_xml(self, xml):
d = xmltodict.parse(xml)
self.ssid = d['WLANProfile']['SSIDConfig']['SSID']['name']
def __init__(self, xml_doc):
with open(xml_doc) as fd:
self._doc = xmltodict.parse(fd.read(), process_namespaces=True,
namespaces=namespaces)
self.peilmetingen = self._get_peilmetingen_df()
self.observaties = self._get_observaties_df()
self.metadata_locatie = \
self._doc["kern:dov-schema"]["grondwaterlocatie"]
self.metadata_filters = self._get_filter_metadata()
def parse_wfs(response, layer, version):
"""A generator to parse the response from a wfs, depending on the
server version
Parameters
----------
response : StringIO
The response from a wfs.getfeature() query (OWSlib)
layer : str
The wfs layer that is queried
version : str
The version of the WFS server: only '1.1.0' and '2.0.0'
"""
if version == "1.1.0":
# convert layer preposition to null
layer = 'null:' + layer.split(':')[1]
# convert the response to a dictionary
doc = xmltodict.parse(response)
# yield the layers of the dict
for a in doc['wfs:FeatureCollection']['gml:featureMembers']:
yield (a[layer])
elif version == "2.0.0":
# convert the response to a dictionary
doc = xmltodict.parse(response.read())
# yield the layers of the dict
for a in doc['wfs:FeatureCollection']['wfs:member']:
yield (a[layer])
def process_message(self, msg: Message):
command, text = self.parse_message(msg)
if text.lower() in self.help_words:
return await msg.answer("??????:\n" + "\n".join(self.description) + "\n\n????????? ????:\n" +
', '.join([k.capitalize() for k in self.news.keys()]))
url = self.news["???????"]
if text.lower() in self.news:
url = self.news[text]
async with aiohttp.ClientSession() as sess:
async with sess.get(url) as resp:
xml = xmltodict.parse(await resp.text())
if "rss" not in xml or "channel" not in xml["rss"] or "item" not in xml["rss"]["channel"]:
return await msg.answer(self.error)
items = xml["rss"]["channel"]["item"]
item = choice(items)
if "title" not in item or "description" not in item:
return await msg.answer(self.error)
return await msg.answer(f'?? {item["title"]}\n'
f'?? {item["description"]}')
def preprocess_message(self, request):
component = get_component()
content = component.crypto.decrypt_message(
request.body,
request.query_params['msg_signature'],
int(request.query_params['timestamp']),
int(request.query_params['nonce'])
)
message = xmltodict.parse(to_text(content))['xml']
cc = json.loads(json.dumps(message))
cc['CreateTime'] = int(cc['CreateTime'])
cc['CreateTime'] = datetime.fromtimestamp(cc['CreateTime'])
if 'MsgId' in cc:
cc['MsgId'] = int(cc['MsgId'])
return cc
def wx_xml2dict(xmlstr):
return xmltodict.parse(xmlstr)['xml']
def display(self, response, headers, webobject=True):
resp_json = ""
request_id = None
try:
if headers and headers.get('x-jcs-request-id'):
request_id = headers.get('x-jcs-request-id')
elif headers and headers.get('request-id'):
request_id = headers.get('request-id')
if response:
if webobject:
resp_dict = json.loads(response)
else:
resp_dict = response
if not request_id:
request_id = utils.requestid_in_response(resp_dict)
resp_json = json.dumps(resp_dict, indent=4, sort_keys=True)
except:
try:
resp_ordereddict = xmltodict.parse(response)
resp_json = json.dumps(resp_ordereddict, indent=4,
sort_keys=True)
resp_dict = json.loads(resp_json)
if not request_id:
request_id = utils.requestid_in_response(resp_dict)
resp_json = json.dumps(resp_dict, indent=4, sort_keys=True)
resp_json = resp_json.replace("\\n", "\n")
resp_json = resp_json.replace("\\", "")
except Exception as e:
raise e
#raise exception.UnknownOutputFormat()
# Handle request-id displaying
if not request_id:
raise exception.UnknownOutputFormat()
output_msg = resp_json
output_msg += "\nRequest-Id: " + request_id
print(output_msg)
def load_xml_into_raw_dict(filename):
"""
Returns a raw dict containing an xml dump
using `xmltodict.parse`.
"""
with open(filename) as xapi_dump:
dump = xmltodict.parse(xapi_dump.read())
return dump['database']