python类EIO的实例源码

unix_events.py 文件源码 项目:annotated-py-asyncio 作者: hhstore 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _fatal_error(self, exc, message='Fatal error on pipe transport'):
        # should be called by exception handler only
        if (isinstance(exc, OSError) and exc.errno == errno.EIO):
            if self._loop.get_debug():
                logger.debug("%r: %s", self, message, exc_info=True)
        else:
            self._loop.call_exception_handler({
                'message': message,
                'exception': exc,
                'transport': self,
                'protocol': self._protocol,
            })
        self._close(exc)
pty_spawn.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __interact_copy(self, escape_character=None,
            input_filter=None, output_filter=None):

        '''This is used by the interact() method.
        '''

        while self.isalive():
            r, w, e = select_ignore_interrupts([self.child_fd, self.STDIN_FILENO], [], [])
            if self.child_fd in r:
                try:
                    data = self.__interact_read(self.child_fd)
                except OSError as err:
                    if err.args[0] == errno.EIO:
                        # Linux-style EOF
                        break
                    raise
                if data == b'':
                    # BSD-style EOF
                    break
                if output_filter:
                    data = output_filter(data)
                self._log(data, 'read')
                os.write(self.STDOUT_FILENO, data)
            if self.STDIN_FILENO in r:
                data = self.__interact_read(self.STDIN_FILENO)
                if input_filter:
                    data = input_filter(data)
                i = -1
                if escape_character is not None:
                    i = data.rfind(escape_character)
                if i != -1:
                    data = data[:i]
                    if data:
                        self._log(data, 'send')
                    self.__interact_writen(self.child_fd, data)
                    break
                self._log(data, 'send')
                self.__interact_writen(self.child_fd, data)
async.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def connection_lost(self, exc):
        if isinstance(exc, OSError) and exc.errno == errno.EIO:
            # We may get here without eof_received being called, e.g on Linux
            self.eof_received()
        elif exc is not None:
            self.error(exc)
ptyprocess.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def read(self, size=1024):
        """Read and return at most ``size`` bytes from the pty.

        Can block if there is nothing to read. Raises :exc:`EOFError` if the
        terminal was closed.

        Unlike Pexpect's ``read_nonblocking`` method, this doesn't try to deal
        with the vagaries of EOF on platforms that do strange things, like IRIX
        or older Solaris systems. It handles the errno=EIO pattern used on
        Linux, and the empty-string return used on BSD platforms and (seemingly)
        on recent Solaris.
        """
        try:
            s = self.fileobj.read(size)
        except (OSError, IOError) as err:
            if err.args[0] == errno.EIO:
                # Linux-style EOF
                self.flag_eof = True
                raise EOFError('End Of File (EOF). Exception style platform.')
            raise
        if s == b'':
            # BSD-style EOF (also appears to work on recent Solaris (OpenIndiana))
            self.flag_eof = True
            raise EOFError('End Of File (EOF). Empty string style platform.')

        return s
util_test.py 文件源码 项目:TCP-IP 作者: JackZ0 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def test_failure(self, mock_fdopen):
        err = OSError("whoops")
        err.errno = errno.EIO
        mock_fdopen.side_effect = err
        self.assertRaises(OSError, self._call, "wow")
util_test.py 文件源码 项目:TCP-IP 作者: JackZ0 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_subsequent_failure(self, mock_fdopen):
        self._call("wow")
        err = OSError("whoops")
        err.errno = errno.EIO
        mock_fdopen.side_effect = err
        self.assertRaises(OSError, self._call, "wow")
client.py 文件源码 项目:micropayment-storage 作者: mitchoneill 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def access(self, path, mode):
        with self._lock:
            r = requests.head(self._url + path)
        if r.status_code == 404:
            raise FuseOSError(errno.ENOENT)
        elif r.status_code != 200:
            raise FuseOSError(errno.EIO)

        return 0
client.py 文件源码 项目:micropayment-storage 作者: mitchoneill 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def getattr(self, path, fh):
        with self._lock:
            r = requests.get(self._url + path, params={'op': 'getattr'})
        if r.status_code == 404:
            raise FuseOSError(errno.ENOENT)
        elif r.status_code != 200:
            raise FuseOSError(errno.EIO)

        return r.json()
client.py 文件源码 项目:micropayment-storage 作者: mitchoneill 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def readdir(self, path, fh):
        with self._lock:
            r = requests.get(self._url + path, params={'op': 'readdir'})
        if r.status_code == 404:
            raise FuseOSError(errno.ENOENT)
        elif r.status_code != 200:
            raise FuseOSError(errno.EIO)

        return r.json()['files']
client.py 文件源码 项目:micropayment-storage 作者: mitchoneill 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def read(self, path, size, offset, fh):
        with self._lock:
            r = requests.get(self._url + path, params={'op': 'read', 'size': size, 'offset': offset})
        if r.status_code == 404:
            raise FuseOSError(errno.ENOENT)
        elif r.status_code != 200:
            raise FuseOSError(errno.EIO)

        return base64.b64decode(r.json()['data'])
client.py 文件源码 项目:micropayment-storage 作者: mitchoneill 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def create(self, path, mode):
        with self._lock:
            r = requests.post(self._url + path, params={'op': 'create'})
        if r.status_code != 200:
            raise FuseOSError(errno.EIO)

        return 0
