def test_create_instance_with_group_hint(self):
ctxt = self.req.environ['nova.context']
test_group = objects.InstanceGroup(ctxt)
test_group.project_id = ctxt.project_id
test_group.user_id = ctxt.user_id
test_group.create()
def fake_instance_destroy(context, uuid, constraint):
return fakes.stub_instance(1)
self.stub_out('nova.db.instance_destroy', fake_instance_destroy)
self.ext_mgr.extensions = {'OS-SCH-HNT': 'fake',
'os-server-group-quotas': 'fake'}
self.body['server']['scheduler_hints'] = {'group': test_group.uuid}
self.req.body = jsonutils.dump_as_bytes(self.body)
server = self.controller.create(self.req, self.body).obj['server']
test_group = objects.InstanceGroup.get_by_uuid(ctxt, test_group.uuid)
self.assertIn(server['id'], test_group.members)
python类dump_as_bytes()的实例源码
test_servers.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
test_extensions.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def test_controller_action_extension_early(self):
controller = StubActionController(response_body)
actions = dict(action='POST')
res_ext = base_extensions.ResourceExtension('tweedles', controller,
member_actions=actions)
ext_controller = StubEarlyExtensionController(extension_body)
extension = StubControllerExtension()
cont_ext = base_extensions.ControllerExtension(extension, 'tweedles',
ext_controller)
manager = StubExtensionManager(resource_ext=res_ext,
controller_ext=cont_ext)
app = compute.APIRouter(manager)
request = webob.Request.blank("/fake/tweedles/foo/action")
request.method = 'POST'
request.headers['Content-Type'] = 'application/json'
request.body = jsonutils.dump_as_bytes(dict(fooAction=True))
response = request.get_response(app)
self.assertEqual(200, response.status_int)
self.assertEqual(extension_body, response.body)
test_extensions.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_controller_action_extension_late(self):
# Need a dict for the body to convert to a ResponseObject
controller = StubActionController(dict(foo=response_body))
actions = dict(action='POST')
res_ext = base_extensions.ResourceExtension('tweedles', controller,
member_actions=actions)
ext_controller = StubLateExtensionController(extension_body)
extension = StubControllerExtension()
cont_ext = base_extensions.ControllerExtension(extension, 'tweedles',
ext_controller)
manager = StubExtensionManager(resource_ext=res_ext,
controller_ext=cont_ext)
app = compute.APIRouter(manager)
request = webob.Request.blank("/fake/tweedles/foo/action")
request.method = 'POST'
request.headers['Content-Type'] = 'application/json'
request.body = jsonutils.dump_as_bytes(dict(fooAction=True))
response = request.get_response(app)
self.assertEqual(200, response.status_int)
self.assertEqual(extension_body, response.body)
test_server_metadata.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_invalid_metadata_items_on_create(self):
self.stub_out('nova.db.instance_metadata_update',
return_create_instance_metadata)
req = self._get_request()
req.method = 'POST'
req.headers["content-type"] = "application/json"
# test for long key
data = {"metadata": {"a" * 260: "value1"}}
req.body = jsonutils.dump_as_bytes(data)
self.assertRaises(self.validation_ex_large,
self.controller.create, req, self.uuid, body=data)
# test for long value
data = {"metadata": {"key": "v" * 260}}
req.body = jsonutils.dump_as_bytes(data)
self.assertRaises(self.validation_ex_large,
self.controller.create, req, self.uuid, body=data)
# test for empty key.
data = {"metadata": {"": "value1"}}
req.body = jsonutils.dump_as_bytes(data)
self.assertRaises(self.validation_ex,
self.controller.create, req, self.uuid, body=data)
test_image_metadata.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 15
收藏 0
点赞 0
评论 0
def test_create(self, get_mocked, update_mocked, quota_mocked):
mock_result = copy.deepcopy(get_image_123())
mock_result['properties']['key7'] = 'value7'
update_mocked.return_value = mock_result
req = fakes.HTTPRequest.blank('/v2/fake/images/123/metadata')
req.method = 'POST'
body = {"metadata": {"key7": "value7"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
res = self.controller.create(req, '123', body=body)
get_mocked.assert_called_once_with(mock.ANY, '123')
expected = copy.deepcopy(get_image_123())
expected['properties'] = {
'key1': 'value1', # existing meta
'key7': 'value7' # new meta
}
quota_mocked.assert_called_once_with(mock.ANY, expected["properties"])
update_mocked.assert_called_once_with(mock.ANY, '123', expected,
data=None, purge_props=True)
expected_output = {'metadata': {'key1': 'value1', 'key7': 'value7'}}
self.assertEqual(expected_output, res)
test_image_metadata.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_update_all(self, get_mocked, update_mocked, quota_mocked):
req = fakes.HTTPRequest.blank('/v2/fake/images/123/metadata')
req.method = 'PUT'
body = {"metadata": {"key9": "value9"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
res = self.controller.update_all(req, '123', body=body)
get_mocked.assert_called_once_with(mock.ANY, '123')
expected = copy.deepcopy(get_image_123())
expected['properties'] = {
'key9': 'value9' # replace meta
}
quota_mocked.assert_called_once_with(mock.ANY, expected["properties"])
update_mocked.assert_called_once_with(mock.ANY, '123', expected,
data=None, purge_props=True)
expected_output = {'metadata': {'key9': 'value9'}}
self.assertEqual(expected_output, res)
test_image_metadata.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_update_item(self, _get_mocked, update_mocked, quota_mocked):
req = fakes.HTTPRequest.blank('/v2/fake/images/123/metadata/key1')
req.method = 'PUT'
body = {"meta": {"key1": "zz"}}
req.body = jsonutils.dump_as_bytes(body)
req.headers["content-type"] = "application/json"
res = self.controller.update(req, '123', 'key1', body=body)
expected = copy.deepcopy(get_image_123())
expected['properties'] = {
'key1': 'zz' # changed meta
}
quota_mocked.assert_called_once_with(mock.ANY, expected["properties"])
update_mocked.assert_called_once_with(mock.ANY, '123', expected,
data=None, purge_props=True)
expected_output = {'meta': {'key1': 'zz'}}
self.assertEqual(res, expected_output)
test_volumes.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def test_volume_create(self):
self.stubs.Set(cinder.API, "create", fakes.stub_volume_create)
vol = {"size": 100,
"display_name": "Volume Test Name",
"display_description": "Volume Test Desc",
"availability_zone": "zone1:host1"}
body = {"volume": vol}
req = fakes.HTTPRequest.blank(self.url_prefix + '/os-volumes')
req.method = 'POST'
req.body = jsonutils.dump_as_bytes(body)
req.headers['content-type'] = 'application/json'
resp = req.get_response(self.app)
self.assertEqual(200, resp.status_int)
resp_dict = jsonutils.loads(resp.body)
self.assertIn('volume', resp_dict)
self.assertEqual(vol['size'], resp_dict['volume']['size'])
self.assertEqual(vol['display_name'],
resp_dict['volume']['displayName'])
self.assertEqual(vol['display_description'],
resp_dict['volume']['displayDescription'])
self.assertEqual(vol['availability_zone'],
resp_dict['volume']['availabilityZone'])
def test_request_no_enrollment(self, mock_get_image):
mock_get_image.return_value = FakeImageService()
body = {"metadata": {"ipa_enroll": "False"},
"instance-id": fake.INSTANCE_ID,
"project-id": fake.PROJECT_ID,
"image-id": fake.IMAGE_ID,
"hostname": "test"}
expected = {}
req = fakes.HTTPRequest.blank('/v1')
req.method = 'POST'
req.content_type = "application/json"
req.body = jsonutils.dump_as_bytes(body)
res_dict = self.join_controller.create(req, body)
self.assertEqual(expected, res_dict)
def test_valid_request(self, mock_get_domain, mock_get_image,
mock_get_instance, mock_conf_parser):
mock_get_image.return_value = FakeImageService()
mock_get_instance.return_value = fake.fake_instance
mock_get_domain.return_value = "test"
mock_conf_parser_instance = mock.MagicMock()
mock_conf_parser_instance.get = mock.Mock(
side_effect=["novajoin", "REALM"])
mock_conf_parser.return_value = mock_conf_parser_instance
body = {"metadata": {"ipa_enroll": "True"},
"instance-id": fake.INSTANCE_ID,
"project-id": fake.PROJECT_ID,
"image-id": fake.IMAGE_ID,
"hostname": "test"}
req = fakes.HTTPRequest.blank('/v1')
req.method = 'POST'
req.content_type = "application/json"
req.body = jsonutils.dump_as_bytes(body)
res_dict = self.join_controller.create(req, body)
# There should be no OTP because IPA shouldn't be
# configured, but we'll handle both cases.
if res_dict.get('ipaotp'):
self.assertThat(res_dict.get('ipaotp'),
MatchesRegex('^[a-z0-9]{32}'))
self.assertEqual(len(res_dict.get('ipaotp', 0)), 32)
self.assertEqual(res_dict.get('hostname'), 'test.test')
self.assertEqual(res_dict.get('krb_realm'), 'REALM')
# Note that on failures this will generate to stdout a Krb5Error
# because in all likelihood the keytab cannot be read (and
# probably doesn't exist. This can be ignored.
def test_valid_hostclass_request(self, mock_get_domain, mock_get_image,
mock_get_instance,
mock_get_project_name,
mock_conf_parser):
mock_get_image.return_value = FakeImageService()
mock_get_instance.return_value = fake.fake_instance
mock_get_domain.return_value = "test"
mock_get_project_name.return_value = "test"
mock_conf_parser_instance = mock.MagicMock()
mock_conf_parser_instance.get = mock.Mock(
side_effect=["novajoin", "REALM"])
mock_conf_parser.return_value = mock_conf_parser_instance
body = {"metadata": {"ipa_enroll": "True"},
"instance-id": fake.INSTANCE_ID,
"project-id": fake.PROJECT_ID,
"image-id": fake.IMAGE_ID,
"hostname": "test"}
req = fakes.HTTPRequest.blank('/v1')
req.method = 'POST'
req.content_type = "application/json"
req.body = jsonutils.dump_as_bytes(body)
res_dict = self.join_controller.create(req, body)
# There should be no OTP because IPA shouldn't be
# configured, but we'll handle both cases.
if res_dict.get('ipaotp'):
self.assertThat(res_dict.get('ipaotp'),
MatchesRegex('^[a-z0-9]{32}'))
self.assertEqual(len(res_dict.get('ipaotp', 0)), 32)
self.assertEqual(res_dict.get('hostname'), 'test.test')
self.assertEqual(res_dict.get('krb_realm'), 'REALM')
# Note that on failures this will generate to stdout a Krb5Error
# because in all likelihood the keytab cannot be read (and
# probably doesn't exist. This can be ignored.
def default(self, data):
return jsonutils.dump_as_bytes(data)
def process_bind_param(self, value, dialect):
if value is None:
# Save default value according to current type to keep the
# interface the consistent.
value = self.type()
elif not isinstance(value, self.type):
raise TypeError("%s supposes to store %s objects, but %s given"
% (self.__class__.__name__,
self.type.__name__,
type(value).__name__))
serialized_value = json.dump_as_bytes(value)
return serialized_value
def update_container(self, context, container_uuid, values):
# NOTE(yuywz): Update would fail if any other client
# write '/containers/$CONTAINER_UUID' in the meanwhile
if 'uuid' in values:
msg = _("Cannot overwrite UUID for an existing Container.")
raise exception.InvalidParameterValue(err=msg)
if 'name' in values:
self._validate_unique_container_name(context, values['name'])
try:
target_uuid = self.get_container_by_uuid(
context, container_uuid).uuid
target = self.client.read('/containers/' + target_uuid)
target_value = json.loads(target.value)
target_value.update(values)
target.value = json.dump_as_bytes(target_value)
self.client.update(target)
except etcd.EtcdKeyNotFound:
raise exception.ContainerNotFound(container=container_uuid)
except Exception as e:
LOG.error('Error occurred while updating container: %s',
six.text_type(e))
raise
return translate_etcd_result(target, 'container')
def update_zun_service(self, host, binary, values):
try:
target = self.client.read('/zun_services/' + host + '_' + binary)
target_value = json.loads(target.value)
values['updated_at'] = datetime.isoformat(timeutils.utcnow())
target_value.update(values)
target.value = json.dump_as_bytes(target_value)
self.client.update(target)
except etcd.EtcdKeyNotFound:
raise exception.ZunServiceNotFound(host=host, binary=binary)
except Exception as e:
LOG.error('Error occurred while updating service: %s',
six.text_type(e))
raise
def save(self, session=None):
if session is None:
session = db.api.get_connection()
client = session.client
path = self.etcd_path(self.uuid)
if self.path_already_exist(client, path):
raise exception.ResourceExists(name=getattr(self, '__class__'))
client.write(path, json.dump_as_bytes(self.as_dict()))
return
def save(self, session=None):
if session is None:
session = db.api.get_connection()
client = session.client
path = self.etcd_path(self.host + '_' + self.binary)
if self.path_already_exist(client, path):
raise exception.ZunServiceAlreadyExists(host=self.host,
binary=self.binary)
client.write(path, json.dump_as_bytes(self.as_dict()))
return
def save(self, session=None):
if session is None:
session = db.api.get_connection()
client = session.client
path = self.etcd_path(self.uuid)
if self.path_already_exist(client, path):
raise exception.ComputeNodeAlreadyExists(
field='UUID', value=self.uuid)
client.write(path, json.dump_as_bytes(self.as_dict()))
return
def __init__(self, value):
self.value = json.dump_as_bytes(value)
def to_primitive(self, obj, attr, value):
return json.dump_as_bytes(value)