python类strerror()的实例源码

timeout.py 文件源码 项目:open-nti 作者: Juniper 项目源码 文件源码 阅读 40 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator
ceph.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def make_filesystem(blk_device, fstype='ext4', timeout=10):
    """Make a new filesystem on the specified block device."""
    count = 0
    e_noent = os.errno.ENOENT
    while not os.path.exists(blk_device):
        if count >= timeout:
            log('Gave up waiting on block device %s' % blk_device,
                level=ERROR)
            raise IOError(e_noent, os.strerror(e_noent), blk_device)

        log('Waiting for block device %s to appear' % blk_device,
            level=DEBUG)
        count += 1
        time.sleep(1)
    else:
        log('Formatting block device %s as filesystem %s.' %
            (blk_device, fstype), level=INFO)
        check_call(['mkfs', '-t', fstype, blk_device])
ceph.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def make_filesystem(blk_device, fstype='ext4', timeout=10):
    """Make a new filesystem on the specified block device."""
    count = 0
    e_noent = os.errno.ENOENT
    while not os.path.exists(blk_device):
        if count >= timeout:
            log('Gave up waiting on block device %s' % blk_device,
                level=ERROR)
            raise IOError(e_noent, os.strerror(e_noent), blk_device)

        log('Waiting for block device %s to appear' % blk_device,
            level=DEBUG)
        count += 1
        time.sleep(1)
    else:
        log('Formatting block device %s as filesystem %s.' %
            (blk_device, fstype), level=INFO)
        check_call(['mkfs', '-t', fstype, blk_device])
ceph.py 文件源码 项目:charm-swift-proxy 作者: openstack 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def make_filesystem(blk_device, fstype='ext4', timeout=10):
    """Make a new filesystem on the specified block device."""
    count = 0
    e_noent = os.errno.ENOENT
    while not os.path.exists(blk_device):
        if count >= timeout:
            log('Gave up waiting on block device %s' % blk_device,
                level=ERROR)
            raise IOError(e_noent, os.strerror(e_noent), blk_device)

        log('Waiting for block device %s to appear' % blk_device,
            level=DEBUG)
        count += 1
        time.sleep(1)
    else:
        log('Formatting block device %s as filesystem %s.' %
            (blk_device, fstype), level=INFO)
        check_call(['mkfs', '-t', fstype, blk_device])
ceph.py 文件源码 项目:charm-heat 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def make_filesystem(blk_device, fstype='ext4', timeout=10):
    """Make a new filesystem on the specified block device."""
    count = 0
    e_noent = os.errno.ENOENT
    while not os.path.exists(blk_device):
        if count >= timeout:
            log('Gave up waiting on block device %s' % blk_device,
                level=ERROR)
            raise IOError(e_noent, os.strerror(e_noent), blk_device)

        log('Waiting for block device %s to appear' % blk_device,
            level=DEBUG)
        count += 1
        time.sleep(1)
    else:
        log('Formatting block device %s as filesystem %s.' %
            (blk_device, fstype), level=INFO)
        check_call(['mkfs', '-t', fstype, blk_device])
ceph.py 文件源码 项目:charm-keystone 作者: openstack 项目源码 文件源码 阅读 50 收藏 0 点赞 0 评论 0
def make_filesystem(blk_device, fstype='ext4', timeout=10):
    """Make a new filesystem on the specified block device."""
    count = 0
    e_noent = os.errno.ENOENT
    while not os.path.exists(blk_device):
        if count >= timeout:
            log('Gave up waiting on block device %s' % blk_device,
                level=ERROR)
            raise IOError(e_noent, os.strerror(e_noent), blk_device)

        log('Waiting for block device %s to appear' % blk_device,
            level=DEBUG)
        count += 1
        time.sleep(1)
    else:
        log('Formatting block device %s as filesystem %s.' %
            (blk_device, fstype), level=INFO)
        check_call(['mkfs', '-t', fstype, blk_device])
ceph.py 文件源码 项目:charm-keystone 作者: openstack 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def make_filesystem(blk_device, fstype='ext4', timeout=10):
    """Make a new filesystem on the specified block device."""
    count = 0
    e_noent = os.errno.ENOENT
    while not os.path.exists(blk_device):
        if count >= timeout:
            log('Gave up waiting on block device %s' % blk_device,
                level=ERROR)
            raise IOError(e_noent, os.strerror(e_noent), blk_device)

        log('Waiting for block device %s to appear' % blk_device,
            level=DEBUG)
        count += 1
        time.sleep(1)
    else:
        log('Formatting block device %s as filesystem %s.' %
            (blk_device, fstype), level=INFO)
        check_call(['mkfs', '-t', fstype, blk_device])
