def test_pprint_compact(self):
with patch('sys.stdout', new_callable=StringIO) as mock_stdout:
# - test compact pprint-ing with 80x25 terminal
with patch.object(pythonrc.subprocess, 'check_output',
return_value='25 80'):
sys.displayhook(list(range(22)))
self.assertIn('20, 21]', sys.stdout.getvalue())
sys.displayhook(list(range(23)))
self.assertIn('21,\n 22]', sys.stdout.getvalue())
# - test compact pprint-ing with resized 100x25 terminal
with patch.object(pythonrc.subprocess, 'check_output',
return_value=('25 100')):
sys.displayhook(list(range(23)))
self.assertIn('21, 22]', sys.stdout.getvalue())
python类object()的实例源码
def test_completer(self):
completer = self.pymp.improved_rlcompleter()
rl = pythonrc.readline
# - no leading characters
with patch.object(rl, 'get_line_buffer', return_value='\t'):
self.assertEqual(completer('\t', 0), ' ')
# - keyword completion
with patch.object(rl, 'get_line_buffer', return_value='imp\t'):
self.assertEqual(completer('imp', 0), 'import ')
# - module name completion
with patch.object(rl, 'get_line_buffer', return_value='from '):
self.assertIn(completer('th', 0), ('this', 'threading'))
self.assertIn(completer('th', 1), ('this', 'threading'))
# - pathname completion
with patch.object(rl, 'get_line_buffer', return_value='./p'):
self.assertEqual(completer('./py', 0), './pythonrc.py')
def test_remove_all_report_tpr_success(caplog):
mock = MagicMock()
mock.remove.return_value = 0
with patch('kubernetes.config.load_incluster_config',
MagicMock(return_value=0)), \
patch('kubernetes.client.ExtensionsV1beta1Api',
MagicMock(return_value=0)), \
patch.object(third_party.ThirdPartyResourceType,
'create',
MagicMock(return_value=mock)):
uninstall.remove_all_report()
caplog_tuple = caplog.record_tuples
assert caplog_tuple[-1][2] == "\"Reconcilereport\" for node \"{}\" " \
"removed.".format(os.getenv("NODE_NAME"))
assert caplog_tuple[-3][2] == "\"Nodereport\" for node \"{}\" " \
"removed.".format(os.getenv("NODE_NAME"))
def test_remove_all_report_crd_success(caplog):
mock = MagicMock()
mock.remove.return_value = 0
with patch('kubernetes.config.load_incluster_config',
MagicMock(return_value=0)), \
patch('kubernetes.client.ExtensionsV1beta1Api',
MagicMock(return_value=0)), \
patch.object(custom_resource.CustomResourceDefinitionType,
'create',
MagicMock(return_value=mock)):
uninstall.remove_all_report()
caplog_tuple = caplog.record_tuples
assert caplog_tuple[-1][2] == "\"cmk-reconcilereport\" for node " \
"\"{}\" removed."\
.format(os.getenv("NODE_NAME"))
assert caplog_tuple[-3][2] == "\"cmk-nodereport\" for node \"{}\" " \
"removed.".format(os.getenv("NODE_NAME"))
def test_remove_report_tpr_success(caplog):
fake_tpr_report = MagicMock()
fake_tpr_report.remove.return_value = 0
with patch('kubernetes.config.load_incluster_config',
MagicMock(return_value=0)),\
patch('kubernetes.client.ExtensionsV1beta1Api',
MagicMock(return_value=0)), \
patch.object(third_party.ThirdPartyResourceType, 'create',
MagicMock(return_value=fake_tpr_report)):
uninstall.remove_report_tpr("NodeReport")
caplog_tuple = caplog.record_tuples
assert caplog_tuple[-1][2] == "\"NodeReport\" for node \"{}\" " \
"removed.".format(os.getenv("NODE_NAME"))
# Remove success due to not existing report
def test_remove_report_tpr_success2(caplog):
fake_http_resp = FakeHTTPResponse(500, "{\"message\":\"fake message\"}",
"{\"reason\":\"NotFound\"}")
fake_api_exception = K8sApiException(http_resp=fake_http_resp)
fake_tpr_report = MagicMock()
fake_tpr_report.remove.side_effect = fake_api_exception
with patch('kubernetes.config.load_incluster_config',
MagicMock(return_value=0)),\
patch('kubernetes.client.ExtensionsV1beta1Api',
MagicMock(return_value=0)), \
patch.object(third_party.ThirdPartyResourceType, 'create',
MagicMock(return_value=fake_tpr_report)):
uninstall.remove_report_tpr("NodeReport")
caplog_tuple = caplog.record_tuples
assert \
caplog_tuple[-2][2] == "\"NodeReport\" for node \"{}\" does" \
" not exist.".format(os.getenv("NODE_NAME"))
assert \
caplog_tuple[-1][2] == "\"NodeReport\" for node \"{}\" " \
"removed.".format(os.getenv("NODE_NAME"))
def test_remove_report_tpr_failure(caplog):
fake_http_resp = FakeHTTPResponse(500, "{\"message\":\"fake message\"}",
"{\"reason\":\"WrongReason\"}")
fake_api_exception = K8sApiException(http_resp=fake_http_resp)
fake_tpr_report = MagicMock()
fake_tpr_report.remove.side_effect = fake_api_exception
with patch('kubernetes.config.load_incluster_config',
MagicMock(return_value=0)),\
patch('kubernetes.client.ExtensionsV1beta1Api',
MagicMock(return_value=0)), \
patch.object(third_party.ThirdPartyResourceType, 'create',
MagicMock(return_value=fake_tpr_report)):
with pytest.raises(SystemExit):
uninstall.remove_report_tpr("NodeReport")
caplog_tuple = caplog.record_tuples
exp_err = "Aborting uninstall: " \
"Exception when removing third party resource \"NodeReport\""
exp_log_err = get_expected_log_error(exp_err, fake_http_resp)
assert caplog_tuple[-1][2] == exp_log_err
def test_remove_report_crd_success2(caplog):
fake_http_resp = FakeHTTPResponse(500, "{\"message\":\"fake message\"}",
"{\"reason\":\"NotFound\"}")
fake_api_exception = K8sApiException(http_resp=fake_http_resp)
fake_crd_report = MagicMock()
fake_crd_report.remove.side_effect = fake_api_exception
with patch('kubernetes.config.load_incluster_config',
MagicMock(return_value=0)),\
patch('kubernetes.client.ExtensionsV1beta1Api',
MagicMock(return_value=0)), \
patch.object(custom_resource.CustomResourceDefinitionType,
'create',
MagicMock(return_value=fake_crd_report)):
uninstall.remove_report_crd("cmk-nodereport", ["cmk-nr"])
caplog_tuple = caplog.record_tuples
assert \
caplog_tuple[-2][2] == "\"cmk-nodereport\" for node \"{}\" does "\
"not exist.".format(os.getenv("NODE_NAME"))
assert \
caplog_tuple[-1][2] == "\"cmk-nodereport\" for node \"{}\" " \
"removed.".format(os.getenv("NODE_NAME"))
def test_remove_report_crd_failure(caplog):
fake_http_resp = FakeHTTPResponse(500, "{\"message\":\"fake message\"}",
"{\"reason\":\"WrongReason\"}")
fake_api_exception = K8sApiException(http_resp=fake_http_resp)
fake_crd_report = MagicMock()
fake_crd_report.remove.side_effect = fake_api_exception
with patch('kubernetes.config.load_incluster_config',
MagicMock(return_value=0)),\
patch('kubernetes.client.ExtensionsV1beta1Api',
MagicMock(return_value=0)), \
patch.object(custom_resource.CustomResourceDefinitionType,
'create',
MagicMock(return_value=fake_crd_report)):
with pytest.raises(SystemExit):
uninstall.remove_report_crd("cmk-nodereport", ["cmk-nr"])
caplog_tuple = caplog.record_tuples
exp_err = "Aborting uninstall: " \
"Exception when removing custom resource definition " \
"\"cmk-nodereport\""
exp_log_err = get_expected_log_error(exp_err, fake_http_resp)
assert caplog_tuple[-1][2] == exp_log_err
def mock_crypto_passthrough():
count = 0
def mocked_urandom(size):
nonlocal count
count += 1
if size > 16:
template = '<dummy-key-{:0>%s}>' % (size - 12)
else:
template = '{:0>%s}' % size
return template.format(count).encode()
with patch.object(RSAPublicKey, 'encrypt', new=lambda _, txt: txt), \
patch.object(RSAPublicKey, 'verify', new=lambda _, sign, txt: None), \
patch.object(RSAPrivateKey, 'decrypt', new=lambda _, txt: txt), \
patch.object(RSAPrivateKey, 'sign', new=lambda _, txt: '<mock-signature>'), \
patch.object(RSAPrivateKey, 'export',
new=lambda _, pwd: ('<mock-exported-key with password %s>' % pwd).encode()), \
patch.object(AESKey, 'encrypt', new=lambda _, txt: txt), \
patch.object(AESKey, 'decrypt', new=lambda _, txt: txt), \
patch('parsec.crypto.urandom', new=mocked_urandom), \
patch('parsec.crypto.encrypt_with_password', new=lambda p, s, t: t), \
patch('parsec.crypto.decrypt_with_password', new=lambda p, s, c: c):
yield
def mock_crypto_passthrough():
count = 0
def mocked_urandom(size):
nonlocal count
count += 1
if size > 16:
template = '<dummy-key-{:0>%s}>' % (size - 12)
else:
template = '{:0>%s}' % size
return template.format(count).encode()
with patch.object(RSAPublicKey, 'encrypt', new=lambda _, txt: txt), \
patch.object(RSAPublicKey, 'verify', new=lambda _, sign, txt: None), \
patch.object(RSAPrivateKey, 'decrypt', new=lambda _, txt: txt), \
patch.object(RSAPrivateKey, 'sign', new=lambda _, txt: '<mock-signature>'), \
patch.object(RSAPrivateKey, 'export',
new=lambda _, pwd: ('<mock-exported-key with password %s>' % pwd).encode()), \
patch.object(AESKey, 'encrypt', new=lambda _, txt: txt), \
patch.object(AESKey, 'decrypt', new=lambda _, txt: txt), \
patch('parsec.crypto.urandom', new=mocked_urandom), \
patch('parsec.crypto.encrypt_with_password', new=lambda p, s, t: t), \
patch('parsec.crypto.decrypt_with_password', new=lambda p, s, c: c):
yield
def test_pype_use_parent_context_with_swallow(mock_run_pipeline):
"""pype swallowing error in child pipeline."""
context = Context({
'pype': {
'name': 'pipe name',
'pipeArg': 'argument here',
'useParentContext': True,
'skipParse': True,
'raiseError': False
}
})
logger = logging.getLogger('pypyr.steps.pype')
with patch.object(logger, 'error') as mock_logger_error:
pype.run_step(context)
mock_run_pipeline.assert_called_once_with(
pipeline_name='pipe name',
pipeline_context_input='argument here',
context=context,
parse_input=False)
mock_logger_error.assert_called_once_with(
'Something went wrong pyping pipe name. RuntimeError: whoops')
# ------------------------ run_step --------------------------------------
def test_run_failure_step_group_swallows():
"""Failure step group runner swallows errors."""
logger = pypyr.log.logger.get_logger('pypyr.stepsrunner')
with patch('pypyr.stepsrunner.run_step_group') as mock_run_group:
with patch.object(logger, 'error') as mock_logger_error:
mock_run_group.side_effect = ContextError('arb error')
pypyr.stepsrunner.run_failure_step_group(
{'pipe': 'val'}, Context())
mock_logger_error.assert_any_call(
"Failure handler also failed. Swallowing.")
mock_run_group.assert_called_once_with(pipeline_definition={'pipe': 'val'},
step_group_name='on_failure',
context=Context())
# ------------------------- run_failure_step_group----------------------------#
# ------------------------- run_pipeline_step---------------------------------#
def test_get_current_version(self):
sample_data = [
"## Unreleased\n",
"---\n",
"\n",
"### New\n",
"\n",
"### Fixes\n",
"\n",
"### Breaks\n",
"\n",
"\n",
"## 0.3.2 - (2017-06-09)\n",
"---\n",
]
with patch.object(ChangelogUtils, 'get_changelog_data', return_value=sample_data) as mock_read:
CL = ChangelogUtils()
result = CL.get_current_version()
self.assertEqual(result, '0.3.2')
def test_get_changes(self):
sample_data = [
"## Unreleased\n",
"---\n",
"\n",
"### New\n",
"* added feature x\n",
"\n",
"### Fixes\n",
"* fixed bug 1\n",
"\n",
"### Breaks\n",
"\n",
"\n",
"## 0.3.2 - (2017-06-09)\n",
"---\n",
]
with patch.object(ChangelogUtils, 'get_changelog_data', return_value=sample_data) as mock_read:
CL = ChangelogUtils()
result = CL.get_changes()
self.assertTrue('new' in result)
self.assertTrue('fix' in result)
def doctest_reset_del():
"""Test that resetting doesn't cause errors in __del__ methods.
In [2]: class A(object):
...: def __del__(self):
...: print str("Hi")
...:
In [3]: a = A()
In [4]: get_ipython().reset()
Hi
In [5]: 1+1
Out[5]: 2
"""
# For some tests, it will be handy to organize them in a class with a common
# setup that makes a temp file
def test_obj_del(self):
"""Test that object's __del__ methods are called on exit."""
if sys.platform == 'win32':
try:
import win32api
except ImportError:
raise SkipTest("Test requires pywin32")
src = ("class A(object):\n"
" def __del__(self):\n"
" print 'object A deleted'\n"
"a = A()\n")
self.mktmp(py3compat.doctest_refactor_print(src))
if dec.module_not_available('sqlite3'):
err = 'WARNING: IPython History requires SQLite, your history will not be saved\n'
else:
err = None
tt.ipexec_validate(self.fname, 'object A deleted', err)
def test_tclass(self):
mydir = os.path.dirname(__file__)
tc = os.path.join(mydir, 'tclass')
src = ("%%run '%s' C-first\n"
"%%run '%s' C-second\n"
"%%run '%s' C-third\n") % (tc, tc, tc)
self.mktmp(src, '.ipy')
out = """\
ARGV 1-: ['C-first']
ARGV 1-: ['C-second']
tclass.py: deleting object: C-first
ARGV 1-: ['C-third']
tclass.py: deleting object: C-second
tclass.py: deleting object: C-third
"""
if dec.module_not_available('sqlite3'):
err = 'WARNING: IPython History requires SQLite, your history will not be saved\n'
else:
err = None
tt.ipexec_validate(self.fname, out, err)
def test_get_content_ids__returns_cached_ids_if_enough_in_cache(self, mock_queryset):
stream = FollowedStream(user=self.user)
stream.paginate_by = 1
with patch.object(stream, "get_queryset") as mock_queryset, \
patch.object(stream, "get_cached_content_ids") as mock_cached:
mock_cached.return_value = [self.content1.id], {self.content1.id: self.content1.id}
stream.get_content_ids()
self.assertEqual(mock_queryset.call_count, 0)
def test_get_content_ids__uses_cached_ids(self):
with patch.object(self.stream, "get_cached_content_ids", return_value=([], {})) as mock_cached:
self.stream.get_content_ids()
mock_cached.assert_called_once_with()