python类unquote_plus()的实例源码

proxylib.py 文件源码 项目:xxNet 作者: drzorm 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def filter(self, handler):
        path = urlparse.urlsplit(handler.path).path
        if path.startswith('/'):
            path = urllib.unquote_plus(path.lstrip('/') or '.').decode('utf8')
            if os.path.isdir(path):
                index_file = os.path.join(path, self.index_file)
                if not os.path.isfile(index_file):
                    content = self.format_index_html(path).encode('UTF-8')
                    headers = {'Content-Type': 'text/html; charset=utf-8', 'Connection': 'close'}
                    return 'mock', {'status': 200, 'headers': headers, 'body': content}
                else:
                    path = index_file
            if os.path.isfile(path):
                content_type = 'application/octet-stream'
                try:
                    import mimetypes
                    content_type = mimetypes.types_map.get(os.path.splitext(path)[1])
                    if os.path.splitext(path)[1].endswith(('crt', 'pem')):
                        content_type = 'application/x-x509-ca-cert'
                except StandardError as e:
                    logging.error('import mimetypes failed: %r', e)
                with open(path, 'rb') as fp:
                    content = fp.read()
                    headers = {'Connection': 'close', 'Content-Type': content_type}
                    return 'mock', {'status': 200, 'headers': headers, 'body': content}
urls.py 文件源码 项目:reddit-service-ads-tracking 作者: reddit 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _encode_query(query):
    """
    `urlparse.parse_qsl` and `urllib.encodeurl` modify
    blank query values so we had to roll our own.
    """
    kvps = urllib.unquote_plus(query).split("&")
    encoded_pairs = []

    for kvp in kvps:
        if "=" not in kvp:
            encoded_pairs.append(urllib.quote_plus(kvp))
        else:
            key, value = kvp.split("=")
            encoded_pairs.append("%s=%s" % (
                urllib.quote_plus(key),
                urllib.quote_plus(value)
            ))

    return "&".join(encoded_pairs)
addon.py 文件源码 项目:plugin.video.rtbfauvio 作者: Gaet81 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_params():
    param = {}
    if len(sys.argv) < 3:
        return {}
    paramstring = sys.argv[2]
    if len(paramstring) >= 2:
        params = sys.argv[2]
        cleanedparams = params.replace('?', '')
        if (params[len(params) - 1] == '/'):
            params = params[0:len(params) - 2]
        xbmc.log(str(cleanedparams),xbmc.LOGDEBUG)
        pairsofparams = cleanedparams.split('&')
        xbmc.log(str(pairsofparams),xbmc.LOGDEBUG)
        param = {}
        for i in range(len(pairsofparams)):
            splitparams = {}
            splitparams = pairsofparams[i].split('=')
            if (len(splitparams)) == 2:
                try:
                    param[splitparams[0]] = urllib.unquote_plus(splitparams[1])
                except:
                    pass
    return param
uri_parser.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
uri_parser.py 文件源码 项目:kekescan 作者: xiaoxiaoleo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
stream2chromecast.py 文件源码 项目:CodeLabs 作者: TheIoTLearningInitiative 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def do_GET(self):

        query = self.path.split("?",1)[-1]
        filepath = urllib.unquote_plus(query)

        self.suppress_socket_error_report = None

        self.send_headers(filepath)       

        print "sending data"      
        try: 
            self.write_response(filepath)
        except socket.error, e:     
            if isinstance(e.args, tuple):
                if e[0] in (errno.EPIPE, errno.ECONNRESET):
                   print "disconnected"
                   self.suppress_socket_error_report = True
                   return

            raise
GetAndResizeImages.py 文件源码 项目:ecs-refarch-batch-processing 作者: awslabs 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def process_images():
    """Process the image

    No real error handling in this sample code. In case of error we'll put
    the message back in the queue and make it visable again. It will end up in
    the dead letter queue after five failed attempts.

    """
    for message in get_messages_from_sqs():
        try:
            message_content = json.loads(message.body)
            image = urllib.unquote_plus(message_content
                                        ['Records'][0]['s3']['object']
                                        ['key']).encode('utf-8')
            s3.download_file(input_bucket_name, image, image)
            resize_image(image)
            upload_image(image)
            cleanup_files(image)
        except:
            message.change_visibility(VisibilityTimeout=0)
            continue
        else:
            message.delete()
test_urllib.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_unquoting(self):
        # Make sure unquoting of all ASCII values works
        escape_list = []
        for num in range(128):
            given = hexescape(chr(num))
            expect = chr(num)
            result = urllib.unquote(given)
            self.assertEqual(expect, result,
                             "using unquote(): %s != %s" % (expect, result))
            result = urllib.unquote_plus(given)
            self.assertEqual(expect, result,
                             "using unquote_plus(): %s != %s" %
                             (expect, result))
            escape_list.append(given)
        escape_string = ''.join(escape_list)
        del escape_list
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using quote(): not all characters escaped; %s" %
                         result)
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using unquote(): not all characters escaped: "
                         "%s" % result)
