python类dup()的实例源码

py2.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def __init__(self, f, mode='r', bufsize=-1):
        if not isinstance(f, six.string_types + (int, file)):
            raise TypeError('f(ile) should be int, str, unicode or file, not %r' % f)

        if isinstance(f, six.string_types):
            f = open(f, mode, 0)

        if isinstance(f, int):
            fileno = f
            self._name = "<fd:%d>" % fileno
        else:
            fileno = os.dup(f.fileno())
            self._name = f.name
            if f.mode != mode:
                raise ValueError('file.mode %r does not match mode parameter %r' % (f.mode, mode))
            self._name = f.name
            f.close()

        super(GreenPipe, self).__init__(_SocketDuckForFd(fileno), mode, bufsize)
        set_nonblocking(self)
        self.softspace = 0
py3.py 文件源码 项目:remoteControlPPT 作者: htwenning 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, name, mode='r', closefd=True, opener=None):
        if isinstance(name, int):
            fileno = name
            self._name = "<fd:%d>" % fileno
        else:
            assert isinstance(name, six.string_types)
            with open(name, mode) as fd:
                self._name = fd.name
                fileno = _original_os.dup(fd.fileno())

        notify_opened(fileno)
        self._fileno = fileno
        self._mode = mode
        self._closed = False
        set_nonblocking(self)
        self._seekable = None
test_os.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_closerange(self):
        first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
        # We must allocate two consecutive file descriptors, otherwise
        # it will mess up other file descriptors (perhaps even the three
        # standard ones).
        second = os.dup(first)
        try:
            retries = 0
            while second != first + 1:
                os.close(first)
                retries += 1
                if retries > 10:
                    # XXX test skipped
                    self.skipTest("couldn't allocate two consecutive fds")
                first, second = second, os.dup(second)
        finally:
            os.close(second)
        # close a fd that is open, and one that isn't
        os.closerange(first, first + 2)
        self.assertRaises(OSError, os.write, first, "a")
test_os.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_closerange(self):
        first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
        # We must allocate two consecutive file descriptors, otherwise
        # it will mess up other file descriptors (perhaps even the three
        # standard ones).
        second = os.dup(first)
        try:
            retries = 0
            while second != first + 1:
                os.close(first)
                retries += 1
                if retries > 10:
                    # XXX test skipped
                    self.skipTest("couldn't allocate two consecutive fds")
                first, second = second, os.dup(second)
        finally:
            os.close(second)
        # close a fd that is open, and one that isn't
        os.closerange(first, first + 2)
        self.assertRaises(OSError, os.write, first, b"a")
test_subprocess.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_small_errpipe_write_fd(self):
        """Issue #15798: Popen should work when stdio fds are available."""
        new_stdin = os.dup(0)
        new_stdout = os.dup(1)
        try:
            os.close(0)
            os.close(1)

            # Side test: if errpipe_write fails to have its CLOEXEC
            # flag set this should cause the parent to think the exec
            # failed.  Extremely unlikely: everyone supports CLOEXEC.
            subprocess.Popen([
                    sys.executable, "-c",
                    "print('AssertionError:0:CLOEXEC failure.')"]).wait()
        finally:
            # Restore original stdin and stdout
            os.dup2(new_stdin, 0)
            os.dup2(new_stdout, 1)
            os.close(new_stdin)
            os.close(new_stdout)
test_subprocess.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_close_fds_after_preexec(self):
        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")

        # this FD is used as dup2() target by preexec_fn, and should be closed
        # in the child process
        fd = os.dup(1)
        self.addCleanup(os.close, fd)

        p = subprocess.Popen([sys.executable, fd_status],
                             stdout=subprocess.PIPE, close_fds=True,
                             preexec_fn=lambda: os.dup2(1, fd))
        output, ignored = p.communicate()

        remaining_fds = set(map(int, output.split(b',')))

        self.assertNotIn(fd, remaining_fds)
_test_multiprocessing.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_high_socket_fd(self):
        if WIN32:
            # The child process will not have any socket handles, so
            # calling socket.fromfd() should produce WSAENOTSOCK even
            # if there is a handle of the same number.
            return socket.socket().detach()
        else:
            # We want to produce a socket with an fd high enough that a
            # freshly created child process will not have any fds as high.
            fd = socket.socket().detach()
            to_close = []
            while fd < 50:
                to_close.append(fd)
                fd = os.dup(fd)
            for x in to_close:
                os.close(x)
            return fd
