python类redirect_stdout()的实例源码

train.py 文件源码 项目:enhance 作者: cdiazbas 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def define_network(self, l2_reg):
        print("Setting up network...")

        if (self.network_type == 'encdec'):
            self.model = nn_model.encdec(self.nx, self.ny, self.noise, self.depth, activation=self.activation)

        if (self.network_type == 'keepsize'):
            self.model = nn_model.keepsize(self.nx, self.ny, self.noise, self.depth, activation=self.activation)


        json_string = self.model.to_json()
        f = open('{0}_{1}_model.json'.format(self.root, self.depth), 'w')
        f.write(json_string)
        f.close()

        with open('{0}_{1}_summary.txt'.format(self.root, self.depth), 'w') as f:
            with redirect_stdout(f):
                self.model.summary()

        plot_model(self.model, to_file='{0}_{1}_model.png'.format(self.root, self.depth), show_shapes=True)
test_main.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def setUp(self, main_log_handler_mock, debug_log_handler_mock, mkdir_mock):
        sys.argv = ["test/modules/not_a_real_file", "run", "--abc=def"]
        os.chdir(self.callpath)
        os.environ["EC2RL_SUDO"] = "False"
        os.environ["EC2RL_DISTRO"] = "alami"
        os.environ["EC2RL_NET_DRIVER"] = "ixgbevf"
        os.environ["EC2RL_VIRT_TYPE"] = "hvm"
        with contextlib.redirect_stdout(StringIO()):
            self.ec2rl = ec2rlcore.main.Main(debug=True, full_init=True)
            self.ec2rl.full_init()

        self.assertTrue(debug_log_handler_mock.called)
        self.assertTrue(main_log_handler_mock.called)
        self.assertTrue(mkdir_mock.called)

        self.output = StringIO()
test_main.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_main_help_class(self, main_log_handler_mock, mkdir_mock):
        """Test help output for a class of modules."""
        path_to_ec2rl = os.path.abspath("ec2rl")
        test_path = os.path.sep.join([os.path.split(path_to_ec2rl)[0], "test", "modules", "ec2rl"])
        sys.argv = [test_path, "help", "--only-classes=diagnose,asdf"]
        ec2rl = ec2rlcore.main.Main(full_init=True)

        with contextlib.redirect_stdout(self.output):
            self.assertTrue(ec2rl.help())

        # Check that the length of the help message matches the expected value
        self.assertEqual(len(self.output.getvalue()), 1777)
        self.assertTrue(self.output.getvalue().startswith("arpcache:\nDetermines if aggressive arp caching is enabled"))
        self.assertTrue(self.output.getvalue().endswith("ackets to drop due to discarded skbs\nRequires sudo: False\n"))
        self.assertTrue(main_log_handler_mock.called)
        self.assertTrue(mkdir_mock.called)
test_main.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_main_menu_config_global_module_removal(self, write_config_mock):
        """Test removal of a module named Global."""
        original_length = len(self.ec2rl._modules)
        global_mod = ec2rlcore.module.get_module("test/modules/bad_mod.d/global.yaml")
        self.ec2rl._modules.append(global_mod)
        self.assertNotEqual(len(self.ec2rl._modules), original_length)
        curses.initscr()
        curses.ungetch("\n")
        curses.ungetch(curses.KEY_RIGHT)
        with contextlib.redirect_stdout(self.output):
            self.assertTrue(self.ec2rl.menu_config())
        self.assertEqual(len(self.output.getvalue()), 129)
        self.assertTrue(re.match(r"^\n----------\[Configuration File\]----------\n\nConfiguration file saved:\n"
                                 r"/var/tmp/ec2rl/[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}_[0-9]{2}_[0-9]{2}.[0-9]{6}"
                                 r"/configuration.cfg\n$",
                                 self.output.getvalue()))
        self.assertEqual(len(self.ec2rl._modules), original_length)
        self.assertTrue("Global" not in self.ec2rl._modules)
        self.assertTrue(write_config_mock.called)
