python类open_osfhandle()的实例源码

asyncfile.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, args, stdin=None, stdout=None, stderr=None, **kwargs):
            self.stdin = self.stdout = self.stderr = None

            stdin_rh = stdin_wh = stdout_rh = stdout_wh = stderr_rh = stderr_wh = None

            if stdin == subprocess.PIPE:
                stdin_rh, stdin_wh = pipe()
                stdin_rfd = msvcrt.open_osfhandle(stdin_rh.Detach(), os.O_RDONLY)
                self.stdin_rh = stdin_rh
            else:
                stdin_rfd = stdin
                self.stdin_rh = None

            if stdout == subprocess.PIPE:
                stdout_rh, stdout_wh = pipe()
                stdout_wfd = msvcrt.open_osfhandle(stdout_wh, 0)
            else:
                stdout_wfd = stdout

            if stderr == subprocess.PIPE:
                stderr_rh, stderr_wh = pipe()
                stderr_wfd = msvcrt.open_osfhandle(stderr_wh, 0)
            elif stderr == subprocess.STDOUT:
                stderr_wfd = stdout_wfd
            else:
                stderr_wfd = stderr

            try:
                super(Popen, self).__init__(args, stdin=stdin_rfd, stdout=stdout_wfd,
                                            stderr=stderr_wfd, **kwargs)
            except:
                for handle in (stdin_rh, stdin_wh, stdout_rh, stdout_wh, stderr_rh, stderr_wh):
                    if handle is not None:
                        win32file.CloseHandle(handle)
                raise
            else:
                if stdin_wh is not None:
                    self.stdin = AsyncFile(stdin_wh, mode='w')
                if stdout_rh is not None:
                    self.stdout = AsyncFile(stdout_rh, mode='r')
                if stderr_rh is not None:
                    self.stderr = AsyncFile(stderr_rh, mode='r')
            finally:
                if stdin == subprocess.PIPE:
                    os.close(stdin_rfd)
                if stdout == subprocess.PIPE:
                    os.close(stdout_wfd)
                if stderr == subprocess.PIPE:
                    os.close(stderr_wfd)
windows_utils.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, args, stdin=None, stdout=None, stderr=None, **kwds):
        assert not kwds.get('universal_newlines')
        assert kwds.get('bufsize', 0) == 0
        stdin_rfd = stdout_wfd = stderr_wfd = None
        stdin_wh = stdout_rh = stderr_rh = None
        if stdin == PIPE:
            stdin_rh, stdin_wh = pipe(overlapped=(False, True), duplex=True)
            stdin_rfd = msvcrt.open_osfhandle(stdin_rh, os.O_RDONLY)
        else:
            stdin_rfd = stdin
        if stdout == PIPE:
            stdout_rh, stdout_wh = pipe(overlapped=(True, False))
            stdout_wfd = msvcrt.open_osfhandle(stdout_wh, 0)
        else:
            stdout_wfd = stdout
        if stderr == PIPE:
            stderr_rh, stderr_wh = pipe(overlapped=(True, False))
            stderr_wfd = msvcrt.open_osfhandle(stderr_wh, 0)
        elif stderr == STDOUT:
            stderr_wfd = stdout_wfd
        else:
            stderr_wfd = stderr
        try:
            super().__init__(args, stdin=stdin_rfd, stdout=stdout_wfd,
                             stderr=stderr_wfd, **kwds)
        except:
            for h in (stdin_wh, stdout_rh, stderr_rh):
                if h is not None:
                    _winapi.CloseHandle(h)
            raise
        else:
            if stdin_wh is not None:
                self.stdin = PipeHandle(stdin_wh)
            if stdout_rh is not None:
                self.stdout = PipeHandle(stdout_rh)
            if stderr_rh is not None:
                self.stderr = PipeHandle(stderr_rh)
        finally:
            if stdin == PIPE:
                os.close(stdin_rfd)
            if stdout == PIPE:
                os.close(stdout_wfd)
            if stderr == PIPE:
                os.close(stderr_wfd)
