def update_query_parameters(url, query_parameters):
"""
Return url with updated query parameters.
Arguments:
url (str): Original url whose query parameters need to be updated.
query_parameters (dict): A dictionary containing query parameters to be added to course selection url.
Returns:
(slug): slug identifier for the identity provider that can be used for identity verification of
users associated the enterprise customer of the given user.
"""
scheme, netloc, path, query_string, fragment = urlsplit(url)
url_params = parse_qs(query_string)
# Update url query parameters
url_params.update(query_parameters)
return urlunsplit(
(scheme, netloc, path, urlencode(url_params, doseq=True), fragment),
)
python类urlunsplit()的实例源码
def remove_trailing_version_from_href(href):
"""Removes the api version from the href.
Given: 'http://www.masakari.com/ha/v1.1'
Returns: 'http://www.masakari.com/ha'
Given: 'http://www.masakari.com/v1.1'
Returns: 'http://www.masakari.com'
"""
parsed_url = urlparse.urlsplit(href)
url_parts = parsed_url.path.rsplit('/', 1)
# NOTE: this should match vX.X or vX
expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
if not expression.match(url_parts.pop()):
LOG.debug('href %s does not contain version', href)
raise ValueError(_('href %s does not contain version') % href)
new_path = url_join(*url_parts)
parsed_url = list(parsed_url)
parsed_url[2] = new_path
return urlparse.urlunsplit(parsed_url)
def remove_trailing_version_from_href(href):
"""Removes the api version from the href.
Given: 'http://www.nova.com/compute/v1.1'
Returns: 'http://www.nova.com/compute'
Given: 'http://www.nova.com/v1.1'
Returns: 'http://www.nova.com'
"""
parsed_url = urlparse.urlsplit(href)
url_parts = parsed_url.path.rsplit('/', 1)
# NOTE: this should match vX.X or vX
expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
if not expression.match(url_parts.pop()):
LOG.debug('href %s does not contain version', href)
raise ValueError(_('href %s does not contain version') % href)
new_path = url_join(*url_parts)
parsed_url = list(parsed_url)
parsed_url[2] = new_path
return urlparse.urlunsplit(parsed_url)
def remove_version_from_href(href):
"""Removes the first api version from the href.
Given: 'http://www.meteos.com/v1.1/123'
Returns: 'http://www.meteos.com/123'
Given: 'http://www.meteos.com/v1.1'
Returns: 'http://www.meteos.com'
"""
parsed_url = parse.urlsplit(href)
url_parts = parsed_url.path.split('/', 2)
# NOTE: this should match vX.X or vX
expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
if expression.match(url_parts[1]):
del url_parts[1]
new_path = '/'.join(url_parts)
if new_path == parsed_url.path:
msg = 'href %s does not contain version' % href
LOG.debug(msg)
raise ValueError(msg)
parsed_url = list(parsed_url)
parsed_url[2] = new_path
return parse.urlunsplit(parsed_url)
def _update_link_prefix(self, orig_url, prefix):
if not prefix:
return orig_url
url_parts = list(parse.urlsplit(orig_url))
prefix_parts = list(parse.urlsplit(prefix))
url_parts[0:2] = prefix_parts[0:2]
return parse.urlunsplit(url_parts)
def _urlunsplit(scheme=None, netloc=None, path=None, query=None,
fragment=None):
"""Like ``urlparse.urlunsplit``, but will escape values and urlencode and
sort query arguments.
:param scheme:
URI scheme, e.g., `http` or `https`.
:param netloc:
Network location, e.g., `localhost:8080` or `www.google.com`.
:param path:
URI path.
:param query:
URI query as an escaped string, or a dictionary or list of key-values
tuples to build a query.
:param fragment:
Fragment identifier, also known as "anchor".
:returns:
An assembled absolute or relative URI.
"""
if not scheme or not netloc:
scheme = ''
netloc = None
if path:
path = quote(_to_utf8(path))
if query and not isinstance(query, six.string_types):
if isinstance(query, dict):
query = six.iteritems(query)
# Sort args: commonly needed to build signatures for services.
query = urlencode(sorted(query))
if fragment:
fragment = quote(_to_utf8(fragment))
return urlunsplit((scheme, netloc, path, query, fragment))
def get_path(url):
p = urlsplit(url)
return urlunsplit(['', '', p.path or '/', p.query, p.fragment])
def __str__(self):
return urllib_parse.urlunsplit((self.scheme, self.netloc, self.path, self.query, self.fragment))
def _fully_qualify(environ, url):
"""Turn a URL path into a fully qualified URL."""
split_url = urlparse.urlsplit(url)
server_name = environ.get('SERVER_NAME')
server_port = str(environ.get('SERVER_PORT'))
server_scheme = environ.get('wsgi.url_scheme')
if server_port not in ['80', '443']:
netloc = '%s:%s' % (server_name, server_port)
else:
netloc = server_name
return urlparse.urlunsplit((server_scheme, netloc, split_url.path,
split_url.query, split_url.fragment))
def _parse_url(self, url):
"""Create a url from test data.
If provided with a full URL, just return that. If SSL is requested
set the scheme appropriately.
Scheme and netloc are saved for later use in comparisons.
"""
query_params = self.test_data['query_parameters']
ssl = self.test_data['ssl']
parsed_url = urlparse.urlsplit(url)
if not parsed_url.scheme:
full_url = utils.create_url(url, self.host, port=self.port,
prefix=self.prefix, ssl=ssl)
# parse again to set updated netloc and scheme
parsed_url = urlparse.urlsplit(full_url)
self.scheme = parsed_url.scheme
self.netloc = parsed_url.netloc
if query_params:
query_string = self._update_query_params(parsed_url.query,
query_params)
else:
query_string = parsed_url.query
return urlparse.urlunsplit((parsed_url.scheme, parsed_url.netloc,
parsed_url.path, query_string, ''))
def create_url(base_url, host, port=None, prefix='', ssl=False):
"""Given pieces of a path-based url, return a fully qualified url."""
scheme = 'http'
# A host with : in it at this stage is assumed to be an IPv6
# address of some kind (they come in many forms). Port should
# already have been stripped off.
if ':' in host and not (host.startswith('[') and host.endswith(']')):
host = '[%s]' % host
if port and not _port_follows_standard(port, ssl):
netloc = '%s:%s' % (host, port)
else:
netloc = host
if ssl:
scheme = 'https'
parsed_url = urlparse.urlsplit(base_url)
query_string = parsed_url.query
path = parsed_url.path
# Guard against a prefix of None or the url already having the
# prefix. Without the startswith check, the tests in prefix.yaml
# fail. This is a pragmatic fix which does this for any URL in a
# test request that does not have a scheme and does not
# distinguish between URLs in a gabbi test file and those
# generated by the server. Idealy we would not mutate nor need
# to check URLs returned from the server. Doing that, however,
# would require more complex data handling than we have now and
# this covers most common cases and will be okay until someone
# reports a bug.
if prefix and not path.startswith(prefix):
prefix = prefix.rstrip('/')
path = path.lstrip('/')
path = '%s/%s' % (prefix, path)
return urlparse.urlunsplit((scheme, netloc, path, query_string, ''))
def _update_link_prefix(self, orig_url, prefix):
if not prefix:
return orig_url
url_parts = list(urlparse.urlsplit(orig_url))
prefix_parts = list(urlparse.urlsplit(prefix))
url_parts[0:2] = prefix_parts[0:2]
url_parts[2] = prefix_parts[2] + url_parts[2]
return urlparse.urlunsplit(url_parts).rstrip('/')
def get_path(url):
p = urlsplit(url)
return urlunsplit(['', '', p.path or '/', p.query, p.fragment])
def _url_from_primitives(self):
if self.port == 443:
scheme = 'https'
else:
scheme = 'http'
if self.port and self.port not in [443, 80]:
port = ':%s' % self.port
else:
port = ''
netloc = self.host + port
return urlparse.urlunsplit((scheme, netloc, self.script_name,
None, None))
def __init__(self, bus, gconf):
"""Bus is the cherrypy engine and gconf is a dictionary
containing the CherryPy configuration.
"""
SimplePlugin.__init__(self, bus)
if "pybonjour" not in globals():
self.start = lambda: None
self.exit = lambda: None
return
self.name = "pkg(7) mirror on {0}".format(socket.gethostname())
self.wanted_name = self.name
self.regtype = "_pkg5._tcp"
self.port = gconf["server.socket_port"]
self.sd_hdl = None
self.reg_ok = False
if gconf["server.ssl_certificate"] and \
gconf["server.ssl_private_key"]:
proto = "https"
else:
proto = "http"
netloc = "{0}:{1}".format(socket.getfqdn(), self.port)
self.url = urlunsplit((proto, netloc, '', '', ''))
def _get_url(self):
"""Return the base URL of the Foreman deployment being tested.
The following values from the config file are used to build the URL:
* ``[server] scheme`` (default: https)
* ``[server] hostname`` (required)
* ``[server] port`` (default: none)
Setting ``port`` to 80 does *not* imply that ``scheme`` is 'https'. If
``port`` is 80 and ``scheme`` is unset, ``scheme`` will still default
to 'https'.
:return: A URL.
:rtype: str
"""
if not self.scheme:
scheme = 'https'
else:
scheme = self.scheme
# All anticipated error cases have been handled at this point.
if not self.port:
return urlunsplit((scheme, self.hostname, '', '', ''))
else:
return urlunsplit((
scheme, '{0}:{1}'.format(self.hostname, self.port), '', '', ''
))
def _update_link_prefix(self, orig_url, prefix):
if not prefix:
return orig_url
url_parts = list(urlparse.urlsplit(orig_url))
prefix_parts = list(urlparse.urlsplit(prefix))
url_parts[0:2] = prefix_parts[0:2]
url_parts[2] = prefix_parts[2] + url_parts[2]
return urlparse.urlunsplit(url_parts).rstrip('/')
def get_request_uri(request):
"""Returns the "exact" url used to call request.
Like :func:`django.http.request.HTTPRequest.build_absolute_uri`,
but also inlines HTTP basic auth, if present.
"""
url = request.build_absolute_uri()
basic_auth = get_request_basic_auth(request)
if basic_auth is not None:
# must reassemble url with auth
parts = urlsplit(url)
url = urlunsplit((parts.scheme, basic_auth + '@' + parts.netloc,
parts.path, parts.query, parts.fragment))
return url
def encode_uri(uri):
split = list(urlsplit(uri))
split[1] = split[1].encode('idna').decode('ascii')
split[2] = quote_plus(split[2].encode('utf-8'), '/').decode('ascii')
query = list((q, quote_plus(v.encode('utf-8')))
for (q, v) in parse_qsl(split[3]))
split[3] = urlencode(query).decode('ascii')
return urlunsplit(split)
def _cs_request(self, url, method, **kwargs):
if not self.management_url:
self.authenticate()
if url is None:
# To get API version information, it is necessary to GET
# a dh endpoint directly without "v2/<tenant-id>".
magic_tuple = parse.urlsplit(self.management_url)
scheme, netloc, path, query, frag = magic_tuple
path = re.sub(r'v[1-9]/[a-z0-9]+$', '', path)
url = parse.urlunsplit((scheme, netloc, path, None, None))
else:
if self.service_catalog:
url = self.get_service_url(self.service_type) + url
else:
# NOTE(melwitt): The service catalog is not available
# when bypass_url is used.
url = self.management_url + url
# Perform the request once. If we get a 401 back then it
# might be because the auth token expired, so try to
# re-authenticate and try again. If it still fails, bail.
try:
kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token
kwargs['headers']['Content-Type'] = 'application/json'
if self.projectid:
kwargs['headers']['X-Auth-Project-Id'] = self.projectid
resp, body = self._time_request(url, method, **kwargs)
return resp, body
except exceptions.Unauthorized as e:
try:
# first discard auth token, to avoid the possibly expired
# token being re-used in the re-authentication attempt
self.unauthenticate()
# overwrite bad token
self.keyring_saved = False
self.authenticate()
kwargs['headers']['X-Auth-Token'] = self.auth_token
resp, body = self._time_request(url, method, **kwargs)
return resp, body
except exceptions.Unauthorized:
raise e
def _cs_request(self, url, method, **kwargs):
if not self.management_url:
self.authenticate()
if url is None:
# To get API version information, it is necessary to GET
# a nova endpoint directly without "v2/<tenant-id>".
magic_tuple = parse.urlsplit(self.management_url)
scheme, netloc, path, query, frag = magic_tuple
path = re.sub(r'v[1-9]/[a-z0-9]+$', '', path)
url = parse.urlunsplit((scheme, netloc, path, None, None))
else:
if self.service_catalog:
url = self.get_service_url(self.service_type) + url
else:
# NOTE(melwitt): The service catalog is not available
# when bypass_url is used.
url = self.management_url + url
# Perform the request once. If we get a 401 back then it
# might be because the auth token expired, so try to
# re-authenticate and try again. If it still fails, bail.
try:
kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token
kwargs['headers']['Content-Type'] = 'application/json'
if self.projectid:
kwargs['headers']['X-Auth-Project-Id'] = self.projectid
resp, body = self._time_request(url, method, **kwargs)
return resp, body
except exceptions.Unauthorized as e:
try:
# first discard auth token, to avoid the possibly expired
# token being re-used in the re-authentication attempt
self.unauthenticate()
# overwrite bad token
self.keyring_saved = False
self.authenticate()
kwargs['headers']['X-Auth-Token'] = self.auth_token
resp, body = self._time_request(url, method, **kwargs)
return resp, body
except exceptions.Unauthorized:
raise e
def _cs_request(self, url, method, **kwargs):
if not self.management_url:
self.authenticate()
if url is None:
# To get API version information, it is necessary to GET
# a icgw endpoint directly without "v2/<tenant-id>".
magic_tuple = parse.urlsplit(self.management_url)
scheme, netloc, path, query, frag = magic_tuple
path = re.sub(r'v[1-9]/[a-z0-9]+$', '', path)
url = parse.urlunsplit((scheme, netloc, path, None, None))
else:
if self.service_catalog:
url = self.get_service_url(self.service_type) + url
else:
# NOTE(melwitt): The service catalog is not available
# when bypass_url is used.
url = self.management_url + url
# Perform the request once. If we get a 401 back then it
# might be because the auth token expired, so try to
# re-authenticate and try again. If it still fails, bail.
try:
kwargs.setdefault('headers', {})['X-Auth-Token'] = self.auth_token
kwargs['headers']['Content-Type'] = 'application/json'
if self.projectid:
kwargs['headers']['X-Auth-Project-Id'] = self.projectid
resp, body = self._time_request(url, method, **kwargs)
return resp, body
except exceptions.Unauthorized as e:
try:
# first discard auth token, to avoid the possibly expired
# token being re-used in the re-authentication attempt
self.unauthenticate()
# overwrite bad token
self.keyring_saved = False
self.authenticate()
kwargs['headers']['X-Auth-Token'] = self.auth_token
resp, body = self._time_request(url, method, **kwargs)
return resp, body
except exceptions.Unauthorized:
raise e