python类__stderr__()的实例源码

test_runner_redirection.py 文件源码 项目:cauldron 作者: sernst 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_restore(self):
        """ should restore from an enabled state """

        support.create_project(self, 'percy')
        support.add_step(self)

        project = cd.project.internal_project
        step = project.steps[0]

        redirection.enable(step)
        self.assertIsInstance(sys.stdout, redirection.RedirectBuffer)
        self.assertIsInstance(sys.stderr, redirection.RedirectBuffer)

        redirection.restore_default_configuration()
        self.assertNotIsInstance(sys.stdout, redirection.RedirectBuffer)
        self.assertNotIsInstance(sys.stderr, redirection.RedirectBuffer)
        self.assertEqual(sys.stdout, sys.__stdout__)
        self.assertEqual(sys.stderr, sys.__stderr__)
test_runner_redirection.py 文件源码 项目:cauldron 作者: sernst 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_enable_disable(self):
        """ should properly enable and disable redirection """

        support.create_project(self, 'tonks')
        support.add_step(self)

        project = cd.project.internal_project
        step = project.steps[0]

        redirection.enable(step)
        self.assertIsInstance(sys.stdout, redirection.RedirectBuffer)
        self.assertIsInstance(sys.stderr, redirection.RedirectBuffer)

        redirection.disable(step)
        self.assertNotIsInstance(sys.stdout, redirection.RedirectBuffer)
        self.assertNotIsInstance(sys.stderr, redirection.RedirectBuffer)
        self.assertEqual(sys.stdout, sys.__stdout__)
        self.assertEqual(sys.stderr, sys.__stderr__)
scheme_test.py 文件源码 项目:Charon 作者: forrestchang 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def run_tests(src_file='tests.scm'):
    """Run a read-eval loop that reads from src_file and collects outputs."""
    sys.stderr = sys.stdout = io.StringIO() # Collect output to stdout and stderr
    reader = None
    try:
        reader = TestReader(open(src_file).readlines(), sys.stdout)
        src = Buffer(tokenize_lines(reader))
        def next_line():
            src.current()
            return src
        read_eval_print_loop(next_line, create_global_frame())
    except BaseException as exc:
        sys.stderr = sys.__stderr__
        if reader:
            print("Tests terminated due to unhandled exception "
                  "after line {0}:\n>>>".format(reader.line_number),
                  file=sys.stderr)
        raise
    finally:
        sys.stdout = sys.__stdout__  # Revert stdout
        sys.stderr = sys.__stderr__  # Revert stderr
    summarize(reader.output, reader.expected_output)
managers.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def serve_forever(self):
        '''
        Run the server forever
        '''
        self.stop_event = threading.Event()
        process.current_process()._manager_server = self
        try:
            accepter = threading.Thread(target=self.accepter)
            accepter.daemon = True
            accepter.start()
            try:
                while not self.stop_event.is_set():
                    self.stop_event.wait(1)
            except (KeyboardInterrupt, SystemExit):
                pass
        finally:
            if sys.stdout != sys.__stdout__:
                util.debug('resetting stdout, stderr')
                sys.stdout = sys.__stdout__
                sys.stderr = sys.__stderr__
            sys.exit(0)
__init__.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 56 收藏 0 点赞 0 评论 0
def _rmtree(path):
        def _rmtree_inner(path):
            for name in os.listdir(path):
                fullname = os.path.join(path, name)
                try:
                    mode = os.lstat(fullname).st_mode
                except OSError as exc:
                    print("support.rmtree(): os.lstat(%r) failed with %s" % (fullname, exc),
                          file=sys.__stderr__)
                    mode = 0
                if stat.S_ISDIR(mode):
                    _waitfor(_rmtree_inner, fullname, waitall=True)
                    os.rmdir(fullname)
                else:
                    os.unlink(fullname)
        _waitfor(_rmtree_inner, path, waitall=True)
        _waitfor(os.rmdir, path)
test_faulthandler.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_is_enabled(self):
        orig_stderr = sys.stderr
        try:
            # regrtest may replace sys.stderr by io.StringIO object, but
            # faulthandler.enable() requires that sys.stderr has a fileno()
            # method
            sys.stderr = sys.__stderr__

            was_enabled = faulthandler.is_enabled()
            try:
                faulthandler.enable()
                self.assertTrue(faulthandler.is_enabled())
                faulthandler.disable()
                self.assertFalse(faulthandler.is_enabled())
            finally:
                if was_enabled:
                    faulthandler.enable()
                else:
                    faulthandler.disable()
        finally:
            sys.stderr = orig_stderr