windows_utils.py 文件源码 项目:golightan 作者: shirou 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, args, stdin=None, stdout=None, stderr=None, **kwds):
        assert not kwds.get('universal_newlines')
        assert kwds.get('bufsize', 0) == 0
        stdin_rfd = stdout_wfd = stderr_wfd = None
        stdin_wh = stdout_rh = stderr_rh = None
        if stdin == PIPE:
            stdin_rh, stdin_wh = pipe(overlapped=(False, True), duplex=True)
            stdin_rfd = msvcrt.open_osfhandle(stdin_rh, os.O_RDONLY)
        else:
            stdin_rfd = stdin
        if stdout == PIPE:
            stdout_rh, stdout_wh = pipe(overlapped=(True, False))
            stdout_wfd = msvcrt.open_osfhandle(stdout_wh, 0)
        else:
            stdout_wfd = stdout
        if stderr == PIPE:
            stderr_rh, stderr_wh = pipe(overlapped=(True, False))
            stderr_wfd = msvcrt.open_osfhandle(stderr_wh, 0)
        elif stderr == STDOUT:
            stderr_wfd = stdout_wfd
        else:
            stderr_wfd = stderr
        try:
            super().__init__(args, stdin=stdin_rfd, stdout=stdout_wfd,
                             stderr=stderr_wfd, **kwds)
        except:
            for h in (stdin_wh, stdout_rh, stderr_rh):
                if h is not None:
                    _winapi.CloseHandle(h)
            raise
        else:
            if stdin_wh is not None:
                self.stdin = PipeHandle(stdin_wh)
            if stdout_rh is not None:
                self.stdout = PipeHandle(stdout_rh)
            if stderr_rh is not None:
                self.stderr = PipeHandle(stderr_rh)
        finally:
            if stdin == PIPE:
                os.close(stdin_rfd)
            if stdout == PIPE:
                os.close(stdout_wfd)
            if stderr == PIPE:
                os.close(stderr_wfd)
windows_utils.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, args, stdin=None, stdout=None, stderr=None, **kwds):
        assert not kwds.get('universal_newlines')
        assert kwds.get('bufsize', 0) == 0
        stdin_rfd = stdout_wfd = stderr_wfd = None
        stdin_wh = stdout_rh = stderr_rh = None
        if stdin == PIPE:
            stdin_rh, stdin_wh = pipe(overlapped=(False, True), duplex=True)
            stdin_rfd = msvcrt.open_osfhandle(stdin_rh, os.O_RDONLY)
        else:
            stdin_rfd = stdin
        if stdout == PIPE:
            stdout_rh, stdout_wh = pipe(overlapped=(True, False))
            stdout_wfd = msvcrt.open_osfhandle(stdout_wh, 0)
        else:
            stdout_wfd = stdout
        if stderr == PIPE:
            stderr_rh, stderr_wh = pipe(overlapped=(True, False))
            stderr_wfd = msvcrt.open_osfhandle(stderr_wh, 0)
        elif stderr == STDOUT:
            stderr_wfd = stdout_wfd
        else:
            stderr_wfd = stderr
        try:
            super().__init__(args, stdin=stdin_rfd, stdout=stdout_wfd,
                             stderr=stderr_wfd, **kwds)
        except:
            for h in (stdin_wh, stdout_rh, stderr_rh):
                if h is not None:
                    _winapi.CloseHandle(h)
            raise
        else:
            if stdin_wh is not None:
                self.stdin = PipeHandle(stdin_wh)
            if stdout_rh is not None:
                self.stdout = PipeHandle(stdout_rh)
            if stderr_rh is not None:
                self.stderr = PipeHandle(stderr_rh)
        finally:
            if stdin == PIPE:
                os.close(stdin_rfd)
            if stdout == PIPE:
                os.close(stdout_wfd)
            if stderr == PIPE:
                os.close(stderr_wfd)
windows_utils.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, args, stdin=None, stdout=None, stderr=None, **kwds):
        assert not kwds.get('universal_newlines')
        assert kwds.get('bufsize', 0) == 0
        stdin_rfd = stdout_wfd = stderr_wfd = None
        stdin_wh = stdout_rh = stderr_rh = None
        if stdin == PIPE:
            stdin_rh, stdin_wh = pipe(overlapped=(False, True), duplex=True)
            stdin_rfd = msvcrt.open_osfhandle(stdin_rh, os.O_RDONLY)
        else:
            stdin_rfd = stdin
        if stdout == PIPE:
            stdout_rh, stdout_wh = pipe(overlapped=(True, False))
            stdout_wfd = msvcrt.open_osfhandle(stdout_wh, 0)
        else:
            stdout_wfd = stdout
        if stderr == PIPE:
            stderr_rh, stderr_wh = pipe(overlapped=(True, False))
            stderr_wfd = msvcrt.open_osfhandle(stderr_wh, 0)
        elif stderr == STDOUT:
            stderr_wfd = stdout_wfd
        else:
            stderr_wfd = stderr
        try:
            super().__init__(args, stdin=stdin_rfd, stdout=stdout_wfd,
                             stderr=stderr_wfd, **kwds)
        except:
            for h in (stdin_wh, stdout_rh, stderr_rh):
                if h is not None:
                    _winapi.CloseHandle(h)
            raise
        else:
            if stdin_wh is not None:
                self.stdin = PipeHandle(stdin_wh)
            if stdout_rh is not None:
                self.stdout = PipeHandle(stdout_rh)
            if stderr_rh is not None:
                self.stderr = PipeHandle(stderr_rh)
        finally:
            if stdin == PIPE:
                os.close(stdin_rfd)
            if stdout == PIPE:
                os.close(stdout_wfd)
            if stderr == PIPE:
                os.close(stderr_wfd)


问题


面经


文章

微信
公众号

扫码关注公众号