python类get_errno()的实例源码

inotify.py 文件源码 项目:centos-base-consul 作者: zeroc0d3lab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def watch(self, path):
        ''' Register a watch for the file/directory named path. Raises an OSError if path
        does not exist. '''
        path = realpath(path)
        with self.lock:
            if path not in self.watches:
                bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
                flags = self.MOVE_SELF | self.DELETE_SELF
                buf = ctypes.c_char_p(bpath)
                # Try watching path as a directory
                wd = self._add_watch(self._inotify_fd, buf, flags | self.ONLYDIR)
                if wd == -1:
                    eno = ctypes.get_errno()
                    if eno != errno.ENOTDIR:
                        self.handle_error()
                    # Try watching path as a file
                    flags |= (self.MODIFY | self.ATTRIB)
                    wd = self._add_watch(self._inotify_fd, buf, flags)
                    if wd == -1:
                        self.handle_error()
                self.watches[path] = wd
                self.modified[path] = False
inotify.py 文件源码 项目:centos-base-consul 作者: zeroc0d3lab 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def add_watch(self, path):
        bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
        wd = self._add_watch(
            self._inotify_fd,
            ctypes.c_char_p(bpath),

            # Ignore symlinks and watch only directories
            self.DONT_FOLLOW | self.ONLYDIR |

            self.MODIFY | self.CREATE | self.DELETE |
            self.MOVE_SELF | self.MOVED_FROM | self.MOVED_TO |
            self.ATTRIB | self.DELETE_SELF
        )
        if wd == -1:
            eno = ctypes.get_errno()
            if eno == errno.ENOTDIR:
                return False
            raise OSError(eno, 'Failed to add watch for: {0}: {1}'.format(path, self.os.strerror(eno)))
        self.watched_dirs[path] = wd
        self.watched_rmap[wd] = path
        return True
inotify.py 文件源码 项目:centos-base-consul 作者: zeroc0d3lab 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, cloexec=True, nonblock=True):
        self._init1, self._add_watch, self._rm_watch, self._read = load_inotify()
        flags = 0
        if cloexec:
            flags |= self.CLOEXEC
        if nonblock:
            flags |= self.NONBLOCK
        self._inotify_fd = self._init1(flags)
        if self._inotify_fd == -1:
            raise INotifyError(os.strerror(ctypes.get_errno()))

        self._buf = ctypes.create_string_buffer(5000)
        self.fenc = get_preferred_file_name_encoding()
        self.hdr = struct.Struct(b'iIII')
        # We keep a reference to os to prevent it from being deleted
        # during interpreter shutdown, which would lead to errors in the
        # __del__ method
        self.os = os
inotify.py 文件源码 项目:centos-base-consul 作者: zeroc0d3lab 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def read(self, get_name=True):
        buf = []
        while True:
            num = self._read(self._inotify_fd, self._buf, len(self._buf))
            if num == 0:
                break
            if num < 0:
                en = ctypes.get_errno()
                if en == errno.EAGAIN:
                    break  # No more data
                if en == errno.EINTR:
                    continue  # Interrupted, try again
                raise OSError(en, self.os.strerror(en))
            buf.append(self._buf.raw[:num])
        raw = b''.join(buf)
        pos = 0
        lraw = len(raw)
        while lraw - pos >= self.hdr.size:
            wd, mask, cookie, name_len = self.hdr.unpack_from(raw, pos)
            pos += self.hdr.size
            name = None
            if get_name:
                name = raw[pos:pos + name_len].rstrip(b'\0')
            pos += name_len
            self.process_event(wd, mask, cookie, name)
inotify.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def inotify_init(flags=0):
    """Initializes a new inotify instance and returns a file descriptor
    associated with a new inotify event queue.

    :param ``INInitFlags`` flags:
        Optional flag to control the inotify_init behavior.
    """
    fileno = _INOTIFY_INIT1(flags)
    if fileno < 0:
        errno = ctypes.get_errno()
        raise OSError(errno, os.strerror(errno),
                      'inotify_init1(%r)' % flags)
    return fileno


