def request(self, method, path, allowed_statuses=None, **kwargs):
from . import __version__
url = urljoin(self.base_url, path, '/')
headers = kwargs.setdefault('headers', {})
headers['Authorization'] = 'Token token="{}"'.format(self.access_token)
headers['User-Agent'] = 'python-percy/{}'.format(__version__)
try:
response = requests.request(method, url, **kwargs)
self._debug_response(response)
except Exception as ex:
l.debug('%s %s -> Exception: %s: %s', method, url, ex.__class__.__name__, ex.args)
raise
if not allowed_statuses or response.status_code not in allowed_statuses:
self._check_response_error(response)
assert response.status_code < 300, (response.status_code, response.content)
return response
python类urljoin()的实例源码
def user_login(self, username, password):
response = self._get_opener().open(CONF.dashboard.dashboard_url).read()
# Grab the CSRF token and default region
parser = HorizonHTMLParser()
parser.feed(response)
# construct login url for dashboard, discovery accommodates non-/ web
# root for dashboard
login_url = parse.urljoin(CONF.dashboard.dashboard_url, parser.login)
# Prepare login form request
req = request.Request(login_url)
req.add_header('Content-type', 'application/x-www-form-urlencoded')
req.add_header('Referer', CONF.dashboard.dashboard_url)
# Pass the default domain name regardless of the auth version in order
# to test the scenario of when horizon is running with keystone v3
params = {'username': username,
'password': password,
'region': parser.region,
'domain': CONF.auth.default_credentials_domain_name,
'csrfmiddlewaretoken': parser.csrf_token}
self._get_opener().open(req, parse.urlencode(params))
def get_index(self):
'''Get index page of periodic job and returns all links to jobs'''
url = urljoin(self.config.log_url, self.args.job)
res = get_html(url)
if res is None or not res.ok:
return []
body = res.content.decode() if res.content else ''
hrefs = [HREF.search(l).group(1)
for l in body.splitlines() if HREF.search(l)]
links = ["/".join((url, link))
for link in hrefs if JOBRE.match(link)]
if links:
# Number of links to return
return links[:NLINKS]
else:
return []
def get_index(self):
'''Get index page of periodic job and returns all links to jobs'''
url = urljoin(self.config.log_url, self.args.job)
res = get_html(url)
if res is None or not res.ok:
return []
body = res.content.decode() if res.content else ''
hrefs = [HREF.search(l).group(1)
for l in body.splitlines() if HREF.search(l)]
links = ["/".join((url, link))
for link in hrefs if JOBRE.match(link)]
if links:
# Number of links to return
return links[:NLINKS]
else:
return []
def get_index(self):
'''Get index page of periodic job and returns all links to jobs'''
url = urljoin(self.config.log_url, self.args.job)
res = get_html(url)
if res is None or not res.ok:
return []
body = res.content.decode() if res.content else ''
hrefs = [HREF.search(l).group(1)
for l in body.splitlines() if HREF.search(l)]
links = ["/".join((url, link))
for link in hrefs if JOBRE.match(link)]
if links:
# Number of links to return
return links[:NLINKS]
else:
return []
def _call(method, path, api_version=None, **kwargs):
# type: (str, str, str, **Any) -> requests.Response
url = urljoin(base_url, '/api/server/')
url = urljoin(url, path)
headers = kwargs.setdefault('headers', {})
if api_version is not None:
headers['X-Cloak-API-Version'] = api_version
elif default_api_version is not None:
headers['X-Cloak-API-Version'] = default_api_version
if method == 'GET':
response = session.get(url, **kwargs)
elif method == 'POST':
response = session.post(url, **kwargs)
else:
raise NotImplementedError()
if response.status_code not in xrange(200, 400):
raise ServerApiError(response)
return response
def log_curl_request(self, method, url, kwargs):
curl = ['curl -i -X %s' % method]
for (key, value) in kwargs['headers'].items():
header = '-H \'%s: %s\'' % self._process_header(key, value)
curl.append(header)
if not self.session.verify:
curl.append('-k')
elif isinstance(self.session.verify, six.string_types):
curl.append('--cacert %s' % self.session.verify)
if self.session.cert:
curl.append('--cert %s' % self.session.cert[0])
curl.append('--key %s' % self.session.cert[1])
if 'body' in kwargs:
body = strutils.mask_password(kwargs['body'])
curl.append('-d \'%s\'' % body)
curl.append(urlparse.urljoin(self.endpoint_trimmed, url))
LOG.debug(' '.join(curl))
def resource_type_dict(cls, request=None):
"""
Return a ``dict`` containing ResourceType metadata for the user object.
"""
id_ = cls.resource_type
path = reverse('scim:resource-types', kwargs={'uuid': id_})
location = urljoin(get_base_scim_location_getter()(request), path)
return {
'schemas': [constants.SchemaURI.RESOURCE_TYPE],
'id': id_,
'name': 'User',
'endpoint': reverse('scim:users'),
'description': 'User Account',
'schema': constants.SchemaURI.USER,
'meta': {
'location': location,
'resourceType': 'ResourceType'
}
}
def resource_type_dict(cls, request=None):
"""
Return a ``dict`` containing ResourceType metadata for the group object.
"""
id_ = cls.resource_type
path = reverse('scim:resource-types', kwargs={'uuid': id_})
location = urljoin(get_base_scim_location_getter()(request), path)
return {
'schemas': [constants.SchemaURI.RESOURCE_TYPE],
'id': id_,
'name': 'Group',
'endpoint': reverse('scim:groups'),
'description': 'Group',
'schema': constants.SchemaURI.GROUP,
'meta': {
'location': location,
'resourceType': 'ResourceType'
}
}
sf_scrummaster.py 文件源码
项目:software-factory
作者: softwarefactory-project
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def __init__(self, url, api_key, project_group, board):
self.url = url
self.cookie = get_cookie(url, api_key=api_key)
self.client = SFStoryboard(urljoin(url, "storyboard_api"),
self.cookie)
try:
self.project_group = self.client.project_groups.find(
name=project_group)
except exceptions.NotFound:
raise Exception('projects group not found')
self.stories = self.client.stories.get_all(
project_group_id=self.project_group.id)
self.board_id = None
if board:
try:
self.board_id = self.client.boards.find(title=board).id
except exceptions.NotFound:
raise Exception('board not found')
self.board_lanes = {}
for lane in self.client.worklists.get_all(board_id=self.board_id):
if not lane.archived and lane.title in LANES:
self.board_lanes[lane.title] = lane
def __network_ping(self):
try:
repourl = urljoin(self.get_depot_url(),
"versions/0")
# Disable SSL peer verification, we just want to check
# if the depot is running.
url = urlopen(repourl,
context=ssl._create_unverified_context())
url.close()
except HTTPError as e:
# Server returns NOT_MODIFIED if catalog is up
# to date
if e.code == http_client.NOT_MODIFIED:
return True
else:
return False
except URLError as e:
return False
return True
def test_bug_5366(self):
"""Publish a package with slashes in the name, and then verify
that the depot manifest and info operations work regardless of
the encoding."""
depot_url = self.dc.get_depot_url()
plist = self.pkgsend_bulk(depot_url, self.system10)
# First, try it un-encoded.
repourl = urljoin(depot_url, "info/0/{0}".format(plist[0]))
urlopen(repourl)
repourl = urljoin(depot_url, "manifest/0/{0}".format(
plist[0]))
urlopen(repourl)
# Second, try it encoded.
repourl = urljoin(depot_url, "info/0/{0}".format(
quote(plist[0])))
urlopen(repourl)
repourl = urljoin(depot_url, "manifest/0/{0}".format(
quote(plist[0])))
urlopen(repourl)
def test_bug_15482(self):
"""Test to make sure BUI search doesn't trigger a traceback."""
# Now update the repository configuration while the depot is
# stopped so changes won't be overwritten on exit.
self.__update_repo_config()
# Start the depot.
self.dc.start()
# Then, publish some packages we can abuse for testing.
durl = self.dc.get_depot_url()
self.pkgsend_bulk(durl, self.quux10, refresh_index=True)
surl = urljoin(durl,
"en/search.shtml?action=Search&token=*")
urlopen(surl).read()
surl = urljoin(durl,
"en/advanced_search.shtml?action=Search&token=*")
urlopen(surl).read()
surl = urljoin(durl,
"en/advanced_search.shtml?token=*&show=a&rpp=50&"
"action=Advanced+Search")
urlopen(surl).read()
def _network_ping(self):
try:
# Ping the versions URL, rather than the default /
# so that we don't initialize the BUI code yet.
repourl = urljoin(self.url, "versions/0")
# Disable SSL peer verification, we just want to check
# if the depot is running.
urlopen(repourl,
context=ssl._create_unverified_context())
except HTTPError as e:
if e.code == http_client.FORBIDDEN:
return True
return False
except URLError:
return False
return True
bittorrent.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def _lookup_torrent_url_fn():
"""Load a "fetcher" func to get the right torrent URL.
"""
if CONF.xenserver.torrent_base_url:
if '/' not in CONF.xenserver.torrent_base_url:
LOG.warning(_LW('Value specified in conf file for'
' xenserver.torrent_base_url does not contain a'
' slash character, therefore it will not be used'
' as part of the torrent URL. Specify a valid'
' base URL as defined by RFC 1808 (see step 6).'))
def _default_torrent_url_fn(image_id):
return urlparse.urljoin(CONF.xenserver.torrent_base_url,
"%s.torrent" % image_id)
return _default_torrent_url_fn
raise RuntimeError(_('Cannot create default bittorrent URL'
' without xenserver.torrent_base_url'
' configuration option set.'))
def get_request_params(self, api_url):
"""Returns a dict of requests.request params that will send payload to the ESP.
:param api_url: the base api_url for the backend
:return: dict
"""
api_endpoint = self.get_api_endpoint()
if api_endpoint is not None:
url = urljoin(api_url, api_endpoint)
else:
url = api_url
return dict(
method=self.method,
url=url,
params=self.params,
data=self.serialize_data(),
headers=self.headers,
files=self.files,
auth=self.auth,
# json= is not here, because we prefer to do our own serialization
# to provide extra context in error messages
)
def do_GET(self):
url_map = {instance.url: instance for instance in self.server.test_instances}
if self.path == self.ticket_path:
self.send_response(200)
self.end_headers()
urls = [
{
"url": urljoin(SERVER_URL, test_instance.url),
"headers": test_instance.headers
} for test_instance in self.server.test_instances]
ticket = {"htsget": {"urls": urls}}
self.wfile.write(json.dumps(ticket).encode())
elif self.path in url_map:
instance = url_map[self.path]
if instance.error_code is not None:
self.send_error(instance.error_code)
else:
self.send_response(200)
self.send_header("Content-Length", len(instance.data))
if instance.truncate:
self.end_headers()
self.wfile.write(instance.data[:-1])
self.wfile.flush()
else:
self.end_headers()
self.wfile.write(instance.data)
else:
self.send_error(404)
def _get_endpoint_url(request, endpoint_type, catalog=None):
if getattr(request.user, "service_catalog", None):
url = base.url_for(request,
service_type='identity',
endpoint_type=endpoint_type)
else:
auth_url = getattr(settings, 'OPENSTACK_KEYSTONE_URL')
url = request.session.get('region_endpoint', auth_url)
# TODO(gabriel): When the Service Catalog no longer contains API versions
# in the endpoints this can be removed.
url = url.rstrip('/')
url = urlparse.urljoin(url, 'v%s' % VERSIONS.active)
return url
def resolve_service_href(self, href):
absolute_href = urljoin(self.api.url, href)
# The first replace remove the root of ovirt location
# The second replace is to remove the first / in the path
# The str ensure that service_path is a str, not a unicode in python 2
service_path = str(absolute_href.replace(self.api.url, "").replace("/", "", 1))
new_service = self.api.service(service_path)
return new_service
def http_request(self, method, path, data=None, params=None):
""" Wraps HTTP calls to ThrestStack API """
s = Session()
url = urljoin(self.BASE_URL, path)
headers = {"Authorization": self.api_key}
if self.org_id:
headers[self.org_id_header] = self.org_id
req = Request(
method,
url,
headers=headers,
data=data,
params=params
)
prepped = req.prepare()
resp = s.send(prepped, timeout=self.timeout)
if resp.status_code == 429:
raise errors.APIRateLimitError("Threat Stack API rate limit exceeded")
else:
return self.handle_response(resp)