python类parse()的实例源码

extract_metadata.py 文件源码 项目:mu 作者: excamera 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def lambda_handler(self, event, context):
      # Loop through records provided by S3 Event trigger
      self.logger.info("Working on bucket-key in S3...")
      # Extract the Key and Bucket names for the asset uploaded to S3
      key = event['key']
      bucket = event['bucket']
      self.logger.info("Bucket: {} \t Key: {}".format(bucket, key))
      # Generate a signed URL for the uploaded asset
      signed_url = self.get_signed_url(self.SIGNED_URL_EXPIRATION, bucket, key)
      self.logger.info("Signed URL: {}".format(signed_url))
      # Launch MediaInfo
      # Pass the signed URL of the uploaded asset to MediaInfo as an input
      # MediaInfo will extract the technical metadata from the asset
      # The extracted metadata will be outputted in XML format and
      # stored in the variable xml_output
      xml_output = subprocess.check_output(["mediainfo", "--full", "--output=XML", signed_url])
      self.logger.info("Output: {}".format(xml_output))
      xml_json = xmltodict.parse(xml_output)
      return self.write_job_spec_to_file(xml_json, bucket, key)
crypto.py 文件源码 项目:wechat-async-sdk 作者: ihjmh 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _decrypt_message(self, msg, msg_signature, timestamp, nonce):
        """???????????????????

        :param msg: ?????POST?????
        :param msg_signature: ??????URL???msg_signature
        :param timestamp: ??????URL???timestamp
        :param nonce: ??????URL???nonce
        :return: ??????
        """
        timestamp = to_binary(timestamp)
        nonce = to_binary(nonce)
        if isinstance(msg, six.string_types):
            try:
                msg = xmltodict.parse(to_text(msg))['xml']
            except Exception as e:
                raise ParseError(e)

        encrypt = msg['Encrypt']
        signature = get_sha1_signature(self.__token, timestamp, nonce, encrypt)
        if signature != msg_signature:
            raise ValidateSignatureError()
        return self.__pc.decrypt(encrypt, self.__id)
IceCat.py 文件源码 项目:pyIceCat 作者: moonlitesolutions 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def _parse(self, xml_file):

        if xml_file:
            data = ET.parse(xml_file).getroot()
        else:
            self.log.error("Failed to retrieve suppliers")
            return False

        '''
        Data is an XML ElementTree Object
        '''
        self.id_map = {}
        self.catid = ''
        self.catname = ''

        for elem in data.iter('SupplierMapping'):
            self.mfrid = elem.attrib['supplier_id']
            self.mfrname = elem.attrib['name']
            if not self.mfrname:
                self.mfrname = "Unknown"
            self.id_map[self.mfrid] = self.mfrname
        self.log.info("Parsed {} Manufacturers from IceCat Supplier Map".format(str(len(self.id_map.keys()))))
IceCat.py 文件源码 项目:pyIceCat 作者: moonlitesolutions 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def _parse(self, xml_file):
        if xml_file.endswith('.gz'):
            with gzip.open(xml_file, 'rb') as f:
                data = ET.parse(f).getroot()
        else:
             data = ET.parse(xml_file).getroot()

        '''
        Data is an XML ElementTree Object
        '''
        self.id_map = {}
        self.catid = ''
        self.catname = ''
        self.findpath = 'Name[@langid="' + langid + '"]'
        for elem in data.iter('Category'):
            self.catid = elem.attrib['ID']
            for name in elem.iterfind(self.findpath):
                self.catname = name.attrib['Value']
                # only need one match
                break
            if not self.catname:
                self.catname = "Unknown"
            self.id_map[self.catid] = self.catname

        self.log.info("Parsed {} Categories from IceCat CategoriesList".format(str(len(self.id_map.keys()))))
IceCat.py 文件源码 项目:pyIceCat 作者: moonlitesolutions 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _parse(self, xml_file):
        self.xml_file = xml_file
        self.key_count = 0

        if not self.suppliers:
            self.suppliers = IceCatSupplierMapping(log=self.log, auth=self.auth, data_dir=self.data_dir)
        if not self.categories:
            self.categories = IceCatCategoryMapping(log=self.log, data_dir=self.data_dir, auth=self.auth)


        print("Parsing products from index file:", xml_file)
        with progressbar.ProgressBar(max_value=progressbar.UnknownLength) as self.bar:
            with open(self.xml_file, 'rb') as f:
                self.o = xmltodict.parse(f, attr_prefix='', postprocessor=self._postprocessor,
                    namespace_separator='', process_namespaces=True, namespaces=self._namespaces)
            f.closed

            # peel down to file key
            self.o = self.o['icecat-interface']['files.index']['file']
            self.log.info("Parsed {} products from IceCat catalog".format(str(len(self.o))))
        return len(self.o)
