def test_show_anon_private_dataset(self):
user = factories.User()
org = factories.Organization()
dataset = factories.Dataset(
owner_org=org['id'], resources=[factories.Resource()],
private=True)
context = {
'user': user['name'],
'model': model
}
assert_raises(t.NotAuthorized,
call_auth, 'resource_validation_run', context=context,
resource_id=dataset['resources'][0]['id'])
python类assert_raises()的实例源码
def test_validation_fails_on_upload(self, mock_open):
invalid_file = StringIO.StringIO()
invalid_file.write(INVALID_CSV)
mock_upload = MockFieldStorage(invalid_file, 'invalid.csv')
dataset = factories.Dataset()
invalid_stream = io.BufferedReader(io.BytesIO(INVALID_CSV))
with mock.patch('io.open', return_value=invalid_stream):
with assert_raises(t.ValidationError) as e:
call_action(
'resource_create',
package_id=dataset['id'],
format='CSV',
upload=mock_upload
)
assert 'validation' in e.exception.error_dict
assert 'missing-value' in str(e.exception)
assert 'Row 2 has a missing value in column 4' in str(e.exception)
def test_boot_local_lun_001(mock_login, mock_query_dn, mock_order_clear):
mock_login.return_value = True
bp = LsbootPolicy("org-root", name="test")
mock_query_dn.return_value = bp
mock_order_clear.return_value = None
# case1:
# Add two local_lun without type
devices = [
{"device_name": "local_lun",
"device_order": "1",
},
{"device_name": "local_lun",
"device_order": "2",
},
]
expected_error_message = "_local_lun_add failed, error: Instance of Local Lun already added at order '1'"
with assert_raises(UcsOperationError) as error:
boot_policy_order_set(handle, boot_policy_dn, devices)
assert_equal(error.exception.message, expected_error_message)
def test_boot_local_lun_002(mock_login, mock_query_dn, mock_order_clear):
mock_login.return_value = True
bp = LsbootPolicy("org-root", name="test")
mock_query_dn.return_value = bp
mock_order_clear.return_value = None
# case2:
# Add first local_lun without type
# Add next local_lun with type
devices = [
{"device_name": "local_lun",
"device_order": "1",
},
{"device_name": "local_lun",
"device_order": "2",
"type": "primary",
"lun_name": "test_primary"
},
]
expected_error_message = "_local_lun_add failed, error: Instance of Local Lun already added at order '1'"
with assert_raises(UcsOperationError) as error:
boot_policy_order_set(handle, boot_policy_dn, devices)
assert_equal(error.exception.message, expected_error_message)
def test_boot_local_lun_003(mock_login, mock_query_dn, mock_order_clear):
mock_login.return_value = True
bp = LsbootPolicy("org-root", name="test")
mock_query_dn.return_value = bp
mock_order_clear.return_value = None
# case3:
# Add first local_lun with type
# Add next local_lun with type
devices = [
{"device_name": "local_lun",
"device_order": "1",
"type": "primary",
"lun_name": "primary"
},
{"device_name": "local_lun",
"device_order": "2",
},
]
expected_error_message = "_local_lun_add failed, error: Required parameter 'lun_name' or 'type' missing."
with assert_raises(UcsOperationError) as error:
boot_policy_order_set(handle, boot_policy_dn, devices)
assert_equal(error.exception.message, expected_error_message)
def test_boot_local_jbod_002(mock_login, mock_query_dn, mock_order_clear):
mock_login.return_value = True
bp = LsbootPolicy("org-root", name="test")
mock_query_dn.return_value = bp
mock_order_clear.return_value = None
# local_jbod_case2
# Add local_jbod without slot_number
devices = [
{"device_name": "local_jbod",
"device_order": "1",
},
]
expected_error_message = "_local_jbod_add() takes exactly 3 arguments (2 given)"
with assert_raises(TypeError) as error:
boot_policy_order_set(handle, boot_policy_dn, devices)
assert_equal(error.exception.message, expected_error_message)
def test_boot_embedded_disk_001(mock_login, mock_query_dn, mock_order_clear):
mock_login.return_value = True
bp = LsbootPolicy("org-root", name="test")
mock_query_dn.return_value = bp
mock_order_clear.return_value = None
# local_embedded_disk_case1:
# Add two local_embedded_disk without type
devices = [
{"device_name": "embedded_disk",
"device_order": "1",
},
{"device_name": "embedded_disk",
"device_order": "2",
},
]
expected_error_message = "_local_embedded_disk_add failed, error: Instance of Local Embedded Disk already added at order '1'"
with assert_raises(UcsOperationError) as error:
boot_policy_order_set(handle, boot_policy_dn, devices)
assert_equal(error.exception.message, expected_error_message)
def test_boot_embedded_disk_002(mock_login, mock_query_dn, mock_order_clear):
mock_login.return_value = True
bp = LsbootPolicy("org-root", name="test")
mock_query_dn.return_value = bp
mock_order_clear.return_value = None
# local_embedded_disk_case2:
# Add first local_embedded_disk without type
# Add next local_embedded_disk with type
devices = [
{"device_name": "embedded_disk",
"device_order": "1",
},
{"device_name": "embedded_disk",
"device_order": "2",
"type": "primary",
"slot_number": "1"
},
]
expected_error_message = "_local_embedded_disk_add failed, error: Instance of Local Embedded Disk already added at order '1'"
with assert_raises(UcsOperationError) as error:
boot_policy_order_set(handle, boot_policy_dn, devices)
assert_equal(error.exception.message, expected_error_message)
def test_boot_embedded_disk_003(mock_login, mock_query_dn, mock_order_clear):
mock_login.return_value = True
bp = LsbootPolicy("org-root", name="test")
mock_query_dn.return_value = bp
mock_order_clear.return_value = None
# local_embedded_disk_case3:
# Add first local_embedded_disk with type
# Add next local_embedded_disk without type
devices = [
{"device_name": "embedded_disk",
"device_order": "1",
"type": "primary",
"slot_number": "1"
},
{"device_name": "embedded_disk",
"device_order": "2",
},
]
expected_error_message = "_local_embedded_disk_add failed, error: Required parameter 'slot_number' or 'type' missing."
with assert_raises(UcsOperationError) as error:
boot_policy_order_set(handle, boot_policy_dn, devices)
assert_equal(error.exception.message, expected_error_message)
def test_boot_lan_001(mock_login, mock_query_dn, mock_order_clear):
mock_login.return_value = True
bp = LsbootPolicy("org-root", name="test")
mock_query_dn.return_value = bp
mock_order_clear.return_value = None
# lan_case1:
# Add first lan without vnic_name and with type
devices = [
{"device_name": "lan",
"device_order": "1",
},
]
expected_error_message = "_lan_device_add() takes exactly 3 arguments (2 given)"
with assert_raises(TypeError) as error:
boot_policy_order_set(handle, boot_policy_dn, devices)
assert_equal(error.exception.message, expected_error_message)
def test_boot_iscsi_001(mock_login, mock_query_dn, mock_order_clear):
mock_login.return_value = True
bp = LsbootPolicy("org-root", name="test")
mock_query_dn.return_value = bp
mock_order_clear.return_value = None
# iscsi_case1:
# Add first iscsi without vnic_name and with type
devices = [
{"device_name": "iscsi",
"device_order": "1",
"type": "primary",
},
]
expected_error_message = "_iscsi_device_add() got an unexpected keyword argument 'type'"
with assert_raises(TypeError) as error:
boot_policy_order_set(handle, boot_policy_dn, devices)
assert_equal(error.exception.message, expected_error_message)
def test_boot_iscsi_002(mock_login, mock_query_dn, mock_order_clear):
mock_login.return_value = True
bp = LsbootPolicy("org-root", name="test")
mock_query_dn.return_value = bp
mock_order_clear.return_value = None
# iscsi_case2:
# Add first iscsi without vnic_name and type
devices = [
{"device_name": "iscsi",
"device_order": "1",
},
]
expected_error_message = "_iscsi_device_add() takes exactly 3 arguments (2 given)"
with assert_raises(TypeError) as error:
boot_policy_order_set(handle, boot_policy_dn, devices)
assert_equal(error.exception.message, expected_error_message)
def test_boot_special_case_01(mock_login, mock_query_dn, mock_order_clear):
mock_login.return_value = True
bp = LsbootPolicy("org-root", name="test")
mock_query_dn.return_value = bp
mock_order_clear.return_value = None
# special_case1:
# Add first local_disk
# Add another local_device
devices = [
{"device_name": "local_disk",
"device_order": "1",
},
{"device_name": "sdcard",
"device_order": "2",
},
]
expected_error_message = "_device_add failed, error: local_disk cannot be added with other local devices."
with assert_raises(UcsOperationError) as error:
boot_policy_order_set(handle, boot_policy_dn, devices)
assert_equal(error.exception.message, expected_error_message)
def test_boot_special_case_02(mock_login, mock_query_dn, mock_order_clear):
mock_login.return_value = True
bp = LsbootPolicy("org-root", name="test")
mock_query_dn.return_value = bp
mock_order_clear.return_value = None
# special_case2:
# Add first cd_dvd
# Add another cd_dvd_local
devices = [
{"device_name": "cd_dvd",
"device_order": "1",
},
{"device_name": "cd_dvd_local",
"device_order": "2",
},
]
expected_error_message = "_device_add failed, error: 'cd_dvd' or 'cd_dvd_local, cd_dvd_remote'"
with assert_raises(UcsOperationError) as error:
boot_policy_order_set(handle, boot_policy_dn, devices)
assert_equal(error.exception.message, expected_error_message)
def test_boot_special_case_03(mock_login, mock_query_dn, mock_order_clear):
mock_login.return_value = True
bp = LsbootPolicy("org-root", name="test")
mock_query_dn.return_value = bp
mock_order_clear.return_value = None
# special_case3:
# Add first floppy
# Add another floppy_local
devices = [
{"device_name": "floppy",
"device_order": "1",
},
{"device_name": "floppy_local",
"device_order": "2",
},
]
expected_error_message = "_device_add failed, error: 'floppy' or 'floppy_local, floppy_remote'"
with assert_raises(UcsOperationError) as error:
boot_policy_order_set(handle, boot_policy_dn, devices)
assert_equal(error.exception.message, expected_error_message)
def test_boot_special_case_05(mock_login, mock_query_dn, mock_order_clear):
mock_login.return_value = True
bp = LsbootPolicy("org-root", name="test")
mock_query_dn.return_value = bp
mock_order_clear.return_value = None
# special_case4:
# Add virtual_drive
# Add virtual_drive
devices = [
{"device_name": "virtual_drive",
"device_order": "1",
},
{"device_name": "virtual_drive",
"device_order": "2",
},
]
expected_error_message = "_vmedia_device_add failed, error: Device 'virtual_drive' already exist at order '1'"
with assert_raises(UcsOperationError) as error:
boot_policy_order_set(handle, boot_policy_dn, devices)
assert_equal(error.exception.message, expected_error_message)
def test_cluster_proxy_pool():
with patch('django_nameko.rpc.ClusterRpcProxy') as FakeClusterRpcProxy:
pool = rpc.ClusterRpcProxyPool(dict(), pool_size=2)
pool.start()
assert pool.queue.qsize() == 2
with pool.next() as client:
assert pool.queue.qsize() == 1
client.foo.bar()
assert call().start().foo.bar() in FakeClusterRpcProxy.mock_calls
with pool.next():
assert pool.queue.qsize() == 0
tools.assert_raises(queue_six.Empty, pool.next, timeout=1)
assert pool.queue.qsize() == 1
assert pool.queue.qsize() == 2
pool.stop()
def test_powerview_create_title_missing(self):
'''Missing title raises ValidationError.'''
sysadmin = Sysadmin()
data_dict = self._make_create_data_dict()
# remove title key
del data_dict['title']
with nosetools.assert_raises(ValidationError) as cm:
toolkit.get_action('powerview_create')(
context={'user': sysadmin['name']},
data_dict=data_dict
)
error_dict = cm.exception.error_dict['title']
nosetools.assert_true("Missing value"
in error_dict,
"Expected string not in exception message.")
def test_powerview_create_view_type_missing(self):
'''Missing view_type raises ValidationError.'''
sysadmin = Sysadmin()
data_dict = self._make_create_data_dict()
# remove title key
del data_dict['view_type']
with nosetools.assert_raises(ValidationError) as cm:
toolkit.get_action('powerview_create')(
context={'user': sysadmin['name']},
data_dict=data_dict
)
error_dict = cm.exception.error_dict['view_type']
nosetools.assert_true("Missing value"
in error_dict,
"Expected string not in exception message.")
def test_powerview_create_config_must_be_json(self):
'''If present, config must be a json string.'''
sysadmin = Sysadmin()
data_dict = self._make_create_data_dict()
# replace config with non-json string
data_dict['config'] = "I'm not json."
with nosetools.assert_raises(ValidationError) as cm:
toolkit.get_action('powerview_create')(
context={'user': sysadmin['name']},
data_dict=data_dict
)
error_dict = cm.exception.error_dict['config']
nosetools.assert_true("Could not parse as valid JSON"
in error_dict,
"Expected string not in exception message.")
def test_powerview_update_no_id(self):
'''Updating with a missing powerview id raises error.'''
sysadmin = Sysadmin()
powerview_dict = factories.PowerView()
del powerview_dict['id']
with nosetools.assert_raises(ValidationError) as cm:
toolkit.get_action('powerview_update')(
context={'user': sysadmin['name']},
data_dict=powerview_dict
)
error_dict = cm.exception.error_dict['id']
nosetools.assert_true("Missing value"
in error_dict,
"Expected string not in exception message.")
def test_powerview_update_nonexisting_id(self):
'''Updating with a non-existing powerview id raises error.'''
sysadmin = Sysadmin()
powerview_dict = factories.PowerView()
powerview_dict['id'] = 'non-existing-id'
with nosetools.assert_raises(ValidationError) as cm:
toolkit.get_action('powerview_update')(
context={'user': sysadmin['name']},
data_dict=powerview_dict
)
error_dict = cm.exception.error_dict['id']
nosetools.assert_true("Not found: PowerView"
in error_dict,
"Expected string not in exception message.")
def test_powerview_show_no_id(self):
'''Calling powerview show with a missing id raises error.'''
sysadmin = Sysadmin()
factories.PowerView()
with nosetools.assert_raises(ValidationError) as cm:
toolkit.get_action('powerview_show')(
context={'user': sysadmin['name']},
data_dict={}
)
error_dict = cm.exception.error_dict['id']
nosetools.assert_true("Missing value"
in error_dict,
"Expected string not in exception message.")
def test_powerview_show_nonexisting_id(self):
'''Calling powerview show with a non-existing id raises error.'''
sysadmin = Sysadmin()
factories.PowerView()
with nosetools.assert_raises(ValidationError) as cm:
toolkit.get_action('powerview_show')(
context={'user': sysadmin['name']},
data_dict={'id': 'non-existin-id'}
)
error_dict = cm.exception.error_dict['id']
nosetools.assert_true("Not found: PowerView"
in error_dict,
"Expected string not in exception message.")
def test_powerview_resource_list_nonexisting_id(self):
'''Calling powerview resource list with a non-existing id raises
error.'''
sysadmin = Sysadmin()
factories.PowerView()
with nosetools.assert_raises(ValidationError) as cm:
toolkit.get_action('powerview_resource_list')(
context={'user': sysadmin['name']},
data_dict={'id': 'non-existin-id'}
)
error_dict = cm.exception.error_dict['id']
nosetools.assert_true("Not found: PowerView"
in error_dict,
"Expected string not in exception message.")
def test_powerview_delete_no_id(self):
'''Calling powerview delete with a missing id raises error.'''
sysadmin = Sysadmin()
factories.PowerView()
with nosetools.assert_raises(ValidationError) as cm:
toolkit.get_action('powerview_delete')(
context={'user': sysadmin['name']},
data_dict={}
)
error_dict = cm.exception.error_dict['id']
nosetools.assert_true("Missing value"
in error_dict,
"Expected string not in exception message.")
def test_powerview_delete_nonexisting_id(self):
'''Calling powerview delete with a non-existing id raises error.'''
sysadmin = Sysadmin()
factories.PowerView()
with nosetools.assert_raises(ValidationError) as cm:
toolkit.get_action('powerview_delete')(
context={'user': sysadmin['name']},
data_dict={'id': 'non-existin-id'}
)
error_dict = cm.exception.error_dict['id']
nosetools.assert_true("Not found: PowerView"
in error_dict,
"Expected string not in exception message.")
def test_powerview_add_resource_nonexisting_resource_id(self):
'''Calling powerview_add_resource with nonexisting resource id raises
ValidationError.'''
sysadmin = Sysadmin()
powerview = factories.PowerView()
with nosetools.assert_raises(ValidationError) as cm:
toolkit.get_action('powerview_add_resource')(
context={'user': sysadmin['name']},
data_dict={'id': powerview['id'],
'resource_id': 'non-existing-id'}
)
error_dict = cm.exception.error_dict['resource_id']
nosetools.assert_true("Not found: Resource"
in error_dict,
"Expected string not in exception message.")
def test_powerview_remove_resource_nonexisting_resource_id(self):
'''Calling powerview_remove_resource with nonexisting resource id raises
ValidationError.'''
sysadmin = Sysadmin()
powerview = factories.PowerView()
with nosetools.assert_raises(ValidationError) as cm:
toolkit.get_action('powerview_remove_resource')(
context={'user': sysadmin['name']},
data_dict={'id': powerview['id'],
'resource_id': 'non-existing-id'}
)
error_dict = cm.exception.error_dict['resource_id']
nosetools.assert_true("Not found: Resource"
in error_dict,
"Expected string not in exception message.")
def test_powerview_create_normal_user_allow_create_private_resources(self):
'''
Calling powerview create with normal user when allow_user_create is
true, with unauthorized resources, raises NotAuthorized.
'''
org = factories.Organization()
dataset = factories.Dataset(owner_org=org['id'], private="true")
r1 = factories.Resource(package_id=dataset['id'])
r2 = factories.Resource(package_id=dataset['id'])
a_user = factories.User()
pv_dict = self._make_create_data_dict(resources=[r1['id'], r2['id']])
context = {'user': a_user['name'], 'model': model}
nosetools.assert_raises(NotAuthorized, helpers.call_auth,
'ckanext_powerview_create', context=context,
**pv_dict)