PyShell.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def idle_showwarning(
        message, category, filename, lineno, file=None, line=None):
    """Show Idle-format warning (after replacing warnings.showwarning).

    The differences are the formatter called, the file=None replacement,
    which can be None, the capture of the consequence AttributeError,
    and the output of a hard-coded prompt.
    """
    if file is None:
        file = warning_stream
    try:
        file.write(idle_formatwarning(
                message, category, filename, lineno, line=line))
        file.write(">>> ")
    except (AttributeError, OSError):
        pass  # if file (probably __stderr__) is invalid, skip warning.
run.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def manage_socket(address):
    for i in range(3):
        time.sleep(i)
        try:
            server = MyRPCServer(address, MyHandler)
            break
        except OSError as err:
            print("IDLE Subprocess: OSError: " + err.args[1] +
                  ", retrying....", file=sys.__stderr__)
            socket_error = err
    else:
        print("IDLE Subprocess: Connection to "
              "IDLE GUI failed, exiting.", file=sys.__stderr__)
        show_socket_error(socket_error, address)
        global exit_now
        exit_now = True
        return
    server.handle_request() # A single request only
rpc.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def handle_error(self, request, client_address):
        """Override TCPServer method

        Error message goes to __stderr__.  No error message if exiting
        normally or socket raised EOF.  Other exceptions not handled in
        server code will cause os._exit.

        """
        try:
            raise
        except SystemExit:
            raise
        except:
            erf = sys.__stderr__
            print('\n' + '-'*40, file=erf)
            print('Unhandled server exception!', file=erf)
            print('Thread: %s' % threading.current_thread().name, file=erf)
            print('Client Address: ', client_address, file=erf)
            print('Request: ', repr(request), file=erf)
            traceback.print_exc(file=erf)
            print('\n*** Unrecoverable, server exiting!', file=erf)
            print('-'*40, file=erf)
            os._exit(0)

#----------------- end class RPCServer --------------------
main.py 文件源码 项目:urh 作者: jopohl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def fix_windows_stdout_stderr():
    """
    Processes can't write to stdout/stderr on frozen windows apps because they do not exist here
    if process tries it anyway we get a nasty dialog window popping up, so we redirect the streams to a dummy
    see https://github.com/jopohl/urh/issues/370
    """

    if hasattr(sys, "frozen") and sys.platform == "win32":
        try:
            sys.stdout.write("\n")
            sys.stdout.flush()
        except:
            class DummyStream(object):
                def __init__(self): pass

                def write(self, data): pass

                def read(self, data): pass

                def flush(self): pass

                def close(self): pass

            sys.stdout, sys.stderr, sys.stdin = DummyStream(), DummyStream(), DummyStream()
            sys.__stdout__, sys.__stderr__, sys.__stdin__ = DummyStream(), DummyStream(), DummyStream()
mock.py 文件源码 项目:case 作者: celery 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def mute():
    """Redirect `sys.stdout` and `sys.stderr` to /dev/null, silencent them.
    Decorator example::
        @mock.mute
        def test_foo(self):
            something()
    Context example::
        with mock.mute():
            something()
    """
    prev_out, prev_err = sys.stdout, sys.stderr
    prev_rout, prev_rerr = sys.__stdout__, sys.__stderr__
    devnull = open(os.devnull, 'w')
    mystdout, mystderr = devnull, devnull
    sys.stdout = sys.__stdout__ = mystdout
    sys.stderr = sys.__stderr__ = mystderr

    try:
        yield
    finally:
        sys.stdout = prev_out
        sys.stderr = prev_err
        sys.__stdout__ = prev_rout
        sys.__stderr__ = prev_rerr