connection.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def Pipe(duplex=True):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
            c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
            s1.close()
            s2.close()
        else:
            fd1, fd2 = os.pipe()
            c1 = _multiprocessing.Connection(fd1, writable=False)
            c2 = _multiprocessing.Connection(fd2, readable=False)

        return c1, c2
test_os.py 文件源码 项目:ndk-python 作者: gittor 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_closerange(self):
        first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
        # We must allocate two consecutive file descriptors, otherwise
        # it will mess up other file descriptors (perhaps even the three
        # standard ones).
        second = os.dup(first)
        try:
            retries = 0
            while second != first + 1:
                os.close(first)
                retries += 1
                if retries > 10:
                    # XXX test skipped
                    self.skipTest("couldn't allocate two consecutive fds")
                first, second = second, os.dup(second)
        finally:
            os.close(second)
        # close a fd that is open, and one that isn't
        os.closerange(first, first + 2)
        self.assertRaises(OSError, os.write, first, "a")
stream.py 文件源码 项目:kafe 作者: dsavoiu 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def redirect_stdout_to(stream_with_fd):
    if stream_with_fd is None:
        # no redirect when stream is 'None'
        yield
    else:
        # do not redirect, if streams do not have file descriptors!
        try:
            _sys_stdout_fileno = sys.stdout.fileno()
            _out_stream_fileno = stream_with_fd.fileno()
        except:
            yield
        else:
            # save the old stdout stream
            old_out_stream = os.dup(sys.stdout.fileno())
            os.dup2(stream_with_fd.fileno(), sys.stdout.fileno())

            # if all OK, yield back to the caller, catching exceptions
            try:
                yield
            finally:
                # restore the previous output stream
                os.dup2(old_out_stream, sys.stdout.fileno())
                os.close(old_out_stream)
weakaudio.py 文件源码 项目:weakmon 作者: rtmrtmrtmrtm 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def pya():
    global global_pya
    import pyaudio
    if global_pya == None:
        # suppress Jack and ALSA error messages on Linux.
        nullfd = os.open("/dev/null", 1)
        oerr = os.dup(2)
        os.dup2(nullfd, 2)

        global_pya = pyaudio.PyAudio()

        os.dup2(oerr, 2)
        os.close(oerr)
        os.close(nullfd)
    return global_pya

# find the lowest supported input rate >= rate.
# needed on Linux but not the Mac (which converts as needed).
__init__.py 文件源码 项目:docker-network-capture 作者: shaded-enmity 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def _run_pcap_parser(self, rx_tcpdump, interface):
        rx, tx = pipe()
        # dupe pids for the child process
        tcpdump, out = dup(rx_tcpdump), dup(tx)
        close(tx)
        close(rx_tcpdump)
        pid = fork()
        if pid:
            close(out)
            close(tcpdump)
            self.child_parser = pid
            self.rx = rx
        else:
            # get rid of unnecessary privileges
            drop_privileges()
            # start parsing packets
            start_pcap_parser(tcpdump, out, interface)
connection.py 文件源码 项目:empyrion-python-api 作者: huhlig 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def Pipe(duplex=True):
        '''
        Returns pair of connection objects at either end of a pipe
        '''
        if duplex:
            s1, s2 = socket.socketpair()
            s1.setblocking(True)
            s2.setblocking(True)
            c1 = _multiprocessing.Connection(os.dup(s1.fileno()))
            c2 = _multiprocessing.Connection(os.dup(s2.fileno()))
            s1.close()
            s2.close()
        else:
            fd1, fd2 = os.pipe()
            c1 = _multiprocessing.Connection(fd1, writable=False)
            c2 = _multiprocessing.Connection(fd2, readable=False)

        return c1, c2
build_datasets.py 文件源码 项目:TrackML 作者: tboser 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def generateHits(self, cont):
        """ generates the hits.csv file """
        hitf = self.outdir + "/hits.csv"
        old = os.dup(1)
        sys.stdout.flush()
        os.close(1)
        os.open(hitf, os.O_WRONLY | os.O_CREAT)
        cont.printallHits()
        sys.stdout.flush()
        os.close(1)
        os.dup(old)
        os.close(old)

        lines = open(hitf).readlines()
        random.shuffle(lines)
        open(hitf, 'w').writelines(lines)
