python类S_ISCHR的实例源码

unix_events.py 文件源码 项目:golightan 作者: shirou 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
        super().__init__(extra)
        self._extra['pipe'] = pipe
        self._loop = loop
        self._pipe = pipe
        self._fileno = pipe.fileno()
        self._protocol = protocol
        self._closing = False

        mode = os.fstat(self._fileno).st_mode
        if not (stat.S_ISFIFO(mode) or
                stat.S_ISSOCK(mode) or
                stat.S_ISCHR(mode)):
            self._pipe = None
            self._fileno = None
            self._protocol = None
            raise ValueError("Pipe transport is for pipes/sockets only.")

        _set_nonblocking(self._fileno)

        self._loop.call_soon(self._protocol.connection_made, self)
        # only start reading when connection_made() has been called
        self._loop.call_soon(self._loop._add_reader,
                             self._fileno, self._read_ready)
        if waiter is not None:
            # only wake up the waiter when connection_made() has been called
            self._loop.call_soon(futures._set_result_unless_cancelled,
                                 waiter, None)
unix_events.py 文件源码 项目:golightan 作者: shirou 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
        super().__init__(extra, loop)
        self._extra['pipe'] = pipe
        self._pipe = pipe
        self._fileno = pipe.fileno()
        self._protocol = protocol
        self._buffer = bytearray()
        self._conn_lost = 0
        self._closing = False  # Set when close() or write_eof() called.

        mode = os.fstat(self._fileno).st_mode
        is_char = stat.S_ISCHR(mode)
        is_fifo = stat.S_ISFIFO(mode)
        is_socket = stat.S_ISSOCK(mode)
        if not (is_char or is_fifo or is_socket):
            self._pipe = None
            self._fileno = None
            self._protocol = None
            raise ValueError("Pipe transport is only for "
                             "pipes, sockets and character devices")

        _set_nonblocking(self._fileno)
        self._loop.call_soon(self._protocol.connection_made, self)

        # On AIX, the reader trick (to be notified when the read end of the
        # socket is closed) only works for sockets. On other platforms it
        # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
        if is_socket or (is_fifo and not sys.platform.startswith("aix")):
            # only start reading when connection_made() has been called
            self._loop.call_soon(self._loop._add_reader,
                                 self._fileno, self._read_ready)

        if waiter is not None:
            # only wake up the waiter when connection_made() has been called
            self._loop.call_soon(futures._set_result_unless_cancelled,
                                 waiter, None)
pathlib.py 文件源码 项目:click-configfile 作者: jenisys 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def is_char_device(self):
        """
        Whether this path is a character device.
        """
        try:
            return S_ISCHR(self.stat().st_mode)
        except OSError as e:
            if e.errno != ENOENT:
                raise
            # Path doesn't exist or is a broken symlink
            # (see https://bitbucket.org/pitrou/pathlib/issue/12/)
            return False
misc.py 文件源码 项目:flux_line_bot 作者: blesscat 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def is_serial_port(filepath):
    if os.path.exists(filepath):
        st_mode = os.stat(filepath).st_mode
        return S_ISCHR(st_mode)
    else:
        return False
ops.py 文件源码 项目:tfhfs 作者: fingon 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def mknod(self, parent_inode, name, mode, rdev, ctx):
        assert self._initialized
        fd, a = self.create(parent_inode, name, mode,
                            os.O_TRUNC | os.O_CREAT, ctx)
        inode = self.forest.inodes.get_by_value(a.st_ino)
        if stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
            inode.direntry.set_data('st_rdev', rdev)
        self.release(fd)
        return self._inode_attributes(inode)