ceph.py 文件源码 项目:charm-keystone 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def make_filesystem(blk_device, fstype='ext4', timeout=10):
    """Make a new filesystem on the specified block device."""
    count = 0
    e_noent = os.errno.ENOENT
    while not os.path.exists(blk_device):
        if count >= timeout:
            log('Gave up waiting on block device %s' % blk_device,
                level=ERROR)
            raise IOError(e_noent, os.strerror(e_noent), blk_device)

        log('Waiting for block device %s to appear' % blk_device,
            level=DEBUG)
        count += 1
        time.sleep(1)
    else:
        log('Formatting block device %s as filesystem %s.' %
            (blk_device, fstype), level=INFO)
        check_call(['mkfs', '-t', fstype, blk_device])
ceph.py 文件源码 项目:charm-keystone 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def make_filesystem(blk_device, fstype='ext4', timeout=10):
    """Make a new filesystem on the specified block device."""
    count = 0
    e_noent = os.errno.ENOENT
    while not os.path.exists(blk_device):
        if count >= timeout:
            log('Gave up waiting on block device %s' % blk_device,
                level=ERROR)
            raise IOError(e_noent, os.strerror(e_noent), blk_device)

        log('Waiting for block device %s to appear' % blk_device,
            level=DEBUG)
        count += 1
        time.sleep(1)
    else:
        log('Formatting block device %s as filesystem %s.' %
            (blk_device, fstype), level=INFO)
        check_call(['mkfs', '-t', fstype, blk_device])
ceph.py 文件源码 项目:charm-nova-cloud-controller 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def make_filesystem(blk_device, fstype='ext4', timeout=10):
    """Make a new filesystem on the specified block device."""
    count = 0
    e_noent = os.errno.ENOENT
    while not os.path.exists(blk_device):
        if count >= timeout:
            log('Gave up waiting on block device %s' % blk_device,
                level=ERROR)
            raise IOError(e_noent, os.strerror(e_noent), blk_device)

        log('Waiting for block device %s to appear' % blk_device,
            level=DEBUG)
        count += 1
        time.sleep(1)
    else:
        log('Formatting block device %s as filesystem %s.' %
            (blk_device, fstype), level=INFO)
        check_call(['mkfs', '-t', fstype, blk_device])
simple_httpclient_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_connection_refused(self):
        cleanup_func, port = refusing_port()
        self.addCleanup(cleanup_func)
        with ExpectLog(gen_log, ".*", required=False):
            self.http_client.fetch("http://127.0.0.1:%d/" % port, self.stop)
            response = self.wait()
        self.assertEqual(599, response.code)

        if sys.platform != 'cygwin':
            # cygwin returns EPERM instead of ECONNREFUSED here
            contains_errno = str(errno.ECONNREFUSED) in str(response.error)
            if not contains_errno and hasattr(errno, "WSAECONNREFUSED"):
                contains_errno = str(errno.WSAECONNREFUSED) in str(response.error)
            self.assertTrue(contains_errno, response.error)
            # This is usually "Connection refused".
            # On windows, strerror is broken and returns "Unknown error".
            expected_message = os.strerror(errno.ECONNREFUSED)
            self.assertTrue(expected_message in str(response.error),
                            response.error)
iostream.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def _handle_connect(self):
        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if err != 0:
            self.error = socket.error(err, os.strerror(err))
            # IOLoop implementations may vary: some of them return
            # an error state before the socket becomes writable, so
            # in that case a connection failure would be handled by the
            # error path in _handle_events instead of here.
            if self._connect_future is None:
                gen_log.warning("Connect error on fd %s: %s",
                                self.socket.fileno(), errno.errorcode[err])
            self.close()
            return
        if self._connect_callback is not None:
            callback = self._connect_callback
            self._connect_callback = None
            self._run_callback(callback)
        if self._connect_future is not None:
            future = self._connect_future
            self._connect_future = None
            future.set_result(self)
        self._connecting = False
