def filter(self, handler):
path = urlparse.urlsplit(handler.path).path
if path.startswith('/'):
path = urllib.unquote_plus(path.lstrip('/') or '.').decode('utf8')
if os.path.isdir(path):
index_file = os.path.join(path, self.index_file)
if not os.path.isfile(index_file):
content = self.format_index_html(path).encode('UTF-8')
headers = {'Content-Type': 'text/html; charset=utf-8', 'Connection': 'close'}
return 'mock', {'status': 200, 'headers': headers, 'body': content}
else:
path = index_file
if os.path.isfile(path):
content_type = 'application/octet-stream'
try:
import mimetypes
content_type = mimetypes.types_map.get(os.path.splitext(path)[1])
if os.path.splitext(path)[1].endswith(('crt', 'pem')):
content_type = 'application/x-x509-ca-cert'
except StandardError as e:
logging.error('import mimetypes failed: %r', e)
with open(path, 'rb') as fp:
content = fp.read()
headers = {'Connection': 'close', 'Content-Type': content_type}
return 'mock', {'status': 200, 'headers': headers, 'body': content}
python类unquote_plus()的实例源码
def _encode_query(query):
"""
`urlparse.parse_qsl` and `urllib.encodeurl` modify
blank query values so we had to roll our own.
"""
kvps = urllib.unquote_plus(query).split("&")
encoded_pairs = []
for kvp in kvps:
if "=" not in kvp:
encoded_pairs.append(urllib.quote_plus(kvp))
else:
key, value = kvp.split("=")
encoded_pairs.append("%s=%s" % (
urllib.quote_plus(key),
urllib.quote_plus(value)
))
return "&".join(encoded_pairs)
def get_params():
param = {}
if len(sys.argv) < 3:
return {}
paramstring = sys.argv[2]
if len(paramstring) >= 2:
params = sys.argv[2]
cleanedparams = params.replace('?', '')
if (params[len(params) - 1] == '/'):
params = params[0:len(params) - 2]
xbmc.log(str(cleanedparams),xbmc.LOGDEBUG)
pairsofparams = cleanedparams.split('&')
xbmc.log(str(pairsofparams),xbmc.LOGDEBUG)
param = {}
for i in range(len(pairsofparams)):
splitparams = {}
splitparams = pairsofparams[i].split('=')
if (len(splitparams)) == 2:
try:
param[splitparams[0]] = urllib.unquote_plus(splitparams[1])
except:
pass
return param
def parse_userinfo(userinfo):
"""Validates the format of user information in a MongoDB URI.
Reserved characters like ':', '/', '+' and '@' must be escaped
following RFC 2396.
Returns a 2-tuple containing the unescaped username followed
by the unescaped password.
:Paramaters:
- `userinfo`: A string of the form <username>:<password>
.. versionchanged:: 2.2
Now uses `urllib.unquote_plus` so `+` characters must be escaped.
"""
if '@' in userinfo or userinfo.count(':') > 1:
raise InvalidURI("':' or '@' characters in a username or password "
"must be escaped according to RFC 2396.")
user, _, passwd = _partition(userinfo, ":")
# No password is expected with GSSAPI authentication.
if not user:
raise InvalidURI("The empty string is not valid username.")
user = unquote_plus(user)
passwd = unquote_plus(passwd)
return user, passwd
def _parse_options(opts, delim):
"""Helper method for split_options which creates the options dict.
Also handles the creation of a list for the URI tag_sets/
readpreferencetags portion."""
options = {}
for opt in opts.split(delim):
key, val = opt.split("=")
if key.lower() == 'readpreferencetags':
options.setdefault('readpreferencetags', []).append(val)
else:
# str(option) to ensure that a unicode URI results in plain 'str'
# option names. 'normalized' is then suitable to be passed as
# kwargs in all Python versions.
if str(key) in options:
warnings.warn("Duplicate URI option %s" % (str(key),))
options[str(key)] = unquote_plus(val)
# Special case for deprecated options
if "wtimeout" in options:
if "wtimeoutMS" in options:
options.pop("wtimeout")
warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
" instead")
return options
def do_GET(self):
query = self.path.split("?",1)[-1]
filepath = urllib.unquote_plus(query)
self.suppress_socket_error_report = None
self.send_headers(filepath)
print "sending data"
try:
self.write_response(filepath)
except socket.error, e:
if isinstance(e.args, tuple):
if e[0] in (errno.EPIPE, errno.ECONNRESET):
print "disconnected"
self.suppress_socket_error_report = True
return
raise
GetAndResizeImages.py 文件源码
项目:ecs-refarch-batch-processing
作者: awslabs
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def process_images():
"""Process the image
No real error handling in this sample code. In case of error we'll put
the message back in the queue and make it visable again. It will end up in
the dead letter queue after five failed attempts.
"""
for message in get_messages_from_sqs():
try:
message_content = json.loads(message.body)
image = urllib.unquote_plus(message_content
['Records'][0]['s3']['object']
['key']).encode('utf-8')
s3.download_file(input_bucket_name, image, image)
resize_image(image)
upload_image(image)
cleanup_files(image)
except:
message.change_visibility(VisibilityTimeout=0)
continue
else:
message.delete()
def test_unquoting(self):
# Make sure unquoting of all ASCII values works
escape_list = []
for num in range(128):
given = hexescape(chr(num))
expect = chr(num)
result = urllib.unquote(given)
self.assertEqual(expect, result,
"using unquote(): %s != %s" % (expect, result))
result = urllib.unquote_plus(given)
self.assertEqual(expect, result,
"using unquote_plus(): %s != %s" %
(expect, result))
escape_list.append(given)
escape_string = ''.join(escape_list)
del escape_list
result = urllib.unquote(escape_string)
self.assertEqual(result.count('%'), 1,
"using quote(): not all characters escaped; %s" %
result)
result = urllib.unquote(escape_string)
self.assertEqual(result.count('%'), 1,
"using unquote(): not all characters escaped: "
"%s" % result)
def parse(self, response):
"""
default parse method, rule is not useful now
"""
# import pdb; pdb.set_trace()
response = response.replace(url=HtmlParser.remove_url_parameter(response.url))
hxs = HtmlXPathSelector(response)
index_level = self.determine_level(response)
log.msg("Parse: index level:" + str(index_level))
if index_level in [1, 2, 3, 4]:
self.save_to_file_system(index_level, response)
relative_urls = self.get_follow_links(index_level, hxs)
if relative_urls is not None:
for url in relative_urls:
log.msg('yield process, url:' + url)
yield Request(url, callback=self.parse)
elif index_level == 5:
personProfile = HtmlParser.extract_person_profile(hxs)
linkedin_id = self.get_linkedin_id(response.url)
linkedin_id = UnicodeDammit(urllib.unquote_plus(linkedin_id)).markup
if linkedin_id:
personProfile['_id'] = linkedin_id
personProfile['url'] = UnicodeDammit(response.url).markup
yield personProfile
def _translate_single_text(self, text, target_language, source_lauguage):
assert _is_bytes(text)
def split_text(text):
start = 0
text = quote_plus(text)
length = len(text)
while (length - start) > self._MAX_LENGTH_PER_QUERY:
for seperator in self._SEPERATORS:
index = text.rfind(seperator, start, start+self._MAX_LENGTH_PER_QUERY)
if index != -1:
break
else:
raise Error('input too large')
end = index + len(seperator)
yield unquote_plus(text[start:end])
start = end
yield unquote_plus(text[start:])
def make_task(text):
return lambda: self._basic_translate(text, target_language, source_lauguage)[0]
results = list(self._execute(make_task(i) for i in split_text(text)))
return tuple(''.join(i[n] for i in results) for n in range(len(self._writing)))
def test_unquoting(self):
# Make sure unquoting of all ASCII values works
escape_list = []
for num in range(128):
given = hexescape(chr(num))
expect = chr(num)
result = urllib.unquote(given)
self.assertEqual(expect, result,
"using unquote(): %s != %s" % (expect, result))
result = urllib.unquote_plus(given)
self.assertEqual(expect, result,
"using unquote_plus(): %s != %s" %
(expect, result))
escape_list.append(given)
escape_string = ''.join(escape_list)
del escape_list
result = urllib.unquote(escape_string)
self.assertEqual(result.count('%'), 1,
"using quote(): not all characters escaped; %s" %
result)
result = urllib.unquote(escape_string)
self.assertEqual(result.count('%'), 1,
"using unquote(): not all characters escaped: "
"%s" % result)
def argument_query(query_str):
if query_str and query_str.startswith('?'):
warn("You don't need to use a leading '?' when setting the query"
" string, this may be an error!", stacklevel=3)
if not query_str:
query_params = {}
else:
try:
# much faster than parse_qsl()
query_params = dict(( map(unquote_plus, (to_utf8(token) + '=').split('=', 2)[:2])
for token in query_str.split('&') ))
if len(query_params) == 1 and not query_params.values()[0]:
query_params = {}
else:
query = None
except Exception:
##raise # XXX DEBUG
query_params = {}
return query_params
#------------------------------------------------------------------------------
def query(self, query):
if query and query.startswith('?'):
warn("You don't need to use a leading '?' when setting the query"
" string, this may be an error!", stacklevel=3)
if not query:
query_params = {}
else:
try:
# much faster than parse_qsl()
query_params = dict(( map(unquote_plus, (to_utf8(token) + '=').split('=', 2)[:2])
for token in query.split('&') ))
if len(query_params) == 1 and not query_params.values()[0]:
query_params = {}
else:
query = None
except Exception:
##raise # XXX DEBUG
query_params = {}
self.__query, self.__query_params = query, query_params
def netloc(self, netloc):
if '@' in netloc:
auth, host = netloc.split('@', 1)
else:
auth, host = None, netloc
port = ''
if host and host[0] == '[':
host, port = host[1:].split(']', 1)
if ':' in port:
_host, port = port.split(':', 1)
if not host:
host = _host
elif ':' in host:
host, port = host.split(':', 1)
if '%' in port:
port = unquote(port)
if port:
port = int(port)
if host:
host = unquote_plus(host)
self.auth = auth # TODO: roll back changes if it fails
self.host = host
self.port = port
def _initialize(self, request):
self.response.headers.add_header('Access-Control-Allow-Headers', 'Content-Type')
# We use _initialize instead of webapp2's initialize, so that exceptions can be caught easily
self.fbl = fb_api.FBLookup(None, None)
if self.request.body:
logging.info("Request body: %r", self.request.body)
escaped_body = urllib.unquote_plus(self.request.body.strip('='))
self.json_body = json.loads(escaped_body)
logging.info("json_request: %r", self.json_body)
else:
self.json_body = None
if self.requires_auth or self.supports_auth:
if self.json_body.get('access_token'):
access_token = self.json_body.get('access_token')
self.fb_uid = get_user_id_for_token(access_token)
self.fbl = fb_api.FBLookup(self.fb_uid, access_token)
logging.info("Access token for user ID %s", self.fb_uid)
elif self.requires_auth:
self.add_error("Needs access_token parameter")
def getParameters(parameterString):
log("", 5)
commands = {}
if getXBMCVersion() >= 12.0:
parameterString = urllib.unquote_plus(parameterString)
splitCommands = parameterString[parameterString.find('?') + 1:].split('&')
for command in splitCommands:
if (len(command) > 0):
splitCommand = command.split('=')
key = splitCommand[0]
try:
value = splitCommand[1].encode("utf-8")
except:
log("Error utf-8 encoding argument value: " + repr(splitCommand[1]))
value = splitCommand[1]
commands[key] = value
log(repr(commands), 5)
return commands
def prettify_reddit_query(subreddit_entry):
#for search queries; make the reddit query string presentable
if subreddit_entry.startswith('?'):
#log('************ prettify_reddit_query='+subreddit_entry)
tbn=subreddit_entry.split('/')[-1]
tbn=urllib.unquote_plus(tbn)
tbn=tbn.replace('?q=','[LIGHT]search:[/LIGHT]' )
tbn=tbn.replace('site:','' )
tbn=tbn.replace('&sort=','[LIGHT] sort by:[/LIGHT]' )
tbn=tbn.replace('&t=','[LIGHT] from:[/LIGHT]' )
tbn=tbn.replace('subreddit:','r/' )
tbn=tbn.replace('author:','[LIGHT] by:[/LIGHT]' )
tbn=tbn.replace('&restrict_sr=on','' )
tbn=tbn.replace('&restrict_sr=','' )
tbn=tbn.replace('nsfw:no','' )
tbn=tbn.replace('nsfw:yes','nsfw' )
#log('************ prettify_reddit_query='+tbn)
return tbn
else:
return subreddit_entry
def parse_userinfo(userinfo):
"""Validates the format of user information in a MongoDB URI.
Reserved characters like ':', '/', '+' and '@' must be escaped
following RFC 2396.
Returns a 2-tuple containing the unescaped username followed
by the unescaped password.
:Paramaters:
- `userinfo`: A string of the form <username>:<password>
.. versionchanged:: 2.2
Now uses `urllib.unquote_plus` so `+` characters must be escaped.
"""
if '@' in userinfo or userinfo.count(':') > 1:
raise InvalidURI("':' or '@' characters in a username or password "
"must be escaped according to RFC 2396.")
user, _, passwd = _partition(userinfo, ":")
# No password is expected with GSSAPI authentication.
if not user:
raise InvalidURI("The empty string is not valid username.")
user = unquote_plus(user)
passwd = unquote_plus(passwd)
return user, passwd
def _parse_options(opts, delim):
"""Helper method for split_options which creates the options dict.
Also handles the creation of a list for the URI tag_sets/
readpreferencetags portion."""
options = {}
for opt in opts.split(delim):
key, val = opt.split("=")
if key.lower() == 'readpreferencetags':
options.setdefault('readpreferencetags', []).append(val)
else:
# str(option) to ensure that a unicode URI results in plain 'str'
# option names. 'normalized' is then suitable to be passed as
# kwargs in all Python versions.
if str(key) in options:
warnings.warn("Duplicate URI option %s" % (str(key),))
options[str(key)] = unquote_plus(val)
# Special case for deprecated options
if "wtimeout" in options:
if "wtimeoutMS" in options:
options.pop("wtimeout")
warnings.warn("Option wtimeout is deprecated, use 'wtimeoutMS'"
" instead")
return options
def parse_userinfo(userinfo):
"""Validates the format of user information in a MongoDB URI.
Reserved characters like ':', '/', '+' and '@' must be escaped
following RFC 2396.
Returns a 2-tuple containing the unescaped username followed
by the unescaped password.
:Paramaters:
- `userinfo`: A string of the form <username>:<password>
.. versionchanged:: 2.2
Now uses `urllib.unquote_plus` so `+` characters must be escaped.
"""
if '@' in userinfo or userinfo.count(':') > 1:
raise InvalidURI("':' or '@' characters in a username or password "
"must be escaped according to RFC 2396.")
user, _, passwd = _partition(userinfo, ":")
# No password is expected with GSSAPI authentication.
if not user:
raise InvalidURI("The empty string is not valid username.")
user = unquote_plus(user)
passwd = unquote_plus(passwd)
return user, passwd