unix_events.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
        super().__init__(extra, loop)
        self._extra['pipe'] = pipe
        self._pipe = pipe
        self._fileno = pipe.fileno()
        mode = os.fstat(self._fileno).st_mode
        is_socket = stat.S_ISSOCK(mode)
        if not (is_socket or
                stat.S_ISFIFO(mode) or
                stat.S_ISCHR(mode)):
            raise ValueError("Pipe transport is only for "
                             "pipes, sockets and character devices")
        _set_nonblocking(self._fileno)
        self._protocol = protocol
        self._buffer = []
        self._conn_lost = 0
        self._closing = False  # Set when close() or write_eof() called.

        self._loop.call_soon(self._protocol.connection_made, self)

        # On AIX, the reader trick (to be notified when the read end of the
        # socket is closed) only works for sockets. On other platforms it
        # works for pipes and sockets. (Exception: OS X 10.4?  Issue #19294.)
        if is_socket or not sys.platform.startswith("aix"):
            # only start reading when connection_made() has been called
            self._loop.call_soon(self._loop.add_reader,
                                 self._fileno, self._read_ready)

        if waiter is not None:
            # only wake up the waiter when connection_made() has been called
            self._loop.call_soon(futures._set_result_unless_cancelled,
                                 waiter, None)
pathlib.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def is_char_device(self):
        """
        Whether this path is a character device.
        """
        try:
            return S_ISCHR(self.stat().st_mode)
        except OSError as e:
            if e.errno not in (ENOENT, ENOTDIR):
                raise
            # Path doesn't exist or is a broken symlink
            # (see https://bitbucket.org/pitrou/pathlib/issue/12/)
            return False
utils.py 文件源码 项目:jepsen-training-vpc 作者: bloomberg 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def is_special_file(cls, filename):
        """Checks to see if a file is a special UNIX file.

        It checks if the file is a character special device, block special
        device, FIFO, or socket.

        :param filename: Name of the file

        :returns: True if the file is a special file. False, if is not.
        """
        # If it does not exist, it must be a new file so it cannot be
        # a special file.
        if not os.path.exists(filename):
            return False
        mode = os.stat(filename).st_mode
        # Character special device.
        if stat.S_ISCHR(mode):
            return True
        # Block special device
        if stat.S_ISBLK(mode):
            return True
        # Named pipe / FIFO
        if stat.S_ISFIFO(mode):
            return True
        # Socket.
        if stat.S_ISSOCK(mode):
            return True
        return False
path.py 文件源码 项目:alicloud-duplicity 作者: aliyun 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def set_from_stat(self):
        """Set the value of self.type, self.mode from self.stat"""
        if not self.stat:
            self.type = None

        st_mode = self.stat.st_mode
        if stat.S_ISREG(st_mode):
            self.type = "reg"
        elif stat.S_ISDIR(st_mode):
            self.type = "dir"
        elif stat.S_ISLNK(st_mode):
            self.type = "sym"
        elif stat.S_ISFIFO(st_mode):
            self.type = "fifo"
        elif stat.S_ISSOCK(st_mode):
            raise PathException(util.ufn(self.get_relative_path()) +
                                u"is a socket, unsupported by tar")
            self.type = "sock"
        elif stat.S_ISCHR(st_mode):
            self.type = "chr"
        elif stat.S_ISBLK(st_mode):
            self.type = "blk"
        else:
            raise PathException("Unknown type")

        self.mode = stat.S_IMODE(st_mode)
        if self.type in ("chr", "blk"):
            try:
                self.devnums = (os.major(self.stat.st_rdev),
                                os.minor(self.stat.st_rdev))
            except:
                log.Warn(_("Warning: %s invalid devnums (0x%X), treating as (0, 0).")
                         % (util.ufn(self.get_relative_path()), self.stat.st_rdev))
                self.devnums = (0, 0)