simple_httpclient_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_connection_refused(self):
        cleanup_func, port = refusing_port()
        self.addCleanup(cleanup_func)
        with ExpectLog(gen_log, ".*", required=False):
            self.http_client.fetch("http://127.0.0.1:%d/" % port, self.stop)
            response = self.wait()
        self.assertEqual(599, response.code)

        if sys.platform != 'cygwin':
            # cygwin returns EPERM instead of ECONNREFUSED here
            contains_errno = str(errno.ECONNREFUSED) in str(response.error)
            if not contains_errno and hasattr(errno, "WSAECONNREFUSED"):
                contains_errno = str(errno.WSAECONNREFUSED) in str(response.error)
            self.assertTrue(contains_errno, response.error)
            # This is usually "Connection refused".
            # On windows, strerror is broken and returns "Unknown error".
            expected_message = os.strerror(errno.ECONNREFUSED)
            self.assertTrue(expected_message in str(response.error),
                            response.error)
simple_httpclient_test.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_connection_refused(self):
        cleanup_func, port = refusing_port()
        self.addCleanup(cleanup_func)
        with ExpectLog(gen_log, ".*", required=False):
            self.http_client.fetch("http://127.0.0.1:%d/" % port, self.stop)
            response = self.wait()
        self.assertEqual(599, response.code)

        if sys.platform != 'cygwin':
            # cygwin returns EPERM instead of ECONNREFUSED here
            contains_errno = str(errno.ECONNREFUSED) in str(response.error)
            if not contains_errno and hasattr(errno, "WSAECONNREFUSED"):
                contains_errno = str(errno.WSAECONNREFUSED) in str(response.error)
            self.assertTrue(contains_errno, response.error)
            # This is usually "Connection refused".
            # On windows, strerror is broken and returns "Unknown error".
            expected_message = os.strerror(errno.ECONNREFUSED)
            self.assertTrue(expected_message in str(response.error),
                            response.error)
iostream.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def _handle_connect(self):
        err = self.socket.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
        if err != 0:
            self.error = socket.error(err, os.strerror(err))
            # IOLoop implementations may vary: some of them return
            # an error state before the socket becomes writable, so
            # in that case a connection failure would be handled by the
            # error path in _handle_events instead of here.
            if self._connect_future is None:
                gen_log.warning("Connect error on fd %s: %s",
                                self.socket.fileno(), errno.errorcode[err])
            self.close()
            return
        if self._connect_callback is not None:
            callback = self._connect_callback
            self._connect_callback = None
            self._run_callback(callback)
        if self._connect_future is not None:
            future = self._connect_future
            self._connect_future = None
            future.set_result(self)
        self._connecting = False
libusb0.py 文件源码 项目:ipwndfu 作者: axi0mX 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _check(ret):
    if ret is None:
        errmsg = _lib.usb_strerror()
    else:
        if hasattr(ret, 'value'):
            ret = ret.value

        if ret < 0:
            errmsg = _lib.usb_strerror()
            # No error means that we need to get the error
            # message from the return code
            # Thanks to Nicholas Wheeler to point out the problem...
            # Also see issue #2860940
            if errmsg.lower() == 'no error':
                errmsg = os.strerror(-ret)
        else:
            return ret
    raise USBError(errmsg, ret)
ptrace_syscall.py 文件源码 项目:darkc0de-old-stuff 作者: tuwid 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def formatExit(self, process):
        if CPU_I386:
            regname = "eax"
        elif CPU_X86_64:
            regname = "rax"
        elif CPU_PPC:
            regname = "result"
        else:
            raise NotImplementedError()
        self.result = process.getreg(regname)

        if self.restype.endswith("*"):
            text = formatAddress(self.result)
        else:
            uresult = self.result
            self.result = ulong2long(self.result)
            if self.result < 0:
                text = "%s (%s)" % (
                    self.result, strerror(-self.result))
            elif not(0 <= self.result <= 9):
                text = "%s (%s)" % (self.result, formatWordHex(uresult))
            else:
                text = str(self.result)
        return text
common.py 文件源码 项目:geppetto 作者: datosio 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def decorator(minutes=1, error_message=os.strerror(errno.ETIME)):
        def dec(func):
            def _handle_timeout(signum, frame):
                msg = 'Timeout Error: %s' % (error_message)
                add_test_note(msg)
                raise TimeoutException(error_message)

            def wrapper(*args, **kwargs):
                if minutes > 0:
                    signal.signal(signal.SIGALRM, _handle_timeout)
                    signal.alarm(int(minutes * 60))
                try:
                    result = func(*args, **kwargs)
                finally:
                    signal.alarm(0)
                return result

            return wraps(func)(wrapper)

        return dec
