python类RawIOBase()的实例源码

test_unix_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def setUp(self):
        self.loop = self.new_test_loop()
        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
        self.pipe = mock.Mock(spec_set=io.RawIOBase)
        self.pipe.fileno.return_value = 5

        blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
        blocking_patcher.start()
        self.addCleanup(blocking_patcher.stop)

        fstat_patcher = mock.patch('os.fstat')
        m_fstat = fstat_patcher.start()
        st = mock.Mock()
        st.st_mode = stat.S_IFIFO
        m_fstat.return_value = st
        self.addCleanup(fstat_patcher.stop)
test_unix_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setUp(self):
        self.loop = self.new_test_loop()
        self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
        self.pipe = mock.Mock(spec_set=io.RawIOBase)
        self.pipe.fileno.return_value = 5

        blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
        blocking_patcher.start()
        self.addCleanup(blocking_patcher.stop)

        fstat_patcher = mock.patch('os.fstat')
        m_fstat = fstat_patcher.start()
        st = mock.Mock()
        st.st_mode = stat.S_IFSOCK
        m_fstat.return_value = st
        self.addCleanup(fstat_patcher.stop)
ymodem.py 文件源码 项目:device-updater 作者: spark 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def wait_until_ready(self, channel:RawIOBase, timeout=60):
        """
        sends ' ' (space) and waits for the corresponding ACK message. Once we have 3 of these in a row we can be fairly
        certain the device is ready for ymodem.
        :param channel:
        :param timeout:
        :return:
        """
        success_count = 0
        while channel.readline():  # flush any existing data
            success_count = 0

        while success_count < 2:
            channel.write(b' ')
            result = channel.read()
            if result and result[0]==LightYModemProtocol.ack:
                success_count += 1
_fileobjectposix.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, fileno, mode='r', closefd=True):
        RawIOBase.__init__(self) # Python 2: pylint:disable=no-member,non-parent-init-called
        self._closed = False
        self._closefd = closefd
        self._fileno = fileno
        make_nonblocking(fileno)
        self._readable = 'r' in mode
        self._writable = 'w' in mode
        self.hub = get_hub()

        io_watcher = self.hub.loop.io
        if self._readable:
            self._read_event = io_watcher(fileno, 1)

        if self._writable:
            self._write_event = io_watcher(fileno, 2)

        self._seekable = None
_fileobjectposix.py 文件源码 项目:RealtimePythonChat 作者: quangtqag 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def close(self):
        if self._closed:
            return
        self.flush()
        self._closed = True
        if self._readable:
            self.hub.cancel_wait(self._read_event, cancel_wait_ex)
        if self._writable:
            self.hub.cancel_wait(self._write_event, cancel_wait_ex)
        fileno = self._fileno
        if self._closefd:
            self._fileno = None
            os.close(fileno)

    # RawIOBase provides a 'read' method that will call readall() if
    # the `size` was missing or -1 and otherwise call readinto(). We
    # want to take advantage of this to avoid single byte reads when
    # possible. This is highlighted by a bug in BufferedIOReader that
    # calls read() in a loop when its readall() method is invoked;
    # this was fixed in Python 3.3. See
    # https://github.com/gevent/gevent/issues/675)
test_unix_events.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def setUp(self):
        self.loop = self.new_test_loop()
        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
        self.pipe = mock.Mock(spec_set=io.RawIOBase)
        self.pipe.fileno.return_value = 5

        blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
        blocking_patcher.start()
        self.addCleanup(blocking_patcher.stop)

        fstat_patcher = mock.patch('os.fstat')
        m_fstat = fstat_patcher.start()
        st = mock.Mock()
        st.st_mode = stat.S_IFIFO
        m_fstat.return_value = st
        self.addCleanup(fstat_patcher.stop)
test_unix_events.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def setUp(self):
        self.loop = self.new_test_loop()
        self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
        self.pipe = mock.Mock(spec_set=io.RawIOBase)
        self.pipe.fileno.return_value = 5

        blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
        blocking_patcher.start()
        self.addCleanup(blocking_patcher.stop)

        fstat_patcher = mock.patch('os.fstat')
        m_fstat = fstat_patcher.start()
        st = mock.Mock()
        st.st_mode = stat.S_IFSOCK
        m_fstat.return_value = st
        self.addCleanup(fstat_patcher.stop)