scheme_test.py 文件源码 项目:SchemeInterpreter 作者: GrantHiggins16 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run_tests(src_file='tests.scm'):
    """Run a read-eval loop that reads from src_file and collects outputs."""
    sys.stderr = sys.stdout = io.StringIO() # Collect output to stdout and stderr
    reader = None
    try:
        reader = TestReader(open(src_file).readlines(), sys.stdout)
        src = Buffer(tokenize_lines(reader))
        def next_line():
            src.current()
            return src
        read_eval_print_loop(next_line, create_global_frame())
    except BaseException as exc:
        sys.stderr = sys.__stderr__
        if reader:
            print("Tests terminated due to unhandled exception "
                  "after line {0}:\n>>>".format(reader.line_number),
                  file=sys.stderr)
        raise
    finally:
        sys.stdout = sys.__stdout__  # Revert stdout
        sys.stderr = sys.__stderr__  # Revert stderr
    summarize(reader.output, reader.expected_output)
benefits.py 文件源码 项目:DjanGoat 作者: Contrast-Security-OSS 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def silence_streams(func):
        def wrapper(*args, **kwargs):
            # save stderr
            save_streams = sys.__stderr__
            save_streams.flush()
            # file descriptor for stderr is 2
            save_stderr = os.dup(2)
            # silence
            null_fd = os.open(os.devnull, os.O_RDWR)
            os.dup2(null_fd, 2)
            # uncomment the line to test if stderr is silenced
            # 1/0
            try:
                bak_file_path = func(*args, **kwargs)
                sys.stderr = save_streams
                os.dup2(save_stderr, 2)
                return bak_file_path
            except:
                sys.stderr = save_streams
                os.dup2(save_stderr, 2)
        return wrapper
managers.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def shutdown(self, c):
        '''
        Shutdown this process
        '''
        try:
            try:
                util.debug('manager received shutdown message')
                c.send(('#RETURN', None))

                if sys.stdout != sys.__stdout__:
                    util.debug('resetting stdout, stderr')
                    sys.stdout = sys.__stdout__
                    sys.stderr = sys.__stderr__

                util._run_finalizers(0)

                for p in active_children():
                    util.debug('terminating a child process of manager')
                    p.terminate()

                for p in active_children():
                    util.debug('terminating a child process of manager')
                    p.join()

                util._run_finalizers()
                util.info('manager exiting with exitcode 0')
            except:
                import traceback
                traceback.print_exc()
        finally:
            exit(0)
ws_client_ single_callback_L0.py 文件源码 项目:CEX.IO-Client-Python3.5 作者: cexioltd 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def _force_disconnect():
        while True:
            try:
                await asyncio.sleep(random.randrange(24, 64))
                print('test > Force disconnect')
                await client.ws.close()
            except Exception as ex:
                print("Exception at exit: {}".format(ex), sys.__stderr__)
build_logs.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def init(ctx):
    global LOGFILE
    filename = os.path.abspath(LOGFILE)
    try:
        os.makedirs(os.path.dirname(os.path.abspath(filename)))
    except OSError:
        pass

    if hasattr(os, 'O_NOINHERIT'):
        fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT)
        fileobj = os.fdopen(fd, 'w')
    else:
        fileobj = open(LOGFILE, 'w')
    old_stderr = sys.stderr

    # sys.stdout has already been replaced, so __stdout__ will be faster
    #sys.stdout = log_to_file(sys.stdout, fileobj, filename)
    #sys.stderr = log_to_file(sys.stderr, fileobj, filename)
    def wrap(stream):
        if stream.isatty():
            return ansiterm.AnsiTerm(stream)
        return stream
    sys.stdout = log_to_file(wrap(sys.__stdout__), fileobj, filename)
    sys.stderr = log_to_file(wrap(sys.__stderr__), fileobj, filename)

    # now mess with the logging module...
    for x in Logs.log.handlers:
        try:
            stream = x.stream
        except AttributeError:
            pass
        else:
            if id(stream) == id(old_stderr):
                x.stream = sys.stderr
build_logs.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def init(ctx):
    global LOGFILE
    filename = os.path.abspath(LOGFILE)
    try:
        os.makedirs(os.path.dirname(os.path.abspath(filename)))
    except OSError:
        pass

    if hasattr(os, 'O_NOINHERIT'):
        fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT)
        fileobj = os.fdopen(fd, 'w')
    else:
        fileobj = open(LOGFILE, 'w')
    old_stderr = sys.stderr

    # sys.stdout has already been replaced, so __stdout__ will be faster
    #sys.stdout = log_to_file(sys.stdout, fileobj, filename)
    #sys.stderr = log_to_file(sys.stderr, fileobj, filename)
    def wrap(stream):
        if stream.isatty():
            return ansiterm.AnsiTerm(stream)
        return stream
    sys.stdout = log_to_file(wrap(sys.__stdout__), fileobj, filename)
    sys.stderr = log_to_file(wrap(sys.__stderr__), fileobj, filename)

    # now mess with the logging module...
    for x in Logs.log.handlers:
        try:
            stream = x.stream
        except AttributeError:
            pass
        else:
            if id(stream) == id(old_stderr):
                x.stream = sys.stderr
