python类dup()的实例源码

connection.py 文件源码 项目:kinect-2-libras 作者: inessadl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def Pipe(duplex=True):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
            c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
            s1.close()
            s2.close()
        else:
            fd1, fd2 = os.pipe()
            c1 = _multiprocessing.Connection(fd1, writable=False)
            c2 = _multiprocessing.Connection(fd2, readable=False)

        return c1, c2
recipe-576967.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, fh, map=None, maxdata=None, ignore_broken_pipe=False, logger=None, **obsopt):
        """Wrap a dispatcher around the passed filehandle.

        If `ignore_broken_pipe` is `True`, an `EPIPE` or `EBADF` error will
        call `handle_close()` instead of `handle_expt()`. Useful when broken
        pipes should be handled quietly.

        `logger` is a logger which will be used to log unusual exceptions;
        otherwise, they will be printed to stderr.
        """
        self.maxdata = maxdata if maxdata else self.pipe_maxdata
        self.__logger = logger
        if ignore_broken_pipe:
            self.__ignore_errno = [EPIPE, EBADF]
        else:
            self.__ignore_errno = []
        self.__filehandle = fh
        # Check for overduplication of the file descriptor and close the extra
        fddup = os.dup(fh.fileno())
        file_dispatcher.__init__(self, fddup, map=map)
        if (self._fileno != fddup): os.close (fddup)
        Observable.__init__(self, **obsopt)
capture.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False):
        """ save targetfd descriptor, and open a new
            temporary file there.  If no tmpfile is
            specified a tempfile.Tempfile() will be opened
            in text mode.
        """
        self.targetfd = targetfd
        if tmpfile is None and targetfd != 0:
            f = tempfile.TemporaryFile('wb+')
            tmpfile = dupfile(f, encoding="UTF-8")
            f.close()
        self.tmpfile = tmpfile
        self._savefd = os.dup(self.targetfd)
        if patchsys:
            self._oldsys = getattr(sys, patchsysdict[targetfd])
        if now:
            self.start()
capture.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def safe_text_dupfile(f, mode, default_encoding="UTF8"):
    """ return a open text file object that's a duplicate of f on the
        FD-level if possible.
    """
    encoding = getattr(f, "encoding", None)
    try:
        fd = f.fileno()
    except Exception:
        if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"):
            # we seem to have a text stream, let's just use it
            return f
    else:
        newfd = os.dup(fd)
        if "b" not in mode:
            mode += "b"
        f = os.fdopen(newfd, mode, 0)  # no buffering
    return EncodedFile(f, encoding or default_encoding)
connection.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def Pipe(duplex=True):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
            c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
            s1.close()
            s2.close()
        else:
            fd1, fd2 = os.pipe()
            c1 = _multiprocessing.Connection(fd1, writable=False)
            c2 = _multiprocessing.Connection(fd2, readable=False)

        return c1, c2
update.py 文件源码 项目:nojs 作者: chrisdickinson 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def main():
  if sys.platform in ['win32', 'cygwin']:
    return 0

  # This script is called by gclient. gclient opens its hooks subprocesses with
  # (stdout=subprocess.PIPE, stderr=subprocess.STDOUT) and then does custom
  # output processing that breaks printing '\r' characters for single-line
  # updating status messages as printed by curl and wget.
  # Work around this by setting stderr of the update.sh process to stdin (!):
  # gclient doesn't redirect stdin, and while stdin itself is read-only, a
  # dup()ed sys.stdin is writable, try
  #   fd2 = os.dup(sys.stdin.fileno()); os.write(fd2, 'hi')
  # TODO: Fix gclient instead, http://crbug.com/95350
  return subprocess.call(
      [os.path.join(os.path.dirname(__file__), 'update.sh')] +  sys.argv[1:],
      stderr=os.fdopen(
        os.dup(
            sys.stdin.fileno())))
master.py 文件源码 项目:mitogen 作者: dw 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def create_child(*args):
    parentfp, childfp = socket.socketpair()
    pid = os.fork()
    if not pid:
        mitogen.core.set_block(childfp.fileno())
        os.dup2(childfp.fileno(), 0)
        os.dup2(childfp.fileno(), 1)
        childfp.close()
        parentfp.close()
        os.execvp(args[0], args)

    childfp.close()
    # Decouple the socket from the lifetime of the Python socket object.
    fd = os.dup(parentfp.fileno())
    parentfp.close()

    LOG.debug('create_child() child %d fd %d, parent %d, cmd: %s',
              pid, fd, os.getpid(), Argv(args))
    return pid, fd
connection.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 72 收藏 0 点赞 0 评论 0
def Pipe(duplex=True):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
            c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
            s1.close()
            s2.close()
        else:
            fd1, fd2 = os.pipe()
            c1 = _multiprocessing.Connection(fd1, writable=False)
            c2 = _multiprocessing.Connection(fd2, readable=False)

        return c1, c2