LinkedinSpider.py 文件源码 项目:spiders 作者: poodarchu 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def parse(self, response):
        """
        default parse method, rule is not useful now
        """
        # import pdb; pdb.set_trace()
        response = response.replace(url=HtmlParser.remove_url_parameter(response.url))
        hxs = HtmlXPathSelector(response)
        index_level = self.determine_level(response)
        log.msg("Parse: index level:" + str(index_level))
        if index_level in [1, 2, 3, 4]:
            self.save_to_file_system(index_level, response)
            relative_urls = self.get_follow_links(index_level, hxs)
            if relative_urls is not None:
                for url in relative_urls:
                    log.msg('yield process, url:' + url)
                    yield Request(url, callback=self.parse)
        elif index_level == 5:
            personProfile = HtmlParser.extract_person_profile(hxs)
            linkedin_id = self.get_linkedin_id(response.url)
            linkedin_id = UnicodeDammit(urllib.unquote_plus(linkedin_id)).markup
            if linkedin_id:
                personProfile['_id'] = linkedin_id
                personProfile['url'] = UnicodeDammit(response.url).markup
                yield personProfile
goslate.py 文件源码 项目:CVProject 作者: hieuxinhe94 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _translate_single_text(self, text, target_language, source_lauguage):
        assert _is_bytes(text)
        def split_text(text):
            start = 0
            text = quote_plus(text)
            length = len(text)
            while (length - start) > self._MAX_LENGTH_PER_QUERY:
                for seperator in self._SEPERATORS:
                    index = text.rfind(seperator, start, start+self._MAX_LENGTH_PER_QUERY)
                    if index != -1:
                        break
                else:
                    raise Error('input too large')
                end = index + len(seperator)
                yield unquote_plus(text[start:end])
                start = end

            yield unquote_plus(text[start:])

        def make_task(text):
            return lambda: self._basic_translate(text, target_language, source_lauguage)[0]

        results = list(self._execute(make_task(i) for i in split_text(text)))
        return tuple(''.join(i[n] for i in results) for n in range(len(self._writing)))
test_urllib.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_unquoting(self):
        # Make sure unquoting of all ASCII values works
        escape_list = []
        for num in range(128):
            given = hexescape(chr(num))
            expect = chr(num)
            result = urllib.unquote(given)
            self.assertEqual(expect, result,
                             "using unquote(): %s != %s" % (expect, result))
            result = urllib.unquote_plus(given)
            self.assertEqual(expect, result,
                             "using unquote_plus(): %s != %s" %
                             (expect, result))
            escape_list.append(given)
        escape_string = ''.join(escape_list)
        del escape_list
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using quote(): not all characters escaped; %s" %
                         result)
        result = urllib.unquote(escape_string)
        self.assertEqual(result.count('%'), 1,
                         "using unquote(): not all characters escaped: "
                         "%s" % result)
web_utils.py 文件源码 项目:lalascan 作者: blackye 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def argument_query(query_str):
    if query_str and query_str.startswith('?'):
        warn("You don't need to use a leading '?' when setting the query"
         " string, this may be an error!", stacklevel=3)
    if not query_str:
        query_params = {}
    else:
        try:
            # much faster than parse_qsl()
            query_params = dict(( map(unquote_plus, (to_utf8(token) + '=').split('=', 2)[:2])
                                  for token in query_str.split('&') ))
            if len(query_params) == 1 and not query_params.values()[0]:
                query_params = {}
            else:
                query = None
        except Exception:
            ##raise   # XXX DEBUG
            query_params = {}
    return query_params


#------------------------------------------------------------------------------
web_utils.py 文件源码 项目:lalascan 作者: blackye 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def query(self, query):
        if query and query.startswith('?'):
            warn("You don't need to use a leading '?' when setting the query"
                 " string, this may be an error!", stacklevel=3)
        if not query:
            query_params = {}
        else:
            try:
                # much faster than parse_qsl()
                query_params = dict(( map(unquote_plus, (to_utf8(token) + '=').split('=', 2)[:2])
                                      for token in query.split('&') ))
                if len(query_params) == 1 and not query_params.values()[0]:
                    query_params = {}
                else:
                    query = None
            except Exception:
                ##raise   # XXX DEBUG
                query_params = {}
        self.__query, self.__query_params = query, query_params
web_utils.py 文件源码 项目:lalascan 作者: blackye 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def netloc(self, netloc):
        if '@' in netloc:
            auth, host = netloc.split('@', 1)
        else:
            auth, host = None, netloc
        port = ''
        if host and host[0] == '[':
            host, port = host[1:].split(']', 1)
            if ':' in port:
                _host, port = port.split(':', 1)
                if not host:
                    host = _host
        elif ':' in host:
            host, port = host.split(':', 1)
        if '%' in port:
            port = unquote(port)
        if port:
            port = int(port)
        if host:
            host = unquote_plus(host)
        self.auth = auth  # TODO: roll back changes if it fails
        self.host = host
        self.port = port
