def test_env_only_calls_get():
context = Context({
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
'envGet': {
'key2': 'ARB_GET_ME1',
'key4': 'ARB_GET_ME2'
}
})
with patch.multiple('pypyr.steps.env',
env_get=DEFAULT,
env_set=DEFAULT,
env_unset=DEFAULT
) as mock_env:
pypyr.steps.env.run_step(context)
mock_env['env_get'].assert_called_once()
mock_env['env_set'].assert_not_called()
mock_env['env_unset'].assert_not_called()
python类DEFAULT的实例源码
def test_env_only_calls_set():
context = Context({
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
'envSet': {
'ARB_SET_ME1': 'key2',
'ARB_SET_ME2': 'key1'
}
})
with patch.multiple('pypyr.steps.env',
env_get=DEFAULT,
env_set=DEFAULT,
env_unset=DEFAULT
) as mock_env:
pypyr.steps.env.run_step(context)
mock_env['env_get'].assert_not_called()
mock_env['env_set'].assert_called_once()
mock_env['env_unset'].assert_not_called()
def test_env_only_calls_unset():
context = Context({
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
'envUnset': [
'ARB_DELETE_ME1',
'ARB_DELETE_ME2'
]
})
with patch.multiple('pypyr.steps.env',
env_get=DEFAULT,
env_set=DEFAULT,
env_unset=DEFAULT
) as mock_env:
pypyr.steps.env.run_step(context)
mock_env['env_get'].assert_not_called()
mock_env['env_set'].assert_not_called()
mock_env['env_unset'].assert_called_once()
def test_env_only_calls_set_unset():
context = Context({
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
'envUnset': [
'ARB_DELETE_ME1',
'ARB_DELETE_ME2'
],
'envSet': {
'ARB_SET_ME1': 'key2',
'ARB_SET_ME2': 'key1'
}
})
with patch.multiple('pypyr.steps.env',
env_get=DEFAULT,
env_set=DEFAULT,
env_unset=DEFAULT
) as mock_env:
pypyr.steps.env.run_step(context)
mock_env['env_get'].assert_not_called()
mock_env['env_set'].assert_called_once()
mock_env['env_unset'].assert_called_once()
def test_tar_only_calls_extract():
"""Only calls extract if only extract specified."""
context = Context({
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
'tarExtract': [
{'in': 'key2',
'out': 'ARB_GET_ME1'},
{'in': 'key4',
'out': 'ARB_GET_ME2'}
]
})
with patch.multiple('pypyr.steps.tar',
tar_archive=DEFAULT,
tar_extract=DEFAULT
) as mock_tar:
pypyr.steps.tar.run_step(context)
mock_tar['tar_extract'].assert_called_once()
mock_tar['tar_archive'].assert_not_called()
def test_tar_calls_archive_and_extract():
"""Calls both extract and archive when both specified."""
context = Context({
'key2': 'value2',
'key1': 'value1',
'key3': 'value3',
'tarArchive': [
{'in': 'key2',
'out': 'ARB_GET_ME1'},
{'in': 'key4',
'out': 'ARB_GET_ME2'}
],
'tarExtract': [
{'in': 'key2',
'out': 'ARB_GET_ME1'},
{'in': 'key4',
'out': 'ARB_GET_ME2'}
]
})
with patch.multiple('pypyr.steps.tar',
tar_archive=DEFAULT,
tar_extract=DEFAULT
) as mock_tar:
pypyr.steps.tar.run_step(context)
mock_tar['tar_extract'].assert_called_once()
mock_tar['tar_archive'].assert_called_once()
# ------------------------- tar base ---------------------------------------#
#
# ------------------------- tar extract---------------------------------------#
def patch(
target, new=mock.DEFAULT, spec=None, create=False,
mocksignature=False, spec_set=None, autospec=False,
new_callable=None, **kwargs
):
"""Mocks an async function.
Should be a drop-in replacement for mock.patch that handles async automatically. The .asynq
attribute is automatically created and shouldn't be used when accessing data on
the mock.
"""
getter, attribute = _get_target(target)
return _make_patch_async(
getter, attribute, new, spec, create, mocksignature, spec_set, autospec, new_callable,
kwargs
)
def test_run(self, mCreateSession, mSample):
mSample.return_value = 'XXX'
iSession = MockSession()
mCreateSession.return_value = (iSession, '123456')
client = iSession.client('stepfunctions')
client.list_activities.return_value = {
'activities':[{
'name': 'name',
'activityArn': 'XXX'
}]
}
client.get_activity_task.return_value = {
'taskToken': 'YYY',
'input': '{}'
}
target = mock.MagicMock()
activity = ActivityMixin(handle_task = target)
def stop_loop(*args, **kwargs):
activity.polling = False
return mock.DEFAULT
target.side_effect = stop_loop
activity.run('name')
calls = [
mock.call.list_activities(),
mock.call.get_activity_task(activityArn = 'XXX',
workerName = 'name-XXX')
]
self.assertEqual(client.mock_calls, calls)
calls = [
mock.call('YYY', {}),
mock.call().start()
]
self.assertEqual(target.mock_calls, calls)
def test_run(self):
self.cls.reactor = Mock(spec_set=reactor)
with patch.multiple(
pbm,
logger=DEFAULT,
Site=DEFAULT,
LoopingCall=DEFAULT,
VaultRedirectorSite=DEFAULT
) as mod_mocks:
with patch.multiple(
pb,
get_active_node=DEFAULT,
run_reactor=DEFAULT,
listentcp=DEFAULT,
add_update_loop=DEFAULT,
listentls=DEFAULT
) as cls_mocks:
cls_mocks['get_active_node'].return_value = 'consul:1234'
self.cls.run()
assert self.cls.active_node_ip_port == 'consul:1234'
assert mod_mocks['logger'].mock_calls == [
call.warning('Initial Vault active node: %s', 'consul:1234'),
call.warning('Starting Twisted reactor (event loop)')
]
assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
assert mod_mocks['Site'].mock_calls == [
call(mod_mocks['VaultRedirectorSite'].return_value)
]
assert self.cls.reactor.mock_calls == []
assert cls_mocks['run_reactor'].mock_calls == [call()]
assert mod_mocks['LoopingCall'].mock_calls == []
assert cls_mocks['listentcp'].mock_calls == [
call(mod_mocks['Site'].return_value)
]
assert cls_mocks['add_update_loop'].mock_calls == [call()]
assert cls_mocks['listentls'].mock_calls == []
def test_run_tls(self):
self.cls.reactor = Mock(spec_set=reactor)
self.cls.tls_factory = Mock()
with patch.multiple(
pbm,
logger=DEFAULT,
Site=DEFAULT,
LoopingCall=DEFAULT,
VaultRedirectorSite=DEFAULT
) as mod_mocks:
with patch.multiple(
pb,
get_active_node=DEFAULT,
run_reactor=DEFAULT,
listentcp=DEFAULT,
add_update_loop=DEFAULT,
listentls=DEFAULT
) as cls_mocks:
cls_mocks['get_active_node'].return_value = 'consul:1234'
self.cls.run()
assert self.cls.active_node_ip_port == 'consul:1234'
assert mod_mocks['logger'].mock_calls == [
call.warning('Initial Vault active node: %s', 'consul:1234'),
call.warning('Starting Twisted reactor (event loop)')
]
assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
assert mod_mocks['Site'].mock_calls == [
call(mod_mocks['VaultRedirectorSite'].return_value)
]
assert self.cls.reactor.mock_calls == []
assert cls_mocks['run_reactor'].mock_calls == [call()]
assert mod_mocks['LoopingCall'].mock_calls == []
assert cls_mocks['listentls'].mock_calls == [
call(mod_mocks['Site'].return_value)
]
assert cls_mocks['add_update_loop'].mock_calls == [call()]
assert cls_mocks['listentcp'].mock_calls == []
def test_run_error(self):
self.cls.reactor = Mock(spec_set=reactor)
with patch.multiple(
pbm,
logger=DEFAULT,
Site=DEFAULT,
LoopingCall=DEFAULT,
VaultRedirectorSite=DEFAULT
) as mod_mocks:
with patch.multiple(
pb,
get_active_node=DEFAULT,
run_reactor=DEFAULT,
listentcp=DEFAULT,
add_update_loop=DEFAULT
) as cls_mocks:
cls_mocks['get_active_node'].return_value = None
with pytest.raises(SystemExit) as excinfo:
self.cls.run()
assert excinfo.value.code == 3
assert mod_mocks['logger'].mock_calls == [
call.critical("ERROR: Could not get active vault node from "
"Consul. Exiting.")
]
assert mod_mocks['VaultRedirectorSite'].mock_calls == []
assert mod_mocks['Site'].mock_calls == []
assert self.cls.reactor.mock_calls == []
assert cls_mocks['run_reactor'].mock_calls == []
assert mod_mocks['LoopingCall'].mock_calls == []
def testDEFAULT(self):
self.assertTrue(DEFAULT is sentinel.DEFAULT)
def test_cached(self):
mock_t = Mock()
mock_std = Mock()
mock_stpp = Mock()
mock_stm = Mock()
mock_mct = Mock()
mock_mbs = Mock()
mock_mos = Mock()
with patch.multiple(
pb,
autospec=True,
_transactions=DEFAULT,
_scheduled_transactions_date=DEFAULT,
_scheduled_transactions_per_period=DEFAULT,
_scheduled_transactions_monthly=DEFAULT,
_make_combined_transactions=DEFAULT,
_make_budget_sums=DEFAULT,
_make_overall_sums=DEFAULT
) as mocks:
mocks['_transactions'].return_value.all.return_value = mock_t
mocks['_scheduled_transactions_date'
''].return_value.all.return_value = mock_std
mocks['_scheduled_transactions_per_period'
''].return_value.all.return_value = mock_stpp
mocks['_scheduled_transactions_monthly'
''].return_value.all.return_value = mock_stm
mocks['_make_combined_transactions'].return_value = mock_mct
mocks['_make_budget_sums'].return_value = mock_mbs
mocks['_make_overall_sums'].return_value = mock_mos
self.cls._data_cache = {'foo': 'bar'}
res = self.cls._data
assert res == {'foo': 'bar'}
assert mocks['_transactions'].mock_calls == []
assert mocks['_scheduled_transactions_date'].mock_calls == []
assert mocks['_scheduled_transactions_per_period'].mock_calls == []
assert mocks['_scheduled_transactions_monthly'].mock_calls == []
assert mocks['_make_combined_transactions'].mock_calls == []
assert mocks['_make_budget_sums'].mock_calls == []
assert mocks['_make_overall_sums'].mock_calls == []
def test_upload_journal_logs(self):
''' test upload_journal_logs() '''
tskey = '__REALTIME_TIMESTAMP'
journal = systemd.journal.Reader(path=os.getcwd())
with patch('systemd.journal.Reader', return_value=journal) as journal:
with patch('main.JournaldClient', MagicMock(autospec=True)) as reader:
reader.return_value.__iter__.return_value = [sentinel.msg1, sentinel.msg2, sentinel.msg3, sentinel.msg4]
with patch.multiple(self.client, retain_message=DEFAULT, group_messages=DEFAULT):
log_group1 = Mock()
log_group2 = Mock()
self.client.retain_message.side_effect = [True, False, True, True]
self.client.group_messages.return_value = [
((log_group1, 'stream1'), [sentinel.msg1]),
((log_group2, 'stream2'), [sentinel.msg3, sentinel.msg4]),
]
self.client.upload_journal_logs(os.getcwd())
# creates reader
reader.assert_called_once_with(journal.return_value, self.CURSOR_CONTENT)
# uploads log messages
log_group1.log_messages.assert_called_once_with('stream1', [sentinel.msg1])
log_group2.log_messages.assert_called_once_with('stream2', [sentinel.msg3, sentinel.msg4])
def testDEFAULT(self):
self.assertIs(DEFAULT, sentinel.DEFAULT)
def testDEFAULT(self):
self.assertIs(DEFAULT, sentinel.DEFAULT)
def test_ioctl_fn_ptr_r(self, ioctl_mock):
def _handle_ioctl(fd, request, int_ptr):
assert fd == 12
assert request == 32
assert type(int_ptr) == ctypes.POINTER(ctypes.c_int)
int_ptr.contents.value = 42
return mock.DEFAULT
ioctl_mock.side_effect = _handle_ioctl
fn = ioctl.ioctl_fn_ptr_r(32, ctypes.c_int)
res = fn(12)
assert res == 42
def test_ioctl_fn_ptr_w(self, ioctl_mock):
def _handle_ioctl(fd, request, int_ptr):
assert fd == 12
assert request == 32
assert type(int_ptr) == ctypes.POINTER(ctypes.c_int)
assert int_ptr.contents.value == 42
return mock.DEFAULT
ioctl_mock.side_effect = _handle_ioctl
fn = ioctl.ioctl_fn_ptr_w(32, ctypes.c_int)
fn(12, 42)
def test_ioctl_fn_ptr_wr(self, ioctl_mock):
def _handle_ioctl(fd, request, int_ptr):
assert fd == 12
assert request == 32
assert type(int_ptr) == ctypes.POINTER(ctypes.c_int)
assert int_ptr.contents.value == 24
int_ptr.contents.value = 42
return mock.DEFAULT
ioctl_mock.side_effect = _handle_ioctl
fn = ioctl.ioctl_fn_ptr_wr(32, ctypes.c_int)
res = fn(12, 24)
assert res == 42
def test_ioctl_fn_w(self, ioctl_mock):
def _handle_ioctl(fd, request, int_val):
assert fd == 12
assert request == 32
assert type(int_val) == ctypes.c_int
assert int_val.value == 42
return mock.DEFAULT
ioctl_mock.side_effect = _handle_ioctl
fn = ioctl.ioctl_fn_w(32, ctypes.c_int)
fn(12, 42)
def _patch_object(
target, attribute, new=mock.DEFAULT, spec=None,
create=False, mocksignature=False, spec_set=None, autospec=False,
new_callable=None, **kwargs
):
getter = lambda: target
return _make_patch_async(
getter, attribute, new, spec, create, mocksignature, spec_set, autospec, new_callable,
kwargs
)
def _maybe_wrap_new(new):
"""If the mock replacement cannot have attributes set on it, wraps it in a function.
Also, if the replacement object is a method, applies the async() decorator.
This is needed so that we support patch(..., x.method) where x.method is an instancemethod
object, because instancemethods do not support attribute assignment.
"""
if new is mock.DEFAULT:
return new
if inspect.isfunction(new) or isinstance(new, (classmethod, staticmethod)):
return asynq(sync_fn=new)(new)
elif not callable(new):
return new
try:
new._maybe_wrap_new_test_attribute = None
del new._maybe_wrap_new_test_attribute
except (AttributeError, TypeError):
# setting something on a bound method raises AttributeError, setting something on a
# Cythonized class raises TypeError
should_wrap = True
else:
should_wrap = False
if should_wrap:
# we can't just use a lambda because that overrides __get__ and creates bound methods we
# don't want, so we make a wrapper class that overrides __call__
class Wrapper(object):
def __call__(self, *args, **kwargs):
return new(*args, **kwargs)
return Wrapper()
else:
return new
def test_acceptance(self):
logger.debug('starting acceptance test')
with patch.multiple(
pb,
get_active_node=DEFAULT,
run_reactor=DEFAULT,
update_active_node=DEFAULT
) as cls_mocks:
# setup some return values
cls_mocks['run_reactor'].side_effect = self.se_run_reactor
cls_mocks['get_active_node'].return_value = 'foo:1234'
cls_mocks['update_active_node'].side_effect = self.se_update_active
# instantiate class
self.cls = VaultRedirector('consul:1234', poll_interval=0.5)
self.cls.log_enabled = False
# make sure active is None (starting state)
assert self.cls.active_node_ip_port is None
self.cls.bind_port = self.get_open_port()
self.cls.log_enabled = True
# setup an async task to make the HTTP request
self.cls.reactor.callLater(2.0, self.se_requester)
# do this in case the callLater for self.stop_reactor fails...
signal.signal(signal.SIGALRM, self.stop_reactor)
signal.alarm(20) # send SIGALRM in 20 seconds, to stop runaway loop
self.cls.run()
signal.alarm(0) # disable SIGALRM
assert self.cls.active_node_ip_port == 'bar:5678' # from update_active
assert self.update_active_called is True
assert cls_mocks['update_active_node'].mock_calls[0] == call()
assert cls_mocks['run_reactor'].mock_calls == [call()]
assert cls_mocks['get_active_node'].mock_calls == [call()]
# HTTP response checks
resp = json.loads(self.response)
# /bar/baz
assert resp['/bar/baz']['headers'][
'Server'] == "vault-redirector/%s/TwistedWeb/%s" % (
_VERSION, twisted_version.short()
)
assert resp['/bar/baz']['headers'][
'Location'] == 'http://bar:5678/bar/baz'
assert resp['/bar/baz']['status_code'] == 307
# /vault-redirector-health
assert resp['/vault-redirector-health']['status_code'] == 200
health_info = json.loads(resp['/vault-redirector-health']['text'])
assert resp['/vault-redirector-health']['headers'][
'Content-Type'] == 'application/json'
assert health_info['healthy'] is True
assert health_info['application'] == 'vault-redirector'
assert health_info['version'] == _VERSION
assert health_info['source'] == _PROJECT_URL
assert health_info['consul_host_port'] == 'consul:1234'
assert health_info['active_vault'] == 'bar:5678'
def test_initial(self):
mock_t = Mock()
mock_std = Mock()
mock_stpp = Mock()
mock_stm = Mock()
mock_mct = Mock()
mock_mbs = Mock()
mock_mos = Mock()
with patch.multiple(
pb,
autospec=True,
_transactions=DEFAULT,
_scheduled_transactions_date=DEFAULT,
_scheduled_transactions_per_period=DEFAULT,
_scheduled_transactions_monthly=DEFAULT,
_make_combined_transactions=DEFAULT,
_make_budget_sums=DEFAULT,
_make_overall_sums=DEFAULT
) as mocks:
mocks['_transactions'].return_value.all.return_value = mock_t
mocks['_scheduled_transactions_date'
''].return_value.all.return_value = mock_std
mocks['_scheduled_transactions_per_period'
''].return_value.all.return_value = mock_stpp
mocks['_scheduled_transactions_monthly'
''].return_value.all.return_value = mock_stm
mocks['_make_combined_transactions'].return_value = mock_mct
mocks['_make_budget_sums'].return_value = mock_mbs
mocks['_make_overall_sums'].return_value = mock_mos
res = self.cls._data
assert res == {
'transactions': mock_t,
'st_date': mock_std,
'st_per_period': mock_stpp,
'st_monthly': mock_stm,
'all_trans_list': mock_mct,
'budget_sums': mock_mbs,
'overall_sums': mock_mos
}
assert mocks['_transactions'].mock_calls == [
call(self.cls), call().all()
]
assert mocks['_scheduled_transactions_date'].mock_calls == [
call(self.cls), call().all()
]
assert mocks['_scheduled_transactions_per_period'].mock_calls == [
call(self.cls), call().all()
]
assert mocks['_scheduled_transactions_monthly'].mock_calls == [
call(self.cls), call().all()
]
assert mocks['_make_combined_transactions'].mock_calls == [
call(self.cls)
]
assert mocks['_make_budget_sums'].mock_calls == [
call(self.cls)
]
assert mocks['_make_overall_sums'].mock_calls == [
call(self.cls)
]