def remove_trailing_version_from_href(href):
"""Removes the api version from the href.
Given: 'http://www.nova.com/compute/v1.1'
Returns: 'http://www.nova.com/compute'
Given: 'http://www.nova.com/v1.1'
Returns: 'http://www.nova.com'
"""
parsed_url = urlparse.urlsplit(href)
url_parts = parsed_url.path.rsplit('/', 1)
# NOTE: this should match vX.X or vX
expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
if not expression.match(url_parts.pop()):
raise ValueError('URL %s does not contain version' % href)
new_path = url_join(*url_parts)
parsed_url = list(parsed_url)
parsed_url[2] = new_path
return urlparse.urlunsplit(parsed_url)
python类urlunsplit()的实例源码
def put(self, uri, data, content_type=''):
dbname, dburi = self._get_dburi(uri)
if not dbname or not dburi:
raise DAV_Forbidden
pool = Pool(Transaction().cursor.database_name)
Collection = pool.get('webdav.collection')
try:
res = Collection.put(dburi, data, content_type, cache=CACHE)
Transaction().cursor.commit()
except (DAV_Error, DAV_NotFound, DAV_Secret, DAV_Forbidden), exception:
self._log_exception(exception)
Transaction().cursor.rollback()
raise
except Exception, exception:
self._log_exception(exception)
Transaction().cursor.rollback()
raise DAV_Error(500)
if res:
uparts = list(urlparse.urlsplit(uri))
uparts[2] = res
res = urlparse.urlunsplit(uparts)
return res
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = u''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def join_url(base_url, path):
"""Joins base url and path removing extra slashes.
Removes trailing slashes. Joins queries.
E.g.: See unit tests.
:param base_url: Base url.
:param path: Path.
:return: Joined url.
"""
# Example of usages see in unittests
base_url = urlparse.urlsplit(base_url, allow_fragments=False)
path = urlparse.urlsplit(path, allow_fragments=False)
full_path = _join_paths(base_url.path, path.path)
full_query = _join_queries(base_url.query, path.query)
return urlparse.urlunsplit(
(base_url.scheme, base_url.netloc, full_path, full_query,
base_url.fragment))
def url_join(*parts, **kwargs):
"""
Normalize url parts and join them with a slash.
adapted from: http://codereview.stackexchange.com/q/13027
"""
def concat_paths(sequence):
result = []
for path in sequence:
result.append(path)
if path.startswith('/'):
break
return '/'.join(reversed(result))
schemes, netlocs, paths, queries, fragments = zip(*(urlsplit(part) for part in reversed(parts)))
scheme = next((x for x in schemes if x), kwargs.get('scheme', 'http'))
netloc = next((x for x in netlocs if x), '')
path = concat_paths(paths)
query = queries[0]
fragment = fragments[0]
return urlunsplit((scheme, netloc, path, query, fragment))
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = u''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = u''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def _get_env_info(self, script_url):
script_folder = ModuleExec('system_info', [ '-info', 'script_folder' ]).load_result_or_run('script_folder')
if not script_folder: return
script_url_splitted = urlparse.urlsplit(script_url)
script_url_path_folder, script_url_path_filename = os.path.split(
script_url_splitted.path)
url_folder_pieces = script_url_path_folder.split(os.sep)
folder_pieces = script_folder.split(os.sep)
for pieceurl, piecefolder in zip(reversed(url_folder_pieces), reversed(folder_pieces)):
if pieceurl == piecefolder:
folder_pieces.pop()
url_folder_pieces.pop()
else:
break
base_url_path_folder = os.sep.join(url_folder_pieces)
self.base_folder_url = urlparse.urlunsplit(
script_url_splitted[:2] + (base_url_path_folder, ) + script_url_splitted[3:])
self.base_folder_path = os.sep.join(folder_pieces)
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = u''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = u''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def audit(arg):
Ii1iI = arg
Oo = urlparse.urlparse(Ii1iI)
I1Ii11I1Ii1i = urlparse.urlunsplit((Oo.scheme,
Oo.netloc,
Oo.path,
decode(''),
decode('')))
Oo0Ooo = urlparse.parse_qsl(Oo.query)
if 0:
iiI1iIiI.ooo0Oo0 * i1 - Oooo0000 * i1IIi11111i / o000o0o00o0Oo
oo = [decode('\xb0\xee\xa7\xb8\xcd[\xa7y\xe8\x81\xf1'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc6'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc2')]
for O0O0OO0O0O0, iiiii in Oo0Ooo:
if O0O0OO0O0O0 in oo:
continue
debug(decode('\xa0\xfb\xad\xb5\xce%\xddE\x8c\xe7\xcb'), O0O0OO0O0O0, I1Ii11I1Ii1i)
IiII1I1i1i1ii = iI1(I1Ii11I1Ii1i, Oo0Ooo, O0O0OO0O0O0, iiiii)
if IiII1I1i1i1ii:
security_info(IiII1I1i1i1ii[1])
return
if 0:
OOo0o0 / OOoOoo00oo - iI1OoOooOOOO + i1iiIII111ii + i1iIIi1
def audit(arg):
ooO0oooOoO0 = arg
II11i = urlparse.urlparse(ooO0oooOoO0)
i1oOOoo00O0O = urlparse.urlunsplit((II11i.scheme,
II11i.netloc,
II11i.path,
decode(''),
decode('')))
Oo0Ooo = urlparse.parse_qsl(II11i.query)
i1111 = [decode('\x19\xd8C\xe3UG<\xeb\xc4\x8ft'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1k'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1{')]
i11 = [decode('\x1f\xfec'), decode('*\xdcR\xf4')]
for I11 in i11:
for O0O0OO0O0O0, iiiii in Oo0Ooo:
if O0O0OO0O0O0 in i1111:
continue
debug(decode('\x18\xe9R\xc5S:G\xb7\xe8\xe5-\x94\xc9\xdd\xa9\x14'), I11, O0O0OO0O0O0, i1oOOoo00O0O)
Oo0o0000o0o0 = iI1(I11, i1oOOoo00O0O, Oo0Ooo, O0O0OO0O0O0, iiiii)
if Oo0o0000o0o0:
security_info(decode('a\xb6Z\x9e\x0c+4') % (I11, Oo0o0000o0o0[1]))
return
if 0:
iiiii11iII1 % O0o
def generate_docservice_url(request, doc_id, temporary=True, prefix=None):
docservice_key = getattr(request.registry, 'docservice_key', None)
parsed_url = urlparse(request.registry.docservice_url)
query = {}
if temporary:
expires = int(ttime()) + 300 # EXPIRES
mess = "{}\0{}".format(doc_id, expires)
query['Expires'] = expires
else:
mess = doc_id
if prefix:
mess = '{}/{}'.format(prefix, mess)
query['Prefix'] = prefix
query['Signature'] = quote(b64encode(docservice_key.signature(mess.encode("utf-8"))))
query['KeyID'] = docservice_key.hex_vk()[:8]
return urlunsplit((parsed_url.scheme, parsed_url.netloc, '/get/{}'.format(doc_id), urlencode(query), ''))
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = u''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def request_url(secure, hostname, port, path, query=None):
'''
Combines url components into a url passable into the request function.
Args:
secure (boolean): Whether or not to use HTTPS.
hostname (str): The host name for the url.
port (int): The port number, as an integer.
path (str): The hierarchical path.
query (dict): A dict of query parameters.
Returns:
(str) A complete url made up of the arguments.
'''
encoded_query = urlencode(query) if query else ''
scheme = 'https' if secure else 'http'
netloc = '{0}:{1}'.format(hostname, port)
return urlunsplit((scheme, netloc, path, encoded_query, ''))
def validate_(self, value, context=None):
url = self.valid_url(value)
if not url:
raise StopValidationError(self.messages['invalid_url'])
if self.verify_exists:
url_string = urlquote(urlunsplit((
url['scheme'],
(url['host6'] or url['host4'] or url['hostn_enc']) + ':' + (url['port'] or ''),
url['path'],
url['query'],
url['frag'])
).encode('utf-8'), safe=VALID_CHAR_STRING)
try:
urlopen(url_string)
except URLError:
raise StopValidationError(self.messages['not_found'])
def parse_url(self, url):
"""
Remove the list parameter from the URL as we currently don't support
conversion of an entire playlist.
"""
qs = parse_qs(urlparse(url).query)
if qs.get('list', None):
del(qs['list'])
parts = urlsplit(url)
return urlunsplit([
parts.scheme,
parts.netloc,
parts.path,
urllib.urlencode(qs, True),
parts.fragment
])
return url
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = u''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def validate_(self, value, context=None):
url = self.valid_url(value)
if not url:
raise StopValidationError(self.messages['invalid_url'])
if self.verify_exists:
url_string = urlquote(urlunsplit((
url['scheme'],
(url['host6'] or url['host4'] or url['hostn_enc']) + ':' + (url['port'] or ''),
url['path'],
url['query'],
url['frag'])
).encode('utf-8'), safe=VALID_CHAR_STRING)
try:
urlopen(url_string)
except URLError:
raise StopValidationError(self.messages['not_found'])
def validate_(self, value, context=None):
url = self.valid_url(value)
if not url:
raise StopValidationError(self.messages['invalid_url'])
if self.verify_exists:
url_string = urlquote(urlunsplit((
url['scheme'],
(url['host6'] or url['host4'] or url['hostn_enc']) + ':' + (url['port'] or ''),
url['path'],
url['query'],
url['frag'])
).encode('utf-8'), safe=VALID_CHAR_STRING)
try:
urlopen(url_string)
except URLError:
raise StopValidationError(self.messages['not_found'])
def normalize_url(url):
"""
:param url:
:return:
"""
# only hostname
if not '/' in url:
return 'http://{}'.format(url)
p = urlparse.urlsplit(url)
# www.test.com/index.php
# exclude /xxxxx/index.php
if not p.netloc:
if url.startswith('/'):
# /xxxxx/index.php
return ''
else:
# www.test.com/index.php
return 'http://{}'.format(url)
# //www.test.com/index.php
if not p.scheme:
url = urlparse.urlunsplit(('http', p.netloc, p.path or '/', p.query, p.fragment))
return url
def _generate_url(self, path, query=None, frag=None):
'''_generate_url
:param path:
:param query:
:param frag:
:return:url
'''
if CONF.vrm_ssl:
scheme = 'https'
else:
scheme = 'http'
fc_ip = FC_DRIVER_CONF.fc_ip
netloc = str(fc_ip) + ':' + str(CONF.vrm_port)
if path.startswith(self.BASIC_URI):
url = urlparse.urlunsplit((scheme, netloc, path, query, frag))
else:
url = urlparse.urlunsplit(
(scheme, netloc, self.BASIC_URI + str(path), query, frag))
return url
def _generate_url(self, path, query=None, frag=None):
'''_generate_url
_generate_url
:param path:
:param query:
:param frag:
:return:
'''
if CONF.vrm_ssl:
scheme = 'https'
else:
scheme = 'http'
fc_ip = FC_DRIVER_CONF.fc_ip
netloc = str(fc_ip) + ':' + str(CONF.vrm_port)
if path.startswith(self.BASIC_URI):
url = urlparse.urlunsplit((scheme, netloc, path, query, frag))
else:
url = urlparse.urlunsplit(
(scheme, netloc, self.BASIC_URI + str(path), query, frag))
return url
def audit(arg):
ooO0oooOoO0 = arg
II11i = urlparse.urlparse(ooO0oooOoO0)
i1oOOoo00O0O = urlparse.urlunsplit((II11i.scheme,
II11i.netloc,
II11i.path,
decode(''),
decode('')))
Oo0Ooo = urlparse.parse_qsl(II11i.query)
i1111 = [decode('\x19\xd8C\xe3UG<\xeb\xc4\x8ft'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1k'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1{')]
i11 = [decode('\x1f\xfec'), decode('*\xdcR\xf4')]
for I11 in i11:
for O0O0OO0O0O0, iiiii in Oo0Ooo:
if O0O0OO0O0O0 in i1111:
continue
debug(decode('\x18\xe9R\xc5S:G\xb7\xe8\xe5-\x94\xc9\xdd\xa9\x14'), I11, O0O0OO0O0O0, i1oOOoo00O0O)
Oo0o0000o0o0 = iI1(I11, i1oOOoo00O0O, Oo0Ooo, O0O0OO0O0O0, iiiii)
if Oo0o0000o0o0:
security_info(decode('a\xb6Z\x9e\x0c+4') % (I11, Oo0o0000o0o0[1]))
return
if 0:
iiiii11iII1 % O0o
def audit(arg):
Ii1iI = arg
Oo = urlparse.urlparse(Ii1iI)
I1Ii11I1Ii1i = urlparse.urlunsplit((Oo.scheme,
Oo.netloc,
Oo.path,
decode(''),
decode('')))
Oo0Ooo = urlparse.parse_qsl(Oo.query)
if 0:
iiI1iIiI.ooo0Oo0 * i1 - Oooo0000 * i1IIi11111i / o000o0o00o0Oo
oo = [decode('\xb0\xee\xa7\xb8\xcd[\xa7y\xe8\x81\xf1'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc6'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc2')]
for O0O0OO0O0O0, iiiii in Oo0Ooo:
if O0O0OO0O0O0 in oo:
continue
debug(decode('\xa0\xfb\xad\xb5\xce%\xddE\x8c\xe7\xcb'), O0O0OO0O0O0, I1Ii11I1Ii1i)
IiII1I1i1i1ii = iI1(I1Ii11I1Ii1i, Oo0Ooo, O0O0OO0O0O0, iiiii)
if IiII1I1i1i1ii:
security_info(IiII1I1i1i1ii[1])
return
if 0:
OOo0o0 / OOoOoo00oo - iI1OoOooOOOO + i1iiIII111ii + i1iIIi1
def _convert_to_idn(url):
"""Convert a URL to IDN notation"""
# this function should only be called with a unicode string
# strategy: if the host cannot be encoded in ascii, then
# it'll be necessary to encode it in idn form
parts = list(urlparse.urlsplit(url))
try:
parts[1].encode('ascii')
except UnicodeEncodeError:
# the url needs to be converted to idn notation
host = parts[1].rsplit(':', 1)
newhost = []
port = u''
if len(host) == 2:
port = host.pop()
for h in host[0].split('.'):
newhost.append(h.encode('idna').decode('utf-8'))
parts[1] = '.'.join(newhost)
if port:
parts[1] += ':' + port
return urlparse.urlunsplit(parts)
else:
return url
def serialize(url='', data={}):
""" Returns a URL with a query string of the given data.
"""
p = urlparse.urlsplit(url)
q = urlparse.parse_qsl(p.query)
q.extend((b(k), b(v)) for k, v in sorted(data.items()))
q = urlencode(q, doseq=True)
p = p.scheme, p.netloc, p.path, q, p.fragment
s = urlparse.urlunsplit(p)
s = s.lstrip('?')
return s
# print(serialize('http://www.google.com', {'q': 'cats'})) # http://www.google.com?q=cats
#---- REQUESTS & STREAMS --------------------------------------------------------------------------
# The download(url) function returns the HTML (JSON, image data, ...) at the given url.
# If this fails it will raise NotFound (404), Forbidden (403) or TooManyRequests (420).
def redirect(self, url, **kwargs):
if url.startswith('/'):
spliturl = urlparse.urlsplit(self.request.url)
# Redirect to the www.dancedeets.com domain if they requested the raw hostname
domain = self._get_full_hostname() if spliturl.netloc == 'dancedeets.com' else spliturl.netloc
# Redirect to https on prod, as relying on url.scheme would send it back to http, due to the nginx http-based proxy
scheme = 'https' if self.request.app.prod_mode else 'http'
new_url = urlparse.urlunsplit([
scheme,
domain,
spliturl.path,
spliturl.query,
spliturl.fragment,
])
url = str(urlparse.urljoin(new_url, url))
return super(BaseRequestHandler, self).redirect(url, **kwargs)
def audit(arg):
Ii1iI = arg
Oo = urlparse.urlparse(Ii1iI)
I1Ii11I1Ii1i = urlparse.urlunsplit((Oo.scheme,
Oo.netloc,
Oo.path,
decode(''),
decode('')))
Oo0Ooo = urlparse.parse_qsl(Oo.query)
if 0:
iiI1iIiI.ooo0Oo0 * i1 - Oooo0000 * i1IIi11111i / o000o0o00o0Oo
oo = [decode('\xb0\xee\xa7\xb8\xcd[\xa7y\xe8\x81\xf1'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc6'), decode('\xaa\xd9\x8f\x84\xcd|\x9b_\xc6\xea\xc2')]
for O0O0OO0O0O0, iiiii in Oo0Ooo:
if O0O0OO0O0O0 in oo:
continue
debug(decode('\xa0\xfb\xad\xb5\xce%\xddE\x8c\xe7\xcb'), O0O0OO0O0O0, I1Ii11I1Ii1i)
IiII1I1i1i1ii = iI1(I1Ii11I1Ii1i, Oo0Ooo, O0O0OO0O0O0, iiiii)
if IiII1I1i1i1ii:
security_info(IiII1I1i1i1ii[1])
return
if 0:
OOo0o0 / OOoOoo00oo - iI1OoOooOOOO + i1iiIII111ii + i1iIIi1
def audit(arg):
ooO0oooOoO0 = arg
II11i = urlparse.urlparse(ooO0oooOoO0)
i1oOOoo00O0O = urlparse.urlunsplit((II11i.scheme,
II11i.netloc,
II11i.path,
decode(''),
decode('')))
Oo0Ooo = urlparse.parse_qsl(II11i.query)
i1111 = [decode('\x19\xd8C\xe3UG<\xeb\xc4\x8ft'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1k'), decode('<\xc7k\xdaUY\x05\xf7\xf8\xe1{')]
i11 = [decode('\x1f\xfec'), decode('*\xdcR\xf4')]
for I11 in i11:
for O0O0OO0O0O0, iiiii in Oo0Ooo:
if O0O0OO0O0O0 in i1111:
continue
debug(decode('\x18\xe9R\xc5S:G\xb7\xe8\xe5-\x94\xc9\xdd\xa9\x14'), I11, O0O0OO0O0O0, i1oOOoo00O0O)
Oo0o0000o0o0 = iI1(I11, i1oOOoo00O0O, Oo0Ooo, O0O0OO0O0O0, iiiii)
if Oo0o0000o0o0:
security_info(decode('a\xb6Z\x9e\x0c+4') % (I11, Oo0o0000o0o0[1]))
return
if 0:
iiiii11iII1 % O0o