api.py 文件源码 项目:dancedeets-monorepo 作者: mikelambert 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _initialize(self, request):
        self.response.headers.add_header('Access-Control-Allow-Headers', 'Content-Type')

        # We use _initialize instead of webapp2's initialize, so that exceptions can be caught easily
        self.fbl = fb_api.FBLookup(None, None)

        if self.request.body:
            logging.info("Request body: %r", self.request.body)
            escaped_body = urllib.unquote_plus(self.request.body.strip('='))
            self.json_body = json.loads(escaped_body)
            logging.info("json_request: %r", self.json_body)
        else:
            self.json_body = None

        if self.requires_auth or self.supports_auth:
            if self.json_body.get('access_token'):
                access_token = self.json_body.get('access_token')
                self.fb_uid = get_user_id_for_token(access_token)
                self.fbl = fb_api.FBLookup(self.fb_uid, access_token)
                logging.info("Access token for user ID %s", self.fb_uid)
            elif self.requires_auth:
                self.add_error("Needs access_token parameter")
CommonFunctions.py 文件源码 项目:script.reddit.reader 作者: gedisony 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def getParameters(parameterString):
    log("", 5)
    commands = {}
    if getXBMCVersion() >= 12.0:
        parameterString = urllib.unquote_plus(parameterString)
    splitCommands = parameterString[parameterString.find('?') + 1:].split('&')

    for command in splitCommands:
        if (len(command) > 0):
            splitCommand = command.split('=')
            key = splitCommand[0]
            try:
                value = splitCommand[1].encode("utf-8")
            except:
                log("Error utf-8 encoding argument value: " + repr(splitCommand[1]))
                value = splitCommand[1]

            commands[key] = value

    log(repr(commands), 5)
    return commands
utils.py 文件源码 项目:script.reddit.reader 作者: gedisony 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def prettify_reddit_query(subreddit_entry):
    #for search queries; make the reddit query string presentable

    if subreddit_entry.startswith('?'):
        #log('************ prettify_reddit_query='+subreddit_entry)
        tbn=subreddit_entry.split('/')[-1]
        tbn=urllib.unquote_plus(tbn)

        tbn=tbn.replace('?q=','[LIGHT]search:[/LIGHT]' )
        tbn=tbn.replace('site:','' )
        tbn=tbn.replace('&sort=','[LIGHT] sort by:[/LIGHT]' )
        tbn=tbn.replace('&t=','[LIGHT] from:[/LIGHT]' )
        tbn=tbn.replace('subreddit:','r/' )
        tbn=tbn.replace('author:','[LIGHT] by:[/LIGHT]' )
        tbn=tbn.replace('&restrict_sr=on','' )
        tbn=tbn.replace('&restrict_sr=','' )
        tbn=tbn.replace('nsfw:no','' )
        tbn=tbn.replace('nsfw:yes','nsfw' )
        #log('************ prettify_reddit_query='+tbn)
        return tbn
    else:
        return subreddit_entry
uri_parser.py 文件源码 项目:flask-zhenai-mongo-echarts 作者: Fretice 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd
uri_parser.py 文件源码 项目:flask-zhenai-mongo-echarts 作者: Fretice 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _parse_options(opts, delim):
    """Helper method for split_options which creates the options dict.
    Also handles the creation of a list for the URI tag_sets/
    readpreferencetags portion."""
    options = {}
    for opt in opts.split(delim):
        key, val = opt.split("=")
        if key.lower() == 'readpreferencetags':
            options.setdefault('readpreferencetags', []).append(val)
        else:
            # str(option) to ensure that a unicode URI results in plain 'str'
            # option names. 'normalized' is then suitable to be passed as
            # kwargs in all Python versions.
            if str(key) in options:
                warnings.warn("Duplicate URI option %s" % (str(key),))
            options[str(key)] = unquote_plus(val)

    # Special case for deprecated options
    if "wtimeout" in options:
        if "wtimeoutMS" in options:
            options.pop("wtimeout")
        warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
                      " instead")

    return options
uri_parser.py 文件源码 项目:Data-visualization 作者: insta-code1 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def parse_userinfo(userinfo):
    """Validates the format of user information in a MongoDB URI.
    Reserved characters like ':', '/', '+' and '@' must be escaped
    following RFC 2396.

    Returns a 2-tuple containing the unescaped username followed
    by the unescaped password.

    :Paramaters:
        - `userinfo`: A string of the form <username>:<password>

    .. versionchanged:: 2.2
       Now uses `urllib.unquote_plus` so `+` characters must be escaped.
    """
    if '@' in userinfo or userinfo.count(':') > 1:
        raise InvalidURI("':' or '@' characters in a username or password "
                         "must be escaped according to RFC 2396.")
    user, _, passwd = _partition(userinfo, ":")
    # No password is expected with GSSAPI authentication.
    if not user:
        raise InvalidURI("The empty string is not valid username.")
    user = unquote_plus(user)
    passwd = unquote_plus(passwd)

    return user, passwd


问题


面经


文章

微信
公众号

扫码关注公众号