inotify.py 文件源码 项目:centos-base-consul 作者: zeroc0d3lab 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, cloexec=True, nonblock=True):
        self._init1, self._add_watch, self._rm_watch, self._read = load_inotify()
        flags = 0
        if cloexec:
            flags |= self.CLOEXEC
        if nonblock:
            flags |= self.NONBLOCK
        self._inotify_fd = self._init1(flags)
        if self._inotify_fd == -1:
            raise INotifyError(os.strerror(ctypes.get_errno()))

        self._buf = ctypes.create_string_buffer(5000)
        self.fenc = get_preferred_file_name_encoding()
        self.hdr = struct.Struct(b'iIII')
        # We keep a reference to os to prevent it from being deleted
        # during interpreter shutdown, which would lead to errors in the
        # __del__ method
        self.os = os
inotify.py 文件源码 项目:centos-base-consul 作者: zeroc0d3lab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def read(self, get_name=True):
        buf = []
        while True:
            num = self._read(self._inotify_fd, self._buf, len(self._buf))
            if num == 0:
                break
            if num < 0:
                en = ctypes.get_errno()
                if en == errno.EAGAIN:
                    break  # No more data
                if en == errno.EINTR:
                    continue  # Interrupted, try again
                raise OSError(en, self.os.strerror(en))
            buf.append(self._buf.raw[:num])
        raw = b''.join(buf)
        pos = 0
        lraw = len(raw)
        while lraw - pos >= self.hdr.size:
            wd, mask, cookie, name_len = self.hdr.unpack_from(raw, pos)
            pos += self.hdr.size
            name = None
            if get_name:
                name = raw[pos:pos + name_len].rstrip(b'\0')
            pos += name_len
            self.process_event(wd, mask, cookie, name)
base_test.py 文件源码 项目:sshaolin 作者: bucknerns 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator


# special test case for running ssh commands at module level
utils.py 文件源码 项目:steth 作者: openstack 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator
sftp_client.py 文件源码 项目:watchmen 作者: lycclsltt 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def chdir(self, path):
        """
        Change the "current directory" of this SFTP session.  Since SFTP
        doesn't really have the concept of a current working directory, this
        is emulated by paramiko.  Once you use this method to set a working
        directory, all operations on this SFTPClient object will be relative
        to that path. You can pass in C{None} to stop using a current working
        directory.

        @param path: new current working directory
        @type path: str

        @raise IOError: if the requested path doesn't exist on the server

        @since: 1.4
        """
        if path is None:
            self._cwd = None
            return
        if not stat.S_ISDIR(self.stat(path).st_mode):
            raise SFTPError(errno.ENOTDIR, "%s: %s" % (os.strerror(errno.ENOTDIR), path))
        self._cwd = self.normalize(path).encode('utf-8')
test_dask.py 文件源码 项目:knit 作者: dask 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def timeout(seconds=10, error_message=os.strerror(errno.ETIME)):
    def decorator(func):
        def _handle_timeout(signum, frame):
            raise TimeoutError(error_message)

        def wrapper(*args, **kwargs):
            signal.signal(signal.SIGALRM, _handle_timeout)
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
            finally:
                signal.alarm(0)
            return result

        return wraps(func)(wrapper)

    return decorator
ceph.py 文件源码 项目:charm-nova-compute 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def make_filesystem(blk_device, fstype='ext4', timeout=10):
    """Make a new filesystem on the specified block device."""
    count = 0
    e_noent = os.errno.ENOENT
    while not os.path.exists(blk_device):
        if count >= timeout:
            log('Gave up waiting on block device %s' % blk_device,
                level=ERROR)
            raise IOError(e_noent, os.strerror(e_noent), blk_device)

        log('Waiting for block device %s to appear' % blk_device,
            level=DEBUG)
        count += 1
        time.sleep(1)
    else:
        log('Formatting block device %s as filesystem %s.' %
            (blk_device, fstype), level=INFO)
        check_call(['mkfs', '-t', fstype, blk_device])
