def validate_config(self, organization, config, actor=None):
if config.get('url'):
client = self.get_client(actor)
# parse out the repo name and the instance
parts = urlparse(config['url'])
instance = parts.netloc
name = parts.path.rsplit('_git/', 1)[-1]
project = config.get('project') or name
try:
repo = client.get_repo(instance, name, project)
except Exception as e:
self.raise_error(e, identity=client.auth)
config.update({
'instance': instance,
'project': project,
'name': repo['name'],
'external_id': six.text_type(repo['id']),
'url': repo['_links']['web']['href'],
})
return config
python类urlparse()的实例源码
def _connect_request(self, vdi_ref):
# request connection to xapi url service for VDI export
try:
# create task for VDI export
label = 'VDI_EXPORT_for_' + self.instance['name']
desc = 'Exporting VDI for instance: %s' % self.instance['name']
self.task_ref = self.session.task.create(label, desc)
LOG.debug("task_ref is %s" % self.task_ref)
# connect to XS
xs_url = urlparse.urlparse(self.host_url)
if xs_url.scheme == 'http':
conn = httplib.HTTPConnection(xs_url.netloc)
LOG.debug("using http")
elif xs_url.scheme == 'https':
conn = httplib.HTTPSConnection(xs_url.netloc)
LOG.debug("using https")
vdi_export_path = utils.get_vdi_export_path(
self.session, self.task_ref, vdi_ref)
conn.request('GET', vdi_export_path)
conn_resp = conn.getresponse()
except Exception:
LOG.debug('request connect for vdi export failed')
raise
return conn_resp
def make_temp_file_path_from_url(temp_dir_path, url):
try:
url_path = urlparse(url).path
except AttributeError:
raise InvalidFilePathError("url must be a string")
if typepy.is_null_string(url_path):
raise InvalidFilePathError("invalid URL path: {}".format(url_path))
temp_name = os.path.basename(url_path.rstrip("/"))
if typepy.is_null_string(temp_name):
temp_name = pathvalidate.replace_symbol(
temp_name, replacement_text="_")
if typepy.is_null_string(temp_name):
raise InvalidFilePathError("invalid URL: {}".format(url))
try:
return posixpath.join(temp_dir_path, temp_name)
except (TypeError, AttributeError):
raise InvalidFilePathError("temp_dir_path must be a string")
def get_filename(self, task, default_ext):
"""Set the path where the image will be saved.
The default strategy is to use an increasing 6-digit number as
the filename. You can override this method if you want to set custom
naming rules. The file extension is kept if it can be obtained from
the url, otherwise ``default_ext`` is used as extension.
Args:
task (dict): The task dict got from ``task_queue``.
Output:
Filename with extension.
"""
url_path = urlparse(task['file_url'])[2]
extension = url_path.split('.')[-1] if '.' in url_path else default_ext
file_idx = self.fetched_num + self.file_idx_offset
return '{:06d}.{}'.format(file_idx, extension)
def do_GET(self):
self.send_response(200)
self.send_header('Content-Type', 'text/plain')
self.end_headers()
parts = urlparse(self.path)
query = parse_qs(parts.query)
result = {}
for k, v in query.items():
if len(v) == 1:
result[k] = v[0]
elif len(v) == 0:
result[k] = None
else:
result[k] = v
self.server.callback_result = result
self.wfile.write(b'You can close this window now')
def sort_url_by_query_keys(url):
"""A helper function which sorts the keys of the query string of a url.
For example, an input of '/v2/tasks?sort_key=id&sort_dir=asc&limit=10'
returns '/v2/tasks?limit=10&sort_dir=asc&sort_key=id'. This is to
prevent non-deterministic ordering of the query string causing
problems with unit tests.
:param url: url which will be ordered by query keys
:returns url: url with ordered query keys
"""
parsed = urlparse.urlparse(url)
queries = urlparse.parse_qsl(parsed.query, True)
sorted_query = sorted(queries, key=lambda x: x[0])
encoded_sorted_query = urlparse.urlencode(sorted_query, True)
url_parts = (parsed.scheme, parsed.netloc, parsed.path,
parsed.params, encoded_sorted_query,
parsed.fragment)
return urlparse.urlunparse(url_parts)
def strip_version(endpoint):
"""Strip version from the last component of endpoint if present."""
# NOTE(flaper87): This shouldn't be necessary if
# we make endpoint the first argument. However, we
# can't do that just yet because we need to keep
# backwards compatibility.
if not isinstance(endpoint, six.string_types):
raise ValueError("Expected endpoint")
version = None
# Get rid of trailing '/' if present
endpoint = endpoint.rstrip('/')
url_parts = parse.urlparse(endpoint)
(scheme, netloc, path, __, __, __) = url_parts
path = path.lstrip('/')
# regex to match 'v1' or 'v2.0' etc
if re.match('v\d+\.?\d*', path):
version = float(path.lstrip('v'))
endpoint = scheme + '://' + netloc
return endpoint, version
def validate_link(self, link, bookmark=False):
"""Checks if the given link can get correct data."""
# removes the scheme and net location parts of the link
url_parts = list(urlparse.urlparse(link))
url_parts[0] = url_parts[1] = ''
# bookmark link should not have the version in the URL
if bookmark and url_parts[2].startswith(PATH_PREFIX):
return False
full_path = urlparse.urlunparse(url_parts)
try:
self.get_json(full_path, path_prefix='')
return True
except Exception:
return False
def get_contents_if_file(contents_or_file_name):
"""Get the contents of a file.
If the value passed in is a file name or file URI, return the
contents. If not, or there is an error reading the file contents,
return the value passed in as the contents.
For example, a workflow definition will be returned if either the
workflow definition file name, or file URI are passed in, or the
actual workflow definition itself is passed in.
"""
try:
if parse.urlparse(contents_or_file_name).scheme:
definition_url = contents_or_file_name
else:
path = os.path.abspath(contents_or_file_name)
definition_url = parse.urljoin(
'file:',
request.pathname2url(path)
)
return request.urlopen(definition_url).read().decode('utf8')
except Exception:
return contents_or_file_name
def __init__(self, requirement_string):
try:
req = REQUIREMENT.parseString(requirement_string)
except ParseException as e:
raise InvalidRequirement(
"Invalid requirement, parse error at \"{0!r}\"".format(
requirement_string[e.loc:e.loc + 8]))
self.name = req.name
if req.url:
parsed_url = urlparse.urlparse(req.url)
if not (parsed_url.scheme and parsed_url.netloc) or (
not parsed_url.scheme and not parsed_url.netloc):
raise InvalidRequirement("Invalid URL given")
self.url = req.url
else:
self.url = None
self.extras = set(req.extras.asList() if req.extras else [])
self.specifier = SpecifierSet(req.specifier)
self.marker = req.marker if req.marker else None
def __init__(self, url, protocols=None, agent=None):
self.url = url
self.protocols = protocols or []
self.agent = agent or constants.USER_AGENT
self._headers = []
_url = urlparse(url)
self.scheme = _url.scheme
host, _, port = _url.netloc.partition(':')
if not port:
port = '443' if self.scheme == 'wss' else '80'
if not port.isdigit():
raise ValueError('illegal port value')
port = int(port)
self.host = host
self.port = port
self._host_port = "{}:{}".format(host, port)
self.resource = _url.path or '/'
if _url.query:
self.resource = "{}?{}".format(self.resource, _url.query)
self.state = self.State()
def test_redirects_without_credentials(self):
request = self.factory.get('/test')
request.session = self.session
@decorators.oauth_required
def test_view(request):
return http.HttpResponse('test') # pragma: NO COVER
response = test_view(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
self.assertEqual(parse.urlparse(response['Location']).path,
'/oauth2/oauth2authorize/')
self.assertIn(
'return_url=%2Ftest', parse.urlparse(response['Location']).query)
self.assertEqual(response.status_code,
http.HttpResponseRedirect.status_code)
def __init__(self, requirement_string):
try:
req = REQUIREMENT.parseString(requirement_string)
except ParseException as e:
raise InvalidRequirement(
"Invalid requirement, parse error at \"{0!r}\"".format(
requirement_string[e.loc:e.loc + 8]))
self.name = req.name
if req.url:
parsed_url = urlparse.urlparse(req.url)
if not (parsed_url.scheme and parsed_url.netloc) or (
not parsed_url.scheme and not parsed_url.netloc):
raise InvalidRequirement("Invalid URL given")
self.url = req.url
else:
self.url = None
self.extras = set(req.extras.asList() if req.extras else [])
self.specifier = SpecifierSet(req.specifier)
self.marker = req.marker if req.marker else None
def extract_image_url(text):
text = _strip_url(text)
imgurl = None
if text:
# check if the text is style content
match = _CSS_IMAGERE.search(text)
text = match.groups()[0] if match else text
parsed = urlparse(text)
path = None
match = _IMAGE_PATH_RE.search(parsed.path)
if match:
path = match.group()
elif parsed.query:
match = _GENERIC_PATH_RE.search(parsed.path)
if match:
path = match.group()
if path is not None:
parsed = list(parsed)
parsed[2] = path
imgurl = urlunparse(parsed)
if not imgurl:
imgurl = text
return imgurl
def _fix_esx_url(url, host, port):
"""Fix netloc in the case of an ESX host.
In the case of an ESX host, the netloc is set to '*' in the URL
returned in HttpNfcLeaseInfo. It should be replaced with host name
or IP address.
"""
urlp = urlparse.urlparse(url)
if urlp.netloc == '*':
scheme, netloc, path, params, query, fragment = urlp
if netutils.is_valid_ipv6(host):
netloc = '[%s]:%d' % (host, port)
else:
netloc = "%s:%d" % (host, port)
url = urlparse.urlunparse((scheme,
netloc,
path,
params,
query,
fragment))
return url
def __init__(self, requirement_string):
try:
req = REQUIREMENT.parseString(requirement_string)
except ParseException as e:
raise InvalidRequirement(
"Invalid requirement, parse error at \"{0!r}\"".format(
requirement_string[e.loc:e.loc + 8]))
self.name = req.name
if req.url:
parsed_url = urlparse.urlparse(req.url)
if not (parsed_url.scheme and parsed_url.netloc) or (
not parsed_url.scheme and not parsed_url.netloc):
raise InvalidRequirement("Invalid URL given")
self.url = req.url
else:
self.url = None
self.extras = set(req.extras.asList() if req.extras else [])
self.specifier = SpecifierSet(req.specifier)
self.marker = req.marker if req.marker else None
def _decorate_request(self, filters, method, url, headers=None, body=None,
auth_data=None):
if auth_data is None:
auth_data = self.auth_data
token, _ = auth_data
#base_url = self.base_url(filters=filters, auth_data=auth_data)
base_url = url
# build authenticated request
# returns new request, it does not touch the original values
_headers = copy.deepcopy(headers) if headers is not None else {}
_headers['X-Auth-Token'] = str(token)
if url is None or url == "":
_url = base_url
else:
# Join base URL and url, and remove multiple contiguous slashes
_url = "/".join([base_url, url])
parts = [x for x in urlparse.urlparse(_url)]
parts[2] = re.sub("/{2,}", "/", parts[2])
_url = urlparse.urlunparse(parts)
# no change to method or body
return str(_url), _headers, body
def visit_ls(self, node, children):
path = urlparse(self.context_override.url).path
path = filter(None, path.split('/'))
nodes = self.context.root.ls(*path)
if self.output.isatty():
names = []
for node in nodes:
token_type = String if node.data.get('type') == 'dir' else Name
name = self._colorize(node.name, token_type)
names.append(name)
lines = list(colformat(list(names)))
else:
lines = [n.name for n in nodes]
if lines:
self.output.write('\n'.join(lines))
return node
def _drop_db(self):
addr = urlparse.urlparse(self.url)
database = addr.path.strip('/')
loc_pieces = addr.netloc.split('@')
host = loc_pieces[1]
auth_pieces = loc_pieces[0].split(':')
user = auth_pieces[0]
password = ""
if len(auth_pieces) > 1:
if auth_pieces[1].strip():
password = "-p\"%s\"" % auth_pieces[1]
sql = ("drop database if exists %(database)s; create "
"database %(database)s;") % {'database': database}
cmd = ("mysql -u \"%(user)s\" %(password)s -h %(host)s "
"-e \"%(sql)s\"") % {'user': user, 'password': password,
'host': host, 'sql': sql}
execute_cmd(cmd)
def get_connection_params(endpoint, **kwargs):
parts = urlparse.urlparse(endpoint)
# trim API version and trailing slash from endpoint
path = parts.path
path = path.rstrip('/').rstrip(API_VERSION)
_args = (parts.hostname, parts.port, path)
_kwargs = {'timeout': (float(kwargs.get('timeout'))
if kwargs.get('timeout') else 600)}
if parts.scheme == 'https':
_class = VerifiedHTTPSConnection
_kwargs['ca_file'] = kwargs.get('ca_file', None)
_kwargs['cert_file'] = kwargs.get('cert_file', None)
_kwargs['key_file'] = kwargs.get('key_file', None)
_kwargs['insecure'] = kwargs.get('insecure', False)
elif parts.scheme == 'http':
_class = six.moves.http_client.HTTPConnection
else:
msg = 'Unsupported scheme: %s' % parts.scheme
raise exceptions.EndpointException(msg)
return (_class, _args, _kwargs)
def test_redirects_without_credentials(self):
request = self.factory.get('/test')
request.session = self.session
@decorators.oauth_required
def test_view(request):
return http.HttpResponse('test') # pragma: NO COVER
response = test_view(request)
self.assertIsInstance(response, http.HttpResponseRedirect)
self.assertEqual(parse.urlparse(response['Location']).path,
'/oauth2/oauth2authorize/')
self.assertIn(
'return_url=%2Ftest', parse.urlparse(response['Location']).query)
self.assertEqual(response.status_code,
http.HttpResponseRedirect.status_code)
def test_absolute_url(self):
"""
Does the server return absolute URIs?
Relies on the root returing a ``self`` object.
"""
response = requests.get(self.root_url)
resource = response.json().get('href')
assert resource is not None
href = urlparse(resource)
# Test in decreasing order of likely correctness:
# Path => Domain => Scheme
assert href.path == '/'
# Get the origin name, minus the scheme.
assert href.netloc == urlparse(self.origin).netloc
assert href.scheme == 'http'
resource = response.json().get('buckets').get('1.0')
href = urlparse(resource)
assert href.path.endswith('buckets/1.0')
def add_capability(self, name, uri):
"""
Add a capability of the RETS board
:param name: The name of the capability
:param uri: The capability URI given by the RETS board
:return: None
"""
parse_results = urlparse(uri)
if parse_results.hostname is None:
# relative URL given, so build this into an absolute URL
login_url = self.capabilities.get('Login')
if not login_url:
logger.error("There is no login URL stored, so additional capabilities cannot be added.")
raise ValueError("Cannot automatically determine absolute path for {0!s} given.".format(uri))
parts = urlparse(login_url)
uri = parts.scheme + '://' + parts.hostname + '/' + uri.lstrip('/')
self.capabilities[name] = uri
def __init__(self, requirement_string):
try:
req = REQUIREMENT.parseString(requirement_string)
except ParseException as e:
raise InvalidRequirement(
"Invalid requirement, parse error at \"{0!r}\"".format(
requirement_string[e.loc:e.loc + 8]))
self.name = req.name
if req.url:
parsed_url = urlparse.urlparse(req.url)
if not (parsed_url.scheme and parsed_url.netloc) or (
not parsed_url.scheme and not parsed_url.netloc):
raise InvalidRequirement("Invalid URL given")
self.url = req.url
else:
self.url = None
self.extras = set(req.extras.asList() if req.extras else [])
self.specifier = SpecifierSet(req.specifier)
self.marker = req.marker if req.marker else None
def _get_proxy_config(self):
"""
Determine if self._url should be passed through a proxy. If so, return
the appropriate proxy_server and proxy_port. Assumes https_proxy is used
when ssl_enabled=True.
"""
proxy_server = None
proxy_port = None
ssl_enabled = self._ssl_enabled
if ssl_enabled:
proxy = os.environ.get("https_proxy")
else:
proxy = os.environ.get("http_proxy")
no_proxy = os.environ.get("no_proxy")
no_proxy_url = no_proxy and self._server in no_proxy
if proxy and not no_proxy_url:
p = urlparse(proxy)
proxy_server = p.hostname
proxy_port = p.port
return proxy_server, proxy_port
def __init__(self, base_url='https://hydro1.gesdisc.eosdis.nasa.gov/data/NLDAS/', **layout_kwargs):
self.base_url = base_url
self.selected_url = None
if 'min_width' not in layout_kwargs:
layout_kwargs['min_width'] = '30%'
self.label_layout = Layout(**layout_kwargs)
dd = widgets.Select(
options=self.get_links(
base_url,
href_filter=self.dir_and_not_data,
),
description='', #urlparse(base_url).path,
)
dd.observe(partial(self.on_value_change, url=self.base_url), names='value')
lbl = widgets.Label(urlparse(self.base_url).path, layout=self.label_layout)
hbox = widgets.HBox([lbl, dd])
self.elts = [hbox, lbl, dd]
display(hbox)
def on_value_change(self, change, url):
next_url = change['new']
if next_url is None: # 'Select...' chosen
return
if next_url.endswith('.grb'): # File reached
return self.select_url(next_url)
[w.close() for w in self.elts]
links = self.get_links(next_url,
href_filter=(self.dir_and_not_data
if next_url == self.base_url
else self.dir_or_grib))
if not links:
return
next_dd = widgets.Select(
options=links,
description='', #urlparse(url).path,
)
next_dd.observe(partial(self.on_value_change, url=next_url), names='value')
lbl = widgets.Label(urlparse(next_url).path, layout=self.label_layout)
hbox = widgets.HBox([lbl, next_dd])
self.elts = [hbox, lbl, next_dd]
display(hbox)
def __init__(self, requirement_string):
try:
req = REQUIREMENT.parseString(requirement_string)
except ParseException as e:
raise InvalidRequirement(
"Invalid requirement, parse error at \"{0!r}\"".format(
requirement_string[e.loc:e.loc + 8]))
self.name = req.name
if req.url:
parsed_url = urlparse.urlparse(req.url)
if not (parsed_url.scheme and parsed_url.netloc) or (
not parsed_url.scheme and not parsed_url.netloc):
raise InvalidRequirement("Invalid URL given")
self.url = req.url
else:
self.url = None
self.extras = set(req.extras.asList() if req.extras else [])
self.specifier = SpecifierSet(req.specifier)
self.marker = req.marker if req.marker else None
def __init__(self, requirement_string):
try:
req = REQUIREMENT.parseString(requirement_string)
except ParseException as e:
raise InvalidRequirement(
"Invalid requirement, parse error at \"{0!r}\"".format(
requirement_string[e.loc:e.loc + 8]))
self.name = req.name
if req.url:
parsed_url = urlparse.urlparse(req.url)
if not (parsed_url.scheme and parsed_url.netloc) or (
not parsed_url.scheme and not parsed_url.netloc):
raise InvalidRequirement("Invalid URL given")
self.url = req.url
else:
self.url = None
self.extras = set(req.extras.asList() if req.extras else [])
self.specifier = SpecifierSet(req.specifier)
self.marker = req.marker if req.marker else None
def validate_href(self, image_href):
path = urlparse.urlparse(image_href).path.lstrip('/')
if not path:
raise exception.ImageRefValidationFailed(
_("No path specified in swift resource reference: %s. "
"Reference must be like swift:container/path")
% str(image_href))
container, s, object = path.partition('/')
try:
headers = self.client.head_object(container, object)
except exception.SwiftOperationError as e:
raise exception.ImageRefValidationFailed(
_("Cannot fetch %(url)s resource. %(exc)s") %
dict(url=str(image_href), exc=str(e)))
return (container, object, headers)