###############################################################################
# Constants copied from sys/inotify.h
#
# See man inotify(7) for more details.
#
mount.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def mount(source, target, fs_type, mnt_flags=()):
    """Mount ``source`` on ``target`` using filesystem type ``fs_type`` and
    mount flags ``mnt_flags``.

    NOTE: Mount data argument is not supported.
    """
    res = _MOUNT(source, target, fs_type, mnt_flags, 0)
    if res < 0:
        errno = ctypes.get_errno()
        raise OSError(
            errno, os.strerror(errno),
            'mount(%r, %r, %r, %r)' % (source, target, fs_type, mnt_flags)
        )

    return res


# int umount(const char *target);
eventfd.py 文件源码 项目:treadmill 作者: Morgan-Stanley 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def eventfd(initval, flags):
    """create a file descriptor for event notification.
    """
    if initval < 0 or initval > (2**64 - 1):
        raise ValueError('Invalid initval: %r' % initval)

    fileno = _EVENTFD(initval, flags)
    if fileno < 0:
        errno = ctypes.get_errno()
        raise OSError(errno, os.strerror(errno),
                      'eventfd(%r, %r)' % (initval, flags))
    return fileno


###############################################################################
# Constants copied from sys/eventfd.h
#
# See man eventfd(2) for more details.
#
inotify_file_watcher.py 文件源码 项目:Deploy_XXNET_Server 作者: jzp820927 项目源码 文件源码 阅读 39 收藏 0 点赞 0 评论 0
def _remove_watch_for_path(self, path):
    # Must be called with _inotify_fd_lock held.
    logging.debug('_remove_watch_for_path(%r)', path)
    wd = self._directory_to_watch_descriptor[path]

    if _libc.inotify_rm_watch(self._inotify_fd, wd) < 0:
      # If the directory is deleted then the watch will removed automatically
      # and inotify_rm_watch will fail. Just log the error.
      logging.debug('inotify_rm_watch failed for %r: %d [%r]',
                    path,
                    ctypes.get_errno(),
                    errno.errorcode[ctypes.get_errno()])

    parent_path = os.path.dirname(path)
    if parent_path in self._directory_to_subdirs:
      self._directory_to_subdirs[parent_path].remove(path)

    # _directory_to_subdirs must be copied because it is mutated in the
    # recursive call.
    for subdir in frozenset(self._directory_to_subdirs[path]):
      self._remove_watch_for_path(subdir)

    del self._watch_to_directory[wd]
    del self._directory_to_watch_descriptor[path]
    del self._directory_to_subdirs[path]
inotify.py 文件源码 项目:dotfiles 作者: gbraad 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def watch(self, path):
        ''' Register a watch for the file/directory named path. Raises an OSError if path
        does not exist. '''
        path = realpath(path)
        with self.lock:
            if path not in self.watches:
                bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
                flags = self.MOVE_SELF | self.DELETE_SELF
                buf = ctypes.c_char_p(bpath)
                # Try watching path as a directory
                wd = self._add_watch(self._inotify_fd, buf, flags | self.ONLYDIR)
                if wd == -1:
                    eno = ctypes.get_errno()
                    if eno != errno.ENOTDIR:
                        self.handle_error()
                    # Try watching path as a file
                    flags |= (self.MODIFY | self.ATTRIB)
                    wd = self._add_watch(self._inotify_fd, buf, flags)
                    if wd == -1:
                        self.handle_error()
                self.watches[path] = wd
                self.modified[path] = False
inotify.py 文件源码 项目:dotfiles 作者: gbraad 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def add_watch(self, path):
        bpath = path if isinstance(path, bytes) else path.encode(self.fenc)
        wd = self._add_watch(
            self._inotify_fd,
            ctypes.c_char_p(bpath),

            # Ignore symlinks and watch only directories
            self.DONT_FOLLOW | self.ONLYDIR |

            self.MODIFY | self.CREATE | self.DELETE |
            self.MOVE_SELF | self.MOVED_FROM | self.MOVED_TO |
            self.ATTRIB | self.DELETE_SELF
        )
        if wd == -1:
            eno = ctypes.get_errno()
            if eno == errno.ENOTDIR:
                return False
            raise OSError(eno, 'Failed to add watch for: {0}: {1}'.format(path, self.os.strerror(eno)))
        self.watched_dirs[path] = wd
        self.watched_rmap[wd] = path
        return True
