def urlsplit(url, scheme='', allow_fragments=True):
"""Parse a URL using urlparse.urlsplit(), splitting query and fragments.
This function papers over Python issue9374_ when needed.
.. _issue9374: http://bugs.python.org/issue9374
The parameters are the same as urlparse.urlsplit.
"""
scheme, netloc, path, query, fragment = parse.urlsplit(
url, scheme, allow_fragments)
if allow_fragments and '#' in path:
path, fragment = path.split('#', 1)
if '?' in path:
path, query = path.split('?', 1)
return _ModifiedSplitResult(scheme, netloc,
path, query, fragment)
python类urlsplit()的实例源码
def _get_session(self, url):
if self._connection_pool:
magic_tuple = parse.urlsplit(url)
scheme, netloc, path, query, frag = magic_tuple
service_url = '%s://%s' % (scheme, netloc)
if self._current_url != service_url:
# Invalidate Session object in case the url is somehow changed
if self._session:
self._session.close()
self._current_url = service_url
self._logger.debug(
"New session created for: (%s)" % service_url)
self._session = requests.Session()
self._session.mount(service_url,
self._connection_pool.get(service_url))
return self._session
elif self._session:
return self._session
# @set_headers_param
def _get_session(self, url):
if self._connection_pool:
magic_tuple = parse.urlsplit(url)
scheme, netloc, path, query, frag = magic_tuple
service_url = '%s://%s' % (scheme, netloc)
if self._current_url != service_url:
# Invalidate Session object in case the url is somehow changed
if self._session:
self._session.close()
self._current_url = service_url
self._logger.debug(
"New session created for: (%s)" % service_url)
self._session = requests.Session()
self._session.mount(service_url,
self._connection_pool.get(service_url))
return self._session
elif self._session:
return self._session
def _get_session(self, url):
if self._connection_pool:
magic_tuple = parse.urlsplit(url)
scheme, netloc, path, query, frag = magic_tuple
service_url = '%s://%s' % (scheme, netloc)
if self._current_url != service_url:
# Invalidate Session object in case the url is somehow changed
if self._session:
self._session.close()
self._current_url = service_url
self._logger.debug(
"New session created for: (%s)" % service_url)
self._session = requests.Session()
self._session.mount(service_url,
self._connection_pool.get(service_url))
return self._session
elif self._session:
return self._session
def load_tests(loader, tests, pattern):
"""Provide a TestSuite to the discovery process."""
gnocchi_url = os.getenv('GNOCCHI_ENDPOINT')
if gnocchi_url:
parsed_url = urlparse.urlsplit(gnocchi_url)
prefix = parsed_url.path.rstrip('/') # turn it into a prefix
# NOTE(chdent): gabbi requires a port be passed or it will
# default to 8001, so we must dance a little dance to get
# the right ports. Probably gabbi needs to change.
# https://github.com/cdent/gabbi/issues/50
port = 443 if parsed_url.scheme == 'https' else 80
if parsed_url.port:
port = parsed_url.port
test_dir = os.path.join(os.path.dirname(__file__), TESTS_DIR)
return driver.build_tests(test_dir, loader,
host=parsed_url.hostname,
port=port,
prefix=prefix)
elif os.getenv("GABBI_LIVE"):
raise RuntimeError('"GNOCCHI_ENDPOINT" is not set')
def update_query_parameters(url, query_parameters):
"""
Return url with updated query parameters.
Arguments:
url (str): Original url whose query parameters need to be updated.
query_parameters (dict): A dictionary containing query parameters to be added to course selection url.
Returns:
(slug): slug identifier for the identity provider that can be used for identity verification of
users associated the enterprise customer of the given user.
"""
scheme, netloc, path, query_string, fragment = urlsplit(url)
url_params = parse_qs(query_string)
# Update url query parameters
url_params.update(query_parameters)
return urlunsplit(
(scheme, netloc, path, urlencode(url_params, doseq=True), fragment),
)
def remove_trailing_version_from_href(href):
"""Removes the api version from the href.
Given: 'http://www.masakari.com/ha/v1.1'
Returns: 'http://www.masakari.com/ha'
Given: 'http://www.masakari.com/v1.1'
Returns: 'http://www.masakari.com'
"""
parsed_url = urlparse.urlsplit(href)
url_parts = parsed_url.path.rsplit('/', 1)
# NOTE: this should match vX.X or vX
expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
if not expression.match(url_parts.pop()):
LOG.debug('href %s does not contain version', href)
raise ValueError(_('href %s does not contain version') % href)
new_path = url_join(*url_parts)
parsed_url = list(parsed_url)
parsed_url[2] = new_path
return urlparse.urlunsplit(parsed_url)
def _init_from_url(self, url):
port = None
parsed_url = urlparse.urlsplit(url)
if ':' in parsed_url.netloc:
host, port = parsed_url.netloc.split(':')
else:
host = parsed_url.netloc
if not port:
if parsed_url.scheme == 'https':
port = 443
else:
port = 80
path = parsed_url.path
if path == '/' or not path:
self.script_name = ''
else:
self.script_name = path
self.host = host
self.port = int(port)
def resolve(self, url, env, hostname):
if hostname in self.proxy_apps.keys():
parts = urlsplit(url)
full = parts.path
if parts.query:
full += '?' + parts.query
env['REQUEST_URI'] = full
env['wsgiprox.matched_proxy_host'] = hostname
env['wsgiprox.proxy_host'] = hostname
else:
env['REQUEST_URI'] = self.prefix_resolver(url, env)
env['wsgiprox.proxy_host'] = self.proxy_host
queryparts = env['REQUEST_URI'].split('?', 1)
env['PATH_INFO'] = queryparts[0]
env['QUERY_STRING'] = queryparts[1] if len(queryparts) > 1 else ''
def remove_trailing_version_from_href(href):
"""Removes the api version from the href.
Given: 'http://www.nova.com/compute/v1.1'
Returns: 'http://www.nova.com/compute'
Given: 'http://www.nova.com/v1.1'
Returns: 'http://www.nova.com'
"""
parsed_url = urlparse.urlsplit(href)
url_parts = parsed_url.path.rsplit('/', 1)
# NOTE: this should match vX.X or vX
expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
if not expression.match(url_parts.pop()):
LOG.debug('href %s does not contain version', href)
raise ValueError(_('href %s does not contain version') % href)
new_path = url_join(*url_parts)
parsed_url = list(parsed_url)
parsed_url[2] = new_path
return urlparse.urlunsplit(parsed_url)
def get_plane_uri(cls, observation_uri, product_id):
"""
Initializes an Plane URI instance
Arguments:
observation_uri : the uri of the observation
product_id : ID of the product
"""
caom_util.type_check(observation_uri, ObservationURI,
"observation_uri",
override=False)
caom_util.type_check(product_id, str, "observation_uri",
override=False)
caom_util.validate_path_component(cls, "product_id", product_id)
path = urlsplit(observation_uri.uri).path
uri = SplitResult(ObservationURI._SCHEME, "", path + "/" +
product_id, "", "").geturl()
return cls(uri)
# Properties
def uri(self, value):
caom_util.type_check(value, str, "uri", override=False)
tmp = urlsplit(value)
if tmp.scheme != ObservationURI._SCHEME:
raise ValueError("{} doesn't have an allowed scheme".format(value))
if tmp.geturl() != value:
raise ValueError("Failed to parse uri correctly: {}".format(value))
(collection, observation_id, product_id) = tmp.path.split("/")
if product_id is None:
raise ValueError("Faield to get product ID from uri: {}"
.format(value))
self._product_id = product_id
self._observation_uri = \
ObservationURI.get_observation_uri(collection, observation_id)
self._uri = value
def _url_scheme(self, url):
return urlsplit(url).scheme
def stack_output(output):
if not output:
return u''
if isinstance(output, six.string_types):
parts = urlparse.urlsplit(output)
if parts.netloc and parts.scheme in ('http', 'https'):
url = html.escape(output)
safe_link = u'<a href="%s" target="_blank">%s</a>' % (url, url)
return safestring.mark_safe(safe_link)
if isinstance(output, dict) or isinstance(output, list):
output = json.dumps(output, indent=2)
return safestring.mark_safe(u'<pre>%s</pre>' % html.escape(output))
def remove_version_from_href(href):
"""Removes the first api version from the href.
Given: 'http://www.meteos.com/v1.1/123'
Returns: 'http://www.meteos.com/123'
Given: 'http://www.meteos.com/v1.1'
Returns: 'http://www.meteos.com'
"""
parsed_url = parse.urlsplit(href)
url_parts = parsed_url.path.split('/', 2)
# NOTE: this should match vX.X or vX
expression = re.compile(r'^v([0-9]+|[0-9]+\.[0-9]+)(/.*|$)')
if expression.match(url_parts[1]):
del url_parts[1]
new_path = '/'.join(url_parts)
if new_path == parsed_url.path:
msg = 'href %s does not contain version' % href
LOG.debug(msg)
raise ValueError(msg)
parsed_url = list(parsed_url)
parsed_url[2] = new_path
return parse.urlunsplit(parsed_url)
def _update_link_prefix(self, orig_url, prefix):
if not prefix:
return orig_url
url_parts = list(parse.urlsplit(orig_url))
prefix_parts = list(parse.urlsplit(prefix))
url_parts[0:2] = prefix_parts[0:2]
return parse.urlunsplit(url_parts)
def _get_url_parts(url):
url = _clean_url(url)
return urlsplit(url)
def get_host(path):
return urlparse.urlsplit(path).netloc
def can_fetch(self, user_agent, url):
parsed = urlsplit(url)
domain = parsed.netloc
if domain in self.robots_txt_cache:
robot_txt = self.robots_txt_cache[domain]
if time.time() - robot_txt.mtime() > self.robot_txt_age:
robot_txt = None
else:
robot_txt = None
if robot_txt is None:
robot_txt = RobotFileParser()
try:
response = yield gen.maybe_future(self.http_client.fetch(
urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
content = response.body
except tornado.httpclient.HTTPError as e:
logger.error('load robots.txt from %s error: %r', domain, e)
content = ''
try:
content = content.decode('utf8', 'ignore')
except UnicodeDecodeError:
content = ''
robot_txt.parse(content.splitlines())
self.robots_txt_cache[domain] = robot_txt
raise gen.Return(robot_txt.can_fetch(user_agent, url))
def _get_domain_bucket(self, url):
parsed = urlparse.urlsplit(url)
hostname, _, _ = parsed.netloc.partition(':')
return self.domain_cache.setdefault(hostname, {})
def get_path(url):
p = urlsplit(url)
return urlunsplit(['', '', p.path or '/', p.query, p.fragment])
def _reverse_domain_storage(item, media_root):
for obj in item.get('objects', []):
stored_url = obj['obj_stored_url']
assert '/' not in stored_url
domain = urlsplit(obj['obj_original_url']).netloc
if ':' in domain:
domain, _ = domain.split(':', 1)
parents = [p for p in reversed(domain.split('.')) if p]
os.makedirs(os.path.join(media_root, *parents), exist_ok=True)
stored_url_noext, _ = os.path.splitext(stored_url)
new_stored_url = os.path.sep.join(parents + [stored_url_noext])
dest = os.path.join(media_root, new_stored_url)
if not os.path.exists(dest):
shutil.copy(os.path.join(media_root, stored_url), dest)
obj['obj_stored_url'] = new_stored_url
def take_action(self, args):
"""Download a recipe from remote URL and save it to a local file under contrib directory.
Args:
args (:obj:`dict`): Parsed command line arguments.
"url" is an URL where a recipe will be downloaded from.
"""
file_url = args.url
filename = parse.urlsplit(file_url).path.split('/')[-1:][0]
contrib = utils.get_property_from_config_file('defaults', 'contrib')
self._download_recipe(file_url, filename, contrib)
def canonicalize_url(api_root_url):
api_root_url = urlparse.urlsplit(api_root_url).geturl()
if not api_root_url.endswith("/"):
api_root_url += "/"
return api_root_url
def open(self, target_uri, **kwargs):
"""Open target uri.
:param target_uri: Uri to open
:type target_uri: string
:returns: Target object
"""
target = urlsplit(target_uri, scheme=self.default_opener)
opener = self.get_opener(target.scheme)
query = opener.conform_query(target.query)
target = opener.get_target(
target.scheme,
target.path,
target.fragment,
target.username,
target.password,
target.hostname,
target.port,
query,
**kwargs
)
target.opener_path = target_uri
return target
def __init__(self, url_string):
split_url = urllib_parse.urlsplit(url_string)
self.scheme = split_url.scheme #:
self.username = split_url.username #:
self.password = split_url.password #:
self.hostname = split_url.hostname #:
self.port = split_url.port #:
self.path = split_url.path #:
self.query = split_url.query #:
self.fragment = split_url.fragment #:
def get_driver(conf):
"""Return the configured driver."""
split = parse.urlsplit(conf.indexer.url)
d = driver.DriverManager('gnocchi.indexer',
split.scheme).driver
return d(conf)
def _fully_qualify(environ, url):
"""Turn a URL path into a fully qualified URL."""
split_url = urlparse.urlsplit(url)
server_name = environ.get('SERVER_NAME')
server_port = str(environ.get('SERVER_PORT'))
server_scheme = environ.get('wsgi.url_scheme')
if server_port not in ['80', '443']:
netloc = '%s:%s' % (server_name, server_port)
else:
netloc = server_name
return urlparse.urlunsplit((server_scheme, netloc, split_url.path,
split_url.query, split_url.fragment))
def _parse_url(self, url):
"""Create a url from test data.
If provided with a full URL, just return that. If SSL is requested
set the scheme appropriately.
Scheme and netloc are saved for later use in comparisons.
"""
query_params = self.test_data['query_parameters']
ssl = self.test_data['ssl']
parsed_url = urlparse.urlsplit(url)
if not parsed_url.scheme:
full_url = utils.create_url(url, self.host, port=self.port,
prefix=self.prefix, ssl=ssl)
# parse again to set updated netloc and scheme
parsed_url = urlparse.urlsplit(full_url)
self.scheme = parsed_url.scheme
self.netloc = parsed_url.netloc
if query_params:
query_string = self._update_query_params(parsed_url.query,
query_params)
else:
query_string = parsed_url.query
return urlparse.urlunsplit((parsed_url.scheme, parsed_url.netloc,
parsed_url.path, query_string, ''))
def create_url(base_url, host, port=None, prefix='', ssl=False):
"""Given pieces of a path-based url, return a fully qualified url."""
scheme = 'http'
# A host with : in it at this stage is assumed to be an IPv6
# address of some kind (they come in many forms). Port should
# already have been stripped off.
if ':' in host and not (host.startswith('[') and host.endswith(']')):
host = '[%s]' % host
if port and not _port_follows_standard(port, ssl):
netloc = '%s:%s' % (host, port)
else:
netloc = host
if ssl:
scheme = 'https'
parsed_url = urlparse.urlsplit(base_url)
query_string = parsed_url.query
path = parsed_url.path
# Guard against a prefix of None or the url already having the
# prefix. Without the startswith check, the tests in prefix.yaml
# fail. This is a pragmatic fix which does this for any URL in a
# test request that does not have a scheme and does not
# distinguish between URLs in a gabbi test file and those
# generated by the server. Idealy we would not mutate nor need
# to check URLs returned from the server. Doing that, however,
# would require more complex data handling than we have now and
# this covers most common cases and will be okay until someone
# reports a bug.
if prefix and not path.startswith(prefix):
prefix = prefix.rstrip('/')
path = path.lstrip('/')
path = '%s/%s' % (prefix, path)
return urlparse.urlunsplit((scheme, netloc, path, query_string, ''))