python类parse()的实例源码

quoine.py 文件源码 项目:bitex 作者: nlsdfnbch 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def sign(self, uri, endpoint, endpoint_path, method_verb, *args, **kwargs):
        try:
            params = kwargs['params']
        except KeyError:
            params = {}

        if method_verb != 'POST':
            endpoint_path += urllib.parse.urlencode(params)
        msg = {'path': endpoint_path, 'nonce': self.nonce(), 'token_id': self.key}

        signature = jwt.encode(msg, self.secret, algorithm='HS256')
        headers = {'X-Quoine-API-Version': '2', 'X-Quoine-Auth': signature,
                   'Content-Type': 'application/json'}
        request = {'headers': headers}
        if method_verb == 'POST':
            request['json'] = params
        return self.uri + endpoint_path, request
helpers.py 文件源码 项目:bitcoin-arbitrage 作者: ucfyao 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def httpPost(url,resource,params):
    headers = {
        "Content-type" : "application/x-www-form-urlencoded",
    }
    try :
        conn = httplib.HTTPSConnection(url, timeout=10)
        temp_params = urllib.parse.urlencode(params)
        conn.request("POST", resource, temp_params, headers)
        response = conn.getresponse()
        data = response.read().decode('utf-8')
        params.clear()
        conn.close()
        return data
    except:
    # except Exception,e:  
        # print(Exception,":",e)
        traceback.print_exc()
        return False
jenkins.py 文件源码 项目:bob 作者: BobBuildTool 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def doJenkinsSetUrl(recipes, argv):
    parser = argparse.ArgumentParser(prog="bob jenkins set-url")
    parser.add_argument("name", help="Jenkins server alias")
    parser.add_argument("url", help="New URL")
    args = parser.parse_args(argv)

    if args.name not in BobState().getAllJenkins():
        print("Jenkins '{}' not known.".format(args.name), file=sys.stderr)
        sys.exit(1)

    url = urllib.parse.urlparse(args.url)
    urlPath = url.path
    if not urlPath.endswith("/"): urlPath = urlPath + "/"

    config = BobState().getJenkinsConfig(args.name)
    config["url"] = {
        "scheme" : url.scheme,
        "server" : url.hostname,
        "port" : url.port,
        "path" : urlPath,
        "username" : url.username,
        "password" : url.password,
    }
    BobState().setJenkinsConfig(args.name, config)
language.py 文件源码 项目:Jumper-Cogs 作者: Redjumpman 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def dict_search_args_parse(self, message):
        if not message:
            await self.bot.say("Error in arg parse")
            return
        limit = 1
        query = message
        result = re.match(r"^([0-9]+)\s+(.*)$", message)
        if result:
            limit, query = [result.group(x) for x in (1, 2)]
        return int(limit), query
        # keys = ["limit"]
        # kwargs = utils.get_kwargs(args, keys)
        # try:
        #     limit = int(kwargs["limit"])
        #     if limit <= 0:
        #         raise ValueError
        # except (ValueError, KeyError):
        #     limit = 1
        # query = utils.strip_kwargs(args, keys)
html2text.py 文件源码 项目:nstock 作者: ybenitezf 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def dumb_css_parser(data):
    """returns a hash of css selectors, each of which contains a hash of css attributes"""
    # remove @import sentences
    data += ';'
    importIndex = data.find('@import')
    while importIndex != -1:
        data = data[0:importIndex] + data[data.find(';', importIndex) + 1:]
        importIndex = data.find('@import')

    # parse the css. reverted from dictionary compehension in order to support older pythons
    elements =  [x.split('{') for x in data.split('}') if '{' in x.strip()]
    try:
        elements = dict([(a.strip(), dumb_property_dict(b)) for a, b in elements])
    except ValueError:
        elements = {} # not that important

    return elements
ACIS.py 文件源码 项目:IM_Climate 作者: IMDProjects 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _call_ACIS(self, kwargs, **moreKwargs):
        '''
        Core method for calling the ACIS services.

        Returns python dictionary by de-serializing json response
        '''
        #self._formatInputDict(**kwargs)
        kwargs.update(moreKwargs)
        self._input_dict = self._stripNoneValues(kwargs)
        self.url = self.baseURL + self.webServiceSource
        if pyVersion == 2:      #python 2.x
            params = urllib.urlencode({'params':json.dumps(self._input_dict)})
            request = urllib2.Request(self.url, params, {'Accept':'application/json'})
            response = urllib2.urlopen(request)
            jsonData = response.read()
        elif pyVersion == 3:    #python 3.x
            params = urllib.parse.urlencode({'params':json.dumps(self._input_dict)})
            params = params.encode('utf-8')
            req = urllib.request.urlopen(self.url, data = params)
            jsonData = req.read().decode()
        return json.loads(jsonData)