inotify.py 文件源码 项目:dotfiles 作者: gbraad 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self, cloexec=True, nonblock=True):
        self._init1, self._add_watch, self._rm_watch, self._read = load_inotify()
        flags = 0
        if cloexec:
            flags |= self.CLOEXEC
        if nonblock:
            flags |= self.NONBLOCK
        self._inotify_fd = self._init1(flags)
        if self._inotify_fd == -1:
            raise INotifyError(os.strerror(ctypes.get_errno()))

        self._buf = ctypes.create_string_buffer(5000)
        self.fenc = get_preferred_file_name_encoding()
        self.hdr = struct.Struct(b'iIII')
        # We keep a reference to os to prevent it from being deleted
        # during interpreter shutdown, which would lead to errors in the
        # __del__ method
        self.os = os
inotify.py 文件源码 项目:dotfiles 作者: gbraad 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def read(self, get_name=True):
        buf = []
        while True:
            num = self._read(self._inotify_fd, self._buf, len(self._buf))
            if num == 0:
                break
            if num < 0:
                en = ctypes.get_errno()
                if en == errno.EAGAIN:
                    break  # No more data
                if en == errno.EINTR:
                    continue  # Interrupted, try again
                raise OSError(en, self.os.strerror(en))
            buf.append(self._buf.raw[:num])
        raw = b''.join(buf)
        pos = 0
        lraw = len(raw)
        while lraw - pos >= self.hdr.size:
            wd, mask, cookie, name_len = self.hdr.unpack_from(raw, pos)
            pos += self.hdr.size
            name = None
            if get_name:
                name = raw[pos:pos + name_len].rstrip(b'\0')
            pos += name_len
            self.process_event(wd, mask, cookie, name)
gfapi.py 文件源码 项目:gkube 作者: guohongze 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def fallocate(self, mode, offset, length):
        """
        This is a Linux-specific sys call, unlike posix_fallocate()

        Allows the caller to directly manipulate the allocated disk space for
        the file for the byte range starting at offset and continuing for
        length bytes.

        :param mode: Operation to be performed on the given range
        :param offset: Starting offset
        :param length: Size in bytes, starting at offset
        """
        ret = api.client.glfs_fallocate(self.fd, mode, offset, length)
        if ret < 0:
            err = ctypes.get_errno()
            raise OSError(err, os.strerror(err))
gfapi.py 文件源码 项目:gkube 作者: guohongze 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def fgetxattr(self, key, size=0):
        """
        Retrieve the value of the extended attribute identified by key
        for the file.

        :param key: Key of extended attribute
        :param size: If size is specified as zero, we first determine the
                     size of xattr and then allocate a buffer accordingly.
                     If size is non-zero, it is assumed the caller knows
                     the size of xattr.
        :returns: Value of extended attribute corresponding to key specified.
        """
        if size == 0:
            size = api.glfs_fgetxattr(self.fd, key, None, size)
            if size < 0:
                err = ctypes.get_errno()
                raise OSError(err, os.strerror(err))

        buf = ctypes.create_string_buffer(size)
        rc = api.glfs_fgetxattr(self.fd, key, buf, size)
        if rc < 0:
            err = ctypes.get_errno()
            raise OSError(err, os.strerror(err))
        return buf.value[:rc]
gfapi.py 文件源码 项目:gkube 作者: guohongze 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def fsetxattr(self, key, value, flags=0):
        """
        Set extended attribute of file.

        :param key: The key of extended attribute.
        :param value: The valiue of extended attribute.
        :param flags: Possible values are 0 (default), 1 and 2
                      0: xattr will be created if it does not exist, or the
                         value will be replaced if the xattr exists.
                      1: Perform a pure create, which fails if the named
                         attribute already exists.
                      2: Perform a pure replace operation, which fails if the
                         named attribute does not already exist.
        """
        ret = api.glfs_fsetxattr(self.fd, key, value, len(value), flags)
        if ret < 0:
            err = ctypes.get_errno()
            raise OSError(err, os.strerror(err))