test_os.py 文件源码 项目:zippy 作者: securesystemslab 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_closerange(self):
        first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
        # We must allocate two consecutive file descriptors, otherwise
        # it will mess up other file descriptors (perhaps even the three
        # standard ones).
        second = os.dup(first)
        try:
            retries = 0
            while second != first + 1:
                os.close(first)
                retries += 1
                if retries > 10:
                    # XXX test skipped
                    self.skipTest("couldn't allocate two consecutive fds")
                first, second = second, os.dup(second)
        finally:
            os.close(second)
        # close a fd that is open, and one that isn't
        os.closerange(first, first + 2)
        self.assertRaises(OSError, os.write, first, b"a")
test_facade.py 文件源码 项目:landscape-client 作者: CanonicalLtd 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _mock_output_restore(self):
        """
        Mock methods to ensure that stdout and stderr are restored,
        after they have been captured.

        Return the path to the tempfile that was used to capture the output.
        """
        old_stdout = os.dup(1)
        old_stderr = os.dup(2)
        fd, outfile = tempfile.mkstemp()
        mkstemp_patcher = mock.patch("tempfile.mkstemp")
        mock_mkstemp = mkstemp_patcher.start()
        self.addCleanup(mkstemp_patcher.stop)
        mock_mkstemp.return_value = (fd, outfile)

        dup_patcher = mock.patch("os.dup")
        mock_dup = dup_patcher.start()
        self.addCleanup(dup_patcher.stop)
        mock_dup.side_effect = lambda fd: {1: old_stdout, 2: old_stderr}[fd]

        dup2_patcher = mock.patch("os.dup2", wraps=os.dup2)
        mock_dup2 = dup2_patcher.start()
        self.addCleanup(dup2_patcher.stop)
        return outfile, mock_dup2
wurlitzer.py 文件源码 项目:ipybind 作者: aldanor 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _setup_pipe(self, name):
        from fcntl import fcntl, F_GETFL, F_SETFL

        real_fd = getattr(sys, '__%s__' % name).fileno()
        save_fd = os.dup(real_fd)
        self._save_fds[name] = save_fd

        pipe_out, pipe_in = os.pipe()
        os.dup2(pipe_in, real_fd)
        os.close(pipe_in)
        self._real_fds[name] = real_fd

        # make pipe_out non-blocking
        flags = fcntl(pipe_out, F_GETFL)
        fcntl(pipe_out, F_SETFL, flags | os.O_NONBLOCK)
        return pipe_out
solvers.py 文件源码 项目:MatchingMarkets.py 作者: QuantEcon 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def callSolver(self, lp, callback = None):
            """Solves the problem with yaposib
            """
            if self.msg == 0:
                #close stdout to get rid of messages
                tempfile = open(mktemp(),'w')
                savestdout = os.dup(1)
                os.close(1)
                if os.dup(tempfile.fileno()) != 1:
                    raise PulpSolverError("couldn't redirect stdout - dup() error")
            self.solveTime = -clock()
            lp.solverModel.solve(self.mip)
            self.solveTime += clock()
            if self.msg == 0:
                #reopen stdout
                os.close(1)
                os.dup(savestdout)
                os.close(savestdout)
pager.py 文件源码 项目:gerrymanderer 作者: emusical 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def start_pager():
    if not sys.stdout.isatty():
        return

    pager = get_pager()
    if not pager:
        return

    if "LESS" not in os.environ:
        os.environ["LESS"] = "FRSX"

    oldstdout = os.dup(1)
    global pagerproc
    pagerproc = subprocess.Popen([pager],
                                 stdin=subprocess.PIPE,
                                 stdout=oldstdout,
                                 close_fds=True)

    os.close(oldstdout)
    os.dup2(pagerproc.stdin.fileno(), 1)
setupext.py 文件源码 项目:yt 作者: yt-project 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def stdchannel_redirected(stdchannel, dest_filename):
    """
    A context manager to temporarily redirect stdout or stderr

    e.g.:

    with stdchannel_redirected(sys.stderr, os.devnull):
        if compiler.has_function('clock_gettime', libraries=['rt']):
            libraries.append('rt')

    Code adapted from https://stackoverflow.com/a/17752455/1382869
    """

    try:
        oldstdchannel = os.dup(stdchannel.fileno())
        dest_file = open(dest_filename, 'w')
        os.dup2(dest_file.fileno(), stdchannel.fileno())

        yield
    finally:
        if oldstdchannel is not None:
            os.dup2(oldstdchannel, stdchannel.fileno())
        if dest_file is not None:
            dest_file.close()
connection.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def Pipe(duplex=True):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
            c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
            s1.close()
            s2.close()
        else:
            fd1, fd2 = os.pipe()
            c1 = _multiprocessing.Connection(fd1, writable=False)
            c2 = _multiprocessing.Connection(fd2, readable=False)

        return c1, c2