test_main.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_main__run_prediagnostics_metadata_fail(self,
                                                    verify_metadata_mock,
                                                    get_net_driver_mock,
                                                    get_distro_mock,
                                                    check_root_mock,
                                                    main_log_handler_mock,
                                                    debug_log_handler_mock,
                                                    mkdir_mock):
        """Test that _run_prediagnostics() raises MainPrediagnosticFailure when the metadata server is inaccessible."""
        ec2rl_prediag_test = ec2rlcore.main.Main(debug=True, full_init=True)
        module_path = os.path.join(self.callpath, "test/modules/pre.d")
        ec2rl_prediag_test._prediags = ec2rlcore.moduledir.ModuleDir(module_path)
        with self.assertRaises(ec2rlcore.main.MainPrediagnosticFailure):
            with contextlib.redirect_stdout(self.output):
                ec2rl_prediag_test._run_prediagnostics()
        self.assertEqual(self.output.getvalue(), "prediagnostic/verify_metadata: cannot reach metadata server\n")

        self.assertTrue(verify_metadata_mock.called)
        self.assertTrue(get_net_driver_mock.called)
        self.assertTrue(get_distro_mock.called)
        self.assertTrue(check_root_mock.called)
        self.assertTrue(main_log_handler_mock.called)
        self.assertTrue(debug_log_handler_mock.called)
        self.assertTrue(mkdir_mock.called)
test_main.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_main__run_backup_ami(self, main_log_handler_mock, debug_log_handler_mock, mkdir_mock):
        """Test that _run_backup() runs correctly when ami is specified."""
        instanceid = self.setup_ec2()
        responses.add(responses.GET, "http://169.254.169.254/latest/meta-data/placement/availability-zone",
                      body="us-east-1a", status=200)
        responses.add(responses.GET, "http://169.254.169.254/latest/meta-data/instance-id", body=instanceid,
                      status=200)
        ec2rl_prediag_test = ec2rlcore.main.Main(debug=True, full_init=True)
        ec2rl_prediag_test.options.global_args["backup"] = "ami"
        with contextlib.redirect_stdout(self.output):
            self.assertTrue(ec2rl_prediag_test._run_backup())
        self.assertTrue(re.match(r"^\n-----------\[Backup\s{2}Creation\]-----------\n\nCreating AMI "
                                 r"ami-[a-z0-9]{8} for i-[a-z0-9]{8}\n$",
                                 self.output.getvalue(), re.M))
        self.assertTrue(main_log_handler_mock.called)
        self.assertTrue(debug_log_handler_mock.called)
        self.assertTrue(mkdir_mock.called)
test_main.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_main__run_backup_empty_backup_value(self,
                                                 main_log_handler_mock,
                                                 debug_log_handler_mock,
                                                 mkdir_mock):
        """Test that an invalid backup value raise an MainInvalidVolumeSpecificationError exception."""
        ec2rl_prediag_test = ec2rlcore.main.Main(debug=True, full_init=True)
        ec2rl_prediag_test.options.global_args["backup"] = ""
        with self.assertRaises(ec2rlcore.main.MainInvalidVolumeSpecificationError):
            with contextlib.redirect_stdout(self.output):
                ec2rl_prediag_test._run_backup()
        self.assertEqual(self.output.getvalue(), "\n-----------[Backup  Creation]-----------\n\nImproper specification"
                                                 " of volumes. Please verify you have specified a volume"
                                                 " such as vol-xxxxx.\n")

        self.assertTrue(main_log_handler_mock.called)
        self.assertTrue(debug_log_handler_mock.called)
        self.assertTrue(mkdir_mock.called)
test_main.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_main__run_backup_invalid_ebs_volumeid_value(self,
                                                         main_log_handler_mock,
                                                         debug_log_handler_mock,
                                                         mkdir_mock):
        """Test that an invalid EBS volume name raise a ClientError exception."""
        instanceid = self.setup_ec2()
        responses.add(responses.GET, "http://169.254.169.254/latest/meta-data/placement/availability-zone",
                      body="us-east-1a", status=200)
        responses.add(responses.GET, "http://169.254.169.254/latest/meta-data/instance-id", body=instanceid,
                      status=200)
        ec2rl_prediag_test = ec2rlcore.main.Main(debug=True, full_init=True)
        ec2rl_prediag_test.options.global_args["backup"] = "vol-1"
        with self.assertRaises(ec2rlcore.backup.BackupClientError):
            with contextlib.redirect_stdout(self.output):
                ec2rl_prediag_test._run_backup()
        self.assertEqual(self.output.getvalue(), "\n-----------[Backup  Creation]-----------\n\n")

        self.assertTrue(main_log_handler_mock.called)
        self.assertTrue(debug_log_handler_mock.called)
        self.assertTrue(mkdir_mock.called)