unix_events.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
        super().__init__(extra)
        self._extra['pipe'] = pipe
        self._loop = loop
        self._pipe = pipe
        self._fileno = pipe.fileno()
        mode = os.fstat(self._fileno).st_mode
        is_socket = stat.S_ISSOCK(mode)
        if not (is_socket or
                stat.S_ISFIFO(mode) or
                stat.S_ISCHR(mode)):
            raise ValueError("Pipe transport is only for "
                             "pipes, sockets and character devices")
        _set_nonblocking(self._fileno)
        self._protocol = protocol
        self._buffer = []
        self._conn_lost = 0
        self._closing = False  # Set when close() or write_eof() called.

        # On AIX, the reader trick only works for sockets.
        # On other platforms it works for pipes and sockets.
        # (Exception: OS X 10.4?  Issue #19294.)
        if is_socket or not sys.platform.startswith("aix"):
            self._loop.add_reader(self._fileno, self._read_ready)

        self._loop.call_soon(self._protocol.connection_made, self)
        if waiter is not None:
            # wait until protocol.connection_made() has been called
            self._loop.call_soon(waiter._set_result_unless_cancelled, None)
pathlib.py 文件源码 项目:kbe_server 作者: xiaohaoppy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def is_char_device(self):
        """
        Whether this path is a character device.
        """
        try:
            return S_ISCHR(self.stat().st_mode)
        except OSError as e:
            if e.errno != ENOENT:
                raise
            # Path doesn't exist or is a broken symlink
            # (see https://bitbucket.org/pitrou/pathlib/issue/12/)
            return False
utils.py 文件源码 项目:s3transfer 作者: boto 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def is_special_file(cls, filename):
        """Checks to see if a file is a special UNIX file.

        It checks if the file is a character special device, block special
        device, FIFO, or socket.

        :param filename: Name of the file

        :returns: True if the file is a special file. False, if is not.
        """
        # If it does not exist, it must be a new file so it cannot be
        # a special file.
        if not os.path.exists(filename):
            return False
        mode = os.stat(filename).st_mode
        # Character special device.
        if stat.S_ISCHR(mode):
            return True
        # Block special device
        if stat.S_ISBLK(mode):
            return True
        # Named pipe / FIFO
        if stat.S_ISFIFO(mode):
            return True
        # Socket.
        if stat.S_ISSOCK(mode):
            return True
        return False
show_plugin.py 文件源码 项目:obnam 作者: obnam-mirror 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def show_item_kdirstat(self, gen_id, filename):
        mode = self.repo.get_file_key(
            gen_id, filename, obnamlib.REPO_FILE_MODE)
        size = self.repo.get_file_key(
            gen_id, filename, obnamlib.REPO_FILE_SIZE)
        mtime_sec = self.repo.get_file_key(
            gen_id, filename, obnamlib.REPO_FILE_MTIME_SEC)

        if stat.S_ISREG(mode):
            mode_str = "F\t"
        elif stat.S_ISDIR(mode):
            mode_str = "D "
        elif stat.S_ISLNK(mode):
            mode_str = "L\t"
        elif stat.S_ISBLK(mode):
            mode_str = "BlockDev\t"
        elif stat.S_ISCHR(mode):
            mode_str = "CharDev\t"
        elif stat.S_ISFIFO(mode):
            mode_str = "FIFO\t"
        elif stat.S_ISSOCK(mode):
            mode_str = "Socket\t"
        else:
            # Unhandled, make it look like a comment
            mode_str = "#UNHANDLED\t"

        enc_filename = filename.replace("%", "%25")
        enc_filename = enc_filename.replace(" ", "%20")
        enc_filename = enc_filename.replace("\t", "%09")

        self.app.output.write(
            "%s%s\t%d\t%#x\n" %
            (mode_str, enc_filename, size, mtime_sec))
restore_plugin.py 文件源码 项目:obnam 作者: obnam-mirror 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def restore_first_link(self, gen, filename, metadata):
        if stat.S_ISREG(metadata.st_mode):
            self.restore_regular_file(gen, filename, metadata)
        elif stat.S_ISFIFO(metadata.st_mode):
            self.restore_fifo(gen, filename, metadata)
        elif stat.S_ISSOCK(metadata.st_mode):
            self.restore_socket(gen, filename, metadata)
        elif stat.S_ISBLK(metadata.st_mode) or stat.S_ISCHR(metadata.st_mode):
            self.restore_device(gen, filename, metadata)
        else:
            msg = ('Unknown file type: %s (%o)' %
                   (filename, metadata.st_mode))
            logging.error(msg)
            self.app.ts.notify(msg)