test_os.py 文件源码 项目:oil 作者: oilshell 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_closerange(self):
        first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
        # We must allocate two consecutive file descriptors, otherwise
        # it will mess up other file descriptors (perhaps even the three
        # standard ones).
        second = os.dup(first)
        try:
            retries = 0
            while second != first + 1:
                os.close(first)
                retries += 1
                if retries > 10:
                    # XXX test skipped
                    self.skipTest("couldn't allocate two consecutive fds")
                first, second = second, os.dup(second)
        finally:
            os.close(second)
        # close a fd that is open, and one that isn't
        os.closerange(first, first + 2)
        self.assertRaises(OSError, os.write, first, "a")
connection.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def Pipe(duplex=True):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
            c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
            s1.close()
            s2.close()
        else:
            fd1, fd2 = os.pipe()
            c1 = _multiprocessing.Connection(fd1, writable=False)
            c2 = _multiprocessing.Connection(fd2, readable=False)

        return c1, c2
test_os.py 文件源码 项目:python2-tracer 作者: extremecoders-re 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def test_closerange(self):
        first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
        # We must allocate two consecutive file descriptors, otherwise
        # it will mess up other file descriptors (perhaps even the three
        # standard ones).
        second = os.dup(first)
        try:
            retries = 0
            while second != first + 1:
                os.close(first)
                retries += 1
                if retries > 10:
                    # XXX test skipped
                    self.skipTest("couldn't allocate two consecutive fds")
                first, second = second, os.dup(second)
        finally:
            os.close(second)
        # close a fd that is open, and one that isn't
        os.closerange(first, first + 2)
        self.assertRaises(OSError, os.write, first, "a")
capture.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False):
        """ save targetfd descriptor, and open a new
            temporary file there.  If no tmpfile is
            specified a tempfile.Tempfile() will be opened
            in text mode.
        """
        self.targetfd = targetfd
        if tmpfile is None and targetfd != 0:
            f = tempfile.TemporaryFile('wb+')
            tmpfile = dupfile(f, encoding="UTF-8")
            f.close()
        self.tmpfile = tmpfile
        self._savefd = os.dup(self.targetfd)
        if patchsys:
            self._oldsys = getattr(sys, patchsysdict[targetfd])
        if now:
            self.start()
capture.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def safe_text_dupfile(f, mode, default_encoding="UTF8"):
    """ return a open text file object that's a duplicate of f on the
        FD-level if possible.
    """
    encoding = getattr(f, "encoding", None)
    try:
        fd = f.fileno()
    except Exception:
        if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"):
            # we seem to have a text stream, let's just use it
            return f
    else:
        newfd = os.dup(fd)
        if "b" not in mode:
            mode += "b"
        f = os.fdopen(newfd, mode, 0)  # no buffering
    return EncodedFile(f, encoding or default_encoding)
connection.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def Pipe(duplex=True):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
            c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
            s1.close()
            s2.close()
        else:
            fd1, fd2 = os.pipe()
            c1 = _multiprocessing.Connection(fd1, writable=False)
            c2 = _multiprocessing.Connection(fd2, readable=False)

        return c1, c2
basetest.py 文件源码 项目:protoc-gen-lua-bin 作者: u0u0 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, stream, filename):
    self._stream = stream
    self._fd = stream.fileno()
    self._filename = filename

    # Keep original stream for later
    self._uncaptured_fd = os.dup(self._fd)

    # Open file to save stream to
    cap_fd = os.open(self._filename,
                     os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
                     0600)

    # Send stream to this file
    self._stream.flush()
    os.dup2(cap_fd, self._fd)
    os.close(cap_fd)
basetest.py 文件源码 项目:protoc-gen-lua-bin 作者: u0u0 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, stream, filename):
    self._stream = stream
    self._fd = stream.fileno()
    self._filename = filename

    # Keep original stream for later
    self._uncaptured_fd = os.dup(self._fd)

    # Open file to save stream to
    cap_fd = os.open(self._filename,
                     os.O_CREAT | os.O_TRUNC | os.O_WRONLY,
                     0600)

    # Send stream to this file
    self._stream.flush()
    os.dup2(cap_fd, self._fd)
    os.close(cap_fd)
test_os.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_closerange(self):
        first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
        # We must allocate two consecutive file descriptors, otherwise
        # it will mess up other file descriptors (perhaps even the three
        # standard ones).
        second = os.dup(first)
        try:
            retries = 0
            while second != first + 1:
                os.close(first)
                retries += 1
                if retries > 10:
                    # XXX test skipped
                    self.skipTest("couldn't allocate two consecutive fds")
                first, second = second, os.dup(second)
        finally:
            os.close(second)
        # close a fd that is open, and one that isn't
        os.closerange(first, first + 2)
        self.assertRaises(OSError, os.write, first, b"a")