test_main.py 文件源码 项目:aws-ec2rescue-linux 作者: awslabs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_main_main_software_check_no_missing_software(self,
                                                          main_log_handler_mock,
                                                          debug_log_handler_mock,
                                                          mkdir_mock,
                                                          which_mock):
        """Test that software_check returns the expected list of software."""
        path_to_ec2rl = os.path.abspath("ec2rl")
        test_path = os.path.sep.join([os.path.split(path_to_ec2rl)[0], "test", "modules", "ec2rl"])
        sys.argv = [test_path, "run"]
        ec2rl_softwarecheck_test = ec2rlcore.main.Main(debug=True, full_init=True)
        module_path = os.path.join(self.callpath, "test/modules/test_main_multi_run_prunemodules_fakeexecutable/")
        ec2rl_softwarecheck_test._modules = ec2rlcore.moduledir.ModuleDir(module_path)
        ec2rl_softwarecheck_test._modules.validate_constraints_have_args(options=ec2rl_softwarecheck_test.options,
                                                                         constraint=ec2rl_softwarecheck_test.constraint,
                                                                         without_keys=["software", "distro", "sudo"])
        with contextlib.redirect_stdout(self.output):
            self.assertTrue(ec2rl_softwarecheck_test.software_check())
        self.assertEqual("All test software requirements have been met.\n", self.output.getvalue())

        self.assertTrue(main_log_handler_mock.called)
        self.assertTrue(debug_log_handler_mock.called)
        self.assertTrue(mkdir_mock.called)
        self.assertTrue(which_mock.called)
prompt.py 文件源码 项目:Gilgamesh 作者: AndreaOrru 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run(self):
        """Run the command interpreter."""
        readline.parse_and_bind('')

        while True:
            line = input('>>> ').strip()

            # Redirect output to a file:
            try:
                separator_index = line.index('>')
                command = line[:separator_index]
                redirect_file = line[separator_index + 1:].strip()
            # Output to stdout:
            except ValueError:
                command = line
                redirect_file = None
            command = [x.strip() for x in command.split()]

            if redirect_file:
                with open(redirect_file, 'w') as f:
                    with redirect_stdout(f):
                        self._dispatch_command(command[0], command[1:])
            else:
                self._dispatch_command(command[0], command[1:])
main_tests.py 文件源码 项目:jira_reporting_scripts 作者: andrew-hamlin-sp 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_write_to_file(self):
        f, path = tempfile.mkstemp(suffix='csv')
        self.json_response = {
            'total': 1,
            'issues': [test_data.singleSprintStory()]
        }
        lines = None
        try:
            with redirect_stderr(self.std_err):
                with redirect_stdout(self.std_out):
                    prog.main(['-w', 'blah', 'cycletime', '--no-progress', '-o', path, 'TEST'])
            with open(path, 'r') as o:
                lines = o.readlines()
        finally:
            os.unlink(path)
        self.assertEqual(2, len(lines))
test_question12.py 文件源码 项目:qlcoder 作者: L1nwatch 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_count_upper_and_lower(self):
        test_data = "AAaaBBbb123CCcccd"
        right_answer = "upper=6,lower=8\n"
        with io.StringIO() as buf, redirect_stdout(buf):
            self.test_solve.count_upper_and_lower(test_data)
            my_answer = buf.getvalue()
        self.assertEqual(right_answer, my_answer)

        for i in range(10 ** 2):
            test_data = str()
            length = random.randint(1, 10 ** 3)
            for j in range(length):
                test_data += random.choice(string.ascii_letters + string.digits)
            right_answer = self.count_upper_and_lower(test_data)

            with io.StringIO() as buf, redirect_stdout(buf):
                self.test_solve.count_upper_and_lower(test_data)
                my_answer = buf.getvalue()
            self.assertEqual(right_answer, my_answer)
test_model_selection.py 文件源码 项目:mlens 作者: flennerhag 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_w_prep_fit():
    """[Model Selection] Test run with preprocessing, single step."""
    evl = Evaluator(mape_scorer, cv=5, shuffle=False, random_state=100,
                    verbose=True)

    with open(os.devnull, 'w') as f, redirect_stdout(f):

        evl.fit(X, y,
                estimators=[OLS()],
                param_dicts={'ols': {'offset': randint(1, 10)}},
                preprocessing={'pr': [Scale()], 'no': []},
                n_iter=3)

    np.testing.assert_approx_equal(
            evl.results['test_score-m']['no.ols'],
            -24.903229451043195)

    np.testing.assert_approx_equal(
            evl.results['test_score-m']['pr.ols'],
            -26.510708862278072, 1)

    assert evl.results['params']['no.ols']['offset'] == 4
    assert evl.results['params']['pr.ols']['offset'] == 4