_fileobjectposix.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, fileno, mode='r', closefd=True):
        RawIOBase.__init__(self) # Python 2: pylint:disable=no-member,non-parent-init-called
        self._closed = False
        self._closefd = closefd
        self._fileno = fileno
        make_nonblocking(fileno)
        self._readable = 'r' in mode
        self._writable = 'w' in mode
        self.hub = get_hub()

        io_watcher = self.hub.loop.io
        if self._readable:
            self._read_event = io_watcher(fileno, 1)

        if self._writable:
            self._write_event = io_watcher(fileno, 2)

        self._seekable = None
_fileobjectposix.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def close(self):
        if self._closed:
            return
        self.flush()
        self._closed = True
        if self._readable:
            self.hub.cancel_wait(self._read_event, cancel_wait_ex)
        if self._writable:
            self.hub.cancel_wait(self._write_event, cancel_wait_ex)
        fileno = self._fileno
        if self._closefd:
            self._fileno = None
            os.close(fileno)

    # RawIOBase provides a 'read' method that will call readall() if
    # the `size` was missing or -1 and otherwise call readinto(). We
    # want to take advantage of this to avoid single byte reads when
    # possible. This is highlighted by a bug in BufferedIOReader that
    # calls read() in a loop when its readall() method is invoked;
    # this was fixed in Python 3.3. See
    # https://github.com/gevent/gevent/issues/675)
test_unix_events.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setUp(self):
        self.loop = self.new_test_loop()
        self.protocol = test_utils.make_test_protocol(asyncio.Protocol)
        self.pipe = mock.Mock(spec_set=io.RawIOBase)
        self.pipe.fileno.return_value = 5

        blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
        blocking_patcher.start()
        self.addCleanup(blocking_patcher.stop)

        fstat_patcher = mock.patch('os.fstat')
        m_fstat = fstat_patcher.start()
        st = mock.Mock()
        st.st_mode = stat.S_IFIFO
        m_fstat.return_value = st
        self.addCleanup(fstat_patcher.stop)
test_unix_events.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def setUp(self):
        self.loop = self.new_test_loop()
        self.protocol = test_utils.make_test_protocol(asyncio.BaseProtocol)
        self.pipe = mock.Mock(spec_set=io.RawIOBase)
        self.pipe.fileno.return_value = 5

        blocking_patcher = mock.patch('asyncio.unix_events._set_nonblocking')
        blocking_patcher.start()
        self.addCleanup(blocking_patcher.stop)

        fstat_patcher = mock.patch('os.fstat')
        m_fstat = fstat_patcher.start()
        st = mock.Mock()
        st.st_mode = stat.S_IFSOCK
        m_fstat.return_value = st
        self.addCleanup(fstat_patcher.stop)
_winconsole.py 文件源码 项目:swjtu-pyscraper 作者: Desgard 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
saxutils.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
_winconsole.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
_winconsole.py 文件源码 项目:Sci-Finder 作者: snverse 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
saxutils.py 文件源码 项目:OSPTF 作者: xSploited 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
_socketio.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, sock, mode):
        if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
            raise ValueError("invalid mode: %r" % mode)
        io.RawIOBase.__init__(self)
        self._sock = sock
        if "b" not in mode:
            mode += "b"
        self._mode = mode
        self._reading = "r" in mode
        self._writing = "w" in mode
        self._timeout_occurred = False
_socketio.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def close(self):
        """Close the SocketIO object.  This doesn't close the underlying
        socket, except if all references to it have disappeared.
        """
        if self.closed:
            return
        io.RawIOBase.close(self)
        self._sock._decref_socketios()
        self._sock = None
_socketio.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, sock, mode):
        if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
            raise ValueError("invalid mode: %r" % mode)
        io.RawIOBase.__init__(self)
        self._sock = sock
        if "b" not in mode:
            mode += "b"
        self._mode = mode
        self._reading = "r" in mode
        self._writing = "w" in mode
        self._timeout_occurred = False