postcards_random.py 文件源码 项目:postcards 作者: abertschi 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _fetch_img_urls(self, keyword, safe_search=False):
        # bing img search, https://gist.github.com/stephenhouser/c5e2b921c3770ed47eb3b75efbc94799

        url = self._get_bing_url(keyword, safe_search=safe_search)
        self.logger.debug('search url {}'.format(url))

        header = {
            'User-Agent': "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) "
                          "Chrome/43.0.2357.134 Safari/537.36"}

        soup = BeautifulSoup(urllib.request.urlopen(urllib.request.Request(url, headers=header)), 'html.parser')
        imgs = []  # contains the link for Large original images, type of  image
        for a in soup.find_all("a", {"class": "iusc"}):
            mad = json.loads(a["mad"])
            turl = mad["turl"]
            m = json.loads(a["m"])
            murl = m["murl"]

            image_name = urllib.parse.urlsplit(murl).path.split("/")[-1]
            imgs.append((image_name, turl, murl))

        return imgs
variable.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, var):
        #: The original string that comes through with the variable
        self.original = var
        #: The operator for the variable
        self.operator = ''
        #: List of safe characters when quoting the string
        self.safe = ''
        #: List of variables in this variable
        self.variables = []
        #: List of variable names
        self.variable_names = []
        #: List of defaults passed in
        self.defaults = {}
        # Parse the variable itself.
        self.parse()
        self.post_parse()