test_model_selection.py 文件源码 项目:mlens 作者: flennerhag 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_w_prep_list_fit():
    """[Model Selection] Test run with preprocessing as list."""
    evl = Evaluator(
        mape_scorer, cv=5, shuffle=False, random_state=100, verbose=2)

    with open(os.devnull, 'w') as f, redirect_stdout(f):

        evl.fit(X, y,
                estimators=[OLS()],
                param_dicts={'ols': {'offset': randint(1, 10)}},
                preprocessing=[Scale()], n_iter=3)

    np.testing.assert_approx_equal(
            evl.results['test_score-m']['pr.ols'],
            -26.510708862278072)

    assert evl.results['params']['pr.ols']['offset'] == 4
test_b4_layer_push_pop.py 文件源码 项目:mlens 作者: flennerhag 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_push_2():
    """[Parallel | Layer] Test double push"""

    layer.push(g2)
    assert not layer.__fitted__

    with open(os.devnull, 'w') as f, redirect_stdout(f):
        a = run(layer, 'fit', X, y, refit=False, return_preds=True)

    assert layer.__fitted__

    with open(os.devnull, 'w') as f, redirect_stdout(f):
        b = run(layer, 'fit', X, y, refit=False, return_preds=True)

    with open(os.devnull, 'w') as f, redirect_stdout(f):
        c = run(layer, 'transform', X, return_preds=True)

    with open(os.devnull, 'w') as f, redirect_stdout(f):
        d = run(layer, 'fit', X, y, refit=True, return_preds=True)

    np.testing.assert_array_equal(a, b)
    np.testing.assert_array_equal(a, c)
    np.testing.assert_array_equal(a, d)
test_b4_layer_push_pop.py 文件源码 项目:mlens 作者: flennerhag 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_clone():
    """[Parallel | Layer] Test cloning"""
    lyr = clone(layer)

    assert lyr.__stack__
    assert not lyr.__fitted__

    with open(os.devnull, 'w') as f, redirect_stdout(f):
        F = run(layer, 'fit', X, y, refit=False, return_preds=True)
        H = run(lyr, 'fit', X, y, return_preds=True)
    np.testing.assert_array_equal(F, H)

    with open(os.devnull, 'w') as f, redirect_stdout(f):
        F = run(layer, 'transform', X)
        H = run(lyr, 'transform', X)
    np.testing.assert_array_equal(F, H)

    with open(os.devnull, 'w') as f, redirect_stdout(f):
        F = run(layer, 'predict', X)
        H = run(lyr, 'predict', X)
    np.testing.assert_array_equal(F, H)
test_gpustat.py 文件源码 项目:gpustat 作者: wookayin 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_args_endtoend(self, N, Process):
        """
        End-to-end testing given command line args.
        """
        _configure_mock(N, Process)

        def capture_output(*args):
            f = StringIO()
            import contextlib

            with contextlib.redirect_stdout(f):  # requires python 3.4+
                try:
                    gpustat.main(*args)
                except SystemExit:
                    raise AssertionError("Argparse failed (see above error message)")
            return f.getvalue()

        s = capture_output('gpustat', )
        unescaped = remove_ansi_codes(s)
        unescaped = '\n'.join(unescaped.split('\n')[1:])   # remove first line (header)
        self.maxDiff = 4096
        self.assertEqual(unescaped, MOCK_EXPECTED_OUTPUT_DEFAULT)

        s = capture_output('gpustat', '--no-header')
        self.assertIn("[0]", s.split('\n')[0])
sniff_ui_threading.py 文件源码 项目:Mac-Python-3.X 作者: L1nwatch 项目源码 文件源码 阅读 61 收藏 0 点赞 0 评论 0
def _print_packet_contents(self, event, listbox):
        packet = self.l_packets[listbox.curselection()[0]]
        hex_contents = MyUI._hexdump(packet)

        ## ?????? Start
        # state="normal",??????????, ?????? state ? disabled ???????
        self.hex_text.configure(state="normal")
        self.hex_text.delete("1.0", tkinter.END)  # ???????
        self.hex_text.insert(tkinter.END, hex_contents)
        self.hex_text.configure(state="disabled")
        ## ?????? End

        ## ??????? Start
        # ? show ?? print ??????, ??? stdout ????
        with io.StringIO() as buf, redirect_stdout(buf):
            packet.show()
            show_str = buf.getvalue()

        self.contents_text.configure(state="normal")
        self.contents_text.delete("1.0", tkinter.END)  # ???????
        self.contents_text.insert(tkinter.END, show_str)
        self.contents_text.configure(state="disabled")
        ## ??????? End
