def validate_href(self, image_href):
path = urlparse.urlparse(image_href).path.lstrip('/')
if not path:
raise exception.InvalidParameterValue(
_("No path specified in rsync resource reference: %s. "
"Reference must be like rsync:host::module/path")
% str(image_href))
try:
stdout, stderr = utils.execute(
'rsync', '--stats', '--dry-run',
path,
".",
check_exit_code=[0],
log_errors=processutils.LOG_ALL_ERRORS)
return path, stdout, stderr
except (processutils.ProcessExecutionError, OSError) as ex:
raise exception.ImageRefValidationFailed(
_("Cannot fetch %(url)s resource. %(exc)s") %
dict(url=str(image_href), exc=str(ex)))
python类urlparse()的实例源码
def get_glance_image_uuid_name(task, url):
"""Converting glance links.
Links like:
glance:name
glance://name
glance:uuid
glance://uuid
name
uuid
are converted to tuple
uuid, name
"""
urlobj = urlparse.urlparse(url)
if urlobj.scheme and urlobj.scheme != 'glance':
raise exception.InvalidImageRef("Only glance images are supported.")
path = urlobj.path or urlobj.netloc
img_info = _get_glanceclient(task.context).show(path)
return img_info['id'], img_info['name']
def url_download_raw_secured(context, node, url):
"""Download raw contents of the URL bypassing cache and temp files."""
if not url:
return
url = url.rstrip('/')
url = append_storage_prefix(node, url)
scheme = parse.urlparse(url).scheme
sources_with_tenant_isolation = ('glance', 'swift')
if scheme not in sources_with_tenant_isolation:
raise bareon_exception.UnsafeUrlError(
url=url,
details="Use one of the following storages "
"for this resource: %s"
% str(sources_with_tenant_isolation))
resource_storage = image_service.get_image_service(
url, context=context)
out = StringIO.StringIO()
try:
resource_storage.download(url, out)
return out.getvalue()
finally:
out.close()
def __init__(self, **kw):
super(FollowAllSpider, self).__init__(**kw)
url = 'http://localhost/books.toscrape.com/index.html'
if not url.startswith('http://') and not url.startswith('https://'):
url = 'http://%s/' % url
self.url = url
self.allowed_domains = [re.sub(r'^www\.', '', urlparse(url).hostname)]
self.link_extractor = LinkExtractor()
self.cookies_seen = set()
self.previtem = 0
self.items = 0
self.timesec = datetime.datetime.utcnow()
def __init__(self, requirement_string):
try:
req = REQUIREMENT.parseString(requirement_string)
except ParseException as e:
raise InvalidRequirement(
"Invalid requirement, parse error at \"{0!r}\"".format(
requirement_string[e.loc:e.loc + 8]))
self.name = req.name
if req.url:
parsed_url = urlparse.urlparse(req.url)
if not (parsed_url.scheme and parsed_url.netloc) or (
not parsed_url.scheme and not parsed_url.netloc):
raise InvalidRequirement("Invalid URL given")
self.url = req.url
else:
self.url = None
self.extras = set(req.extras.asList() if req.extras else [])
self.specifier = SpecifierSet(req.specifier)
self.marker = req.marker if req.marker else None
def params(arg):
"""
Parse `arg` and returns a dict describing the TPDA
arguments.
Resolution order:
- `arg` is a string is parsed as an URL
- `arg` has an items() method and is parsed
(works for multidicts, ..)
- `arg` is a iterable is parsed as a
((name, value), (name, value), ..)
"""
if isinstance(arg, str):
pr = parse.urlparse(arg)
if not pr.query:
raise exceptions.IncompleteURI
return Params(parse.parse_qsl(pr.query)).params()
elif hasattr(arg, 'items'):
return Params(arg.items()).params()
elif hasattr(arg, '__iter__'):
return Params(arg).params()
def media_for(hash_key, scheme=None, style=None):
if not hash_key:
return None
url = hash_key.split('!')[0]
up = urlparse(url)
hash_domain = up.hostname
if hash_domain and hash_domain not in Config.QINIU_DOMAINS:
if hash_domain == 'wx.qlogo.cn':
hash_key = hash_key.replace('http://', 'https://')
return hash_key
_hash = up.path
if len(_hash) != 0 and _hash[0] == '/':
_hash = _hash[1:]
media_host = Config.QINIU_HOST
url = '%s/%s' % (media_host, _hash)
if url.endswith('.amr'):
url = '%s.mp3' % url[:-4]
if url and style:
url = '%s!%s' % (url, style)
return url
def create_temp_url(swift_client, name, timeout, container=None):
container = container or '%(name)s-%(uuid)s' % {
'name': name, 'uuid': uuid.uuid4()}
object_name = str(uuid.uuid4())
swift_client.put_container(container)
key_header = 'x-account-meta-temp-url-key'
if key_header not in swift_client.head_account():
swift_client.post_account({
key_header: six.text_type(uuid.uuid4())[:32]})
key = swift_client.head_account()[key_header]
project_path = swift_client.url.split('/')[-1]
path = '/v1/%s/%s/%s' % (project_path, container, object_name)
timeout_secs = timeout * 60
tempurl = swiftclient_utils.generate_temp_url(path, timeout_secs, key,
'PUT')
sw_url = urlparse.urlparse(swift_client.url)
put_url = '%s://%s%s' % (sw_url.scheme, sw_url.netloc, tempurl)
swift_client.put_object(container, object_name, '')
return put_url
def _pagination(self, collection, path, **params):
if params.get('page_reverse', False):
linkrel = 'previous'
else:
linkrel = 'next'
next = True
while next:
res = self.get(path, params=params)
yield res
next = False
try:
for link in res['%s_links' % collection]:
if link['rel'] == linkrel:
query_str = urlparse.urlparse(link['href']).query
params = urlparse.parse_qs(query_str)
next = True
break
except KeyError:
break
def _pagination(self, collection, path, **params):
if params.get('page_reverse', False):
linkrel = 'previous'
else:
linkrel = 'next'
next = True
while next:
res = self.get(path, params=params)
yield res
next = False
try:
for link in res['%s_links' % collection]:
if link['rel'] == linkrel:
query_str = urlparse.urlparse(link['href']).query
params = urlparse.parse_qs(query_str)
next = True
break
except KeyError:
break
def _pagination(self, collection, path, **params):
if params.get('page_reverse', False):
linkrel = 'previous'
else:
linkrel = 'next'
next = True
while next:
res = self.get(path, params=params)
yield res
next = False
try:
for link in res['%s_links' % collection]:
if link['rel'] == linkrel:
query_str = urlparse.urlparse(link['href']).query
params = urlparse.parse_qs(query_str)
next = True
break
except KeyError:
break
def _get_node_ip_heartbeat(task):
callback_url = task.node.driver_internal_info.get('agent_url', '')
return urlparse.urlparse(callback_url).netloc.split(':')[0]
def _parse_root_device_hints(node):
"""Convert string with hints to dict. """
root_device = node.properties.get('root_device')
if not root_device:
return {}
try:
parsed_hints = irlib_utils.parse_root_device_hints(root_device)
except ValueError as e:
raise exception.InvalidParameterValue(
_('Failed to validate the root device hints for node %(node)s. '
'Error: %(error)s') % {'node': node.uuid, 'error': e})
root_device_hints = {}
advanced = {}
for hint, value in parsed_hints.items():
if isinstance(value, six.string_types):
if value.startswith('== '):
root_device_hints[hint] = int(value[3:])
elif value.startswith('s== '):
root_device_hints[hint] = urlparse.unquote(value[4:])
else:
advanced[hint] = value
else:
root_device_hints[hint] = value
if advanced:
raise exception.InvalidParameterValue(
_('Ansible-deploy does not support advanced root device hints '
'based on oslo.utils operators. '
'Present advanced hints for node %(node)s are %(hints)s.') % {
'node': node.uuid, 'hints': advanced})
return root_device_hints
def _prepare_variables(task):
node = task.node
i_info = node.instance_info
image = {}
for i_key, i_value in i_info.items():
if i_key.startswith('image_'):
image[i_key[6:]] = i_value
image['mem_req'] = _calculate_memory_req(task)
checksum = image.get('checksum')
if checksum:
# NOTE(pas-ha) checksum can be in <algo>:<checksum> format
# as supported by various Ansible modules, mostly good for
# standalone Ironic case when instance_info is populated manually.
# With no <algo> we take that instance_info is populated from Glance,
# where API reports checksum as MD5 always.
if ':' not in checksum:
image['checksum'] = 'md5:%s' % checksum
_add_ssl_image_options(image)
variables = {'image': image}
configdrive = i_info.get('configdrive')
if configdrive:
if urlparse.urlparse(configdrive).scheme in ('http', 'https'):
cfgdrv_type = 'url'
cfgdrv_location = configdrive
else:
cfgdrv_location = _get_configdrive_path(node.uuid)
with open(cfgdrv_location, 'w') as f:
f.write(configdrive)
cfgdrv_type = 'file'
variables['configdrive'] = {'type': cfgdrv_type,
'location': cfgdrv_location}
root_device_hints = _parse_root_device_hints(node)
if root_device_hints:
variables['root_device_hints'] = root_device_hints
return variables
def get_addon_host_ident():
ident = urlparse(options.get('system.url-prefix')).hostname
if ident in ('localhost', '127.0.0.1', None, ''):
return 'app.dev.getsentry.com'
return ident
def base_url(url):
result = urlparse(url)
return '%s://%s' % (result.scheme, result.netloc)
def from_request(cls, request):
host = urlparse(request.url).netloc
return cls('Unable to reach host: {}'.format(host))
def get_decorator(placebo):
url = placebo._get_url()
if isinstance(url, six.string_types):
url = parse.urlparse(url)
# TODO should we remove empty parts of url?
match_kwargs = {
'scheme': url.scheme,
'netloc': url.netloc,
'path': url.path,
'method': placebo._get_method(),
'query': url.query
}
match_kwargs = {k: v
for k, v in match_kwargs.items()
if v}
@httmock.urlmatch(**match_kwargs)
def mock_response(url, request):
# Convert parse result type from SplitResult to ParseResult
url = parse.urlparse(url.geturl())
# if body is empty httmock returns None
# but we want ot to be always string.
body = request.body or ''
headers = request.headers
return {'status_code': placebo._get_status(url, headers, body),
'content': placebo._get_body(url, headers, body),
'headers': placebo._get_headers(url, headers, body)}
return httmock.with_httmock(mock_response)
def __init__(self, url, headers, body):
if isinstance(url, (parse.ParseResult, parse.SplitResult)):
self.url = url.geturl()
self.parsed_url = url
elif isinstance(url, six.string_types):
self.url = url
self.parsed_url = parse.urlparse(url)
self.headers = headers
self.body = body
def get_decorator(placebo):
url = placebo._get_url()
if isinstance(url, six.string_types):
url = parse.urlparse(url)
# TODO should we remove empty parts of url?
match_kwargs = {
'scheme': url.scheme,
'netloc': url.netloc,
'path': url.path,
'method': placebo._get_method(),
'query': url.query
}
match_kwargs = {k: v
for k, v in match_kwargs.items()
if v}
@httmock.urlmatch(**match_kwargs)
def mock_response(url, request):
# Convert parse result type from SplitResult to ParseResult
url = parse.urlparse(url.geturl())
# if body is empty httmock returns None
# but we want ot to be always string.
body = request.body or ''
headers = request.headers
return {'status_code': placebo._get_status(url, headers, body),
'content': placebo._get_body(url, headers, body),
'headers': placebo._get_headers(url, headers, body)}
return httmock.with_httmock(mock_response)