testutils.py 文件源码 项目:karton 作者: karton 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __enter__(self):
        assert self._original_fds is None, 'You can only use this object once'

        # Save the original FDs and file objects.
        # FIXME: How about stdin?
        self._original_fds = [(fd, os.dup(fd)) for fd in (1, 2)]
        self._old_stdout = sys.stdout
        self._old_stderr = sys.stderr

        # Redirect the Python file objects.
        throwaway_fileno, self._redirected_path = tempfile.mkstemp()
        # We need to reopen the file or Python and non-Python will have two different
        # positions in the file (and we can't build a file object from a fileno).
        os.close(throwaway_fileno)
        self._redirected_file = open(self._redirected_path, 'w')
        sys.stdout = self._redirected_file
        sys.stderr = self._redirected_file

        # Redirect the standard FDs.
        os.dup2(self._redirected_file.fileno(), 1)
        os.dup2(self._redirected_file.fileno(), 2)

        return self
capture.py 文件源码 项目:godot-python 作者: touilleMan 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False):
        """ save targetfd descriptor, and open a new
            temporary file there.  If no tmpfile is
            specified a tempfile.Tempfile() will be opened
            in text mode.
        """
        self.targetfd = targetfd
        if tmpfile is None and targetfd != 0:
            f = tempfile.TemporaryFile('wb+')
            tmpfile = dupfile(f, encoding="UTF-8")
            f.close()
        self.tmpfile = tmpfile
        self._savefd = os.dup(self.targetfd)
        if patchsys:
            self._oldsys = getattr(sys, patchsysdict[targetfd])
        if now:
            self.start()
capture.py 文件源码 项目:godot-python 作者: touilleMan 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def safe_text_dupfile(f, mode, default_encoding="UTF8"):
    """ return a open text file object that's a duplicate of f on the
        FD-level if possible.
    """
    encoding = getattr(f, "encoding", None)
    try:
        fd = f.fileno()
    except Exception:
        if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"):
            # we seem to have a text stream, let's just use it
            return f
    else:
        newfd = os.dup(fd)
        if "b" not in mode:
            mode += "b"
        f = os.fdopen(newfd, mode, 0)  # no buffering
    return EncodedFile(f, encoding or default_encoding)
capture.py 文件源码 项目:godot-python 作者: touilleMan 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, targetfd, tmpfile=None, now=True, patchsys=False):
        """ save targetfd descriptor, and open a new
            temporary file there.  If no tmpfile is
            specified a tempfile.Tempfile() will be opened
            in text mode.
        """
        self.targetfd = targetfd
        if tmpfile is None and targetfd != 0:
            f = tempfile.TemporaryFile('wb+')
            tmpfile = dupfile(f, encoding="UTF-8")
            f.close()
        self.tmpfile = tmpfile
        self._savefd = os.dup(self.targetfd)
        if patchsys:
            self._oldsys = getattr(sys, patchsysdict[targetfd])
        if now:
            self.start()
capture.py 文件源码 项目:godot-python 作者: touilleMan 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def safe_text_dupfile(f, mode, default_encoding="UTF8"):
    """ return a open text file object that's a duplicate of f on the
        FD-level if possible.
    """
    encoding = getattr(f, "encoding", None)
    try:
        fd = f.fileno()
    except Exception:
        if "b" not in getattr(f, "mode", "") and hasattr(f, "encoding"):
            # we seem to have a text stream, let's just use it
            return f
    else:
        newfd = os.dup(fd)
        if "b" not in mode:
            mode += "b"
        f = os.fdopen(newfd, mode, 0)  # no buffering
    return EncodedFile(f, encoding or default_encoding)
silence.py 文件源码 项目:smt 作者: SMTorg 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __enter__(self):
        self.sys = sys
        # save previous stdout/stderr
        self.saved_streams = saved_streams = sys.__stdout__, sys.__stderr__
        self.fds = fds = [s.fileno() for s in saved_streams]
        self.saved_fds = map(os.dup, fds)
        # flush any pending output
        for s in saved_streams: s.flush()

        # open surrogate files
        if self.combine:
            null_streams = [open(self.outfiles[0], self.mode, 0)] * 2
            if self.outfiles[0] != os.devnull:
                # disable buffering so output is merged immediately
                sys.stdout, sys.stderr = map(os.fdopen, fds, ['w']*2, [0]*2)
        else: null_streams = [open(f, self.mode, 0) for f in self.outfiles]
        self.null_fds = null_fds = [s.fileno() for s in null_streams]
        self.null_streams = null_streams

        # overwrite file objects and low-level file descriptors
        map(os.dup2, null_fds, fds)


问题


面经


文章

微信
公众号

扫码关注公众号