nyx.py 文件源码 项目:nyx 作者: Cappycot 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def loadstring(self, code, ctx):
        """Remote execute code from the Discord client or other sources for
        debugging. This returns true if the code to execute runs completely
        without error. This function returns a string with output.

        Arguments:
        code - the Python 3 code to run within self
        """
        if ctx is None:
            return "No context to run the code in!"
        with closing(StringIO()) as log:
            with redirect_stdout(log):
                try:
                    exec(code)
                # Screw your warnings, PyCharm!
                except:
                    error = sys.exc_info()
                    for e in error:
                        print(e)
            return log.getvalue()
test_functional.py 文件源码 项目:kinto-wizard 作者: Kinto 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_round_trip(self):
        cmd = 'kinto-wizard {} --server={} --auth={}'
        load_cmd = cmd.format("load {}".format(self.file),
                              self.server, self.auth)
        sys.argv = load_cmd.split(" ")
        main()

        dump_cmd = cmd.format("dump", self.server, self.auth)
        sys.argv = dump_cmd.split(" ")
        output = io.StringIO()
        with redirect_stdout(output):
            main()
        output.flush()

        # Check that identical to original file.
        generated = output.getvalue()
        with open(self.file) as f:
            assert f.read() == generated
test_functional.py 文件源码 项目:kinto-wizard 作者: Kinto 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def test_round_trip(self):
        # Load some data
        cmd = 'kinto-wizard {} --server={} --auth={}'
        load_cmd = cmd.format("load {}".format(self.file),
                              self.server, self.auth)
        sys.argv = load_cmd.split(" ")
        main()

        cmd = 'kinto-wizard {} --server={} --auth={} --full'
        load_cmd = cmd.format("dump", self.server, self.auth)
        sys.argv = load_cmd.split(" ")
        output = io.StringIO()
        with redirect_stdout(output):
            main()
        output.flush()

        # Check that identical to original file.
        generated = output.getvalue()
        with open(self.file) as f:
            assert f.read() == generated
test_functional.py 文件源码 项目:kinto-wizard 作者: Kinto 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_round_trip(self):
        # Load some data
        cmd = 'kinto-wizard {} --server={} --auth={}'
        load_cmd = cmd.format("load {}".format(self.file),
                              self.server, self.auth)
        sys.argv = load_cmd.split(" ")
        main()

        cmd = 'kinto-wizard {} --server={} --auth={} --data --records'
        load_cmd = cmd.format("dump", self.server, self.auth)
        sys.argv = load_cmd.split(" ")
        output = io.StringIO()
        with redirect_stdout(output):
            main()
        output.flush()

        # Check that identical to original file.
        generated = output.getvalue()
        with open(self.file) as f:
            assert f.read() == generated
test_functional.py 文件源码 项目:kinto-wizard 作者: Kinto 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def dump(self, bucket=None, collection=None):
        cmd = 'kinto-wizard {} --server={} --auth={}'
        dump_cmd = cmd.format("dump --full", self.server, self.auth)

        if bucket:
            dump_cmd += ' --bucket={}'.format(bucket)

        if collection:
            dump_cmd += ' --collection={}'.format(collection)

        sys.argv = dump_cmd.split(" ")
        output = io.StringIO()
        with redirect_stdout(output):
            main()
        output.flush()

        # Check that identical to original file.
        return output.getvalue()
test_main.py 文件源码 项目:yam 作者: trichter 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def out(self, cmd, text=None):
        """Test if text is in output of command"""
        # for TRAVIS use maximal two cores
        if (self.njobs and '--njobs' not in cmd and
                cmd.split()[0] in ('correlate', 'stretch')):
            cmd = cmd + ' --njobs ' + self.njobs
        # disabling the logger is necessary, because the logging
        # configuration cannot be changed easily on subsequent calls
        # of yam in this test suite
        if self.verbose and cmd.split()[0] in ('correlate', 'stack', 'stretch'):
            if '-v' not in cmd:
                cmd = cmd + ' -vvv'
            logging.getLogger('yam').disabled = False
        elif self.verbose:
            logging.getLogger('yam').disabled = True
        if self.verbose:
            tqdm.tqdm.write('> yam ' + cmd)
        # catching all output, print only if tests are run with -v
        try:
            with io.StringIO() as f:
                with redirect_stdout(f), redirect_stderr(f):
                    try:
                        self.script(cmd.split())
                    except SystemExit:
                        pass
                output = f.getvalue()
            if self.verbose:
                tqdm.tqdm.write(output)
        finally:
            self.pbar.update(1)
        if text is not None:
            self.assertIn(text, output)
        return output
