def testRunnerRegistersResult(self):
class Test(unittest2.TestCase):
def testFoo(self):
pass
originalRegisterResult = unittest2.runner.registerResult
def cleanup():
unittest2.runner.registerResult = originalRegisterResult
self.addCleanup(cleanup)
result = unittest2.TestResult()
runner = unittest2.TextTestRunner(stream=StringIO())
# Use our result object
runner._makeResult = lambda: result
self.wasRegistered = 0
def fakeRegisterResult(thisResult):
self.wasRegistered += 1
self.assertEqual(thisResult, result)
unittest2.runner.registerResult = fakeRegisterResult
runner.run(unittest2.TestSuite())
self.assertEqual(self.wasRegistered, 1)
python类StringIO()的实例源码
def test_startTestRun_stopTestRun_called(self):
class LoggingTextResult(LoggingResult):
separator2 = ''
def printErrors(self):
pass
class LoggingRunner(unittest2.TextTestRunner):
def __init__(self, events):
super(LoggingRunner, self).__init__(StringIO())
self._events = events
def _makeResult(self):
return LoggingTextResult(self._events)
events = []
runner = LoggingRunner(events)
runner.run(unittest2.TestSuite())
expected = ['startTestRun', 'stopTestRun']
self.assertEqual(events, expected)
def SerializeFaultDetail(val, info=None, version=None, nsMap=None, encoding=None):
if version is None:
try:
if not isinstance(val, MethodFault):
raise TypeError('{0} is not a MethodFault'.format(str(val)))
version = val._version
except AttributeError:
version = BASE_VERSION
if info is None:
info = Object(name="object", type=object, version=version, flags=0)
writer = StringIO()
SoapSerializer(writer, version, nsMap, encoding).SerializeFaultDetail(val, info)
return writer.getvalue()
## SOAP serializer
#
def formatException(self, exc_info, record=None):
"""Format exception output with CONF.logging_exception_prefix."""
if not record:
return logging.Formatter.formatException(self, exc_info)
stringbuffer = moves.StringIO()
traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
None, stringbuffer)
lines = stringbuffer.getvalue().split('\n')
stringbuffer.close()
if CONF.logging_exception_prefix.find('%(asctime)') != -1:
record.asctime = self.formatTime(record, self.datefmt)
formatted_lines = []
for line in lines:
pl = CONF.logging_exception_prefix % record.__dict__
fl = '%s%s' % (pl, line)
formatted_lines.append(fl)
return '\n'.join(formatted_lines)
def __str__(self):
sio = StringIO()
print(" node:", self.node, file=sio)
print(" node.inputs:", [(str(i), id(i))
for i in self.node.inputs], file=sio)
print(" node.outputs:", [(str(i), id(i))
for i in self.node.outputs], file=sio)
print(" view_map:", getattr(self.node.op, 'view_map', {}), file=sio)
print(" destroy_map:", getattr(self.node.op,
'destroy_map', {}), file=sio)
print(" aliased output:", self.output_idx, file=sio)
print(" aliased output storage:", self.out_storage, file=sio)
if self.in_alias_idx:
print(" aliased to inputs:", self.in_alias_idx, file=sio)
if self.out_alias_idx:
print(" aliased to outputs:", self.out_alias_idx, file=sio)
return sio.getvalue()
def _test_progress_bar(backend, len, increment):
out = StringIO()
fill_str = ('123456890' * (len//10))[:len]
pb = DialogUI(out).get_progressbar('label', fill_str, maxval=10, backend=backend)
pb.start()
# we can't increment 11 times
for x in range(11):
if not (increment and x == 0):
# do not increment on 0
pb.update(x if not increment else 1, increment=increment)
out.flush() # needed atm
pstr = out.getvalue()
ok_startswith(pstr.lstrip('\r'), 'label:')
assert_re_in(r'.*\b%d%%.*' % (10*x), pstr)
if backend == 'progressbar':
assert_in('ETA', pstr)
pb.finish()
ok_endswith(out.getvalue(), '\n')
def test_deployment(self):
ag_deployer = APIGatewayDeployer(
api_name='Sample',
region_name=self.region_name,
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
swagger_file=StringIO(json.dumps(self.swagger_json)),
domain_name="example.com"
)
ag_deployer.deploy_stage(stage='development',
lambda_function_name='api_lambda',
lambda_version='development')
ag_deployer.deploy_domain(stage='development', base_path='v1',
certificate_body="", certificate_private_key="", certificate_chain='')
ag_deployer.deploy_stage(stage='development',
lambda_function_name='api_lambda',
lambda_version='development')
ag_deployer.deploy_domain(stage='development', base_path='v1',
certificate_body="", certificate_private_key="", certificate_chain='')
def test_iter_chunks(self):
self.assertEqual(list(iter_chunks([], 100)), [])
self.assertEqual(list(iter_chunks(list(range(5)), 1)), [[0], [1], [2], [3], [4]])
self.assertEqual(list(iter_chunks(list(range(5)), 2)), [[0, 1], [2, 3], [4]])
self.assertEqual(list(iter_chunks(list(range(5)), 5)), [[0, 1, 2, 3, 4]])
self.assertEqual(list(iter_chunks(list(range(6)), 2)), [[0, 1], [2, 3], [4, 5]])
self.assertEqual(list(iter_chunks(range(5), 2)), [[0, 1], [2, 3], [4]])
self.assertEqual(list(iter_chunks(range(6), 2)), [[0, 1], [2, 3], [4, 5]])
self.assertEqual(list(iter_chunks(range(1, 6), 2)), [[1, 2], [3, 4], [5]])
self.assertEqual(list(iter_chunks(range(1, 7), 2)), [[1, 2], [3, 4], [5, 6]])
def gen(num):
for i in range(num):
yield i+1
self.assertEqual(list(iter_chunks(gen(5), 2)), [[1, 2], [3, 4], [5]])
self.assertEqual(list(iter_chunks("01234", 2)), ["01", "23", "4"])
self.assertEqual(list(iter_chunks("012345", 2)), ["01", "23", "45"])
file_obj = open(os.path.dirname(os.path.abspath(__file__)) + "/chunks_file", "r")
self.assertEqual(list(iter_chunks(file_obj, 11)), (10 * ["1234567890\n"]) + ["\n"])
string_io = StringIO((10 * "1234567890\n") + "\n")
self.assertEqual(list(iter_chunks(string_io, 11)), (10 * ["1234567890\n"]) + ["\n"])
def formatException(self, exc_info, record=None):
"""Format exception output with CONF.logging_exception_prefix."""
if not record:
return logging.Formatter.formatException(self, exc_info)
stringbuffer = moves.StringIO()
traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
None, stringbuffer)
lines = stringbuffer.getvalue().split('\n')
stringbuffer.close()
if CONF.logging_exception_prefix.find('%(asctime)') != -1:
record.asctime = self.formatTime(record, self.datefmt)
formatted_lines = []
for line in lines:
pl = CONF.logging_exception_prefix % record.__dict__
fl = '%s%s' % (pl, line)
formatted_lines.append(fl)
return '\n'.join(formatted_lines)
test_nova_manage.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 27
收藏 0
点赞 0
评论 0
def test_list_without_host(self):
output = StringIO()
sys.stdout = output
with mock.patch.object(objects.InstanceList, 'get_by_filters') as get:
get.return_value = objects.InstanceList(
objects=[fake_instance.fake_instance_obj(
context.get_admin_context(), host='foo-host',
flavor=self.fake_flavor,
system_metadata={})])
self.commands.list()
sys.stdout = sys.__stdout__
result = output.getvalue()
self.assertIn('node', result) # check the header line
self.assertIn('m1.tiny', result) # flavor.name
self.assertIn('foo-host', result)
test_nova_manage.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def test_list_with_host(self):
output = StringIO()
sys.stdout = output
with mock.patch.object(objects.InstanceList, 'get_by_host') as get:
get.return_value = objects.InstanceList(
objects=[fake_instance.fake_instance_obj(
context.get_admin_context(),
flavor=self.fake_flavor,
system_metadata={})])
self.commands.list(host='fake-host')
sys.stdout = sys.__stdout__
result = output.getvalue()
self.assertIn('node', result) # check the header line
self.assertIn('m1.tiny', result) # flavor.name
self.assertIn('fake-host', result)
test_nova_manage.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 31
收藏 0
点赞 0
评论 0
def _test_archive_deleted_rows(self, mock_db_archive, verbose=False):
self.useFixture(fixtures.MonkeyPatch('sys.stdout', StringIO()))
self.commands.archive_deleted_rows(20, verbose=verbose)
mock_db_archive.assert_called_once_with(20)
output = sys.stdout.getvalue()
if verbose:
expected = '''\
+-----------+-------------------------+
| Table | Number of Rows Archived |
+-----------+-------------------------+
| consoles | 5 |
| instances | 10 |
+-----------+-------------------------+
'''
self.assertEqual(expected, output)
else:
self.assertEqual(0, len(output))
test_glance.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 46
收藏 0
点赞 0
评论 0
def test_download_data_dest_path_write_fails(self, show_mock, open_mock):
client = mock.MagicMock()
client.call.return_value = [1, 2, 3]
ctx = mock.sentinel.ctx
service = glance.GlanceImageService(client)
# NOTE(mikal): data is a file like object, which in our case always
# raises an exception when we attempt to write to the file.
class FakeDiskException(Exception):
pass
class Exceptionator(StringIO):
def write(self, _):
raise FakeDiskException('Disk full!')
self.assertRaises(FakeDiskException, service.download, ctx,
mock.sentinel.image_id, data=Exceptionator())
def preview_sql(self, url, step, **args):
"""Mocks SQLAlchemy Engine to store all executed calls in a string
and runs :meth:`PythonScript.run <migrate.versioning.script.py.PythonScript.run>`
:returns: SQL file
"""
buf = StringIO()
args['engine_arg_strategy'] = 'mock'
args['engine_arg_executor'] = lambda s, p = '': buf.write(str(s) + p)
@with_engine
def go(url, step, **kw):
engine = kw.pop('engine')
self.run(engine, step)
return buf.getvalue()
return go(url, step, **args)
def serialize(self, items):
"""Does the inverse of config parsing by taking parsed values and
converting them back to a string representing config file contents.
"""
r = StringIO()
for key, value in items.items():
if type(value) == OrderedDict:
r.write('\n[%s]\n' % key)
r.write(self.serialize(value))
else:
value, help = value
if help:
r.write('; %s\n' % help)
r.write('%s = %s\n' % (key, value))
return r.getvalue()
def startTest(self, test):
"Called when the given test is about to be run"
self.testsRun += 1
self._mirrorOutput = False
if self.buffer:
if self._stderr_buffer is None:
self._stderr_buffer = StringIO()
self._stdout_buffer = StringIO()
sys.stdout = self._stdout_buffer
sys.stderr = self._stderr_buffer
def captured_output(stream_name):
"""Return a context manager used by captured_stdout/stdin/stderr
that temporarily replaces the sys stream *stream_name* with a StringIO."""
orig_stdout = getattr(sys, stream_name)
setattr(sys, stream_name, StringIO())
try:
yield getattr(sys, stream_name)
finally:
setattr(sys, stream_name, orig_stdout)
def test_new_runner_old_case(self):
runner = unittest2.TextTestRunner(resultclass=resultFactory,
stream=StringIO())
class Test(unittest.TestCase):
def testOne(self):
pass
suite = unittest2.TestSuite((Test('testOne'),))
result = runner.run(suite)
self.assertEqual(result.testsRun, 1)
self.assertEqual(len(result.errors), 0)
def test_old_runner_new_case(self):
runner = unittest.TextTestRunner(stream=StringIO())
class Test(unittest2.TestCase):
def testOne(self):
self.assertDictEqual({}, {})
suite = unittest.TestSuite((Test('testOne'),))
result = runner.run(suite)
self.assertEqual(result.testsRun, 1)
self.assertEqual(len(result.errors), 0)
def testFailFastSetByRunner(self):
runner = unittest2.TextTestRunner(stream=StringIO(), failfast=True)
self.testRan = False
def test(result):
self.testRan = True
self.assertTrue(result.failfast)
runner.run(test)
self.assertTrue(self.testRan)
def test_NonExit(self):
program = unittest2.main(exit=False,
argv=["foobar"],
testRunner=unittest2.TextTestRunner(stream=StringIO()),
testLoader=self.FooBarLoader())
self.assertTrue(hasattr(program, 'result'))
def test_Exit(self):
self.assertRaises(
SystemExit,
unittest2.main,
argv=["foobar"],
testRunner=unittest2.TextTestRunner(stream=StringIO()),
exit=True,
testLoader=self.FooBarLoader())
def test_ExitAsDefault(self):
self.assertRaises(
SystemExit,
unittest2.main,
argv=["foobar"],
testRunner=unittest2.TextTestRunner(stream=StringIO()),
testLoader=self.FooBarLoader())
def testRunner(self):
# Creating a TextTestRunner with the appropriate argument should
# register the TextTestResult it creates
runner = unittest2.TextTestRunner(stream=StringIO())
result = runner.run(unittest2.TestSuite())
self.assertIn(result, unittest2.signals._results)
def test_locals(self):
runner = unittest.TextTestRunner(stream=io.StringIO(), tb_locals=True)
result = runner.run(unittest.TestSuite())
self.assertEqual(True, result.tb_locals)
def test_works_with_result_without_startTestRun_stopTestRun(self):
class OldTextResult(OldTestResult):
def __init__(self, *_):
super(OldTextResult, self).__init__()
separator2 = ''
def printErrors(self):
pass
runner = unittest2.TextTestRunner(stream=StringIO(),
resultclass=OldTextResult)
runner.run(unittest2.TestSuite())
def test_pickle_unpickle(self):
# Issue #7197: a TextTestRunner should be (un)pickleable. This is
# required by test_multiprocessing under Windows (in verbose mode).
stream = StringIO(u("foo"))
runner = unittest2.TextTestRunner(stream)
for protocol in range(2, pickle.HIGHEST_PROTOCOL + 1):
s = pickle.dumps(runner, protocol=protocol)
obj = pickle.loads(s)
# StringIO objects never compare equal, a cheap test instead.
self.assertEqual(obj.stream.getvalue(), stream.getvalue())
def getRunner(self):
return unittest2.TextTestRunner(resultclass=resultFactory,
stream=StringIO())
def __enter__(self):
self.real_stdout = sys.stdout
self.stringio = moves.StringIO()
sys.stdout = self.stringio
return self
def testDuckTyping(self):
# We want to support arbitrary classes that implement the stream
# interface.
class StringPassThrough(object):
def __init__(self, stream):
self.stream = stream
def read(self, *args, **kwargs):
return self.stream.read(*args, **kwargs)
dstr = StringPassThrough(StringIO('2014 January 19'))
self.assertEqual(parse(dstr), datetime(2014, 1, 19))