def test_env_only_calls_get():
context = Context({
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
'envGet': {
'key2': 'ARB_GET_ME1',
'key4': 'ARB_GET_ME2'
}
})
with patch.multiple('pypyr.steps.env',
env_get=DEFAULT,
env_set=DEFAULT,
env_unset=DEFAULT
) as mock_env:
pypyr.steps.env.run_step(context)
mock_env['env_get'].assert_called_once()
mock_env['env_set'].assert_not_called()
mock_env['env_unset'].assert_not_called()
python类multiple()的实例源码
def test_env_only_calls_set():
context = Context({
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
'envSet': {
'ARB_SET_ME1': 'key2',
'ARB_SET_ME2': 'key1'
}
})
with patch.multiple('pypyr.steps.env',
env_get=DEFAULT,
env_set=DEFAULT,
env_unset=DEFAULT
) as mock_env:
pypyr.steps.env.run_step(context)
mock_env['env_get'].assert_not_called()
mock_env['env_set'].assert_called_once()
mock_env['env_unset'].assert_not_called()
def test_env_only_calls_unset():
context = Context({
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
'envUnset': [
'ARB_DELETE_ME1',
'ARB_DELETE_ME2'
]
})
with patch.multiple('pypyr.steps.env',
env_get=DEFAULT,
env_set=DEFAULT,
env_unset=DEFAULT
) as mock_env:
pypyr.steps.env.run_step(context)
mock_env['env_get'].assert_not_called()
mock_env['env_set'].assert_not_called()
mock_env['env_unset'].assert_called_once()
def test_env_only_calls_set_unset():
context = Context({
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
'envUnset': [
'ARB_DELETE_ME1',
'ARB_DELETE_ME2'
],
'envSet': {
'ARB_SET_ME1': 'key2',
'ARB_SET_ME2': 'key1'
}
})
with patch.multiple('pypyr.steps.env',
env_get=DEFAULT,
env_set=DEFAULT,
env_unset=DEFAULT
) as mock_env:
pypyr.steps.env.run_step(context)
mock_env['env_get'].assert_not_called()
mock_env['env_set'].assert_called_once()
mock_env['env_unset'].assert_called_once()
def test_tar_only_calls_extract():
"""Only calls extract if only extract specified."""
context = Context({
'key1': 'value1',
'key2': 'value2',
'key3': 'value3',
'tarExtract': [
{'in': 'key2',
'out': 'ARB_GET_ME1'},
{'in': 'key4',
'out': 'ARB_GET_ME2'}
]
})
with patch.multiple('pypyr.steps.tar',
tar_archive=DEFAULT,
tar_extract=DEFAULT
) as mock_tar:
pypyr.steps.tar.run_step(context)
mock_tar['tar_extract'].assert_called_once()
mock_tar['tar_archive'].assert_not_called()
def test_tar_calls_archive_and_extract():
"""Calls both extract and archive when both specified."""
context = Context({
'key2': 'value2',
'key1': 'value1',
'key3': 'value3',
'tarArchive': [
{'in': 'key2',
'out': 'ARB_GET_ME1'},
{'in': 'key4',
'out': 'ARB_GET_ME2'}
],
'tarExtract': [
{'in': 'key2',
'out': 'ARB_GET_ME1'},
{'in': 'key4',
'out': 'ARB_GET_ME2'}
]
})
with patch.multiple('pypyr.steps.tar',
tar_archive=DEFAULT,
tar_extract=DEFAULT
) as mock_tar:
pypyr.steps.tar.run_step(context)
mock_tar['tar_extract'].assert_called_once()
mock_tar['tar_archive'].assert_called_once()
# ------------------------- tar base ---------------------------------------#
#
# ------------------------- tar extract---------------------------------------#
def test_run(self):
self.cls.reactor = Mock(spec_set=reactor)
with patch.multiple(
pbm,
logger=DEFAULT,
Site=DEFAULT,
LoopingCall=DEFAULT,
VaultRedirectorSite=DEFAULT
) as mod_mocks:
with patch.multiple(
pb,
get_active_node=DEFAULT,
run_reactor=DEFAULT,
listentcp=DEFAULT,
add_update_loop=DEFAULT,
listentls=DEFAULT
) as cls_mocks:
cls_mocks['get_active_node'].return_value = 'consul:1234'
self.cls.run()
assert self.cls.active_node_ip_port == 'consul:1234'
assert mod_mocks['logger'].mock_calls == [
call.warning('Initial Vault active node: %s', 'consul:1234'),
call.warning('Starting Twisted reactor (event loop)')
]
assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
assert mod_mocks['Site'].mock_calls == [
call(mod_mocks['VaultRedirectorSite'].return_value)
]
assert self.cls.reactor.mock_calls == []
assert cls_mocks['run_reactor'].mock_calls == [call()]
assert mod_mocks['LoopingCall'].mock_calls == []
assert cls_mocks['listentcp'].mock_calls == [
call(mod_mocks['Site'].return_value)
]
assert cls_mocks['add_update_loop'].mock_calls == [call()]
assert cls_mocks['listentls'].mock_calls == []
def test_run_tls(self):
self.cls.reactor = Mock(spec_set=reactor)
self.cls.tls_factory = Mock()
with patch.multiple(
pbm,
logger=DEFAULT,
Site=DEFAULT,
LoopingCall=DEFAULT,
VaultRedirectorSite=DEFAULT
) as mod_mocks:
with patch.multiple(
pb,
get_active_node=DEFAULT,
run_reactor=DEFAULT,
listentcp=DEFAULT,
add_update_loop=DEFAULT,
listentls=DEFAULT
) as cls_mocks:
cls_mocks['get_active_node'].return_value = 'consul:1234'
self.cls.run()
assert self.cls.active_node_ip_port == 'consul:1234'
assert mod_mocks['logger'].mock_calls == [
call.warning('Initial Vault active node: %s', 'consul:1234'),
call.warning('Starting Twisted reactor (event loop)')
]
assert mod_mocks['VaultRedirectorSite'].mock_calls == [call(self.cls)]
assert mod_mocks['Site'].mock_calls == [
call(mod_mocks['VaultRedirectorSite'].return_value)
]
assert self.cls.reactor.mock_calls == []
assert cls_mocks['run_reactor'].mock_calls == [call()]
assert mod_mocks['LoopingCall'].mock_calls == []
assert cls_mocks['listentls'].mock_calls == [
call(mod_mocks['Site'].return_value)
]
assert cls_mocks['add_update_loop'].mock_calls == [call()]
assert cls_mocks['listentcp'].mock_calls == []
def test_run_error(self):
self.cls.reactor = Mock(spec_set=reactor)
with patch.multiple(
pbm,
logger=DEFAULT,
Site=DEFAULT,
LoopingCall=DEFAULT,
VaultRedirectorSite=DEFAULT
) as mod_mocks:
with patch.multiple(
pb,
get_active_node=DEFAULT,
run_reactor=DEFAULT,
listentcp=DEFAULT,
add_update_loop=DEFAULT
) as cls_mocks:
cls_mocks['get_active_node'].return_value = None
with pytest.raises(SystemExit) as excinfo:
self.cls.run()
assert excinfo.value.code == 3
assert mod_mocks['logger'].mock_calls == [
call.critical("ERROR: Could not get active vault node from "
"Consul. Exiting.")
]
assert mod_mocks['VaultRedirectorSite'].mock_calls == []
assert mod_mocks['Site'].mock_calls == []
assert self.cls.reactor.mock_calls == []
assert cls_mocks['run_reactor'].mock_calls == []
assert mod_mocks['LoopingCall'].mock_calls == []
def test_cached(self):
mock_t = Mock()
mock_std = Mock()
mock_stpp = Mock()
mock_stm = Mock()
mock_mct = Mock()
mock_mbs = Mock()
mock_mos = Mock()
with patch.multiple(
pb,
autospec=True,
_transactions=DEFAULT,
_scheduled_transactions_date=DEFAULT,
_scheduled_transactions_per_period=DEFAULT,
_scheduled_transactions_monthly=DEFAULT,
_make_combined_transactions=DEFAULT,
_make_budget_sums=DEFAULT,
_make_overall_sums=DEFAULT
) as mocks:
mocks['_transactions'].return_value.all.return_value = mock_t
mocks['_scheduled_transactions_date'
''].return_value.all.return_value = mock_std
mocks['_scheduled_transactions_per_period'
''].return_value.all.return_value = mock_stpp
mocks['_scheduled_transactions_monthly'
''].return_value.all.return_value = mock_stm
mocks['_make_combined_transactions'].return_value = mock_mct
mocks['_make_budget_sums'].return_value = mock_mbs
mocks['_make_overall_sums'].return_value = mock_mos
self.cls._data_cache = {'foo': 'bar'}
res = self.cls._data
assert res == {'foo': 'bar'}
assert mocks['_transactions'].mock_calls == []
assert mocks['_scheduled_transactions_date'].mock_calls == []
assert mocks['_scheduled_transactions_per_period'].mock_calls == []
assert mocks['_scheduled_transactions_monthly'].mock_calls == []
assert mocks['_make_combined_transactions'].mock_calls == []
assert mocks['_make_budget_sums'].mock_calls == []
assert mocks['_make_overall_sums'].mock_calls == []
def test_upload_journal_logs(self):
''' test upload_journal_logs() '''
tskey = '__REALTIME_TIMESTAMP'
journal = systemd.journal.Reader(path=os.getcwd())
with patch('systemd.journal.Reader', return_value=journal) as journal:
with patch('main.JournaldClient', MagicMock(autospec=True)) as reader:
reader.return_value.__iter__.return_value = [sentinel.msg1, sentinel.msg2, sentinel.msg3, sentinel.msg4]
with patch.multiple(self.client, retain_message=DEFAULT, group_messages=DEFAULT):
log_group1 = Mock()
log_group2 = Mock()
self.client.retain_message.side_effect = [True, False, True, True]
self.client.group_messages.return_value = [
((log_group1, 'stream1'), [sentinel.msg1]),
((log_group2, 'stream2'), [sentinel.msg3, sentinel.msg4]),
]
self.client.upload_journal_logs(os.getcwd())
# creates reader
reader.assert_called_once_with(journal.return_value, self.CURSOR_CONTENT)
# uploads log messages
log_group1.log_messages.assert_called_once_with('stream1', [sentinel.msg1])
log_group2.log_messages.assert_called_once_with('stream2', [sentinel.msg3, sentinel.msg4])
def test_connectionbase():
# Temporarily disable ABC checks
with patch.multiple(ConnectionBase, __abstractmethods__=set()):
with pytest.raises(TypeError):
ConnectionBase()
c = ConnectionBase('project')
assert c.project_name == 'project'
assert not c.project_namespace
assert c.project_qualname == 'project'
c = ConnectionBase('ns/project')
assert c.project_name == 'project'
assert c.project_namespace == 'ns'
assert c.project_qualname == 'ns/project'
def test_acceptance(self):
logger.debug('starting acceptance test')
with patch.multiple(
pb,
get_active_node=DEFAULT,
run_reactor=DEFAULT,
update_active_node=DEFAULT
) as cls_mocks:
# setup some return values
cls_mocks['run_reactor'].side_effect = self.se_run_reactor
cls_mocks['get_active_node'].return_value = 'foo:1234'
cls_mocks['update_active_node'].side_effect = self.se_update_active
# instantiate class
self.cls = VaultRedirector('consul:1234', poll_interval=0.5)
self.cls.log_enabled = False
# make sure active is None (starting state)
assert self.cls.active_node_ip_port is None
self.cls.bind_port = self.get_open_port()
self.cls.log_enabled = True
# setup an async task to make the HTTP request
self.cls.reactor.callLater(2.0, self.se_requester)
# do this in case the callLater for self.stop_reactor fails...
signal.signal(signal.SIGALRM, self.stop_reactor)
signal.alarm(20) # send SIGALRM in 20 seconds, to stop runaway loop
self.cls.run()
signal.alarm(0) # disable SIGALRM
assert self.cls.active_node_ip_port == 'bar:5678' # from update_active
assert self.update_active_called is True
assert cls_mocks['update_active_node'].mock_calls[0] == call()
assert cls_mocks['run_reactor'].mock_calls == [call()]
assert cls_mocks['get_active_node'].mock_calls == [call()]
# HTTP response checks
resp = json.loads(self.response)
# /bar/baz
assert resp['/bar/baz']['headers'][
'Server'] == "vault-redirector/%s/TwistedWeb/%s" % (
_VERSION, twisted_version.short()
)
assert resp['/bar/baz']['headers'][
'Location'] == 'http://bar:5678/bar/baz'
assert resp['/bar/baz']['status_code'] == 307
# /vault-redirector-health
assert resp['/vault-redirector-health']['status_code'] == 200
health_info = json.loads(resp['/vault-redirector-health']['text'])
assert resp['/vault-redirector-health']['headers'][
'Content-Type'] == 'application/json'
assert health_info['healthy'] is True
assert health_info['application'] == 'vault-redirector'
assert health_info['version'] == _VERSION
assert health_info['source'] == _PROJECT_URL
assert health_info['consul_host_port'] == 'consul:1234'
assert health_info['active_vault'] == 'bar:5678'
def test_initial(self):
mock_t = Mock()
mock_std = Mock()
mock_stpp = Mock()
mock_stm = Mock()
mock_mct = Mock()
mock_mbs = Mock()
mock_mos = Mock()
with patch.multiple(
pb,
autospec=True,
_transactions=DEFAULT,
_scheduled_transactions_date=DEFAULT,
_scheduled_transactions_per_period=DEFAULT,
_scheduled_transactions_monthly=DEFAULT,
_make_combined_transactions=DEFAULT,
_make_budget_sums=DEFAULT,
_make_overall_sums=DEFAULT
) as mocks:
mocks['_transactions'].return_value.all.return_value = mock_t
mocks['_scheduled_transactions_date'
''].return_value.all.return_value = mock_std
mocks['_scheduled_transactions_per_period'
''].return_value.all.return_value = mock_stpp
mocks['_scheduled_transactions_monthly'
''].return_value.all.return_value = mock_stm
mocks['_make_combined_transactions'].return_value = mock_mct
mocks['_make_budget_sums'].return_value = mock_mbs
mocks['_make_overall_sums'].return_value = mock_mos
res = self.cls._data
assert res == {
'transactions': mock_t,
'st_date': mock_std,
'st_per_period': mock_stpp,
'st_monthly': mock_stm,
'all_trans_list': mock_mct,
'budget_sums': mock_mbs,
'overall_sums': mock_mos
}
assert mocks['_transactions'].mock_calls == [
call(self.cls), call().all()
]
assert mocks['_scheduled_transactions_date'].mock_calls == [
call(self.cls), call().all()
]
assert mocks['_scheduled_transactions_per_period'].mock_calls == [
call(self.cls), call().all()
]
assert mocks['_scheduled_transactions_monthly'].mock_calls == [
call(self.cls), call().all()
]
assert mocks['_make_combined_transactions'].mock_calls == [
call(self.cls)
]
assert mocks['_make_budget_sums'].mock_calls == [
call(self.cls)
]
assert mocks['_make_overall_sums'].mock_calls == [
call(self.cls)
]
def patch_jupyter_dirs():
"""
Patch jupyter paths to use temporary directories.
This just creates the patches and directories, caller is still
responsible for starting & stopping patches, and removing temp dir when
appropriate.
"""
test_dir = tempfile.mkdtemp(prefix='jupyter_')
jupyter_dirs = {name: make_dirs(test_dir, name) for name in (
'user_home', 'env_vars', 'system', 'sys_prefix', 'custom', 'server')}
jupyter_dirs['root'] = test_dir
for name in ('notebook', 'runtime'):
d = jupyter_dirs['server'][name] = os.path.join(
test_dir, 'server', name)
if not os.path.exists(d):
os.makedirs(d)
# patch relevant environment variables
jupyter_patches = []
jupyter_patches.append(
patch.dict('os.environ', stringify_env({
'HOME': jupyter_dirs['user_home']['root'],
'JUPYTER_CONFIG_DIR': jupyter_dirs['env_vars']['conf'],
'JUPYTER_DATA_DIR': jupyter_dirs['env_vars']['data'],
'JUPYTER_RUNTIME_DIR': jupyter_dirs['server']['runtime'],
})))
# patch jupyter path variables in various modules
# find the appropriate modules to patch according to compat.
# Should include either
# notebook.nbextensions
# or
# jupyter_contrib_core.notebook_compat._compat.nbextensions
modules_to_patch = set([
jupyter_core.paths,
sys.modules[nbextensions._get_config_dir.__module__],
sys.modules[nbextensions._get_nbextension_dir.__module__],
])
path_patches = dict(
SYSTEM_CONFIG_PATH=[jupyter_dirs['system']['conf']],
ENV_CONFIG_PATH=[jupyter_dirs['sys_prefix']['conf']],
SYSTEM_JUPYTER_PATH=[jupyter_dirs['system']['data']],
ENV_JUPYTER_PATH=[jupyter_dirs['sys_prefix']['data']],
)
for mod in modules_to_patch:
applicable_patches = {
attrname: newval for attrname, newval in path_patches.items()
if hasattr(mod, attrname)}
jupyter_patches.append(
patch.multiple(mod, **applicable_patches))
def remove_jupyter_dirs():
"""Remove all temporary directories created."""
shutil.rmtree(test_dir)
return jupyter_patches, jupyter_dirs, remove_jupyter_dirs