command_test.py 文件源码 项目:stakkr 作者: edyan 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_command_without_stdout_ok(self):
        # TODO make it work under windows
        if os.name == 'nt':
            return

        f = io.StringIO()
        with redirect_stdout(f):
            launch_cmd_displays_output(self.cmd_ok, False, False)
        res = f.getvalue()
        self.assertEqual('.', res[:1])

        try:
            from contextlib import redirect_stderr
        except Exception:
            return

        f = io.StringIO()
        with redirect_stderr(f):
            launch_cmd_displays_output(self.cmd_ok, False, False)
        res = f.getvalue()
        self.assertEqual('', res)
command_test.py 文件源码 项目:stakkr 作者: edyan 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_command_with_stdout_ok(self):
        # TODO make it work under windows
        if os.name == 'nt':
            return

        f = io.StringIO()
        with redirect_stdout(f):
            launch_cmd_displays_output(self.cmd_ok, True, False)
        res = f.getvalue()
        self.assertEqual('coucou\n\n', res)

        try:
            from contextlib import redirect_stderr
        except Exception:
            return

        f = io.StringIO()
        with redirect_stderr(f):
            launch_cmd_displays_output(self.cmd_ok, True, False)
        res = f.getvalue()
        self.assertEqual('', res)
command_test.py 文件源码 项目:stakkr 作者: edyan 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_command_with_stderr_no_stdout_ok(self):
        # TODO make it work under windows
        if os.name == 'nt':
            return

        f = io.StringIO()
        with redirect_stdout(f):
            launch_cmd_displays_output(self.cmd_ok, False, True)
        res = f.getvalue()
        self.assertEqual('.', res[:1])

        try:
            from contextlib import redirect_stderr
        except Exception:
            return

        f = io.StringIO()
        with redirect_stderr(f):
            launch_cmd_displays_output(self.cmd_ok, False, True)
        res = f.getvalue()
        self.assertEqual('', res)
command_test.py 文件源码 项目:stakkr 作者: edyan 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_command_without_stderr_and_stdout_err(self):
        # TODO make it work under windows
        if os.name == 'nt':
            return

        f = io.StringIO()
        with redirect_stdout(f):
            launch_cmd_displays_output(self.cmd_nook, False, False)
        res = f.getvalue()
        self.assertEqual('\n', res)

        try:
            from contextlib import redirect_stderr
        except Exception:
            return

        f = io.StringIO()
        with redirect_stderr(f):
            launch_cmd_displays_output(self.cmd_nook, False, False)
        res = f.getvalue()
        self.assertEqual('', res)
command_test.py 文件源码 项目:stakkr 作者: edyan 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_command_without_stderr_but_stdout_err(self):
        # TODO make it work under windows
        if os.name == 'nt':
            return

        f = io.StringIO()
        with redirect_stdout(f):
            launch_cmd_displays_output(self.cmd_nook, True, False)
        res = f.getvalue()
        self.assertEqual('\n', res)

        try:
            from contextlib import redirect_stderr
        except Exception:
            return

        f = io.StringIO()
        with redirect_stderr(f):
            launch_cmd_displays_output(self.cmd_nook, True, False)
        res = f.getvalue()
        self.assertEqual('', res)
command_test.py 文件源码 项目:stakkr 作者: edyan 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_command_with_stderr_no_stdout_err_loop(self):
        # TODO make it work under windows
        if os.name == 'nt':
            return

        f = io.StringIO()
        with redirect_stdout(f):
            launch_cmd_displays_output(['wget', '--debug', '--tries', '3', 'http://doesnotexist'], False, True)
        res = f.getvalue()
        expected = re.compile('.*\.\.\. and more.*', re.MULTILINE)
        self.assertRegex(res, expected)

        try:
            from contextlib import redirect_stderr
        except Exception:
            return

        f = io.StringIO()
        with redirect_stderr(f):
            launch_cmd_displays_output(self.cmd_nook, False, True)
        res = f.getvalue()
        self.assertEqual('', res)


问题


面经


文章

微信
公众号

扫码关注公众号