gfapi.py 文件源码 项目:gkube 作者: guohongze 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def lseek(self, pos, how):
        """
        Set the read/write offset position of this file.
        The new position is defined by 'pos' relative to 'how'

        :param pos: sets new offset position according to 'how'
        :param how: SEEK_SET, sets offset position 'pos' bytes relative to
                    beginning of file, SEEK_CUR, the position is set relative
                    to the current position and SEEK_END sets the position
                    relative to the end of the file.
        :returns: the new offset position
        """
        ret = api.glfs_lseek(self.fd, pos, how)
        if ret < 0:
            err = ctypes.get_errno()
            raise OSError(err, os.strerror(err))
        return ret
gfapi.py 文件源码 项目:gkube 作者: guohongze 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def read(self, size=-1):
        """
        Read at most size bytes from the file.

        :param buflen: length of read buffer. If less than 0, then whole
                       file is read. Default is -1.
        :returns: buffer of 'size' length
        """
        if size < 0:
            size = self.fgetsize()
        rbuf = ctypes.create_string_buffer(size)
        ret = api.glfs_read(self.fd, rbuf, size, 0)
        if ret > 0:
            # In python 2.x, read() always returns a string. It's really upto
            # the consumer to decode this string into whatever encoding it was
            # written with.
            return rbuf.value[:ret]
        elif ret < 0:
            err = ctypes.get_errno()
            raise OSError(err, os.strerror(err))
gfapi.py 文件源码 项目:gkube 作者: guohongze 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def umount(self):
        """
        Unmount a mounted GlusterFS volume.

        Provides users a way to free resources instead of just waiting for
        python garbage collector to call __del__() at some point later.
        """
        if self.fs:
            ret = self._api.glfs_fini(self.fs)
            if ret < 0:
                err = ctypes.get_errno()
                raise LibgfapiException("glfs_fini(%s) failed: %s" %
                                        (self.fs, os.strerror(err)))
            else:
                # Succeeded. Protect against multiple umount() calls.
                self._mounted = False
                self.fs = None
gfapi.py 文件源码 项目:gkube 作者: guohongze 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def getxattr(self, path, key, size=0):
        """
        Retrieve the value of the extended attribute identified by key
        for path specified.

        :param path: Path to file or directory
        :param key: Key of extended attribute
        :param size: If size is specified as zero, we first determine the
                     size of xattr and then allocate a buffer accordingly.
                     If size is non-zero, it is assumed the caller knows
                     the size of xattr.
        :returns: Value of extended attribute corresponding to key specified.
        """
        if size == 0:
            size = api.glfs_getxattr(self.fs, path, key, None, 0)
            if size < 0:
                err = ctypes.get_errno()
                raise OSError(err, os.strerror(err))

        buf = ctypes.create_string_buffer(size)
        rc = api.glfs_getxattr(self.fs, path, key, buf, size)
        if rc < 0:
            err = ctypes.get_errno()
            raise OSError(err, os.strerror(err))
        return buf.value[:rc]
gfapi.py 文件源码 项目:gkube 作者: guohongze 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def setxattr(self, path, key, value, flags=0):
        """
        Set extended attribute of the path.

        :param path: Path to file or directory.
        :param key: The key of extended attribute.
        :param value: The valiue of extended attribute.
        :param flags: Possible values are 0 (default), 1 and 2
                      0: xattr will be created if it does not exist, or the
                         value will be replaced if the xattr exists.
                      1: Perform a pure create, which fails if the named
                         attribute already exists.
                      2: Perform a pure replace operation, which fails if the
                         named attribute does not already exist.
        """
        ret = api.glfs_setxattr(self.fs, path, key, value, len(value), flags)
        if ret < 0:
            err = ctypes.get_errno()
            raise OSError(err, os.strerror(err))
