def test_get_server_details_with_limit(self):
req = fakes.HTTPRequest.blank('/fake/servers/detail?limit=3')
res = self.controller.detail(req)
servers = res['servers']
self.assertEqual([fakes.get_fake_uuid(i) for i in range(len(servers))],
[s['id'] for s in servers])
servers_links = res['servers_links']
self.assertEqual('next', servers_links[0]['rel'])
href_parts = urlparse.urlparse(servers_links[0]['href'])
self.assertEqual('/v2/fake/servers/detail', href_parts.path)
params = urlparse.parse_qs(href_parts.query)
expected = {'limit': ['3'], 'marker': [fakes.get_fake_uuid(2)]}
self.assertThat(params, matchers.DictMatches(expected))
python类parse_qs()的实例源码
test_servers.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 28
收藏 0
点赞 0
评论 0
test_servers.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 26
收藏 0
点赞 0
评论 0
def test_get_server_details_with_limit_and_other_params(self):
req = fakes.HTTPRequest.blank('/fake/servers/detail'
'?limit=3&blah=2:t'
'&sort_key=id1&sort_dir=asc')
res = self.controller.detail(req)
servers = res['servers']
self.assertEqual([fakes.get_fake_uuid(i) for i in range(len(servers))],
[s['id'] for s in servers])
servers_links = res['servers_links']
self.assertEqual('next', servers_links[0]['rel'])
# Retrieve the parameters from the next link, they should contain the
# same limit, filter, and sort information as the original request as
# well as a marker; this ensures that the caller can simply use the
# "next" link and that they do not need to manually insert the limit
# and sort information.
href_parts = urlparse.urlparse(servers_links[0]['href'])
self.assertEqual('/v2/fake/servers/detail', href_parts.path)
params = urlparse.parse_qs(href_parts.query)
expected = {'limit': ['3'], 'blah': ['2:t'],
'sort_key': ['id1'], 'sort_dir': ['asc'],
'marker': [fakes.get_fake_uuid(2)]}
self.assertThat(params, matchers.DictMatches(expected))
def delete(self, req, id):
if self.ext_mgr.is_loaded('os-extended-quotas'):
context = req.environ['nova.context']
authorize_delete(context)
params = urlparse.parse_qs(req.environ.get('QUERY_STRING', ''))
user_id = params.get('user_id', [None])[0]
if user_id and not self.ext_mgr.is_loaded('os-user-quotas'):
raise webob.exc.HTTPNotFound()
try:
nova.context.authorize_project_context(context, id)
# NOTE(alex_xu): back-compatible with db layer hard-code admin
# permission checks. This has to be left only for API v2.0
# because this version has to be stable even if it means that
# only admins can call this method while the policy could be
# changed.
nova.context.require_admin_context(context)
if user_id:
QUOTAS.destroy_all_by_project_and_user(context,
id, user_id)
else:
QUOTAS.destroy_all_by_project(context, id)
return webob.Response(status_int=202)
except exception.Forbidden:
raise webob.exc.HTTPForbidden()
raise webob.exc.HTTPNotFound()
def test_simple_notification(self):
responses.add('POST', 'https://api.pushover.net/1/messages.json', body=SUCCESS)
self.plugin.set_option('userkey', 'abcdef', self.project)
self.plugin.set_option('apikey', 'ghijkl', self.project)
group = self.create_group(message='Hello world', culprit='foo.bar')
event = self.create_event(
group=group,
message='Hello world',
tags={'level': 'warning'},
)
rule = Rule.objects.create(project=self.project, label='my rule')
notification = Notification(event=event, rule=rule)
with self.options({'system.url-prefix': 'http://example.com'}):
self.plugin.notify(notification)
request = responses.calls[0].request
payload = parse_qs(request.body)
assert payload == {
'message': ['{}\n\nTags: level=warning'.format(event.get_legacy_message())],
'title': ['Bar: Hello world'],
'url': ['http://example.com/baz/bar/issues/{}/'.format(group.id)],
'url_title': ['Issue Details'],
'priority': ['0'],
'user': ['abcdef'],
'token': ['ghijkl'],
}
def test_notification_without_culprit(self):
responses.add('POST', 'http://example.com/slack')
self.plugin.set_option('webhook', 'http://example.com/slack', self.project)
self.plugin.set_option('exclude_culprit', True, self.project)
group = self.create_group(message='Hello world', culprit='foo.bar')
event = self.create_event(group=group, message='Hello world', tags={'level': 'warning'})
rule = Rule.objects.create(project=self.project, label='my rule')
notification = Notification(event=event, rule=rule)
with self.options({'system.url-prefix': 'http://example.com'}):
self.plugin.notify(notification)
request = responses.calls[0].request
payload = json.loads(parse_qs(request.body)['payload'][0])
assert payload == {
'username':
'Sentry',
'attachments': [
{
'color': '#f18500',
'fields': [
{
'short': True,
'value': 'foo Bar',
'title': 'Project'
},
],
'fallback': '[foo Bar] Hello world',
'title': 'Hello world',
'title_link': 'http://example.com/baz/bar/issues/1/?referrer=slack',
},
],
}
def test_notification_without_project(self):
responses.add('POST', 'http://example.com/slack')
self.plugin.set_option('webhook', 'http://example.com/slack', self.project)
self.plugin.set_option('exclude_project', True, self.project)
group = self.create_group(message='Hello world', culprit='foo.bar')
event = self.create_event(group=group, message='Hello world', tags={'level': 'warning'})
rule = Rule.objects.create(project=self.project, label='my rule')
notification = Notification(event=event, rule=rule)
with self.options({'system.url-prefix': 'http://example.com'}):
self.plugin.notify(notification)
request = responses.calls[0].request
payload = json.loads(parse_qs(request.body)['payload'][0])
assert payload == {
'username':
'Sentry',
'attachments': [
{
'color': '#f18500',
'fields': [
{
'short': False,
'value': 'foo.bar',
'title': 'Culprit',
},
],
'fallback': '[foo Bar] Hello world',
'title': 'Hello world',
'title_link': 'http://example.com/baz/bar/issues/1/?referrer=slack',
},
],
}
def query(self):
return parse.parse_qs(self.parsed_url.query,
keep_blank_values=True)
def query(self):
return parse.parse_qs(self.parsed_url.query,
keep_blank_values=True)
def query(self):
return parse.parse_qs(self.parsed_url.query,
keep_blank_values=True)
def test_reference_name(self):
full_url = protocol.ticket_request_url(
"http://example.co.uk/path/to/resource", reference_name="1",
start=2, end=100)
parsed = urlparse(full_url)
self.assertEqual(parsed.scheme, "http")
self.assertEqual(parsed.netloc, "example.co.uk")
self.assertEqual(parsed.path, "/path/to/resource")
query = parse_qs(parsed.query)
self.assertEqual(query["referenceName"], ["1"])
self.assertEqual(query["start"], ["2"])
self.assertEqual(query["end"], ["100"])
self.assertEqual(len(query), 3)
def test_reference_md5(self):
md5 = "b9185d4fade27aa27e17f25fafec695f"
full_url = protocol.ticket_request_url(
"https://example.com/resource", reference_md5=md5)
parsed = urlparse(full_url)
self.assertEqual(parsed.scheme, "https")
self.assertEqual(parsed.netloc, "example.com")
self.assertEqual(parsed.path, "/resource")
query = parse_qs(parsed.query)
self.assertEqual(query["referenceMD5"], [md5])
self.assertEqual(len(query), 1)
def test_format(self):
for data_format in ["cram", "CRAM", "BAM"]:
full_url = protocol.ticket_request_url(
"http://example.co.uk/path/to/resource", data_format=data_format)
parsed = urlparse(full_url)
self.assertEqual(parsed.scheme, "http")
self.assertEqual(parsed.netloc, "example.co.uk")
self.assertEqual(parsed.path, "/path/to/resource")
query = parse_qs(parsed.query)
self.assertEqual(query["format"], [data_format.upper()])
def test_embedded_query_strings(self):
full_url = protocol.ticket_request_url("http://a.com/stuff?a=a&b=b")
query = parse_qs(urlparse(full_url).query)
self.assertEqual(query["a"], ["a"])
self.assertEqual(query["b"], ["b"])
full_url = protocol.ticket_request_url(
"http://a.com/stuff?a=a&b=b", reference_name="123")
query = parse_qs(urlparse(full_url).query)
self.assertEqual(query["a"], ["a"])
self.assertEqual(query["b"], ["b"])
self.assertEqual(query["referenceName"], ["123"])
def ticket_request_url(
url, fmt=None, reference_name=None, reference_md5=None,
start=None, end=None, fields=None, tags=None, notags=None,
data_format=None):
parsed_url = urlparse(url)
get_vars = parse_qs(parsed_url.query)
# TODO error checking
if reference_name is not None:
get_vars["referenceName"] = reference_name
if reference_md5 is not None:
get_vars["referenceMD5"] = reference_md5
if start is not None:
get_vars["start"] = int(start)
if end is not None:
get_vars["end"] = int(end)
if data_format is not None:
get_vars["format"] = data_format.upper()
# if fields is not None:
# get_vars["fields"] = ",".join(fields)
# if tags is not None:
# get_vars["tags"] = ",".join(tags)
# if notags is not None:
# get_vars["notags"] = ",".join(notags)
new_url = list(parsed_url)
new_url[4] = urlencode(get_vars, doseq=True)
return urlunparse(new_url)