build_logs.py 文件源码 项目:SoCFoundationFlow 作者: mattaw 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def init(ctx):
    global LOGFILE
    filename = os.path.abspath(LOGFILE)
    try:
        os.makedirs(os.path.dirname(os.path.abspath(filename)))
    except OSError:
        pass

    if hasattr(os, 'O_NOINHERIT'):
        fd = os.open(LOGFILE, os.O_CREAT | os.O_TRUNC | os.O_WRONLY | os.O_NOINHERIT)
        fileobj = os.fdopen(fd, 'w')
    else:
        fileobj = open(LOGFILE, 'w')
    old_stderr = sys.stderr

    # sys.stdout has already been replaced, so __stdout__ will be faster
    #sys.stdout = log_to_file(sys.stdout, fileobj, filename)
    #sys.stderr = log_to_file(sys.stderr, fileobj, filename)
    sys.stdout = log_to_file(sys.__stdout__, fileobj, filename)
    sys.stderr = log_to_file(sys.__stderr__, fileobj, filename)

    # now mess with the logging module...
    for x in Logs.log.handlers:
        try:
            stream = x.stream
        except AttributeError:
            pass
        else:
            if id(stream) == id(old_stderr):
                x.stream = sys.stderr
scriptcwl.py 文件源码 项目:scriptcwl 作者: NLeSC 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def quiet():
    # save stdout/stderr
    # Jupyter doesn't support setting it back to
    # sys.__stdout__ and sys.__stderr__
    _sys_stdout = sys.stdout
    _sys_stderr = sys.stderr
    # Divert stdout and stderr to devnull
    sys.stdout = sys.stderr = open(os.devnull, "w")
    try:
        yield
    finally:
        # Revert back to standard stdout/stderr
        sys.stdout = _sys_stdout
        sys.stderr = _sys_stderr
snpmatch.py 文件源码 项目:SNPmatch 作者: Gregor-Mendel-Institute 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def readVcf(inFile, logDebug):
    log.info("reading the VCF file")
    ## We read only one sample from the VCF file
    if logDebug:
        vcf = allel.read_vcf(inFile, samples = [0], fields = '*')
    else:
        sys.stderr = StringIO.StringIO()
        vcf = allel.read_vcf(inFile, samples = [0], fields = '*')
        #vcf = vcfnp.variants(inFile, cache=False).view(np.recarray)
        #vcfD = vcfnp.calldata_2d(inFile, cache=False).view(np.recarray)
        sys.stderr = sys.__stderr__
    (snpCHR, snpsREQ) = parseChrName(vcf['variants/CHROM'])
    try:
        snpGT = allel.GenotypeArray(vcf['calldata/GT']).to_gt()[snpsREQ, 0]
    except AttributeError:
        die("input VCF file doesnt have required GT field")
    snpsREQ = snpsREQ[np.where(snpGT != './.')[0]]
    snpGT = allel.GenotypeArray(vcf['calldata/GT']).to_gt()[snpsREQ, 0]
    if 'calldata/PL' in sorted(vcf.keys()):
        snpWEI = np.copy(vcf['calldata/PL'][snpsREQ, 0]).astype('float')
        snpWEI = snpWEI/(-10)
        snpWEI = np.exp(snpWEI)
    else:
        snpBinary = parseGT(snpGT)
        snpWEI = np.ones((len(snpsREQ), 3))  ## for homo and het
        snpWEI[np.where(snpBinary != 0),0] = 0
        snpWEI[np.where(snpBinary != 1),2] = 0
        snpWEI[np.where(snpBinary != 2),1] = 0
    snpCHR = snpCHR[snpsREQ]
    DPmean = np.mean(vcf['calldata/DP'][snpsREQ,0])
    snpPOS = np.array(vcf['variants/POS'][snpsREQ])
    return (DPmean, snpCHR, snpPOS, snpGT, snpWEI)


问题


面经


文章

微信
公众号

扫码关注公众号