__init__.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, credentials, host, request_uri, headers, response, content, http):
        from urllib.parse import urlencode
        Authentication.__init__(self, credentials, host, request_uri, headers, response, content, http)
        challenge = _parse_www_authenticate(response, 'www-authenticate')
        service = challenge['googlelogin'].get('service', 'xapi')
        # Bloggger actually returns the service in the challenge
        # For the rest we guess based on the URI
        if service == 'xapi' and  request_uri.find("calendar") > 0:
            service = "cl"
        # No point in guessing Base or Spreadsheet
        #elif request_uri.find("spreadsheets") > 0:
        #    service = "wise"

        auth = dict(Email=credentials[0], Passwd=credentials[1], service=service, source=headers['user-agent'])
        resp, content = self.http.request("https://www.google.com/accounts/ClientLogin", method="POST", body=urlencode(auth), headers={'Content-Type': 'application/x-www-form-urlencoded'})
        lines = content.split('\n')
        d = dict([tuple(line.split("=", 1)) for line in lines if line])
        if resp.status == 403:
            self.Auth = ""
        else:
            self.Auth = d['Auth']
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testGet301(self):
        # Test that we automatically follow 301 redirects
        # and that we cache the 301 response
        uri = urllib.parse.urljoin(base, "301/onestep.asis")
        destination = urllib.parse.urljoin(base, "302/final-destination.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertTrue('content-location' in response)
        self.assertEqual(response['content-location'], destination)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 301)
        self.assertEqual(response.previous.fromcache, False)

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(response['content-location'], destination)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 301)
        self.assertEqual(response.previous.fromcache, True)
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testGet302RedirectionLimit(self):
        # Test that we can set a lower redirection limit
        # and that we raise an exception when we exceed
        # that limit.
        self.http.force_exception_to_status_code = False

        uri = urllib.parse.urljoin(base, "302/twostep.asis")
        try:
            (response, content) = self.http.request(uri, "GET", redirections = 1)
            self.fail("This should not happen")
        except httplib2.RedirectLimit:
            pass
        except Exception as e:
            self.fail("Threw wrong kind of exception ")

        # Re-run the test with out the exceptions
        self.http.force_exception_to_status_code = True

        (response, content) = self.http.request(uri, "GET", redirections = 1)
        self.assertEqual(response.status, 500)
        self.assertTrue(response.reason.startswith("Redirected more"))
        self.assertEqual("302", response['status'])
        self.assertTrue(content.startswith(b"<html>"))
        self.assertTrue(response.previous != None)
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testGet302NoLocation(self):
        # Test that we throw an exception when we get
        # a 302 with no Location: header.
        self.http.force_exception_to_status_code = False
        uri = urllib.parse.urljoin(base, "302/no-location.asis")
        try:
            (response, content) = self.http.request(uri, "GET")
            self.fail("Should never reach here")
        except httplib2.RedirectMissingLocation:
            pass
        except Exception as e:
            self.fail("Threw wrong kind of exception ")

        # Re-run the test with out the exceptions
        self.http.force_exception_to_status_code = True

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 500)
        self.assertTrue(response.reason.startswith("Redirected but"))
        self.assertEqual("302", response['status'])
        self.assertTrue(content.startswith(b"This is content"))
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def testGet304(self):
        # Test that we use ETags properly to validate our cache
        uri = urllib.parse.urljoin(base, "304/test_etag.txt")
        (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
        self.assertNotEqual(response['etag'], "")

        (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity'})
        (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'cache-control': 'must-revalidate'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, True)

        cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1]))
        f = open(cache_file_name, "r")
        status_line = f.readline()
        f.close()

        self.assertTrue(status_line.startswith("status:"))

        (response, content) = self.http.request(uri, "HEAD", headers = {'accept-encoding': 'identity'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, True)

        (response, content) = self.http.request(uri, "GET", headers = {'accept-encoding': 'identity', 'range': 'bytes=0-0'})
        self.assertEqual(response.status, 206)
        self.assertEqual(response.fromcache, False)
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testGet307(self):
        # Test that we do follow 307 redirects but
        # do not cache the 307
        uri = urllib.parse.urljoin(base, "307/onestep.asis")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 307)
        self.assertEqual(response.previous.fromcache, False)

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, True)
        self.assertEqual(content, b"This is the final destination.\n")
        self.assertEqual(response.previous.status, 307)
        self.assertEqual(response.previous.fromcache, False)
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testNoVary(self):
        pass
        # when there is no vary, a different Accept header (e.g.) should not
        # impact if the cache is used
        # test that the vary header is not sent
        # uri = urllib.parse.urljoin(base, "vary/no-vary.asis")
        # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
        # self.assertEqual(response.status, 200)
        # self.assertFalse('vary' in response)
        #
        # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
        # self.assertEqual(response.status, 200)
        # self.assertEqual(response.fromcache, True, msg="Should be from cache")
        #
        # (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/html'})
        # self.assertEqual(response.status, 200)
        # self.assertEqual(response.fromcache, True, msg="Should be from cache")
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testVaryHeaderDouble(self):
        uri = urllib.parse.urljoin(base, "vary/accept-double.asis")
        (response, content) = self.http.request(uri, "GET", headers={
            'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
        self.assertEqual(response.status, 200)
        self.assertTrue('vary' in response)

        # we are from cache
        (response, content) = self.http.request(uri, "GET", headers={
            'Accept': 'text/plain', 'Accept-Language': 'da, en-gb;q=0.8, en;q=0.7'})
        self.assertEqual(response.fromcache, True, msg="Should be from cache")

        (response, content) = self.http.request(uri, "GET", headers={'Accept': 'text/plain'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, False)

        # get the resource again, not from cache, varied headers don't match exact
        (response, content) = self.http.request(uri, "GET", headers={'Accept-Language': 'da'})
        self.assertEqual(response.status, 200)
        self.assertEqual(response.fromcache, False, msg="Should not be from cache")
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testGetGZipFailure(self):
        # Test that we raise a good exception when the gzip fails
        self.http.force_exception_to_status_code = False
        uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis")
        try:
            (response, content) = self.http.request(uri, "GET")
            self.fail("Should never reach here")
        except httplib2.FailedToDecompressContent:
            pass
        except Exception:
            self.fail("Threw wrong kind of exception")

        # Re-run the test with out the exceptions
        self.http.force_exception_to_status_code = True

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 500)
        self.assertTrue(response.reason.startswith("Content purported"))
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def testGetDeflateFailure(self):
        # Test that we raise a good exception when the deflate fails
        self.http.force_exception_to_status_code = False

        uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis")
        try:
            (response, content) = self.http.request(uri, "GET")
            self.fail("Should never reach here")
        except httplib2.FailedToDecompressContent:
            pass
        except Exception:
            self.fail("Threw wrong kind of exception")

        # Re-run the test with out the exceptions
        self.http.force_exception_to_status_code = True

        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 500)
        self.assertTrue(response.reason.startswith("Content purported"))
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testBasicAuth(self):
        # Test Basic Authentication
        uri = urllib.parse.urljoin(base, "basic/file.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 401)

        uri = urllib.parse.urljoin(base, "basic/")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 401)

        self.http.add_credentials('joe', 'password')
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)

        uri = urllib.parse.urljoin(base, "basic/file.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testBasicAuthTwoDifferentCredentials(self):
        # Test Basic Authentication with multiple sets of credentials
        uri = urllib.parse.urljoin(base, "basic2/file.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 401)

        uri = urllib.parse.urljoin(base, "basic2/")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 401)

        self.http.add_credentials('fred', 'barney')
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)

        uri = urllib.parse.urljoin(base, "basic2/file.txt")
        (response, content) = self.http.request(uri, "GET")
        self.assertEqual(response.status, 200)
