def do_request(self, url, params=None, timeout=None):
"""Performs the HTTP request, signed with OAuth.
:param timeout: optional request timeout, in seconds.
:type timeout: float
@return: the response content
"""
req = self.session.post(url,
data=params,
auth=self.oauth,
timeout=timeout or self.default_timeout)
# check the response headers / status code.
if req.status_code != 200:
self.log.error('do_request: Status code %i received, content:', req.status_code)
for part in req.text.split('&'):
self.log.error(' %s', urllib_parse.unquote(part))
raise exceptions.FlickrError('do_request: Status code %s received' % req.status_code)
return req.content
python类unquote()的实例源码
def api_index(request):
if check_if_valid_token(request):
try:
token = urlparse.unquote(request.META['HTTP_AUTHORIZATION'])
user_id = extrapolate_user(token)
user = User.objects.get(user_id=user_id)
if user.is_admin:
data = serializers.serialize("json", User.objects.all())
return HttpResponse(data, content_type='application/json')
else:
data = serializers.serialize("json", User.objects.filter(user_id=user_id))
return HttpResponse(data, content_type='application/json')
except User.DoesNotExist:
return HttpResponse("null", content_type='application/json')
else:
return HttpResponse('Unauthorized', status=401)
def check_if_valid_token(request):
if 'HTTP_AUTHORIZATION' not in request.META:
return False
else:
token = urlparse.unquote(request.META['HTTP_AUTHORIZATION'])
regex = re.compile("(.*?)-(.*)")
split_token = token.split('=')[1]
regex_groups = regex.search(split_token)
if regex_groups.group(1):
id = regex_groups.group(1)
else:
return False
if regex_groups.group(2):
hash = regex_groups.group(2)
else:
return False
sha = SHA.new()
sha.update(ACCESS_TOKEN_SALT + ":" + str(id))
return hash == sha.hexdigest()
def mkfn (text):
text = unquote (text)
return NAFN.sub ("_", text)
def identify_and_tag_DOI(line):
"""takes a single citation line and attempts to locate any DOI references.
DOI references are recognised in both http (url) format and also the
standard DOI notation (DOI: ...)
@param line: (string) the reference line in which to search for DOI's.
@return: the tagged line and a list of DOI strings (if any)
"""
# Used to hold the DOI strings in the citation line
doi_strings = []
# Run the DOI pattern on the line, returning the re.match objects
matched_doi = re_doi.finditer(line)
# For each match found in the line
for match in reversed(list(matched_doi)):
# Store the start and end position
start = match.start()
end = match.end()
# Get the actual DOI string (remove the url part of the doi string)
doi_phrase = match.group('doi')
if '%2f' in doi_phrase.lower():
doi_phrase = unquote(doi_phrase)
# Replace the entire matched doi with a tag
line = line[0:start] + "<cds.DOI />" + line[end:]
# Add the single DOI string to the list of DOI strings
doi_strings.append(doi_phrase)
doi_strings.reverse()
return line, doi_strings
def CgiDictFromParsedUrl(url):
"""Extract CGI variables from a parsed url into a dict.
Returns a dict containing the following CGI variables for the provided url:
SERVER_PORT, QUERY_STRING, SERVER_NAME and PATH_INFO.
Args:
url: An instance of urlparse.SplitResult.
Returns:
A dict containing the CGI variables derived from url.
"""
environ = {}
if url.port is not None:
environ['SERVER_PORT'] = str(url.port)
elif url.scheme == 'https':
environ['SERVER_PORT'] = '443'
elif url.scheme == 'http':
environ['SERVER_PORT'] = '80'
environ['QUERY_STRING'] = url.query
environ['SERVER_NAME'] = url.hostname
if url.path:
environ['PATH_INFO'] = urlparse.unquote(url.path)
else:
environ['PATH_INFO'] = '/'
return environ
def unquote_utf8(qs):
if isinstance(qs, text_type):
qs = qs.encode('utf-8')
s = unquote(qs)
if isinstance(s, byte_type):
return s.decode("utf-8")
else:
return s
def parse_url(url):
scheme = urlparse.urlparse(url).scheme
schemeless = url[len(scheme) + 3:]
# parse with HTTP URL semantics
parts = urlparse.urlparse('http://' + schemeless)
path = parts.path or ''
path = path[1:] if path and path[0] == '/' else path
return dict(
transport=scheme,
host=urlparse.unquote(parts.hostname or '') or None,
port=parts.port or amqppy.DEFAULT_PORT,
username=urlparse.unquote(parts.username or '') or None,
password=urlparse.unquote(parts.password or '') or None,
virtual_host=urlparse.unquote(path or '') or "/",
**dict(urlparse.parse_qsl(parts.query)))
def PLAY_SOURCE(payload):
return control.play_source(urlparse.unquote(payload))
def api(request, id_number):
if check_if_valid_token(request):
token = urlparse.unquote(request.META['HTTP_AUTHORIZATION'])
user_id = extrapolate_user(token)
data = serializers.serialize("json", User.objects.filter(user_id=user_id))
return HttpResponse(data, content_type='application/json')
else:
return HttpResponse('Unauthorized', status=401)
# This is purposely vulnerable see - https://github.com/OWASP/railsgoat/wiki/Extras:-Broken-Regular-Expression
def db_url_parse(url, engine=None, conn_max_age=0):
"""
Parses a database URL.
"""
if url == "sqlite://:memory:":
# urlparse will choke on :memory:
return {
"ENGINE": DATABASE_ENGINE_SCHEMES["sqlite"],
"NAME": ":memory:",
}
config = {}
url = urlparse.urlparse(url)
# split query strings from path
path = url.path[1:]
if "?" in path and not url.query:
path, query = path.split("?", 2)
else:
path, query = path, url.query
query = urlparse.parse_qs(query)
# sqlite with no path should assume :memory: (sqlalchemy behavior)
if url.scheme == "sqlite" and path == "":
path = ":memory:"
# handle postgresql percent-encoded paths
hostname = url.hostname or ""
if "%2f" in hostname.lower():
hostname = hostname.replace("%2f", "/").replace("%2F", "/")
config.update({
"NAME": urlparse.unquote(path or ""),
"USER": urlparse.unquote(url.username or ""),
"PASSWORD": urlparse.unquote(url.password or ""),
"HOST": hostname,
"PORT": url.port or "",
"CONN_MAX_AGE": conn_max_age,
})
engine = DATABASE_ENGINE_SCHEMES[url.scheme] if engine is None else engine
# pass the query string into OPTIONS
options = {}
for key, values in query.items():
if url.scheme == "mysql" and key == "ssl-ca":
options["ssl"] = {"ca": values[-1]}
continue
options[key] = values[-1]
# postgresql schema URLs
if "currentSchema" in options and engine == "django.db.backends.postgresql_psycopg2":
options["options"] = "-c search_path={0}".format(options["currentSchema"])
if options:
config["OPTIONS"] = options
if engine:
config["ENGINE"] = engine
return config
def get(self, path="/", *args, **kwargs):
self.response.headers['Content-Type'] = 'text/plain'
# Prevent Hash-collision attacks
assert len(self.request.arguments()) < 50
# Fill the (surprisingly empty) kwargs dict with named request params
tmpArgs = dict((k, self.request.get_all(k)) for k in self.request.arguments())
for key in tmpArgs.keys()[:]:
if len(tmpArgs[key]) == 0:
continue
if not key in kwargs.keys():
if len(tmpArgs[key]) == 1:
kwargs[key] = tmpArgs[key][0]
else:
kwargs[key] = tmpArgs[key]
else:
if isinstance(kwargs[key], list):
kwargs[key] = kwargs[key] + tmpArgs[key]
else:
kwargs[key] = [kwargs[key]] + tmpArgs[key]
del tmpArgs
if "self" in kwargs.keys(): # self is reserved for bound methods
raise NotImplementedError()
path = urlparse.urlparse(path).path
pathlist = [urlparse.unquote(x) for x in path.strip("/").split("/")]
if len(pathlist) < 2:
raise NotImplementedError()
tfunc = pathlist[1]
pathlist = pathlist[2:]
if tfunc == "exportDb":
self.response.write(self.exportDb(*pathlist, **kwargs))
elif tfunc == "exportBlob":
self.response.write(self.exportBlob(*pathlist, **kwargs))
elif tfunc == "download":
self.response.write(self.download(*pathlist, **kwargs))
elif tfunc == "info":
self.response.write(self.info(*pathlist, **kwargs))
elif tfunc == "listCursors":
self.response.write(self.listCursors(*pathlist, **kwargs))
elif tfunc == "listKinds":
self.response.write(self.listKinds(*pathlist, **kwargs))
elif tfunc == "_ah":
pass
else:
raise NotImplementedError()
def do_upload(self, filename, url, params=None, fileobj=None, timeout=None):
"""Performs a file upload to the given URL with the given parameters, signed with OAuth.
:param timeout: optional request timeout, in seconds.
:type timeout: float
@return: the response content
"""
# work-around to allow non-ascii characters in file name
# Flickr doesn't store the name but does use it as a default title
if 'title' not in params:
params['title'] = os.path.basename(filename).encode('utf8')
# work-around for Flickr expecting 'photo' to be excluded
# from the oauth signature:
# 1. create a dummy request without 'photo'
# 2. create real request and use auth headers from the dummy one
dummy_req = requests.Request('POST', url, data=params,
auth=self.oauth)
prepared = dummy_req.prepare()
headers = prepared.headers
self.log.debug('do_upload: prepared headers = %s', headers)
if not fileobj:
fileobj = open(filename, 'rb')
params['photo'] = ('dummy name', fileobj)
m = MultipartEncoder(fields=params)
auth = {'Authorization': headers.get('Authorization'),
'Content-Type': m.content_type}
self.log.debug('POST %s', auth)
req = self.session.post(url, data=m, headers=auth, timeout=timeout or self.default_timeout)
# check the response headers / status code.
if req.status_code != 200:
self.log.error('do_upload: Status code %i received, content:', req.status_code)
for part in req.text.split('&'):
self.log.error(' %s', urllib_parse.unquote(part))
raise exceptions.FlickrError('do_upload: Status code %s received' % req.status_code)
return req.content
def selectLanguage( self, path ):
"""
Tries to select the best language for the current request.
"""
if translations is None:
# This project doesn't use the multi-language feature, nothing to do here
return( path )
if conf["viur.languageMethod"] == "session":
# We store the language inside the session, try to load it from there
if not session.current.getLanguage():
if "X-Appengine-Country" in self.request.headers.keys():
lng = self.request.headers["X-Appengine-Country"].lower()
if lng in conf["viur.availableLanguages"]+list( conf["viur.languageAliasMap"].keys() ):
session.current.setLanguage( lng )
self.language = lng
else:
session.current.setLanguage( conf["viur.defaultLanguage"] )
else:
self.language = session.current.getLanguage()
elif conf["viur.languageMethod"] == "domain":
host = self.request.host_url.lower()
host = host[ host.find("://")+3: ].strip(" /") #strip http(s)://
if host.startswith("www."):
host = host[ 4: ]
if host in conf["viur.domainLanguageMapping"].keys():
self.language = conf["viur.domainLanguageMapping"][ host ]
else: # We have no language configured for this domain, try to read it from session
if session.current.getLanguage():
self.language = session.current.getLanguage()
elif conf["viur.languageMethod"] == "url":
tmppath = urlparse.urlparse( path ).path
tmppath = [ urlparse.unquote( x ) for x in tmppath.lower().strip("/").split("/") ]
if len( tmppath )>0 and tmppath[0] in conf["viur.availableLanguages"]+list( conf["viur.languageAliasMap"].keys() ):
self.language = tmppath[0]
return( path[ len( tmppath[0])+1: ] ) #Return the path stripped by its language segment
else: # This URL doesnt contain an language prefix, try to read it from session
if session.current.getLanguage():
self.language = session.current.getLanguage()
elif "X-Appengine-Country" in self.request.headers.keys():
lng = self.request.headers["X-Appengine-Country"].lower()
if lng in conf["viur.availableLanguages"] or lng in conf["viur.languageAliasMap"]:
self.language = lng
return( path )