binary.py 文件源码 项目:diffoscope 作者: ReproducibleBuilds 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def is_device(self):
        mode = os.lstat(self._name).st_mode
        return stat.S_ISCHR(mode) or stat.S_ISBLK(mode)
device.py 文件源码 项目:diffoscope 作者: ReproducibleBuilds 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def format_device(mode, major, minor):
    if stat.S_ISCHR(mode):
        kind = 'character'
    elif stat.S_ISBLK(mode):
        kind = 'block'
    else:
        kind = 'weird'
    return 'device:%s\nmajor: %d\nminor: %d\n' % (kind, major, minor)
utils.py 文件源码 项目:tf_aws_ecs_instance_draining_on_scale_in 作者: terraform-community-modules 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def is_special_file(cls, filename):
        """Checks to see if a file is a special UNIX file.

        It checks if the file is a character special device, block special
        device, FIFO, or socket.

        :param filename: Name of the file

        :returns: True if the file is a special file. False, if is not.
        """
        # If it does not exist, it must be a new file so it cannot be
        # a special file.
        if not os.path.exists(filename):
            return False
        mode = os.stat(filename).st_mode
        # Character special device.
        if stat.S_ISCHR(mode):
            return True
        # Block special device
        if stat.S_ISBLK(mode):
            return True
        # Named pipe / FIFO
        if stat.S_ISFIFO(mode):
            return True
        # Socket.
        if stat.S_ISSOCK(mode):
            return True
        return False
ls.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def lsLine(name, s):
    mode = s.st_mode
    perms = array.array('c', '-'*10)
    ft = stat.S_IFMT(mode)
    if stat.S_ISDIR(ft): perms[0] = 'd'
    elif stat.S_ISCHR(ft): perms[0] = 'c'
    elif stat.S_ISBLK(ft): perms[0] = 'b'
    elif stat.S_ISREG(ft): perms[0] = '-'
    elif stat.S_ISFIFO(ft): perms[0] = 'f'
    elif stat.S_ISLNK(ft): perms[0] = 'l'
    elif stat.S_ISSOCK(ft): perms[0] = 's'
    else: perms[0] = '!'
    # user
    if mode&stat.S_IRUSR:perms[1] = 'r'
    if mode&stat.S_IWUSR:perms[2] = 'w'
    if mode&stat.S_IXUSR:perms[3] = 'x'
    # group
    if mode&stat.S_IRGRP:perms[4] = 'r'
    if mode&stat.S_IWGRP:perms[5] = 'w'
    if mode&stat.S_IXGRP:perms[6] = 'x'
    # other
    if mode&stat.S_IROTH:perms[7] = 'r'
    if mode&stat.S_IWOTH:perms[8] = 'w'
    if mode&stat.S_IXOTH:perms[9] = 'x'
    # suid/sgid
    if mode&stat.S_ISUID:
        if perms[3] == 'x': perms[3] = 's'
        else: perms[3] = 'S'
    if mode&stat.S_ISGID:
        if perms[6] == 'x': perms[6] = 's'
        else: perms[6] = 'S'
    l = perms.tostring()
    l += str(s.st_nlink).rjust(5) + ' '
    un = str(s.st_uid)
    l += un.ljust(9)
    gr = str(s.st_gid)
    l += gr.ljust(9)
    sz = str(s.st_size)
    l += sz.rjust(8)
    l += ' '
    sixmo = 60 * 60 * 24 * 7 * 26
    if s.st_mtime + sixmo < time.time(): # last edited more than 6mo ago
        l += time.strftime("%b %2d  %Y ", time.localtime(s.st_mtime))
    else:
        l += time.strftime("%b %2d %H:%S ", time.localtime(s.st_mtime))
    l += name
    return l