custom.py 文件源码 项目:azure-cli 作者: Azure 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def list_publish_profiles(resource_group_name, name, slot=None):
    import xmltodict

    content = _generic_site_operation(resource_group_name, name,
                                      'list_publishing_profile_xml_with_secrets', slot)
    full_xml = ''
    for f in content:
        full_xml += f.decode()

    profiles = xmltodict.parse(full_xml, xml_attribs=True)['publishData']['publishProfile']
    converted = []
    for profile in profiles:
        new = {}
        for key in profile:
            # strip the leading '@' xmltodict put in for attributes
            new[key.lstrip('@')] = profile[key]
        converted.append(new)

    return converted
test_xml_handler.py 文件源码 项目:rotest 作者: gregoil 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def validate_result(self, test, result, traceback=""):
        """Validate adding result gives the expected output.

        Args:
            test (rotest.core.case.TestCase): the test its result was added.
            result (str): result to add to the test.
            traceback (str): the traceback of the test.

        Raises:
            AssertionError. the result wasn't added as expected.
        """
        if isinstance(test, TestBlock):
            return

        result_xml_file = os.path.join(test.work_dir,
                                       XMLHandler.XML_REPORT_PATH)

        expected_xml_file = self.expected_xml_files.next()

        expected_xml = xmltodict.parse(open(expected_xml_file, "rt").read(),
                                       dict_constructor=dict)
        result_xml = xmltodict.parse(open(result_xml_file, "rt").read(),
                                     dict_constructor=dict)

        self.assertEqual(expected_xml, result_xml)
views.py 文件源码 项目:django-wechat-pay 作者: ChanMo 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def post(self, request, *args, **kwargs):
        pay = PayApi()
        data = request.body
        data = dict(xmltodict.parse(data)['xml'])
        result = {}
        sign = data['sign']
        del data['sign']
        #check_sign = wx.get_sign(data)
        if sign:
            order_id = data['out_trade_no'][10:]
            pay_number = data['transaction_id']
            result = self.handle_order(order_id, pay_number)
        else:
            result['return_code'] = 'FAIL'
            result['return_msg'] = 'ERROR'

        result_xml = pay.dict_to_xml(result)
        return HttpResponse(result_xml)
ui_properties.py 文件源码 项目:PyPlanet 作者: PyPlanet 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def refresh_properties(self):
        if self._instance.game.game == 'tm':
            method = 'Trackmania.UI.GetProperties'
        else:
            method = 'Shootmania.UI.GetProperties'
        try:
            self._raw = await self._instance.gbx(method)
            self._properties = xd.parse(self._raw['raw_1'])
        except Exception as e:
            self._properties = dict()
            self._raw = None
transport.py 文件源码 项目:goodreads-api-client-python 作者: mdzhang 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _transform_res(res, transform: str='xml'):
        if transform == 'xml':
            content = xmltodict.parse(res.text)
            return content['GoodreadsResponse']
        if transform == 'json':
            content = json.loads(res.text)
            # This is just for consistency of return values across
            # different methods in this class - the ordering is not meaningful
            return OrderedDict(content.items())
        return res.text
methods.py 文件源码 项目:plexMusicPlayer 作者: Tyzer34 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def getJsonFromPlex(url):
    response = requests.get(url)
    xml_obj = xmltodict.parse(response.text)
    json_obj = json.loads(json.dumps(xml_obj))
    return json_obj
Win32Wifi.py 文件源码 项目:win32wifi 作者: kedos 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def _parse_xml(self, xml):
        d = xmltodict.parse(xml)
        self.ssid = d['WLANProfile']['SSIDConfig']['SSID']['name']
dovseries.py 文件源码 项目:pydov 作者: DOV-Vlaanderen 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, xml_doc):
        with open(xml_doc) as fd:
            self._doc = xmltodict.parse(fd.read(), process_namespaces=True,
                                        namespaces=namespaces)
            self.peilmetingen = self._get_peilmetingen_df()
            self.observaties = self._get_observaties_df()
            self.metadata_locatie = \
                self._doc["kern:dov-schema"]["grondwaterlocatie"]
            self.metadata_filters = self._get_filter_metadata()
