def test_init(self, mock_get_env_var, mock_get_dir_path):
"""Test initialization of config object"""
# set return values of project utility functions
mock_get_env_var.side_effect = lambda *a, **k: ':0.0' if a and a[0] == 'display' else mock.DEFAULT
mock_get_dir_path.side_effect = lambda *a, **k: 'es_path' if a and a[0] == 'es_root' else mock.DEFAULT
# create mock config object
mock_config_object = mock.MagicMock(name='ConfigObject_instance')
settings = {}
mock_config_object.__getitem__ = lambda s, k: settings.__getitem__(k)
mock_config_object.__setitem__ = lambda s, k, v: settings.__setitem__(k, v)
mock_config_object.get = settings.get
# call init method with mock variables
CONFIG_VARS.update([('sec1', ['var1']), ('sec2', ['var2', 'var3'])])
CONFIG_DEFAULTS.update(var1='foo', var3=42)
ConfigObject.__init__(mock_config_object)
# check values of settings variables
exp_settings = dict(var1='foo', var2=None, var3=42, batchMode=False, esRoot='es_path',
resultsDir='es_path/results', dataDir='es_path/data', macrosDir='es_path/tutorials',
templatesDir='es_path/templates', configDir='es_path/config')
self.assertDictEqual(settings, exp_settings, 'unexpected resulting settings dictionary')
python类DEFAULT的实例源码
def test_sends_notification_if_resubmitted(self, run_check):
# Include whimsical set to True to avoid error in the False code branch:
config = {
'duration': 1.0,
'host': 'fakehost.herokuapp.com',
'whimsical': False
}
fake_assignment = mock.Mock(AssignmentStatus='Submitted')
mturk = mock.Mock(**{'get_assignment.return_value': [fake_assignment]})
participants = [self.a.participant()]
session = None
# Move the clock forward so assignment is overdue:
reference_time = datetime.datetime.now() + datetime.timedelta(hours=6)
mock_messager = mock.Mock(spec=NullHITMessager)
with mock.patch.multiple('dallinger.heroku.clock',
requests=mock.DEFAULT,
NullHITMessager=mock.DEFAULT) as mocks:
mocks['NullHITMessager'].return_value = mock_messager
run_check(config, mturk, participants, session, reference_time)
mock_messager.send_resubmitted_msg.assert_called()
def test_no_assignement_on_mturk_sends_hit_cancelled_message(self, run_check):
# Include whimsical set to True to avoid error in the False code branch:
config = {
'duration': 1.0,
'host': 'fakehost.herokuapp.com',
'whimsical': False
}
mturk = mock.Mock(**{'get_assignment.return_value': []})
participants = [self.a.participant()]
session = None
# Move the clock forward so assignment is overdue:
reference_time = datetime.datetime.now() + datetime.timedelta(hours=6)
mock_messager = mock.Mock(spec=NullHITMessager)
with mock.patch.multiple('dallinger.heroku.clock',
requests=mock.DEFAULT,
NullHITMessager=mock.DEFAULT) as mocks:
mocks['NullHITMessager'].return_value = mock_messager
run_check(config, mturk, participants, session, reference_time)
mock_messager.send_hit_cancelled_msg.assert_called()
def test_retry(self, mock_exc_to_code, mock_time):
mock_exc_to_code.side_effect = lambda e: e.code
to_attempt = 3
retry = RetryOptions(
[_FAKE_STATUS_CODE_1],
BackoffSettings(0, 0, 0, 0, 0, 0, 1))
# Succeeds on the to_attempt'th call, and never again afterward
mock_call = mock.Mock()
mock_call.side_effect = ([CustomException('', _FAKE_STATUS_CODE_1)] *
(to_attempt - 1) + [mock.DEFAULT])
mock_call.return_value = 1729
mock_time.return_value = 0
settings = _CallSettings(timeout=0, retry=retry)
my_callable = api_callable.create_api_call(mock_call, settings)
self.assertEqual(my_callable(None), 1729)
self.assertEqual(mock_call.call_count, to_attempt)
def test_retryable_without_timeout(self, mock_time, mock_exc_to_code):
mock_time.return_value = 0
mock_exc_to_code.side_effect = lambda e: e.code
to_attempt = 3
mock_call = mock.Mock()
mock_call.side_effect = ([CustomException('', _FAKE_STATUS_CODE_1)] *
(to_attempt - 1) + [mock.DEFAULT])
mock_call.return_value = 1729
retry_options = RetryOptions(
[_FAKE_STATUS_CODE_1],
BackoffSettings(0, 0, 0, None, None, None, None))
my_callable = retry.retryable(mock_call, retry_options)
self.assertEqual(my_callable(None), 1729)
self.assertEqual(to_attempt, mock_call.call_count)
def test_retryable_with_timeout(self, mock_time, mock_exc_to_code):
mock_time.return_value = 1
mock_exc_to_code.side_effect = lambda e: e.code
mock_call = mock.Mock()
mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_1),
mock.DEFAULT]
mock_call.return_value = 1729
retry_options = RetryOptions(
[_FAKE_STATUS_CODE_1],
BackoffSettings(0, 0, 0, 0, 0, 0, 0))
my_callable = retry.retryable(mock_call, retry_options)
self.assertRaises(errors.RetryError, my_callable)
self.assertEqual(0, mock_call.call_count)
def test_retryable_when_no_codes(self, mock_time, mock_exc_to_code):
mock_time.return_value = 0
mock_exc_to_code.side_effect = lambda e: e.code
mock_call = mock.Mock()
mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_1),
mock.DEFAULT]
mock_call.return_value = 1729
retry_options = RetryOptions(
[],
BackoffSettings(0, 0, 0, 0, 0, 0, 1))
my_callable = retry.retryable(mock_call, retry_options)
try:
my_callable(None)
self.fail('Should not have been reached')
except errors.RetryError as exc:
self.assertIsInstance(exc.cause, CustomException)
self.assertEqual(1, mock_call.call_count)
def test_retryable_aborts_on_unexpected_exception(
self, mock_time, mock_exc_to_code):
mock_time.return_value = 0
mock_exc_to_code.side_effect = lambda e: e.code
mock_call = mock.Mock()
mock_call.side_effect = [CustomException('', _FAKE_STATUS_CODE_2),
mock.DEFAULT]
mock_call.return_value = 1729
retry_options = RetryOptions(
[_FAKE_STATUS_CODE_1],
BackoffSettings(0, 0, 0, 0, 0, 0, 1))
my_callable = retry.retryable(mock_call, retry_options)
try:
my_callable(None)
self.fail('Should not have been reached')
except errors.RetryError as exc:
self.assertIsInstance(exc.cause, CustomException)
self.assertEqual(1, mock_call.call_count)
def test_nrpe_dependency_installed(self, mock_config):
config = copy.deepcopy(CHARM_CONFIG)
mock_config.side_effect = lambda key: config[key]
with patch.multiple(ceph_hooks,
apt_install=DEFAULT,
rsync=DEFAULT,
log=DEFAULT,
write_file=DEFAULT,
nrpe=DEFAULT) as mocks:
ceph_hooks.update_nrpe_config()
mocks["apt_install"].assert_called_once_with(
["python-dbus", "lockfile-progs"])
def test_train_1(self):
j = 2
ret_train = np.zeros((6, 3, N_CLASSES))
ret_dev = np.zeros((6, 3, N_CLASSES))
func_ret = np.zeros((1, N_CLASSES))
func_ret[0, j] = 1.
with patch.multiple(self.wb,
_gs=True,
_generate_ts=lambda *x: (self.TRAIN_X, Y),
_extract_features=MagicMock(
return_value=FEATS),
_model=MOCK_DEFAULT):
with patch("dsenser.wang.wangbase.GridSearchCV"):
self.wb._model.decision_function = \
MagicMock(return_value=func_ret)
self.wb._model.classes_ = CLASSES_
self.wb.train(([(0, REL1)], [PARSE1]),
([(0, REL1)], [PARSE1]),
1, 1, ret_train, ret_dev)
def test_upgrade_charm_with_nrpe_relation_installs_dependencies(self):
with patch.multiple(
ceph_hooks,
apt_install=DEFAULT,
rsync=DEFAULT,
log=DEFAULT,
write_file=DEFAULT,
nrpe=DEFAULT,
emit_cephconf=DEFAULT,
mon_relation_joined=DEFAULT,
is_relation_made=DEFAULT) as mocks, patch(
"charmhelpers.contrib.hardening.harden.config"):
mocks["is_relation_made"].return_value = True
ceph_hooks.upgrade_charm()
mocks["apt_install"].assert_called_with(
["python-dbus", "lockfile-progs"])
def test__allocate_sockets(self):
"""Test allocating sockets.
"""
# access protected module _allocate_sockets
# pylint: disable=w0212
socket.socket.bind.side_effect = [
socket.error(errno.EADDRINUSE, 'In use'),
mock.DEFAULT,
mock.DEFAULT,
mock.DEFAULT
]
sockets = treadmill.runtime._allocate_sockets(
'prod', '0.0.0.0', socket.SOCK_STREAM, 3
)
self.assertEqual(3, len(sockets))
def patch(
target, new=mock.DEFAULT, spec=None, create=False,
mocksignature=False, spec_set=None, autospec=False,
new_callable=None, **kwargs
):
"""Mocks an async function.
Should be a drop-in replacement for mock.patch that handles async automatically. The .asynq
attribute is automatically created and shouldn't be used when accessing data on
the mock.
"""
getter, attribute = _get_target(target)
return _make_patch_async(
getter, attribute, new, spec, create, mocksignature, spec_set, autospec, new_callable,
kwargs
)
test_luks.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_attach_volume_fail(self, mock_execute):
self.encryptor._get_key = mock.MagicMock()
self.encryptor._get_key.return_value = \
test_cryptsetup.fake__get_key(None)
mock_execute.side_effect = [
processutils.ProcessExecutionError(exit_code=1), # luksOpen
mock.DEFAULT, # isLuks
]
self.assertRaises(processutils.ProcessExecutionError,
self.encryptor.attach_volume, None)
mock_execute.assert_has_calls([
mock.call('cryptsetup', 'luksOpen', '--key-file=-', self.dev_path,
self.dev_name, process_input='0' * 32,
run_as_root=True, check_exit_code=True),
mock.call('cryptsetup', 'isLuks', '--verbose', self.dev_path,
run_as_root=True, check_exit_code=True),
], any_order=False)
self.assertEqual(2, mock_execute.call_count)
def test_deploy_no_callback(self, power_mock, get_ip_mock):
self.config(group='ansible', use_ramdisk_callback=False)
with mock.patch.multiple(self.driver,
_ansible_deploy=mock.DEFAULT,
reboot_to_instance=mock.DEFAULT) as moks:
with task_manager.acquire(
self.context, self.node['uuid'], shared=False) as task:
driver_return = self.driver.deploy(task)
self.assertEqual(driver_return, states.DEPLOYDONE)
power_mock.assert_called_once_with(task, states.REBOOT)
get_ip_mock.assert_called_once_with(task)
moks['_ansible_deploy'].assert_called_once_with(task,
'127.0.0.1')
moks['reboot_to_instance'].assert_called_once_with(task)
def test_continue_deploy(self, getip_mock):
self.node.provision_state = states.DEPLOYWAIT
self.node.target_provision_state = states.ACTIVE
self.node.save()
with task_manager.acquire(self.context, self.node.uuid) as task:
with mock.patch.multiple(self.driver, autospec=True,
_ansible_deploy=mock.DEFAULT,
reboot_to_instance=mock.DEFAULT):
self.driver.continue_deploy(task)
getip_mock.assert_called_once_with(task)
self.driver._ansible_deploy.assert_called_once_with(
task, '1.2.3.4')
self.driver.reboot_to_instance.assert_called_once_with(task)
self.assertEqual(states.ACTIVE, task.node.target_provision_state)
self.assertEqual(states.DEPLOYING, task.node.provision_state)
def test_run(self, mCreateSession, mSample):
mSample.return_value = 'XXX'
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
client.list_activities.return_value = {
'activities':[{
'name': 'name',
'activityArn': 'XXX'
}]
}
client.get_activity_task.return_value = {
'taskToken': 'YYY',
'input': '{}'
}
target = mock.MagicMock()
activity = ActivityMixin(handle_task = target)
def stop_loop(*args, **kwargs):
activity.polling = False
return mock.DEFAULT
target.side_effect = stop_loop
activity.run('name')
calls = [
mock.call.list_activities(),
mock.call.get_activity_task(activityArn = 'XXX',
workerName = 'name-XXX')
]
self.assertEqual(client.mock_calls, calls)
calls = [
mock.call('YYY', {}),
mock.call().start()
]
self.assertEqual(target.mock_calls, calls)
def patchobject(self, target, attribute, new=mock.DEFAULT, autospec=True):
"""Convenient wrapper around `mock.patch.object`
Returns a started mock that will be automatically stopped after the
test ran.
"""
p = mock.patch.object(target, attribute, new, autospec=autospec)
m = p.start()
self.addCleanup(p.stop)
return m
def test_check_db_for_missing_notifications_assembles_resources(self, run_check):
# Can't import until after config is loaded:
from dallinger.heroku.clock import check_db_for_missing_notifications
with mock.patch.multiple('dallinger.heroku.clock',
run_check=mock.DEFAULT,
MTurkConnection=mock.DEFAULT) as mocks:
mocks['MTurkConnection'].return_value = 'fake connection'
check_db_for_missing_notifications()
mocks['run_check'].assert_called()
def recruiter(self):
from dallinger.recruiters import BotRecruiter
with mock.patch.multiple('dallinger.recruiters',
_get_queue=mock.DEFAULT,
get_base_url=mock.DEFAULT) as mocks:
mocks['get_base_url'].return_value = 'fake_base_url'
r = BotRecruiter()
r._get_bot_factory = mock.Mock()
yield r
def recruiter(self, active_config):
from dallinger.mturk import MTurkService
from dallinger.recruiters import MTurkRecruiter
with mock.patch.multiple('dallinger.recruiters',
os=mock.DEFAULT,
get_base_url=mock.DEFAULT) as mocks:
mocks['get_base_url'].return_value = 'http://fake-domain'
mocks['os'].getenv.return_value = 'fake-host-domain'
mockservice = mock.create_autospec(MTurkService)
active_config.extend({'mode': u'sandbox'})
r = MTurkRecruiter()
r.mturkservice = mockservice('fake key', 'fake secret')
r.mturkservice.check_credentials.return_value = True
r.mturkservice.create_hit.return_value = {'type_id': 'fake type id'}
return r
def faster(tempdir):
with mock.patch.multiple('dallinger.command_line',
time=mock.DEFAULT,
setup_experiment=mock.DEFAULT) as mocks:
mocks['setup_experiment'].return_value = ('fake-uid', tempdir)
yield mocks
def testDEFAULT(self):
self.assertIs(DEFAULT, sentinel.DEFAULT)
def test_upgrade_charm_with_nrpe_relation_installs_dependencies(
self,
mock_config):
config = copy.deepcopy(CHARM_CONFIG)
mock_config.side_effect = lambda key: config[key]
with patch.multiple(
ceph_hooks,
apt_install=DEFAULT,
rsync=DEFAULT,
log=DEFAULT,
write_file=DEFAULT,
nrpe=DEFAULT,
emit_cephconf=DEFAULT,
mon_relation_joined=DEFAULT,
is_relation_made=DEFAULT) as mocks, patch(
"charmhelpers.contrib.hardening.harden.config"):
mocks["is_relation_made"].return_value = True
ceph_hooks.upgrade_charm()
mocks["apt_install"].assert_called_with(
["python-dbus", "lockfile-progs"])
def test_send_request(self):
""" Test the execution of a deferred Supervisor request. """
from supvisors.mainloop import SupvisorsMainLoop
from supvisors.utils import DeferredRequestHeaders
main_loop = SupvisorsMainLoop(self.supvisors)
# patch main loop subscriber
with patch.multiple(main_loop, check_address=DEFAULT,
start_process=DEFAULT, stop_process=DEFAULT,
restart=DEFAULT, shutdown=DEFAULT) as mocked_loop:
# test check address
self.check_call(main_loop, mocked_loop, 'check_address',
DeferredRequestHeaders.CHECK_ADDRESS,
('10.0.0.2', ))
# test start process
self.check_call(main_loop, mocked_loop, 'start_process',
DeferredRequestHeaders.START_PROCESS,
('10.0.0.2', 'dummy_process', 'extra args'))
# test stop process
self.check_call(main_loop, mocked_loop, 'stop_process',
DeferredRequestHeaders.STOP_PROCESS,
('10.0.0.2', 'dummy_process'))
# test restart
self.check_call(main_loop, mocked_loop, 'restart',
DeferredRequestHeaders.RESTART,
('10.0.0.2', ))
# test shutdown
self.check_call(main_loop, mocked_loop, 'shutdown',
DeferredRequestHeaders.SHUTDOWN,
('10.0.0.2', ))
def test_on_remote_event(self):
""" Test the reception of a Supervisor remote comm event. """
from supvisors.listener import SupervisorListener
listener = SupervisorListener(self.supvisors)
# add patches for what is tested just above
with patch.multiple(listener, unstack_event=DEFAULT,
unstack_info=DEFAULT, authorization=DEFAULT):
# test unknown type
event = Mock(type='unknown', data='')
listener.on_remote_event(event)
self.assertFalse(listener.unstack_event.called)
self.assertFalse(listener.unstack_info.called)
self.assertFalse(listener.authorization.called)
# test event
event = Mock(type='event', data={'state': 'RUNNING'})
listener.on_remote_event(event)
self.assertEqual([call({'state': 'RUNNING'})],
listener.unstack_event.call_args_list)
self.assertFalse(listener.unstack_info.called)
self.assertFalse(listener.authorization.called)
listener.unstack_event.reset_mock()
# test info
event = Mock(type='info', data={'name': 'dummy_process'})
listener.on_remote_event(event)
self.assertFalse(listener.unstack_event.called)
self.assertEqual([call({'name': 'dummy_process'})],
listener.unstack_info.call_args_list)
self.assertFalse(listener.authorization.called)
listener.unstack_info.reset_mock()
# test authorization
event = Mock(type='auth', data=('10.0.0.1', True))
listener.on_remote_event(event)
self.assertFalse(listener.unstack_event.called)
self.assertFalse(listener.unstack_info.called)
self.assertEqual([call(('10.0.0.1', True))],
listener.authorization.call_args_list)
def test_run(self):
self.cls.reactor = Mock(spec_set=reactor)
with patch.multiple(
pbm,
logger=DEFAULT,
Site=DEFAULT,
LoopingCall=DEFAULT,
VaultRedirectorSite=DEFAULT
) as mod_mocks:
with patch.multiple(
pb,
get_active_node=DEFAULT,
run_reactor=DEFAULT,
listentcp=DEFAULT,
add_update_loop=DEFAULT,
listentls=DEFAULT
) as cls_mocks:
cls_mocks['get_active_node'].return_value = 'consul:1234'
self.cls.run()
assert self.cls.active_node_ip_port == 'consul:1234'
assert mod_mocks['logger'].mock_calls == [
call.warning('Initial Vault active node: %s', 'consul:1234'),
call.warning('Starting Twisted reactor (event loop)')
]
assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
assert mod_mocks['Site'].mock_calls == [
call(mod_mocks['VaultRedirectorSite'].return_value)
]
assert self.cls.reactor.mock_calls == []
assert cls_mocks['run_reactor'].mock_calls == [call()]
assert mod_mocks['LoopingCall'].mock_calls == []
assert cls_mocks['listentcp'].mock_calls == [
call(mod_mocks['Site'].return_value)
]
assert cls_mocks['add_update_loop'].mock_calls == [call()]
assert cls_mocks['listentls'].mock_calls == []
def test_run_tls(self):
self.cls.reactor = Mock(spec_set=reactor)
self.cls.tls_factory = Mock()
with patch.multiple(
pbm,
logger=DEFAULT,
Site=DEFAULT,
LoopingCall=DEFAULT,
VaultRedirectorSite=DEFAULT
) as mod_mocks:
with patch.multiple(
pb,
get_active_node=DEFAULT,
run_reactor=DEFAULT,
listentcp=DEFAULT,
add_update_loop=DEFAULT,
listentls=DEFAULT
) as cls_mocks:
cls_mocks['get_active_node'].return_value = 'consul:1234'
self.cls.run()
assert self.cls.active_node_ip_port == 'consul:1234'
assert mod_mocks['logger'].mock_calls == [
call.warning('Initial Vault active node: %s', 'consul:1234'),
call.warning('Starting Twisted reactor (event loop)')
]
assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
assert mod_mocks['Site'].mock_calls == [
call(mod_mocks['VaultRedirectorSite'].return_value)
]
assert self.cls.reactor.mock_calls == []
assert cls_mocks['run_reactor'].mock_calls == [call()]
assert mod_mocks['LoopingCall'].mock_calls == []
assert cls_mocks['listentls'].mock_calls == [
call(mod_mocks['Site'].return_value)
]
assert cls_mocks['add_update_loop'].mock_calls == [call()]
assert cls_mocks['listentcp'].mock_calls == []
def test_run_error(self):
self.cls.reactor = Mock(spec_set=reactor)
with patch.multiple(
pbm,
logger=DEFAULT,
Site=DEFAULT,
LoopingCall=DEFAULT,
VaultRedirectorSite=DEFAULT
) as mod_mocks:
with patch.multiple(
pb,
get_active_node=DEFAULT,
run_reactor=DEFAULT,
listentcp=DEFAULT,
add_update_loop=DEFAULT
) as cls_mocks:
cls_mocks['get_active_node'].return_value = None
with pytest.raises(SystemExit) as excinfo:
self.cls.run()
assert excinfo.value.code == 3
assert mod_mocks['logger'].mock_calls == [
call.critical("ERROR: Could not get active vault node from "
"Consul. Exiting.")
]
assert mod_mocks['VaultRedirectorSite'].mock_calls == []
assert mod_mocks['Site'].mock_calls == []
assert self.cls.reactor.mock_calls == []
assert cls_mocks['run_reactor'].mock_calls == []
assert mod_mocks['LoopingCall'].mock_calls == []
def run_and_exit(cli_args=None, prompt_commands=None):
"""Run http-prompt executable, execute some prompt commands, and exit."""
if cli_args is None:
cli_args = []
# Make sure last command is 'exit'
if prompt_commands is None:
prompt_commands = ['exit']
else:
prompt_commands += ['exit']
# Fool cli() so that it believes we're running from CLI instead of pytest.
# We will restore it at the end of the function.
orig_argv = sys.argv
sys.argv = ['http-prompt'] + cli_args
try:
with patch.multiple('http_prompt.cli',
prompt=DEFAULT, execute=DEFAULT) as mocks:
mocks['execute'].side_effect = execute
# prompt() is mocked to return the command in 'prompt_commands' in
# sequence, i.e., prompt() returns prompt_commands[i-1] when it is
# called for the ith time
mocks['prompt'].side_effect = prompt_commands
result = CliRunner().invoke(cli, cli_args)
context = mocks['execute'].call_args[0][1]
return result, context
finally:
sys.argv = orig_argv