httplib2test.py 文件源码 项目:httplib2 作者: httplib2 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testDigestAuthStale(self):
        # Test that we can handle a nonce becoming stale
        uri = urllib.parse.urljoin(base, "digest-expire/file.txt")
        self.http.add_credentials('joe', 'password')
        (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
        info = httplib2._parse_www_authenticate(response, 'authentication-info')
        self.assertEqual(response.status, 200)

        time.sleep(3)
        # Sleep long enough that the nonce becomes stale

        (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"})
        self.assertFalse(response.fromcache)
        self.assertTrue(response._stale_digest)
        info3 = httplib2._parse_www_authenticate(response, 'authentication-info')
        self.assertEqual(response.status, 200)
gushiwen.py 文件源码 项目:sc_spider 作者: wings27 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def parse_songci(self, response):
        item = SongCiItem()
        item['url'] = response.url
        full_title = response.css('div.son1>h1::text').extract_first()
        if full_title:
            try:
                item['tune_name'], item['title'] = full_title.split('·')
            except ValueError:
                item['title'] = full_title

        son2_p = response.css('div.son2>p')
        for p in son2_p:
            for name, field in {'??': 'dynasty', '??': 'author'}.items():
                if name in p.css('::text').extract_first():
                    item[field] = p.css('::text').extract()[1]
        content = ''.join(response.css('div#cont::text').extract()).strip()
        if content:
            item['content'] = content
        else:
            all_p_texts = son2_p.css('::text').extract()
            try:
                item['content'] = '\n'.join(all_p_texts[all_p_texts.index('???') + 1:]).strip()
            except ValueError:
                self.logger.error('Cannot parse item. url=%s', response.url)
        yield item
warequest.py 文件源码 项目:whatsapp-rest-webservice 作者: svub 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sendGetRequest(self, parser = None):
        self.response = None
        params =  self.params#[param.items()[0] for param in self.params];

        parser = parser or self.parser or ResponseParser()

        headers = dict(list({"User-Agent":self.getUserAgent(),
                "Accept": parser.getMeta()
            }.items()) + list(self.headers.items()));

        host,port,path = self.getConnectionParameters()

        self.response = WARequest.sendRequest(host, port, path, headers, params, "GET")

        if not self.response.status == WARequest.OK:
            logger.error("Request not success, status was %s"%self.response.status)
            return {}

        data = self.response.read()
        logger.info(data)

        self.sent = True
        return parser.parse(data.decode(), self.pvars)
warequest.py 文件源码 项目:whatsapp-rest-webservice 作者: svub 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def sendPostRequest(self, parser = None):
        self.response = None
        params =  self.params #[param.items()[0] for param in self.params];

        parser = parser or self.parser or ResponseParser()

        headers = dict(list({"User-Agent":self.getUserAgent(),
                "Accept": parser.getMeta(),
                "Content-Type":"application/x-www-form-urlencoded"
            }.items()) + list(self.headers.items()))

        host,port,path = self.getConnectionParameters()
        self.response = WARequest.sendRequest(host, port, path, headers, params, "POST")


        if not self.response.status == WARequest.OK:
            logger.error("Request not success, status was %s" % self.response.status)
            return {}

        data = self.response.read()

        logger.info(data)

        self.sent = True
        return parser.parse(data.decode(), self.pvars)
TestAuthentication.py 文件源码 项目:Securitas 作者: ArrenH 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setUp(self):
        # the URLs for now which will have the WSDL files and the XSD file
        #urlparse.urljoin('file:', urllib.pathname2url(os.path.abspath("service.xml")))
        import urllib
        import os
        from urllib.parse import urlparse
        from urllib.request import pathname2url

        query_services_url = urllib.parse.urljoin('file:', pathname2url(os.path.abspath('../wsdl_files/vipuserservices-query-1.7.wsdl')))
        userservices_url = urllib.parse.urljoin('file:', pathname2url(os.path.abspath('../wsdl_files/vipuserservices-auth-1.7.wsdl')))

        # initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file
        query_services_client = Client(query_services_url,
                                       transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))
        user_services_client = Client(userservices_url,
                                      transport=HTTPSClientCertTransport('vip_certificate.crt', 'vip_certificate.crt'))

        self.test_user_services_object = SymantecUserServices(user_services_client)
TestUserManagement.py 文件源码 项目:Securitas 作者: ArrenH 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def setUp(self):
        # the URLs for now which will have the WSDL files and the XSD file
        import urllib
        import os
        from urllib.parse import urlparse
        from urllib.request import pathname2url

        managementservices_url = urllib.parse.urljoin('file:', pathname2url(
            os.path.abspath('../wsdl_files/vipuserservices-mgmt-1.7.wsdl')))
        # managementservices_url = 'http://webdev.cse.msu.edu/~huynhall/vipuserservices-mgmt-1.7.wsdl'

        # initializing the Suds clients for each url, with the client certificate youll have in the same dir as this file
        self.management_client = Client(managementservices_url,
                                        transport=HTTPSClientCertTransport('vip_certificate.crt',
                                                                           'vip_certificate.crt'))

        self.test_management_services_object = SymantecManagementServices(self.management_client)
        pass
__init__.py 文件源码 项目:DASS 作者: JulesMichael 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __creatBlocks(self):
        """
        Second part of parsing. Find blocks and creat a list.
        """
        w = list(zip(self.lines, self.indentationList))
        self.blocks, indentation, level = "[", 0, 0
        for i in w:
            if i[1] > indentation:
                level = level + 1
                self.blocks += ",[" + '"' + urllib.parse.quote_plus(i[0]) + '"'
            elif i[1] == 0:
                if len(self.blocks) > 1:
                    self.blocks += "]" * (level) + ','
                self.blocks += '"' + urllib.parse.quote_plus(i[0]) + '"'
                level = 0
            elif i[1] < indentation:
                if w.index(i) != len(w):
                    self.blocks += "]" + "," + '"' + \
                        urllib.parse.quote_plus(i[0]) + '"'
                    level += -1
            elif i[1] == indentation:
                self.blocks += "," + '"' + urllib.parse.quote_plus(i[0]) + '"'
            indentation = i[1]
        self.blocks += "]" * (level + 1)
        self.blocks = ast.literal_eval(self.blocks)
feedparser.py 文件源码 项目:SublimeRSS 作者: JaredMHall 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _makeSafeAbsoluteURI(base, rel=None):
    # bail if ACCEPTABLE_URI_SCHEMES is empty
    if not ACCEPTABLE_URI_SCHEMES:
        return _urljoin(base, rel or '')
    if not base:
        return rel or ''
    if not rel:
        try:
            scheme = urllib.parse.urlparse(base)[0]
        except ValueError:
            return ''
        if not scheme or scheme in ACCEPTABLE_URI_SCHEMES:
            return base
        return ''
    uri = _urljoin(base, rel)
    if uri.strip().split(':', 1)[0] not in ACCEPTABLE_URI_SCHEMES:
        return ''
    return uri
feedparser.py 文件源码 项目:SublimeRSS 作者: JaredMHall 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def http_error_401(self, req, fp, code, msg, headers):
        # Check if
        # - server requires digest auth, AND
        # - we tried (unsuccessfully) with basic auth, AND
        # If all conditions hold, parse authentication information
        # out of the Authorization header we sent the first time
        # (for the username and password) and the WWW-Authenticate
        # header the server sent back (for the realm) and retry
        # the request with the appropriate digest auth headers instead.
        # This evil genius hack has been brought to you by Aaron Swartz.
        host = urllib.parse.urlparse(req.get_full_url())[1]
        if base64 is None or 'Authorization' not in req.headers \
                          or 'WWW-Authenticate' not in headers:
            return self.http_error_default(req, fp, code, msg, headers)
        auth = _base64decode(req.headers['Authorization'].split(' ')[1])
        user, passw = auth.split(':')
        realm = re.findall('realm="([^"]*)"', headers['WWW-Authenticate'])[0]
        self.add_password(realm, host, user, passw)
        retry = self.http_error_auth_reqed('www-authenticate', host, req, headers)
        self.reset_retry_count()
        return retry
feedparser.py 文件源码 项目:SublimeRSS 作者: JaredMHall 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _convert_to_idn(url):
    """Convert a URL to IDN notation"""
    # this function should only be called with a unicode string
    # strategy: if the host cannot be encoded in ascii, then
    # it'll be necessary to encode it in idn form
    parts = list(urllib.parse.urlsplit(url))
    try:
        parts[1].encode('ascii')
    except UnicodeEncodeError:
        # the url needs to be converted to idn notation
        host = parts[1].rsplit(':', 1)
        newhost = []
        port = ''
        if len(host) == 2:
            port = host.pop()
        for h in host[0].split('.'):
            newhost.append(h.encode('idna').decode('utf-8'))
        parts[1] = '.'.join(newhost)
        if port:
            parts[1] += ':' + port
        return urllib.parse.urlunsplit(parts)
    else:
        return url


问题


面经


文章

微信
公众号

扫码关注公众号