client.py 文件源码 项目:micropayment-storage 作者: mitchoneill 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def write(self, path, data, offset, fh):
        with self._lock:
            r = requests.put(self._url + path, json={'data': base64.b64encode(data).decode(), 'offset': offset})
        if r.status_code == 404:
            raise FuseOSError(errno.ENOENT)
        elif r.status_code != 200:
            raise FuseOSError(errno.EIO)

        return r.json()['count']
client.py 文件源码 项目:micropayment-storage 作者: mitchoneill 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def unlink(self, path):
        with self._lock:
            r = requests.delete(self._url + path, params={'op': 'unlink'})
        if r.status_code == 404:
            raise FuseOSError(errno.ENOENT)
        elif r.status_code != 200:
            raise FuseOSError(errno.EIO)
client.py 文件源码 项目:micropayment-storage 作者: mitchoneill 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def rmdir(self, path):
        with self._lock:
            r = requests.delete(self._url + path, params={'op': 'rmdir'})
        if r.status_code == 404:
            raise FuseOSError(errno.ENOENT)
        elif r.status_code != 200:
            raise FuseOSError(errno.EIO)
core.py 文件源码 项目:mitogen 作者: dw 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def io_op(func, *args):
    try:
        return func(*args), False
    except OSError, e:
        IOLOG.debug('io_op(%r) -> OSError: %s', func, e)
        if e.errno not in (errno.EIO, errno.ECONNRESET, errno.EPIPE):
            raise
        return None, True
_win.py 文件源码 项目:azure-event-hubs-python 作者: Azure 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def open(cls, owner=None):
        port = -1
        listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        for i in range(25672, 35672):
            port = i
            try:
                listener.bind(("127.0.0.1", port))
                break
            except socket.error as err:
                if err.errno != errno.EADDRINUSE:
                    log.error("%s: pipe socket bind failed %s", owner, err)
                    raise
        listener.listen(1)
        client = None
        server = None
        try:
            client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            client.setblocking(False)
            client.connect_ex(("127.0.0.1", port))
            server, address = listener.accept()
            log.info("%s: pipe accepted socket from %s", owner, address)
            client.setblocking(True)
            code = generate_uuid().bytes
            client.sendall(code)
            code2 = Pipe._recvall(server, len(code))
            if code != code2:
                raise IOError(errno.EIO, "Pipe handshake failed")

            pipe = Pipe()
            pipe.sink = client
            pipe.source = server
            return pipe
        except:
            if client:
                client.close()
            if server:
                server.close()
            raise
        finally:
            listener.close()
base_clf_pn53x.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_command_with_rsp_inout_error(self, chipset):
        cmd = HEX('0000ff 05fb d4 00 313233 96 00')
        rsp = IOError(errno.EIO, os.strerror(errno.EIO))
        chipset.transport.read.side_effect = [ACK(), rsp]
        with pytest.raises(IOError) as excinfo:
            chipset.command(0, b'123', 1.0)
        assert excinfo.value.errno == errno.EIO
        assert chipset.transport.read.mock_calls == [call(100), call(1000)]
        assert chipset.transport.write.mock_calls == [call(cmd)]
base_clf_pn53x.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_command_std_frame_length_check_error(self, chipset):
        cmd = HEX('0000ff 05fb d4 00 313233 96 00')
        rsp = HEX('0000ff 04fb d5 01 343536 8b 00')
        chipset.transport.read.side_effect = [ACK(), rsp]
        with pytest.raises(IOError) as excinfo:
            chipset.command(0, b'123', 1.0)
        assert excinfo.value.errno == errno.EIO
        assert chipset.transport.read.mock_calls == [call(100), call(1000)]
        assert chipset.transport.write.mock_calls == [call(cmd)]
base_clf_pn53x.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_command_std_frame_length_value_error(self, chipset):
        cmd = HEX('0000ff 05fb d4 00 313233 96 00')
        rsp = HEX('0000ff 05fb d5 01 3435 8b 00')
        chipset.transport.read.side_effect = [ACK(), rsp]
        with pytest.raises(IOError) as excinfo:
            chipset.command(0, b'123', 1.0)
        assert excinfo.value.errno == errno.EIO
        assert chipset.transport.read.mock_calls == [call(100), call(1000)]
        assert chipset.transport.write.mock_calls == [call(cmd)]
base_clf_pn53x.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_command_ext_frame_length_check_error(self, chipset):
        if chipset.host_command_frame_max_size >= 256:
            cmd_data = b'123' + bytearray(256)
            rsp_data = b'456' + bytearray(256)
            cmd = HEX('0000ffffff 0105fa d400') + cmd_data + HEX('9600')
            rsp = HEX('0000ffffff 0104fa d501') + rsp_data + HEX('8b00')
            chipset.transport.read.side_effect = [ACK(), rsp]
            with pytest.raises(IOError) as excinfo:
                chipset.command(0, cmd_data, 1.0)
            assert excinfo.value.errno == errno.EIO
            assert chipset.transport.read.mock_calls == [call(100), call(1000)]
            assert chipset.transport.write.mock_calls == [call(cmd)]


问题


面经


文章

微信
公众号

扫码关注公众号