ceph.py 文件源码 项目:charm-nova-compute 作者: openstack 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def make_filesystem(blk_device, fstype='ext4', timeout=10):
    """Make a new filesystem on the specified block device."""
    count = 0
    e_noent = os.errno.ENOENT
    while not os.path.exists(blk_device):
        if count >= timeout:
            log('Gave up waiting on block device %s' % blk_device,
                level=ERROR)
            raise IOError(e_noent, os.strerror(e_noent), blk_device)

        log('Waiting for block device %s to appear' % blk_device,
            level=DEBUG)
        count += 1
        time.sleep(1)
    else:
        log('Formatting block device %s as filesystem %s.' %
            (blk_device, fstype), level=INFO)
        check_call(['mkfs', '-t', fstype, blk_device])
acr122.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def ccid_xfr_block(self, data, timeout=0.1):
        """Encapsulate host command *data* into an PC/SC Escape command to
        send to the device and extract the chip response if received
        within *timeout* seconds.

        """
        frame = struct.pack("<BI5B", 0x6F, len(data), 0, 0, 0, 0, 0) + data
        self.transport.write(bytearray(frame))
        frame = self.transport.read(int(timeout * 1000))
        if not frame or len(frame) < 10:
            log.error("insufficient data for decoding ccid response")
            raise IOError(errno.EIO, os.strerror(errno.EIO))
        if frame[0] != 0x80:
            log.error("expected a RDR_to_PC_DataBlock")
            raise IOError(errno.EIO, os.strerror(errno.EIO))
        if len(frame) != 10 + struct.unpack("<I", buffer(frame, 1, 4))[0]:
            log.error("RDR_to_PC_DataBlock length mismatch")
            raise IOError(errno.EIO, os.strerror(errno.EIO))
        return frame[10:]
acr122.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def command(self, cmd_code, cmd_data, timeout):
        """Send a host command and return the chip response.

        """
        log.log(logging.DEBUG-1, self.CMD[cmd_code]+" "+hexlify(cmd_data))

        frame = bytearray([0xD4, cmd_code]) + bytearray(cmd_data)
        frame = bytearray([0xFF, 0x00, 0x00, 0x00, len(frame)]) + frame

        frame = self.ccid_xfr_block(frame, timeout)
        if not frame or len(frame) < 4:
            log.error("insufficient data for decoding chip response")
            raise IOError(errno.EIO, os.strerror(errno.EIO))
        if not (frame[0] == 0xD5 and frame[1] == cmd_code + 1):
            log.error("received invalid chip response")
            raise IOError(errno.EIO, os.strerror(errno.EIO))
        if not (frame[-2] == 0x90 and frame[-1] == 0x00):
            log.error("received pseudo apdu with error status")
            raise IOError(errno.EIO, os.strerror(errno.EIO))
        return frame[2:-2]
transport.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def read(self, timeout):
        if self.tty is not None:
            self.tty.timeout = max(timeout/1E3, 0.05)
            frame = bytearray(self.tty.read(6))
            if frame is None or len(frame) == 0:
                raise IOError(errno.ETIMEDOUT, os.strerror(errno.ETIMEDOUT))
            if frame.startswith(b"\x00\x00\xff\x00\xff\x00"):
                log.log(logging.DEBUG-1, "<<< %s", str(frame).encode("hex"))
                return frame
            LEN = frame[3]
            if LEN == 0xFF:
                frame += self.tty.read(3)
                LEN = frame[5] << 8 | frame[6]
            frame += self.tty.read(LEN + 1)
            log.log(logging.DEBUG-1, "<<< %s", hexlify(frame))
            return frame
transport.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def read(self, timeout=0):
        if self.usb_inp is not None:
            try:
                ep_addr = self.usb_inp.getAddress()
                frame = self.usb_dev.bulkRead(ep_addr, 300, timeout)
            except libusb.USBErrorTimeout:
                raise IOError(errno.ETIMEDOUT, os.strerror(errno.ETIMEDOUT))
            except libusb.USBErrorNoDevice:
                raise IOError(errno.ENODEV, os.strerror(errno.ENODEV))
            except libusb.USBError as error:
                log.error("%r", error)
                raise IOError(errno.EIO, os.strerror(errno.EIO))

            if len(frame) == 0:
                log.error("bulk read returned zero data")
                raise IOError(errno.EIO, os.strerror(errno.EIO))

            frame = bytearray(frame)
            log.log(logging.DEBUG-1, "<<< %s", hexlify(frame))
            return frame


问题


面经


文章

微信
公众号

扫码关注公众号