ls.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def lsLine(name, s):
    mode = s.st_mode
    perms = array.array('c', '-'*10)
    ft = stat.S_IFMT(mode)
    if stat.S_ISDIR(ft): perms[0] = 'd'
    elif stat.S_ISCHR(ft): perms[0] = 'c'
    elif stat.S_ISBLK(ft): perms[0] = 'b'
    elif stat.S_ISREG(ft): perms[0] = '-'
    elif stat.S_ISFIFO(ft): perms[0] = 'f'
    elif stat.S_ISLNK(ft): perms[0] = 'l'
    elif stat.S_ISSOCK(ft): perms[0] = 's'
    else: perms[0] = '!'
    # user
    if mode&stat.S_IRUSR:perms[1] = 'r'
    if mode&stat.S_IWUSR:perms[2] = 'w'
    if mode&stat.S_IXUSR:perms[3] = 'x'
    # group
    if mode&stat.S_IRGRP:perms[4] = 'r'
    if mode&stat.S_IWGRP:perms[5] = 'w'
    if mode&stat.S_IXGRP:perms[6] = 'x'
    # other
    if mode&stat.S_IROTH:perms[7] = 'r'
    if mode&stat.S_IWOTH:perms[8] = 'w'
    if mode&stat.S_IXOTH:perms[9] = 'x'
    # suid/sgid
    if mode&stat.S_ISUID:
        if perms[3] == 'x': perms[3] = 's'
        else: perms[3] = 'S'
    if mode&stat.S_ISGID:
        if perms[6] == 'x': perms[6] = 's'
        else: perms[6] = 'S'
    l = perms.tostring()
    l += str(s.st_nlink).rjust(5) + ' '
    un = str(s.st_uid)
    l += un.ljust(9)
    gr = str(s.st_gid)
    l += gr.ljust(9)
    sz = str(s.st_size)
    l += sz.rjust(8)
    l += ' '
    sixmo = 60 * 60 * 24 * 7 * 26
    if s.st_mtime + sixmo < time.time(): # last edited more than 6mo ago
        l += time.strftime("%b %2d  %Y ", time.localtime(s.st_mtime))
    else:
        l += time.strftime("%b %2d %H:%S ", time.localtime(s.st_mtime))
    l += name
    return l
find.py 文件源码 项目:unitils 作者: iLoveTux 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def find(path=".", name=None, iname=None, ftype="*"):
    """Search for files in a directory heirarchy.

    This is dramatically different from the GNU version of
    find. There is no Domain Specific language.

    :param path: The directory to start in
    :param name: The name spec (glob pattern) to search for
    :param iname: The case-insensitive name spec (glob pattern) to search for
    :param ftype: The type of file to search for must be one of b, c, d, p, f, k, s or *
    :type ftype: str
    :type iname: str
    :type name: str
    :type path: str
    """
    if ftype not in "bcdpfls*" or len(ftype) != 1:
        raise NotImplementedError(
            "Introspection for {} not implemented".format(ftype)
        )
    ftype_mapping = {
        "b": stat.S_ISBLK, "c": stat.S_ISCHR,
        "d": stat.S_ISDIR, "p": stat.S_ISFIFO,
        "f": stat.S_ISREG, "l": stat.S_ISLNK,
        "s": stat.S_ISSOCK,
        "*": lambda *args, **kwargs: True,
    }
    type_test = ftype_mapping[ftype]

    if name is not None:
        regex = re.compile(fnmatch.translate(name))
    elif iname is not None:
        regex = re.compile(fnmatch.translate(iname), flags=re.IGNORECASE)
    else:
        regex = re.compile(fnmatch.translate("*"))

    if regex.match(path) and type_test(os.stat(path).st_mode):
        yield os.path.relpath(path)

    for root, dirs, files in os.walk(path):
        for n in files + dirs:
            filename = os.path.join(root, n)
            _stat = os.stat(filename)
            if regex.match(n) and type_test(_stat.st_mode):
                yield filename


问题


面经


文章

微信
公众号

扫码关注公众号