buffer_manager.py 文件源码 项目:ci_edit 作者: google 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def readStdin(self):
    app.log.info('reading from stdin')
    # Create a new input stream for the file data.
    # Fd is short for file descriptor. os.dup and os.dup2 will duplicate file
    # descriptors.
    stdinFd = sys.stdin.fileno()
    newFd = os.dup(stdinFd)
    newStdin = open("/dev/tty")
    os.dup2(newStdin.fileno(), stdinFd)
    # Create a text buffer to read from alternate stream.
    textBuffer = self.newTextBuffer()
    try:
      with io.open(newFd, "r") as fileInput:
        textBuffer.fileFilter(fileInput.read())
    except Exception as e:
      app.log.exception(e)
    app.log.info('finished reading from stdin')
    return textBuffer
test_os.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_closerange(self):
        first = os.open(support.TESTFN, os.O_CREAT|os.O_RDWR)
        # We must allocate two consecutive file descriptors, otherwise
        # it will mess up other file descriptors (perhaps even the three
        # standard ones).
        second = os.dup(first)
        try:
            retries = 0
            while second != first + 1:
                os.close(first)
                retries += 1
                if retries > 10:
                    # XXX test skipped
                    self.skipTest("couldn't allocate two consecutive fds")
                first, second = second, os.dup(second)
        finally:
            os.close(second)
        # close a fd that is open, and one that isn't
        os.closerange(first, first + 2)
        self.assertRaises(OSError, os.write, first, b"a")
test_subprocess.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_small_errpipe_write_fd(self):
        """Issue #15798: Popen should work when stdio fds are available."""
        new_stdin = os.dup(0)
        new_stdout = os.dup(1)
        try:
            os.close(0)
            os.close(1)

            # Side test: if errpipe_write fails to have its CLOEXEC
            # flag set this should cause the parent to think the exec
            # failed.  Extremely unlikely: everyone supports CLOEXEC.
            subprocess.Popen([
                    sys.executable, "-c",
                    "print('AssertionError:0:CLOEXEC failure.')"]).wait()
        finally:
            # Restore original stdin and stdout
            os.dup2(new_stdin, 0)
            os.dup2(new_stdout, 1)
            os.close(new_stdin)
            os.close(new_stdout)
test_subprocess.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_close_fds_after_preexec(self):
        fd_status = support.findfile("fd_status.py", subdir="subprocessdata")

        # this FD is used as dup2() target by preexec_fn, and should be closed
        # in the child process
        fd = os.dup(1)
        self.addCleanup(os.close, fd)

        p = subprocess.Popen([sys.executable, fd_status],
                             stdout=subprocess.PIPE, close_fds=True,
                             preexec_fn=lambda: os.dup2(1, fd))
        output, ignored = p.communicate()

        remaining_fds = set(map(int, output.split(b',')))

        self.assertNotIn(fd, remaining_fds)
_test_multiprocessing.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def get_high_socket_fd(self):
        if WIN32:
            # The child process will not have any socket handles, so
            # calling socket.fromfd() should produce WSAENOTSOCK even
            # if there is a handle of the same number.
            return socket.socket().detach()
        else:
            # We want to produce a socket with an fd high enough that a
            # freshly created child process will not have any fds as high.
            fd = socket.socket().detach()
            to_close = []
            while fd < 50:
                to_close.append(fd)
                fd = os.dup(fd)
            for x in to_close:
                os.close(x)
            return fd
helpers.py 文件源码 项目:shakedown 作者: dcos 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def stdchannel_redirected(stdchannel, dest_filename):
    """ A context manager to temporarily redirect stdout or stderr

        :param stdchannel: the channel to redirect
        :type stdchannel: sys.std
        :param dest_filename: the filename to redirect output to
        :type dest_filename: os.devnull

        :return: nothing
        :rtype: None
    """

    try:
        oldstdchannel = os.dup(stdchannel.fileno())
        dest_file = open(dest_filename, 'w')
        os.dup2(dest_file.fileno(), stdchannel.fileno())

        yield
    finally:
        if oldstdchannel is not None:
            os.dup2(oldstdchannel, stdchannel.fileno())
        if dest_file is not None:
            dest_file.close()


问题


面经


文章

微信
公众号

扫码关注公众号