def test_invalid_word_map(self):
error_template = '>> \033[95m[ERROR] {}\033[0m'
warning_template = '>> \033[93m[WARNING] {}\033[0m'
with StringIO() as buffer, redirect_stdout(buffer):
self.invalid_study.Clinical.validate(validate.WARNING)
messages = [line.strip("'") for line in buffer.getvalue().splitlines() if line != '']
assert len(messages) == 4, "Messages length is {} instead of 4".format(len(messages))
missing_file_error = "The file {} isn't included in the column map".format('Not_present_file.txt')
self.assertEqual(messages[1], error_template.format(missing_file_error))
column_index_error = "File {} doesn't has {} columns, but {} columns".format('Cell-line_clinical.txt', 10, 9)
self.assertEqual(messages[2], error_template.format(column_index_error))
unmapped_warning = "Value {} is mapped at column {} in file {}. " \
"However the value is not present in the column".format("Not_present", 8,
'Cell-line_clinical.txt')
self.assertEqual(messages[3], warning_template.format(unmapped_warning))
python类redirect_stdout()的实例源码
def process_module(self, node):
style_checker = pycodestyle.Checker(node.stream().name)
# catch the output of check_all() in pycodestyle
with io.StringIO() as buf, redirect_stdout(buf):
style_checker.check_all()
output = buf.getvalue()
# Handle the case of multiple error messages
lst = output.split('\n')
for line in lst:
if line != '':
line = line.split(':')
self.add_message('pep8-errors', line=line[1],
args=line[3])
def run_command_inline(argv, funct, *args, **kwargs):
argv_save = sys.argv
environ_save = EnvironmentSnapshot()
captured_stdout = StringIO()
captured_stderr = StringIO()
sys.argv = argv
exitcode = None
with redirect_stdout(captured_stdout):
with redirect_stderr(captured_stderr):
try:
exitcode = funct(*args, **kwargs)
except SystemExit as e:
exitcode = e.code
finally:
# restore environment, command-line arguments, and the native
# modules system
environ_save.load()
sys.argv = argv_save
fixtures.init_native_modules_system()
return (exitcode,
captured_stdout.getvalue(),
captured_stderr.getvalue())
def run_spec_nostdout(depth, p):
stdout = io.StringIO()
with contextlib.redirect_stdout(stdout):
s = run_spec(depth, p)
assert stdout.getvalue() == ''
return s
def eval_(self, ctx, *, code: cleanup_code):
"""Alternative to `debug` that executes code inside a coroutine.
Allows multiple lines and `await`ing.
This is a modified version of RoboDanny's latest `eval` command.
"""
env = get_env(ctx)
to_compile = 'async def _func():\n%s' % textwrap.indent(code, ' ')
stdout = io.StringIO()
try:
exec(to_compile, env)
except SyntaxError as e:
await ctx.send(self.eval_output('\n'.join(get_syntax_error(e).splitlines()[1:-1])))
return
func = env['_func']
try:
with redirect_stdout(stdout):
ret = await func()
except Exception as e:
value = stdout.getvalue()
exc = traceback.format_exc().replace(UPPER_PATH, '...').splitlines()
exc = '\n'.join([exc[0], *exc[3:]])
await ctx.send(self.eval_output(f'{value}{exc}'))
else:
value = stdout.getvalue()
if isinstance(ret, discord.Embed):
await ctx.send(self.eval_output(value if value else None), embed=ret)
else:
await ctx.send(self.eval_output(value if ret is None else f'{value}{rep(ret)}'))
def test_get_oauth_verifier_no_browser():
stream = io.StringIO()
url = "https://api.twitter.com/oauth/authorize?oauth_token=abc"
with patch.object(oauth_dance.asyncio, 'sleep', side_effect=dummy):
with patch.object(oauth_dance.webbrowser, 'open', return_value=False):
with patch.object(oauth_dance, 'input', return_value="12345"):
with redirect_stdout(stream):
await oauth_dance.get_oauth_verifier("abc")
stream.seek(0)
assert url in stream.read()
def test_get_oauth_verifier_browser_error():
def error(url):
raise RuntimeError
stream = io.StringIO()
url = "https://api.twitter.com/oauth/authorize?oauth_token=abc"
with patch.object(oauth_dance.asyncio, 'sleep', side_effect=dummy):
with patch.object(oauth_dance.webbrowser, 'open', side_effect=error):
with patch.object(oauth_dance, 'input', return_value="12345"):
with redirect_stdout(stream):
await oauth_dance.get_oauth_verifier("abc")
stream.seek(0)
assert url in stream.read()
def _printed_lines(self, function, year):
"""call the function, and count the number of lines on its output"""
output = io.StringIO()
with contextlib.redirect_stdout(output):
function(year)
txt = output.getvalue()
return len(txt.splitlines())
def say_at_width(width, msg):
buf = StringIO()
with patch('adventurelib.get_terminal_size', return_value=(width, 24)):
with redirect_stdout(buf):
say(msg)
return buf.getvalue()
def test_say_room():
"""saw() will format input as strings."""
r = Room('You are standing in a hallway.')
buf = StringIO()
with redirect_stdout(buf):
say(r)
assert buf.getvalue() == 'You are standing in a hallway.\n'
def function_9():
"""function with context manager"""
with contextlib.redirect_stdout(sys.stderr):
pass
with contextlib.redirect_stdout(sys.stderr) as spam: # type: object
pass
def redirect_stdout(target):
original = sys.stdout
sys.stdout = target
yield
sys.stdout = original
# To deactivate warnings: https://github.com/tensorflow/tensorflow/issues/7778
def test_main_list(self):
"""Test that the list subcommand returns True when complete and outputs the expected number of characters."""
with contextlib.redirect_stdout(self.output):
self.assertTrue(self.ec2rl.list())
self.assertTrue(self.output.getvalue().startswith("Here is a list of available modules that apply to the curr"))
self.assertTrue(self.output.getvalue().endswith("=MODULEa ... MODULEx] [--only-domains=DOMAINa ... DOMAINx]\n"))
self.assertEqual(len(self.output.getvalue()), 14533)
def test_main_list_nonapplicable_modules(self):
"""
Disable the "xennetrocket" module then test that the list subcommand returns True when complete and outputs
the expected number of characters.
"""
for module_obj in self.ec2rl._modules:
if module_obj.name == "xennetrocket":
module_obj.applicable = False
with contextlib.redirect_stdout(self.output):
self.assertTrue(self.ec2rl.list())
self.assertTrue(self.output.getvalue().startswith("Here is a list of available modules that apply to the curr"))
self.assertTrue(self.output.getvalue().endswith("=MODULEa ... MODULEx] [--only-domains=DOMAINa ... DOMAINx]\n"))
self.assertFalse(re.search(r"xennetrocket", self.output.getvalue()))
self.assertEqual(len(self.output.getvalue()), 14410)
def test_main_help(self):
"""Test that help returns True and get_help's output matches the expected length."""
sys.argv = ["ec2rl", "help"]
with contextlib.redirect_stdout(self.output):
self.assertTrue(self.ec2rl.help())
self.assertTrue(self.output.getvalue().startswith("ec2rl: A framework for executing diagnostic and troublesh"))
self.assertTrue(self.output.getvalue().endswith("- enables debug level logging\n\n"))
self.assertEqual(len(self.output.getvalue()), 8438)
def test_main_help_module(self):
"""Test output from a single module."""
self.ec2rl.options.global_args["onlymodules"] = "aptlog"
with contextlib.redirect_stdout(self.output):
self.assertTrue(self.ec2rl.help())
self.assertEqual(self.output.getvalue(), "aptlog:\nCollect apt log files\nRequires sudo: True\n")
del self.ec2rl.options.global_args["onlymodules"]
def test_main_help_subcommand(self):
"""Test help output for the 'run' subcommand."""
sys.argv = ["ec2rl", "help", "run"]
with contextlib.redirect_stdout(self.output):
self.assertTrue(self.ec2rl.help())
# Check that the length of the help message matches the expected value
self.assertEqual(len(self.output.getvalue()), 2541)
self.assertTrue(self.output.getvalue().startswith("run:\n SYNOPSIS:\n ec2rl run [--only-modules=MOD"))
self.assertTrue(self.output.getvalue().endswith("to run in parallel. The default is 10.\n\n\n"))
def test_main_help_module_subcommand_and_invalid(self):
"""
Test help output for an arg that does not match any condition as well as the 'list' subcommand
and the 'arpcache' module.
"""
sys.argv = ["ec2rl", "help", "doesnotmatchanything", "arpcache", "list", ]
with contextlib.redirect_stdout(self.output):
self.assertTrue(self.ec2rl.help())
# Check that the length of the help message matches the expected value
self.assertEqual(len(self.output.getvalue()), 595)
self.assertTrue(self.output.getvalue().startswith("arpcache:\nDetermines if aggressive arp caching is enabled"))
self.assertTrue(self.output.getvalue().endswith(" specified comma delimited list\n\n\n"))
def test_main_version(self):
"""Test output from the version subcommand."""
with contextlib.redirect_stdout(self.output):
self.assertTrue(self.ec2rl.version())
# Check that the length of the version message matches the expected value
self.assertEqual(len(self.output.getvalue()), 272 + len(self.PROGRAM_VERSION))
self.assertTrue(self.output.getvalue().startswith("ec2rl {}\nCopyright 201".format(self.PROGRAM_VERSION)))
self.assertTrue(self.output.getvalue().endswith("TIES OR CONDITIONS OF ANY KIND, either express or implied.\n"))
def test_main_bugreport(self):
"""Test output from the bugreport subcommand."""
with contextlib.redirect_stdout(self.output):
self.assertTrue(self.ec2rl.bug_report())
# Example output:
# ec2rl 1.0.0
# ubuntu, 4.4.0-83-generic
# Python 3.5.2, /usr/bin/python3
regex_str = r"^ec2rl\ [0-9]+\.[0-9]+\.[0-9]+.*\n(ubuntu|suse|rhel|alami),\ [0-9]+\.[0-9]+\.[0-9]+.*\n" \
r"Python\ [0-9]+\.[0-9]+\.[0-9]+.*,\ /.*\n$"
self.assertTrue(re.match(regex_str, self.output.getvalue()))