python类set_blocking()的实例源码

subprocess.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, args, **kwargs):
        if 'universal_newlines' in kwargs:
            raise RuntimeError('universal_newlines argument not supported')

        # If stdin has been given and it's set to a curio FileStream object,
        # then we need to flip it to blocking.
        if 'stdin' in kwargs:
            stdin = kwargs['stdin']
            if isinstance(stdin, FileStream):
                # At hell's heart I stab thy coroutine attempting to read from a stream
                # that's been used as a pipe input to a subprocess.  Must set back to
                # blocking or all hell breaks loose in the child.
                os.set_blocking(stdin.fileno(), True)

        self._popen = subprocess.Popen(args, **kwargs)

        if self._popen.stdin:
            self.stdin = FileStream(self._popen.stdin)
        if self._popen.stdout:
            self.stdout = FileStream(self._popen.stdout)
        if self._popen.stderr:
            self.stderr = FileStream(self._popen.stderr)
unix_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _set_nonblocking(fd):
        os.set_blocking(fd, False)
reopen-pipe.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def check_reopen(r1, w):
    try:
        print("Reopening read end")
        r2 = os.open("/proc/self/fd/{}".format(r1), os.O_RDONLY)

        print("r1 is {}, r2 is {}".format(r1, r2))

        print("checking they both can receive from w...")

        os.write(w, b"a")
        assert os.read(r1, 1) == b"a"

        os.write(w, b"b")
        assert os.read(r2, 1) == b"b"

        print("...ok")

        print("setting r2 to non-blocking")
        os.set_blocking(r2, False)

        print("os.get_blocking(r1) ==", os.get_blocking(r1))
        print("os.get_blocking(r2) ==", os.get_blocking(r2))

        # Check r2 is really truly non-blocking
        try:
            os.read(r2, 1)
        except BlockingIOError:
            print("r2 definitely seems to be in non-blocking mode")

        # Check that r1 is really truly still in blocking mode
        def sleep_then_write():
            time.sleep(1)
            os.write(w, b"c")
        threading.Thread(target=sleep_then_write, daemon=True).start()
        assert os.read(r1, 1) == b"c"
        print("r1 definitely seems to be in blocking mode")
    except Exception as exc:
        print("ERROR: {!r}".format(exc))
unix_events.py 文件源码 项目:golightan 作者: shirou 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _set_nonblocking(fd):
        os.set_blocking(fd, False)
unix_events.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def _set_nonblocking(fd):
        os.set_blocking(fd, False)
asyncore.py 文件源码 项目:sdk-samples 作者: cradlepoint 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, fd, map=None):
            dispatcher.__init__(self, None, map)
            self.connected = True
            try:
                fd = fd.fileno()
            except AttributeError:
                pass
            self.set_file(fd)
            # set it to non-blocking mode
            os.set_blocking(fd, False)
io.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, fileobj):
        assert not isinstance(fileobj, io.TextIOBase), 'Only binary mode files allowed'
        super().__init__(fileobj)
        os.set_blocking(int(self._fileno), False)

        # Common bound methods
        self._file_read = fileobj.read
        self._file_write = fileobj.write
io.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 44 收藏 0 点赞 0 评论 0
def blocking(self):
        '''
        Allow temporary access to the underlying file in blocking mode
        '''
        if self._buffer:
            raise IOError('There is unread buffered data.')
        try:
            os.set_blocking(int(self._fileno), True)
            yield self._file
        finally:
            os.set_blocking(int(self._fileno), False)
unix_events.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _set_nonblocking(fd):
        os.set_blocking(fd, False)


问题


面经


文章

微信
公众号

扫码关注公众号