_socketio.py 文件源码 项目:touch-pay-client 作者: HackPucBemobi 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def close(self):
        """Close the SocketIO object.  This doesn't close the underlying
        socket, except if all references to it have disappeared.
        """
        if self.closed:
            return
        io.RawIOBase.close(self)
        self._sock._decref_socketios()
        self._sock = None
ymodem.py 文件源码 项目:device-updater 作者: spark 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def transfer(self, file, channel:RawIOBase, progress:ProgressSpan):
        """
        Transfers a single file to the device and closes the session, so the device places the data in the
        target location.

        file: the file to transfer via ymodem
        channel: the ymodem endpoint
        progress: notification of progress
        """
        self.seq = 0
        self.channel = channel
        self.progress = progress

        file.seek(0, os.SEEK_END)
        size = file.tell()
        file.seek(0, os.SEEK_SET)
        progress.min = 0
        progress.max = size
        progress.update(0)

        response = self.send_filename_header(self.default_file_name, size)
        while response == LightYModemClient.ack:
            response = self.send_packet(file)
        file.close()
        if response != LightYModemClient.eot:
            raise LightYModemException("Unable to transfer the file to the device. Code=%d" % response)
        self._send_close()
        return True
_winconsole.py 文件源码 项目:RPoint 作者: george17-meet 项目源码 文件源码 阅读 45 收藏 0 点赞 0 评论 0
def isatty(self):
        io.RawIOBase.isatty(self)
        return True
saxutils.py 文件源码 项目:Intranet-Penetration 作者: yuxiaokui 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
saxutils.py 文件源码 项目:MKFQ 作者: maojingios 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
rarfile.py 文件源码 项目:hakkuframework 作者: 4shadoww 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def readall(self):
        """Read all remaining data"""
        # avoid RawIOBase default impl
        return self.read()
socket.py 文件源码 项目:hakkuframework 作者: 4shadoww 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, sock, mode):
        if mode not in ("r", "w", "rw", "rb", "wb", "rwb"):
            raise ValueError("invalid mode: %r" % mode)
        io.RawIOBase.__init__(self)
        self._sock = sock
        if "b" not in mode:
            mode += "b"
        self._mode = mode
        self._reading = "r" in mode
        self._writing = "w" in mode
        self._timeout_occurred = False
socket.py 文件源码 项目:hakkuframework 作者: 4shadoww 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def close(self):
        """Close the SocketIO object.  This doesn't close the underlying
        socket, except if all references to it have disappeared.
        """
        if self.closed:
            return
        io.RawIOBase.close(self)
        self._sock._decref_socketios()
        self._sock = None
test_file_io.py 文件源码 项目:trio 作者: python-trio 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_unsupported_not_forwarded():
    class FakeFile(io.RawIOBase):
        def unsupported_attr(self):  # pragma: no cover
            pass

    async_file = trio.wrap_file(FakeFile())

    assert hasattr(async_file.wrapped, 'unsupported_attr')

    with pytest.raises(AttributeError):
        getattr(async_file, 'unsupported_attr')
saxutils.py 文件源码 项目:pupy 作者: ru-faraon 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _gettextwriter(out, encoding):
    if out is None:
        import sys
        out = sys.stdout

    if isinstance(out, io.RawIOBase):
        buffer = io.BufferedIOBase(out)
        # Keep the original file open when the TextIOWrapper is
        # destroyed
        buffer.close = lambda: None
    else:
        # This is to handle passed objects that aren't in the
        # IOBase hierarchy, but just have a write method
        buffer = io.BufferedIOBase()
        buffer.writable = lambda: True
        buffer.write = out.write
        try:
            # TextIOWrapper uses this methods to determine
            # if BOM (for UTF-16, etc) should be added
            buffer.seekable = out.seekable
            buffer.tell = out.tell
        except AttributeError:
            pass
    # wrap a binary writer with TextIOWrapper
    return _UnbufferedTextIOWrapper(buffer, encoding=encoding,
                                   errors='xmlcharrefreplace',
                                   newline='\n')
_winconsole.py 文件源码 项目:oa_qian 作者: sunqb 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def isatty(self):
        io.RawIOBase.isatty(self)
        return True


问题


面经


文章

微信
公众号

扫码关注公众号