def get_keystone_url(auth_url, auth_version):
"""Gives an http/https url to contact keystone.
Given an auth_url and auth_version, this method generates the url in
which keystone can be reached.
:param auth_url: a http or https url to be inspected (like
'http://127.0.0.1:9898/').
:param auth_version: a string containing the version (like v2, v3.0, etc)
:returns: a string containing the keystone url
"""
api_v3 = _is_apiv3(auth_url, auth_version)
api_version = 'v3' if api_v3 else 'v2.0'
# NOTE(lucasagomes): Get rid of the trailing '/' otherwise urljoin()
# fails to override the version in the URL
return parse.urljoin(auth_url.rstrip('/'), api_version)
python类urljoin()的实例源码
def _prepare_request(self, **kwargs):
"""Prepares a HTTP request.
Args:
kwargs (dict): keyword arguments for the authentication function
(``_add_ecdsa_signature()`` or ``_add_basic_auth()``) and
:py:class:`requests.Request` class.
Raises:
AssertionError: in case ``kwargs['path']`` doesn't start with ``/``.
"""
kwargs.setdefault('headers', {})
# Add appropriate authentication headers
if isinstance(self.private_key, SigningKey):
self._add_ecdsa_signature(kwargs)
elif self.email and self.password:
self._add_basic_auth(kwargs)
# Generate URL from path
path = kwargs.pop('path')
assert path.startswith('/')
kwargs['url'] = urljoin(self.api_url, path)
return requests.Request(**kwargs).prepare()
def get_contents_if_file(contents_or_file_name):
"""Get the contents of a file.
If the value passed in is a file name or file URI, return the
contents. If not, or there is an error reading the file contents,
return the value passed in as the contents.
For example, a workflow definition will be returned if either the
workflow definition file name, or file URI are passed in, or the
actual workflow definition itself is passed in.
"""
try:
if parse.urlparse(contents_or_file_name).scheme:
definition_url = contents_or_file_name
else:
path = os.path.abspath(contents_or_file_name)
definition_url = parse.urljoin(
'file:',
request.pathname2url(path)
)
return request.urlopen(definition_url).read().decode('utf8')
except Exception:
return contents_or_file_name
def get_upload_channels(upload_config_dir, subdir, channels=None):
"""thought here was to provide whatever channel you have set as an output also to be an input
Killed this in favor of setting channels in condarc in the docker image.
"""
configurations = load_yaml_config_dir(upload_config_dir)
channels = channels or []
for config in configurations:
if 'token' in config:
channels.append(config['user'])
elif 'server' in config:
channels.append(parse.urljoin('http://' + config['server'],
config['destination_path'].format(subdir=subdir)))
else:
channels.append(config['channel'])
return channels
def test_get_pbm_wsdl_location(self):
wsdl = pbm.get_pbm_wsdl_location(None)
self.assertIsNone(wsdl)
def expected_wsdl(version):
driver_abs_dir = os.path.abspath(os.path.dirname(pbm.__file__))
path = os.path.join(driver_abs_dir, 'wsdl', version,
'pbmService.wsdl')
return urlparse.urljoin('file:', urllib.pathname2url(path))
with mock.patch('os.path.exists') as path_exists:
path_exists.return_value = True
wsdl = pbm.get_pbm_wsdl_location('5')
self.assertEqual(expected_wsdl('5'), wsdl)
wsdl = pbm.get_pbm_wsdl_location('5.5')
self.assertEqual(expected_wsdl('5.5'), wsdl)
wsdl = pbm.get_pbm_wsdl_location('5.5.1')
self.assertEqual(expected_wsdl('5.5'), wsdl)
path_exists.return_value = False
wsdl = pbm.get_pbm_wsdl_location('5.5')
self.assertIsNone(wsdl)
def get_pbm_wsdl_location(vc_version):
"""Return PBM WSDL file location corresponding to VC version.
:param vc_version: a dot-separated version string. For example, "1.2".
:return: the pbm wsdl file location.
"""
if not vc_version:
return
ver = vc_version.split('.')
major_minor = ver[0]
if len(ver) >= 2:
major_minor = '%s.%s' % (major_minor, ver[1])
curr_dir = os.path.abspath(os.path.dirname(__file__))
pbm_service_wsdl = os.path.join(curr_dir, 'wsdl', major_minor,
'pbmService.wsdl')
if not os.path.exists(pbm_service_wsdl):
LOG.warning(_LW("PBM WSDL file %s not found."), pbm_service_wsdl)
return
pbm_wsdl = urlparse.urljoin('file:', urllib.pathname2url(pbm_service_wsdl))
LOG.debug("Using PBM WSDL location: %s.", pbm_wsdl)
return pbm_wsdl
def _request(self, endpoint, method="GET", lookup=None, data={}, params={}, userargs=None, password=None):
"""
Generic request method designed to handle any morango endpoint.
:param endpoint: constant representing which morango endpoint we are querying
:param method: HTTP verb/method for request
:param lookup: the pk value for the specific object we are querying
:param data: dict that will be form-encoded in request
:param params: dict to be sent as part of URL's query string
:param userargs: Authorization credentials
:param password:
:return: ``Response`` object from request
"""
# convert user arguments into query str for passing to auth layer
if isinstance(userargs, dict):
userargs = "&".join(["{}={}".format(key, val) for (key, val) in iteritems(userargs)])
# build up url and send request
if lookup:
lookup = lookup + '/'
url = urljoin(urljoin(self.base_url, endpoint), lookup)
auth = (userargs, password) if userargs else None
resp = requests.request(method, url, json=data, params=params, auth=auth)
resp.raise_for_status()
return resp
def get_file_contents(from_data, files, base_url=None,
ignore_if=None):
if isinstance(from_data, dict):
for key, value in from_data.items():
if ignore_if and ignore_if(key, value):
continue
if base_url and not base_url.endswith('/'):
base_url = base_url + '/'
str_url = parse.urljoin(base_url, value)
if str_url not in files:
file_content = utils.read_url_content(str_url)
if is_template(file_content):
template = get_template_contents(
template_url=str_url, files=files)[1]
file_content = jsonutils.dumps(template)
files[str_url] = file_content
# replace the data value with the normalised absolute URL
from_data[key] = str_url
def test_class_get_oauth_token_method(self, mocker, mocked_monzo):
"""Test class `_get_oauth_token` method"""
mocked_fetch_token = mocker.MagicMock()
mocked_oauth2_session = mocker.patch('pymonzo.monzo_api.OAuth2Session')
mocked_oauth2_session.return_value.fetch_token = mocked_fetch_token
token = mocked_monzo._get_oauth_token()
assert token == mocked_fetch_token.return_value
mocked_oauth2_session.assert_called_once_with(
client_id=mocked_monzo._client_id,
redirect_uri=config.PYMONZO_REDIRECT_URI,
)
mocked_fetch_token.assert_called_once_with(
token_url=urljoin(mocked_monzo.api_url, '/oauth2/token'),
code=mocked_monzo._auth_code,
client_secret=mocked_monzo._client_secret,
)
def _get_oauth_token(self):
"""
Get Monzo access token via OAuth2 `authorization code` grant type.
Official docs:
https://monzo.com/docs/#acquire-an-access-token
:returns: OAuth 2 access token
:rtype: dict
"""
url = urljoin(self.api_url, '/oauth2/token')
oauth = OAuth2Session(
client_id=self._client_id,
redirect_uri=config.PYMONZO_REDIRECT_URI,
)
token = oauth.fetch_token(
token_url=url,
code=self._auth_code,
client_secret=self._client_secret,
)
return token
def crawl(self, url, base_url):
"""Crawl .html page and extract all URls we think are part of application from there.
Parallerize downloads using threads.
"""
resp = requests.get(url)
# See through redirects
final_base_url = resp.url
tree = lxml.html.fromstring(resp.content)
elems = tree.cssselect("a")
links = [urljoin(final_base_url, elem.attrib.get("href", "")) for elem in elems]
links = [link for link in links if is_likely_app_part(link, base_url)]
# Load all links paraller
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = {executor.submit(self.fetch_file, link, base_url): link for link in links}
for future in concurrent.futures.as_completed(future_to_url):
future.result() # Raise exception in main thread if bad stuff happened
def _get_sushy_system(self, system_id):
"""Get the sushy system for system_id
:param system_id: The identity of the System resource
:returns: the Sushy system instance
:raises: IloError
"""
system_url = parse.urljoin(self._sushy.get_system_collection_path(),
system_id)
try:
return self._sushy.get_system(system_url)
except sushy.exceptions.SushyError as e:
msg = (self._('The Redfish System "%(system)s" was not found. '
'Error %(error)s') %
{'system': system_id, 'error': str(e)})
LOG.debug(msg)
raise exception.IloError(msg)
def _get_sushy_manager(self, manager_id):
"""Get the sushy Manager for manager_id
:param manager_id: The identity of the Manager resource
:returns: the Sushy Manager instance
:raises: IloError
"""
manager_url = parse.urljoin(self._sushy.get_manager_collection_path(),
manager_id)
try:
return self._sushy.get_manager(manager_url)
except sushy.exceptions.SushyError as e:
msg = (self._('The Redfish Manager "%(manager)s" was not found. '
'Error %(error)s') %
{'manager': manager_id, 'error': str(e)})
LOG.debug(msg)
raise exception.IloError(msg)
def collect_usage(self):
url = urlparse.urljoin(self.endpoint, "collect_usage")
headers = {"Content-Type": "application/json",
"X-Auth-Token": self.auth_token}
try:
response = requests.post(url, headers=headers,
verify=not self.insecure)
if response.status_code != 200:
raise AttributeError("Usage cycle failed: %s code: %s" %
(response.text, response.status_code))
else:
return response.json()
except ConnectionError as e:
print(e)
def last_collected(self):
url = urlparse.urljoin(self.endpoint, "last_collected")
headers = {"Content-Type": "application/json",
"X-Auth-Token": self.auth_token}
try:
response = requests.get(url, headers=headers,
verify=not self.insecure)
if response.status_code != 200:
raise AttributeError("Get last collected failed: %s code: %s" %
(response.text, response.status_code))
else:
return response.json()
except ConnectionError as e:
print(e)
def _query_usage(self, tenant, start, end, endpoint):
url = urlparse.urljoin(self.endpoint, endpoint)
headers = {"X-Auth-Token": self.auth_token}
params = {"tenant": tenant,
"start": start,
"end": end
}
try:
response = requests.get(url, headers=headers,
params=params,
verify=not self.insecure)
if response.status_code != 200:
raise AttributeError("Get usage failed: %s code: %s" %
(response.text, response.status_code))
else:
return response.json()
except ConnectionError as e:
print(e)
def testGet300WithLocation(self):
# Test the we automatically follow 300 redirects if a
# Location: header is provided
uri = urllib_parse.urljoin(base, "300/with-location-header.asis")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
self.assertEqual(content, b"This is the final destination.\n")
self.assertEqual(response.previous.status, 300)
self.assertEqual(response.previous.fromcache, False)
# Confirm that the intermediate 300 is not cached
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
self.assertEqual(content, b"This is the final destination.\n")
self.assertEqual(response.previous.status, 300)
self.assertEqual(response.previous.fromcache, False)
def testGet301(self):
# Test that we automatically follow 301 redirects
# and that we cache the 301 response
uri = urllib_parse.urljoin(base, "301/onestep.asis")
destination = urllib_parse.urljoin(base, "302/final-destination.txt")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
self.assertTrue('content-location' in response)
self.assertEqual(response['content-location'], destination)
self.assertEqual(content, b"This is the final destination.\n")
self.assertEqual(response.previous.status, 301)
self.assertEqual(response.previous.fromcache, False)
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
self.assertEqual(response['content-location'], destination)
self.assertEqual(content, b"This is the final destination.\n")
self.assertEqual(response.previous.status, 301)
self.assertEqual(response.previous.fromcache, True)
def testGet302RedirectionLimit(self):
# Test that we can set a lower redirection limit
# and that we raise an exception when we exceed
# that limit.
self.http.force_exception_to_status_code = False
uri = urllib_parse.urljoin(base, "302/twostep.asis")
try:
(response, content) = self.http.request(uri, "GET", redirections=1)
self.fail("This should not happen")
except httplib2.RedirectLimit:
pass
except Exception:
self.fail("Threw wrong kind of exception ")
# Re-run the test with out the exceptions
self.http.force_exception_to_status_code = True
(response, content) = self.http.request(uri, "GET", redirections=1)
self.assertEqual(response.status, 500)
self.assertTrue(response.reason.startswith("Redirected more"))
self.assertEqual("302", response['status'])
self.assertTrue(content.startswith(b"<html>"))
self.assertTrue(response.previous is not None)
def testGetIgnoreEtag(self):
# Test that we can forcibly ignore ETags
uri = urllib_parse.urljoin(base, "reflector/reflector.cgi")
(response, content) = self.http.request(uri, "GET", headers={
'accept-encoding': 'identity'})
self.assertNotEqual(response['etag'], "")
(response, content) = self.http.request(uri, "GET", headers={
'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
d = self.reflector(content)
self.assertTrue('HTTP_IF_NONE_MATCH' in d)
self.http.ignore_etag = True
(response, content) = self.http.request(uri, "GET", headers={
'accept-encoding': 'identity', 'cache-control': 'max-age=0'})
d = self.reflector(content)
self.assertEqual(response.fromcache, False)
self.assertFalse('HTTP_IF_NONE_MATCH' in d)
def testGet307(self):
# Test that we do follow 307 redirects but
# do not cache the 307
uri = urllib_parse.urljoin(base, "307/onestep.asis")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
self.assertEqual(content, b"This is the final destination.\n")
self.assertEqual(response.previous.status, 307)
self.assertEqual(response.previous.fromcache, False)
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, True)
self.assertEqual(content, b"This is the final destination.\n")
self.assertEqual(response.previous.status, 307)
self.assertEqual(response.previous.fromcache, False)
def testGetGZipFailure(self):
# Test that we raise a good exception when the gzip fails
self.http.force_exception_to_status_code = False
uri = urllib_parse.urljoin(base, "gzip/failed-compression.asis")
try:
(response, content) = self.http.request(uri, "GET")
self.fail("Should never reach here")
except httplib2.FailedToDecompressContent:
pass
except Exception:
self.fail("Threw wrong kind of exception")
# Re-run the test with out the exceptions
self.http.force_exception_to_status_code = True
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 500)
self.assertTrue(response.reason.startswith("Content purported"))
def testGetDeflateFailure(self):
# Test that we raise a good exception when the deflate fails
self.http.force_exception_to_status_code = False
uri = urllib_parse.urljoin(base, "deflate/failed-compression.asis")
try:
(response, content) = self.http.request(uri, "GET")
self.fail("Should never reach here")
except httplib2.FailedToDecompressContent:
pass
except Exception:
self.fail("Threw wrong kind of exception")
# Re-run the test with out the exceptions
self.http.force_exception_to_status_code = True
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 500)
self.assertTrue(response.reason.startswith("Content purported"))
def testGetCacheControlNoCache(self):
# Test Cache-Control: no-cache on requests
uri = urllib_parse.urljoin(base, "304/test_etag.txt")
(response, content) = self.http.request(
uri, "GET", headers={'accept-encoding': 'identity'})
self.assertNotEqual(response['etag'], "")
(response, content) = self.http.request(
uri, "GET", headers={'accept-encoding': 'identity'})
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, True)
(response, content) = self.http.request(
uri, "GET", headers={
'accept-encoding': 'identity',
'Cache-Control': 'no-cache'})
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, False)
def testGetCacheControlPragmaNoCache(self):
# Test Pragma: no-cache on requests
uri = urllib_parse.urljoin(base, "304/test_etag.txt")
(response, content) = self.http.request(
uri, "GET", headers={'accept-encoding': 'identity'})
self.assertNotEqual(response['etag'], "")
(response, content) = self.http.request(
uri, "GET", headers={'accept-encoding': 'identity'})
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, True)
(response, content) = self.http.request(
uri, "GET", headers={
'accept-encoding': 'identity',
'Pragma': 'no-cache'})
self.assertEqual(response.status, 200)
self.assertEqual(response.fromcache, False)
def testBasicAuthTwoDifferentCredentials(self):
# Test Basic Authentication with multiple sets of credentials
uri = urllib_parse.urljoin(base, "basic2/file.txt")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 401)
uri = urllib_parse.urljoin(base, "basic2/")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 401)
self.http.add_credentials('fred', 'barney')
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
uri = urllib_parse.urljoin(base, "basic2/file.txt")
(response, content) = self.http.request(uri, "GET")
self.assertEqual(response.status, 200)
def testDigestAuthNextNonceAndNC(self):
# Test that if the server sets nextnonce that we reset
# the nonce count back to 1
uri = urllib_parse.urljoin(base, "digest/file.txt")
self.http.add_credentials('joe', 'password')
(response, content) = self.http.request(
uri, "GET", headers={"cache-control": "no-cache"})
info = httplib2._parse_www_authenticate(
response, 'authentication-info')
self.assertEqual(response.status, 200)
(response, content) = self.http.request(
uri, "GET", headers={"cache-control": "no-cache"})
info2 = httplib2._parse_www_authenticate(
response, 'authentication-info')
self.assertEqual(response.status, 200)
if 'nextnonce' in info:
self.assertEqual(info2['nc'], 1)
def testReflector(self):
uri = urllib_parse.urljoin(base, "reflector/reflector.cgi")
(response, content) = self.http.request(uri, "GET")
d = self.reflector(content)
self.assertTrue('HTTP_USER_AGENT' in d)
# NOTE: disabled because this isn't relevant to the shim.
# def testConnectionClose(self):
# uri = "http://www.google.com/"
# (response, content) = self.http.request(uri, "GET")
# for c in self.http.connections.values():
# self.assertNotEqual(None, c.sock)
# (response, content) = self.http.request(
# uri, "GET", headers={"connection": "close"})
# for c in self.http.connections.values():
# self.assertEqual(None, c.sock)
def get_full_url(self, routename, **kwargs):
"""
Construct full URL using components from current
bottle request, merged with get_url()
For example:
https://example.com/hello?world=1
XXX: Needs UT
"""
url = self.app.get_url(routename, **kwargs)
return urljoin(self.base_url, url)
############################################################
# CBVs (class based views)
############################################################
def show(self, fmt="html", header_block=None, footer_block=None):
"""
Show the block in a browser.
:param fmt: The format of the saved block. Supports the same output as `Block.save`
:return: Path to the block file.
"""
file_name = str_base(hash(self._id)) + "." + fmt
file_path = self.publish(os.path.expanduser(os.path.join(user_config["tmp_html_dir"], file_name)),
header_block=header_block, footer_block=footer_block)
try:
url_base = user_config["public_dir"]
except KeyError:
path = os.path.expanduser(file_path)
else:
path = urljoin(url_base, os.path.expanduser(user_config["tmp_html_dir"] + "/" + file_name))
webbrowser.open_new_tab(path)
return path