func.py 文件源码 项目:python-ptrace 作者: vstinner 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def ptrace(command, pid=0, arg1=0, arg2=0, check_errno=False):
    if HAS_CPTRACE:
        try:
            set_errno(0)
            result = _ptrace(command, pid, arg1, arg2, check_errno)
        except ValueError as errobj:
            message = str(errobj)
            errno = get_errno()
            raise PtraceError(message, errno=errno, pid=pid)
    else:
        result = _ptrace(command, pid, arg1, arg2)
        result_signed = c_long(result).value
        if result_signed == -1:
            errno = get_errno()
            # peek operations may returns -1 with errno=0:
            # it's not an error. For other operations, -1
            # is always an error
            if not(check_errno) or errno:
                message = "ptrace(cmd=%s, pid=%s, %r, %r) error #%s: %s" % (
                    command, pid, arg1, arg2,
                    errno, strerror(errno))
                raise PtraceError(message, errno=errno, pid=pid)
    return result
backend_ctypes.py 文件源码 项目:noc-orchestrator 作者: DirceuSilvaLabs 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_errno(self):
        return ctypes.get_errno()
usb1.py 文件源码 项目:sc-controller 作者: kozec 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_errno():
        raise NotImplementedError(
            'Your python version does not support errno/last_error'
        )
usb1.py 文件源码 项目:sc-controller 作者: kozec 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def getPollFDList(self):
        """
        Return file descriptors to be used to poll USB events.
        You should not have to call this method, unless you are integrating
        this class with a polling mechanism.
        """
        pollfd_p_p = libusb1.libusb_get_pollfds(self.__context_p)
        if not pollfd_p_p:
            errno = get_errno()
            if errno:
                raise OSError(errno)
            else:
                # Assume not implemented
                raise NotImplementedError(
                    'Your libusb does not seem to implement pollable FDs')
        try:
            result = []
            append = result.append
            fd_index = 0
            while pollfd_p_p[fd_index]:
                append((
                    pollfd_p_p[fd_index].contents.fd,
                    pollfd_p_p[fd_index].contents.events,
                ))
                fd_index += 1
        finally:
            _free(pollfd_p_p)
        return result
monotonic.py 文件源码 项目:GAMADV-XTD 作者: taers232c 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def monotonic():
                """Monotonic clock, cannot go backward."""
                ts = timespec()
                if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(ts)):
                    errno = ctypes.get_errno()
                    raise OSError(errno, os.strerror(errno))
                return ts.tv_sec + ts.tv_nsec / 1.0e9

        # Perform a sanity-check.
backend_ctypes.py 文件源码 项目:SwiftKitten 作者: johncsnyder 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def get_errno(self):
        return ctypes.get_errno()
inotify.py 文件源码 项目:centos-base-consul 作者: zeroc0d3lab 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def handle_error(self):
        eno = ctypes.get_errno()
        extra = ''
        if eno == errno.ENOSPC:
            extra = 'You may need to increase the inotify limits on your system, via /proc/sys/fs/inotify/max_user_*'
        raise OSError(eno, self.os.strerror(eno) + str(extra))
_ffi.py 文件源码 项目:pyShadowsocks 作者: FTwOoO 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def errno():
    return ctypes.get_errno()
asm_hooks.py 文件源码 项目:python-doublescript 作者: fdintino 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def mprotect_libc(addr, size, flags):
    libc = ctypes.CDLL(ctypes.util.find_library('libc'), use_errno=True)
    libc.mprotect.argtypes = [c_void_p, c_size_t, c_int]
    libc.mprotect.restype = c_int
    addr_align = addr & ~(PAGE_SIZE - 1)
    mem_end = (addr + size) & ~(PAGE_SIZE - 1)
    if (addr + size) > mem_end:
        mem_end += PAGE_SIZE
    memlen = mem_end - addr_align
    ret = libc.mprotect(addr_align, memlen, flags)
    if ret == -1:
        e = ctypes.get_errno()
        raise OSError(e, errno.errorcodes[e], os.strerror(e))
backend_ctypes.py 文件源码 项目:aws-cfn-plex 作者: lordmuffin 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def get_errno(self):
        return ctypes.get_errno()


问题


面经


文章

微信
公众号

扫码关注公众号