dovboringen.py 文件源码 项目:pydov 作者: DOV-Vlaanderen 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def parse_wfs(response, layer, version):
        """A generator to parse the response from a wfs, depending on the
        server version

        Parameters
        ----------
        response : StringIO
            The response from a wfs.getfeature() query (OWSlib)
        layer : str
            The wfs layer that is queried
        version : str
            The version of the WFS server: only '1.1.0' and '2.0.0'

        """
        if version == "1.1.0":
            # convert layer preposition to null
            layer = 'null:' + layer.split(':')[1]
            # convert the response to a dictionary
            doc = xmltodict.parse(response)
            # yield the layers of the dict
            for a in doc['wfs:FeatureCollection']['gml:featureMembers']:
                yield (a[layer])
        elif version == "2.0.0":
            # convert the response to a dictionary
            doc = xmltodict.parse(response.read())
            # yield the layers of the dict
            for a in doc['wfs:FeatureCollection']['wfs:member']:
                yield (a[layer])
yandex_news.py 文件源码 项目:sketal 作者: vk-brain 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def process_message(self, msg: Message):
        command, text = self.parse_message(msg)

        if text.lower() in self.help_words:
            return await msg.answer("??????:\n" + "\n".join(self.description) + "\n\n????????? ????:\n" +
                                    ', '.join([k.capitalize() for k in self.news.keys()]))

        url = self.news["???????"]
        if text.lower() in self.news:
            url = self.news[text]

        async with aiohttp.ClientSession() as sess:
            async with sess.get(url) as resp:
                xml = xmltodict.parse(await resp.text())

                if "rss" not in xml or "channel" not in xml["rss"] or "item" not in xml["rss"]["channel"]:
                    return await msg.answer(self.error)

                items = xml["rss"]["channel"]["item"]
                item = choice(items)

                if "title" not in item or "description" not in item:
                    return await msg.answer(self.error)

                return await msg.answer(f'?? {item["title"]}\n'
                                        f'?? {item["description"]}')
views.py 文件源码 项目:django-wechat-example 作者: wechatpy 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def preprocess_message(self, request):
        component = get_component()
        content = component.crypto.decrypt_message(
            request.body,
            request.query_params['msg_signature'],
            int(request.query_params['timestamp']),
            int(request.query_params['nonce'])
        )
        message = xmltodict.parse(to_text(content))['xml']
        cc = json.loads(json.dumps(message))
        cc['CreateTime'] = int(cc['CreateTime'])
        cc['CreateTime'] = datetime.fromtimestamp(cc['CreateTime'])
        if 'MsgId' in cc:
            cc['MsgId'] = int(cc['MsgId'])
        return cc
wxapi.py 文件源码 项目:Server 作者: malaonline 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def wx_xml2dict(xmlstr):
    return xmltodict.parse(xmlstr)['xml']
output.py 文件源码 项目:jcsclient 作者: jiocloudservices 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def display(self, response, headers, webobject=True):
        resp_json = ""
        request_id = None
        try:
            if headers and headers.get('x-jcs-request-id'):
                request_id = headers.get('x-jcs-request-id')
            elif headers and headers.get('request-id'):
                request_id = headers.get('request-id')
            if response:
                if webobject:
                    resp_dict = json.loads(response)
                else:
                    resp_dict = response
                if not request_id:
                    request_id = utils.requestid_in_response(resp_dict)
                resp_json = json.dumps(resp_dict, indent=4, sort_keys=True)
        except:
            try:
                resp_ordereddict = xmltodict.parse(response)
                resp_json = json.dumps(resp_ordereddict, indent=4,
                                       sort_keys=True)
                resp_dict = json.loads(resp_json)
                if not request_id:
                    request_id = utils.requestid_in_response(resp_dict)
                resp_json = json.dumps(resp_dict, indent=4, sort_keys=True)
                resp_json = resp_json.replace("\\n", "\n")
                resp_json = resp_json.replace("\\", "")
            except Exception as e:
                raise e
                #raise exception.UnknownOutputFormat()
        # Handle request-id displaying
        if not request_id:
            raise exception.UnknownOutputFormat()
        output_msg = resp_json
        output_msg += "\nRequest-Id: " + request_id
        print(output_msg)
lib.py 文件源码 项目:xapitodict 作者: mseri 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def load_xml_into_raw_dict(filename):
    """
    Returns a raw dict containing an xml dump
    using `xmltodict.parse`.
    """
    with open(filename) as xapi_dump:
        dump = xmltodict.parse(xapi_dump.read())
        return dump['database']


问题


面经


文章

微信
公众号

扫码关注公众号