def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
super().__init__(extra)
self._extra['pipe'] = pipe
self._loop = loop
self._pipe = pipe
self._fileno = pipe.fileno()
self._protocol = protocol
self._closing = False
mode = os.fstat(self._fileno).st_mode
if not (stat.S_ISFIFO(mode) or
stat.S_ISSOCK(mode) or
stat.S_ISCHR(mode)):
self._pipe = None
self._fileno = None
self._protocol = None
raise ValueError("Pipe transport is for pipes/sockets only.")
_set_nonblocking(self._fileno)
self._loop.call_soon(self._protocol.connection_made, self)
# only start reading when connection_made() has been called
self._loop.call_soon(self._loop._add_reader,
self._fileno, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
self._loop.call_soon(futures._set_result_unless_cancelled,
waiter, None)
python类S_ISCHR的实例源码
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
super().__init__(extra, loop)
self._extra['pipe'] = pipe
self._pipe = pipe
self._fileno = pipe.fileno()
self._protocol = protocol
self._buffer = bytearray()
self._conn_lost = 0
self._closing = False # Set when close() or write_eof() called.
mode = os.fstat(self._fileno).st_mode
is_char = stat.S_ISCHR(mode)
is_fifo = stat.S_ISFIFO(mode)
is_socket = stat.S_ISSOCK(mode)
if not (is_char or is_fifo or is_socket):
self._pipe = None
self._fileno = None
self._protocol = None
raise ValueError("Pipe transport is only for "
"pipes, sockets and character devices")
_set_nonblocking(self._fileno)
self._loop.call_soon(self._protocol.connection_made, self)
# On AIX, the reader trick (to be notified when the read end of the
# socket is closed) only works for sockets. On other platforms it
# works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
if is_socket or (is_fifo and not sys.platform.startswith("aix")):
# only start reading when connection_made() has been called
self._loop.call_soon(self._loop._add_reader,
self._fileno, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
self._loop.call_soon(futures._set_result_unless_cancelled,
waiter, None)
def is_char_device(self):
"""
Whether this path is a character device.
"""
try:
return S_ISCHR(self.stat().st_mode)
except OSError as e:
if e.errno != ENOENT:
raise
# Path doesn't exist or is a broken symlink
# (see https://bitbucket.org/pitrou/pathlib/issue/12/)
return False
def is_serial_port(filepath):
if os.path.exists(filepath):
st_mode = os.stat(filepath).st_mode
return S_ISCHR(st_mode)
else:
return False
def mknod(self, parent_inode, name, mode, rdev, ctx):
assert self._initialized
fd, a = self.create(parent_inode, name, mode,
os.O_TRUNC | os.O_CREAT, ctx)
inode = self.forest.inodes.get_by_value(a.st_ino)
if stat.S_ISCHR(mode) or stat.S_ISBLK(mode):
inode.direntry.set_data('st_rdev', rdev)
self.release(fd)
return self._inode_attributes(inode)
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
super().__init__(extra, loop)
self._extra['pipe'] = pipe
self._pipe = pipe
self._fileno = pipe.fileno()
mode = os.fstat(self._fileno).st_mode
is_socket = stat.S_ISSOCK(mode)
if not (is_socket or
stat.S_ISFIFO(mode) or
stat.S_ISCHR(mode)):
raise ValueError("Pipe transport is only for "
"pipes, sockets and character devices")
_set_nonblocking(self._fileno)
self._protocol = protocol
self._buffer = []
self._conn_lost = 0
self._closing = False # Set when close() or write_eof() called.
self._loop.call_soon(self._protocol.connection_made, self)
# On AIX, the reader trick (to be notified when the read end of the
# socket is closed) only works for sockets. On other platforms it
# works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.)
if is_socket or not sys.platform.startswith("aix"):
# only start reading when connection_made() has been called
self._loop.call_soon(self._loop.add_reader,
self._fileno, self._read_ready)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
self._loop.call_soon(futures._set_result_unless_cancelled,
waiter, None)
def is_char_device(self):
"""
Whether this path is a character device.
"""
try:
return S_ISCHR(self.stat().st_mode)
except OSError as e:
if e.errno not in (ENOENT, ENOTDIR):
raise
# Path doesn't exist or is a broken symlink
# (see https://bitbucket.org/pitrou/pathlib/issue/12/)
return False
def is_special_file(cls, filename):
"""Checks to see if a file is a special UNIX file.
It checks if the file is a character special device, block special
device, FIFO, or socket.
:param filename: Name of the file
:returns: True if the file is a special file. False, if is not.
"""
# If it does not exist, it must be a new file so it cannot be
# a special file.
if not os.path.exists(filename):
return False
mode = os.stat(filename).st_mode
# Character special device.
if stat.S_ISCHR(mode):
return True
# Block special device
if stat.S_ISBLK(mode):
return True
# Named pipe / FIFO
if stat.S_ISFIFO(mode):
return True
# Socket.
if stat.S_ISSOCK(mode):
return True
return False
def set_from_stat(self):
"""Set the value of self.type, self.mode from self.stat"""
if not self.stat:
self.type = None
st_mode = self.stat.st_mode
if stat.S_ISREG(st_mode):
self.type = "reg"
elif stat.S_ISDIR(st_mode):
self.type = "dir"
elif stat.S_ISLNK(st_mode):
self.type = "sym"
elif stat.S_ISFIFO(st_mode):
self.type = "fifo"
elif stat.S_ISSOCK(st_mode):
raise PathException(util.ufn(self.get_relative_path()) +
u"is a socket, unsupported by tar")
self.type = "sock"
elif stat.S_ISCHR(st_mode):
self.type = "chr"
elif stat.S_ISBLK(st_mode):
self.type = "blk"
else:
raise PathException("Unknown type")
self.mode = stat.S_IMODE(st_mode)
if self.type in ("chr", "blk"):
try:
self.devnums = (os.major(self.stat.st_rdev),
os.minor(self.stat.st_rdev))
except:
log.Warn(_("Warning: %s invalid devnums (0x%X), treating as (0, 0).")
% (util.ufn(self.get_relative_path()), self.stat.st_rdev))
self.devnums = (0, 0)
def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
super().__init__(extra)
self._extra['pipe'] = pipe
self._loop = loop
self._pipe = pipe
self._fileno = pipe.fileno()
mode = os.fstat(self._fileno).st_mode
is_socket = stat.S_ISSOCK(mode)
if not (is_socket or
stat.S_ISFIFO(mode) or
stat.S_ISCHR(mode)):
raise ValueError("Pipe transport is only for "
"pipes, sockets and character devices")
_set_nonblocking(self._fileno)
self._protocol = protocol
self._buffer = []
self._conn_lost = 0
self._closing = False # Set when close() or write_eof() called.
# On AIX, the reader trick only works for sockets.
# On other platforms it works for pipes and sockets.
# (Exception: OS X 10.4? Issue #19294.)
if is_socket or not sys.platform.startswith("aix"):
self._loop.add_reader(self._fileno, self._read_ready)
self._loop.call_soon(self._protocol.connection_made, self)
if waiter is not None:
# wait until protocol.connection_made() has been called
self._loop.call_soon(waiter._set_result_unless_cancelled, None)
def is_char_device(self):
"""
Whether this path is a character device.
"""
try:
return S_ISCHR(self.stat().st_mode)
except OSError as e:
if e.errno != ENOENT:
raise
# Path doesn't exist or is a broken symlink
# (see https://bitbucket.org/pitrou/pathlib/issue/12/)
return False
def is_special_file(cls, filename):
"""Checks to see if a file is a special UNIX file.
It checks if the file is a character special device, block special
device, FIFO, or socket.
:param filename: Name of the file
:returns: True if the file is a special file. False, if is not.
"""
# If it does not exist, it must be a new file so it cannot be
# a special file.
if not os.path.exists(filename):
return False
mode = os.stat(filename).st_mode
# Character special device.
if stat.S_ISCHR(mode):
return True
# Block special device
if stat.S_ISBLK(mode):
return True
# Named pipe / FIFO
if stat.S_ISFIFO(mode):
return True
# Socket.
if stat.S_ISSOCK(mode):
return True
return False
def show_item_kdirstat(self, gen_id, filename):
mode = self.repo.get_file_key(
gen_id, filename, obnamlib.REPO_FILE_MODE)
size = self.repo.get_file_key(
gen_id, filename, obnamlib.REPO_FILE_SIZE)
mtime_sec = self.repo.get_file_key(
gen_id, filename, obnamlib.REPO_FILE_MTIME_SEC)
if stat.S_ISREG(mode):
mode_str = "F\t"
elif stat.S_ISDIR(mode):
mode_str = "D "
elif stat.S_ISLNK(mode):
mode_str = "L\t"
elif stat.S_ISBLK(mode):
mode_str = "BlockDev\t"
elif stat.S_ISCHR(mode):
mode_str = "CharDev\t"
elif stat.S_ISFIFO(mode):
mode_str = "FIFO\t"
elif stat.S_ISSOCK(mode):
mode_str = "Socket\t"
else:
# Unhandled, make it look like a comment
mode_str = "#UNHANDLED\t"
enc_filename = filename.replace("%", "%25")
enc_filename = enc_filename.replace(" ", "%20")
enc_filename = enc_filename.replace("\t", "%09")
self.app.output.write(
"%s%s\t%d\t%#x\n" %
(mode_str, enc_filename, size, mtime_sec))
def restore_first_link(self, gen, filename, metadata):
if stat.S_ISREG(metadata.st_mode):
self.restore_regular_file(gen, filename, metadata)
elif stat.S_ISFIFO(metadata.st_mode):
self.restore_fifo(gen, filename, metadata)
elif stat.S_ISSOCK(metadata.st_mode):
self.restore_socket(gen, filename, metadata)
elif stat.S_ISBLK(metadata.st_mode) or stat.S_ISCHR(metadata.st_mode):
self.restore_device(gen, filename, metadata)
else:
msg = ('Unknown file type: %s (%o)' %
(filename, metadata.st_mode))
logging.error(msg)
self.app.ts.notify(msg)
def is_device(self):
mode = os.lstat(self._name).st_mode
return stat.S_ISCHR(mode) or stat.S_ISBLK(mode)
def format_device(mode, major, minor):
if stat.S_ISCHR(mode):
kind = 'character'
elif stat.S_ISBLK(mode):
kind = 'block'
else:
kind = 'weird'
return 'device:%s\nmajor: %d\nminor: %d\n' % (kind, major, minor)
utils.py 文件源码
项目:tf_aws_ecs_instance_draining_on_scale_in
作者: terraform-community-modules
项目源码
文件源码
阅读 16
收藏 0
点赞 0
评论 0
def is_special_file(cls, filename):
"""Checks to see if a file is a special UNIX file.
It checks if the file is a character special device, block special
device, FIFO, or socket.
:param filename: Name of the file
:returns: True if the file is a special file. False, if is not.
"""
# If it does not exist, it must be a new file so it cannot be
# a special file.
if not os.path.exists(filename):
return False
mode = os.stat(filename).st_mode
# Character special device.
if stat.S_ISCHR(mode):
return True
# Block special device
if stat.S_ISBLK(mode):
return True
# Named pipe / FIFO
if stat.S_ISFIFO(mode):
return True
# Socket.
if stat.S_ISSOCK(mode):
return True
return False
def lsLine(name, s):
mode = s.st_mode
perms = array.array('c', '-'*10)
ft = stat.S_IFMT(mode)
if stat.S_ISDIR(ft): perms[0] = 'd'
elif stat.S_ISCHR(ft): perms[0] = 'c'
elif stat.S_ISBLK(ft): perms[0] = 'b'
elif stat.S_ISREG(ft): perms[0] = '-'
elif stat.S_ISFIFO(ft): perms[0] = 'f'
elif stat.S_ISLNK(ft): perms[0] = 'l'
elif stat.S_ISSOCK(ft): perms[0] = 's'
else: perms[0] = '!'
# user
if mode&stat.S_IRUSR:perms[1] = 'r'
if mode&stat.S_IWUSR:perms[2] = 'w'
if mode&stat.S_IXUSR:perms[3] = 'x'
# group
if mode&stat.S_IRGRP:perms[4] = 'r'
if mode&stat.S_IWGRP:perms[5] = 'w'
if mode&stat.S_IXGRP:perms[6] = 'x'
# other
if mode&stat.S_IROTH:perms[7] = 'r'
if mode&stat.S_IWOTH:perms[8] = 'w'
if mode&stat.S_IXOTH:perms[9] = 'x'
# suid/sgid
if mode&stat.S_ISUID:
if perms[3] == 'x': perms[3] = 's'
else: perms[3] = 'S'
if mode&stat.S_ISGID:
if perms[6] == 'x': perms[6] = 's'
else: perms[6] = 'S'
l = perms.tostring()
l += str(s.st_nlink).rjust(5) + ' '
un = str(s.st_uid)
l += un.ljust(9)
gr = str(s.st_gid)
l += gr.ljust(9)
sz = str(s.st_size)
l += sz.rjust(8)
l += ' '
sixmo = 60 * 60 * 24 * 7 * 26
if s.st_mtime + sixmo < time.time(): # last edited more than 6mo ago
l += time.strftime("%b %2d %Y ", time.localtime(s.st_mtime))
else:
l += time.strftime("%b %2d %H:%S ", time.localtime(s.st_mtime))
l += name
return l
def lsLine(name, s):
mode = s.st_mode
perms = array.array('c', '-'*10)
ft = stat.S_IFMT(mode)
if stat.S_ISDIR(ft): perms[0] = 'd'
elif stat.S_ISCHR(ft): perms[0] = 'c'
elif stat.S_ISBLK(ft): perms[0] = 'b'
elif stat.S_ISREG(ft): perms[0] = '-'
elif stat.S_ISFIFO(ft): perms[0] = 'f'
elif stat.S_ISLNK(ft): perms[0] = 'l'
elif stat.S_ISSOCK(ft): perms[0] = 's'
else: perms[0] = '!'
# user
if mode&stat.S_IRUSR:perms[1] = 'r'
if mode&stat.S_IWUSR:perms[2] = 'w'
if mode&stat.S_IXUSR:perms[3] = 'x'
# group
if mode&stat.S_IRGRP:perms[4] = 'r'
if mode&stat.S_IWGRP:perms[5] = 'w'
if mode&stat.S_IXGRP:perms[6] = 'x'
# other
if mode&stat.S_IROTH:perms[7] = 'r'
if mode&stat.S_IWOTH:perms[8] = 'w'
if mode&stat.S_IXOTH:perms[9] = 'x'
# suid/sgid
if mode&stat.S_ISUID:
if perms[3] == 'x': perms[3] = 's'
else: perms[3] = 'S'
if mode&stat.S_ISGID:
if perms[6] == 'x': perms[6] = 's'
else: perms[6] = 'S'
l = perms.tostring()
l += str(s.st_nlink).rjust(5) + ' '
un = str(s.st_uid)
l += un.ljust(9)
gr = str(s.st_gid)
l += gr.ljust(9)
sz = str(s.st_size)
l += sz.rjust(8)
l += ' '
sixmo = 60 * 60 * 24 * 7 * 26
if s.st_mtime + sixmo < time.time(): # last edited more than 6mo ago
l += time.strftime("%b %2d %Y ", time.localtime(s.st_mtime))
else:
l += time.strftime("%b %2d %H:%S ", time.localtime(s.st_mtime))
l += name
return l
def find(path=".", name=None, iname=None, ftype="*"):
"""Search for files in a directory heirarchy.
This is dramatically different from the GNU version of
find. There is no Domain Specific language.
:param path: The directory to start in
:param name: The name spec (glob pattern) to search for
:param iname: The case-insensitive name spec (glob pattern) to search for
:param ftype: The type of file to search for must be one of b, c, d, p, f, k, s or *
:type ftype: str
:type iname: str
:type name: str
:type path: str
"""
if ftype not in "bcdpfls*" or len(ftype) != 1:
raise NotImplementedError(
"Introspection for {} not implemented".format(ftype)
)
ftype_mapping = {
"b": stat.S_ISBLK, "c": stat.S_ISCHR,
"d": stat.S_ISDIR, "p": stat.S_ISFIFO,
"f": stat.S_ISREG, "l": stat.S_ISLNK,
"s": stat.S_ISSOCK,
"*": lambda *args, **kwargs: True,
}
type_test = ftype_mapping[ftype]
if name is not None:
regex = re.compile(fnmatch.translate(name))
elif iname is not None:
regex = re.compile(fnmatch.translate(iname), flags=re.IGNORECASE)
else:
regex = re.compile(fnmatch.translate("*"))
if regex.match(path) and type_test(os.stat(path).st_mode):
yield os.path.relpath(path)
for root, dirs, files in os.walk(path):
for n in files + dirs:
filename = os.path.join(root, n)
_stat = os.stat(filename)
if regex.match(n) and type_test(_stat.st_mode):
yield filename