def __hashEntry(self, prefix, entry, s):
if stat.S_ISREG(s.st_mode):
digest = self.__index.check(prefix, entry, s, hashFile)
elif stat.S_ISDIR(s.st_mode):
digest = self.__hashDir(prefix, entry)
elif stat.S_ISLNK(s.st_mode):
digest = self.__index.check(prefix, entry, s, DirHasher.__hashLink)
elif stat.S_ISBLK(s.st_mode) or stat.S_ISCHR(s.st_mode):
digest = struct.pack("<L", s.st_rdev)
elif stat.S_ISFIFO(s.st_mode):
digest = b''
else:
digest = b''
logging.getLogger(__name__).warning("Unknown file: %s", entry)
return digest
python类S_ISDIR的实例源码
def h_file(filename):
st = os.stat(filename)
if stat.S_ISDIR(st[stat.ST_MODE]): raise IOError('not a file')
if filename in Build.hashes_md5_tstamp:
if Build.hashes_md5_tstamp[filename][0] == str(st.st_mtime):
return Build.hashes_md5_tstamp[filename][1]
if STRONGEST:
ret = Utils.h_file_no_md5(filename)
Build.hashes_md5_tstamp[filename] = (str(st.st_mtime), ret)
return ret
else:
m = Utils.md5()
m.update(str(st.st_mtime))
m.update(str(st.st_size))
m.update(filename)
Build.hashes_md5_tstamp[filename] = (str(st.st_mtime), m.digest())
return m.digest()
def h_file(filename):
st = os.stat(filename)
if stat.S_ISDIR(st[stat.ST_MODE]): raise IOError('not a file')
if filename in Build.hashes_md5_tstamp:
if Build.hashes_md5_tstamp[filename][0] == str(st.st_mtime):
return Build.hashes_md5_tstamp[filename][1]
if STRONGEST:
ret = Utils.h_file_no_md5(filename)
Build.hashes_md5_tstamp[filename] = (str(st.st_mtime), ret)
return ret
else:
m = Utils.md5()
m.update(str(st.st_mtime))
m.update(str(st.st_size))
m.update(filename)
Build.hashes_md5_tstamp[filename] = (str(st.st_mtime), m.digest())
return m.digest()
def h_file(filename):
st = os.stat(filename)
if stat.S_ISDIR(st[stat.ST_MODE]): raise IOError('not a file')
if filename in Build.hashes_md5_tstamp:
if Build.hashes_md5_tstamp[filename][0] == str(st.st_mtime):
return Build.hashes_md5_tstamp[filename][1]
if STRONGEST:
ret = Utils.h_file_no_md5(filename)
Build.hashes_md5_tstamp[filename] = (str(st.st_mtime), ret)
return ret
else:
m = Utils.md5()
m.update(str(st.st_mtime))
m.update(str(st.st_size))
m.update(filename)
Build.hashes_md5_tstamp[filename] = (str(st.st_mtime), m.digest())
return m.digest()
def chdir(self, path):
"""
Change the "current directory" of this SFTP session. Since SFTP
doesn't really have the concept of a current working directory, this
is emulated by paramiko. Once you use this method to set a working
directory, all operations on this SFTPClient object will be relative
to that path. You can pass in C{None} to stop using a current working
directory.
@param path: new current working directory
@type path: str
@raise IOError: if the requested path doesn't exist on the server
@since: 1.4
"""
if path is None:
self._cwd = None
return
if not stat.S_ISDIR(self.stat(path).st_mode):
raise SFTPError(errno.ENOTDIR, "%s: %s" % (os.strerror(errno.ENOTDIR), path))
self._cwd = self.normalize(path).encode('utf-8')
def _find_by_name(tree_data, name, is_dir, start_at):
"""return data entry matching the given name and tree mode
or None.
Before the item is returned, the respective data item is set
None in the tree_data list to mark it done"""
try:
item = tree_data[start_at]
if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
tree_data[start_at] = None
return item
except IndexError:
pass
# END exception handling
for index, item in enumerate(tree_data):
if item and item[2] == name and S_ISDIR(item[1]) == is_dir:
tree_data[index] = None
return item
# END if item matches
# END for each item
return None
def traverse_tree_recursive(odb, tree_sha, path_prefix):
"""
:return: list of entries of the tree pointed to by the binary tree_sha. An entry
has the following format:
* [0] 20 byte sha
* [1] mode as int
* [2] path relative to the repository
:param path_prefix: prefix to prepend to the front of all returned paths"""
entries = list()
data = tree_entries_from_data(odb.stream(tree_sha).read())
# unpacking/packing is faster than accessing individual items
for sha, mode, name in data:
if S_ISDIR(mode):
entries.extend(traverse_tree_recursive(odb, sha, path_prefix + name + '/'))
else:
entries.append((sha, mode, path_prefix + name))
# END for each item
return entries
def check_module_timestamps(self, module_path=False, mtime=0):
extensions = ('.py', '.xml', '.csv')
"Walk the tree of a module to find the last updated file"
for file in os.listdir(module_path):
file_path = os.path.join(module_path, file)
statinfo = os.stat(file_path)
file_mtime = 0
if stat.S_ISDIR(statinfo.st_mode):
file_mtime = self.check_module_timestamps(file_path)
elif stat.S_ISREG(statinfo.st_mode):
if any(file.endswith(ex) for ex in extensions):
file_mtime = statinfo.st_mtime
else:
raise Exception(
'Unknown file mode in module path %s' % file_path)
if file_mtime > mtime:
mtime = file_mtime
return mtime
def walk(self, remotepath):
"""Taken from https://gist.github.com/johnfink8/2190472
A stripped down version of os.walk implemented for sftp which
yields tuples: (path, folders, files)
"""
path = remotepath
files = []
folders = []
for f in self.listdir_attr(remotepath):
if S_ISDIR(f.st_mode):
folders.append(f.filename)
else: # non-dir
files.append(f.filename)
yield path, folders, files
for folder in folders:
new_path = os.path.join(remotepath, folder)
for x in self.walk(new_path):
yield x
def findall(self):
"""Find all files under the base and set ``allfiles`` to the absolute
pathnames of files found.
"""
from stat import S_ISREG, S_ISDIR, S_ISLNK
self.allfiles = allfiles = []
root = self.base
stack = [root]
pop = stack.pop
push = stack.append
while stack:
root = pop()
names = os.listdir(root)
for name in names:
fullname = os.path.join(root, name)
# Avoid excess stat calls -- just one will do, thank you!
stat = os.stat(fullname)
mode = stat.st_mode
if S_ISREG(mode):
allfiles.append(fsdecode(fullname))
elif S_ISDIR(mode) and not S_ISLNK(mode):
push(fullname)
def isdir(s):
"""Return true if the pathname refers to an existing directory."""
try:
st = os.stat(s)
except OSError:
return False
return stat.S_ISDIR(st.st_mode)
def findall(self):
"""Find all files under the base and set ``allfiles`` to the absolute
pathnames of files found.
"""
from stat import S_ISREG, S_ISDIR, S_ISLNK
self.allfiles = allfiles = []
root = self.base
stack = [root]
pop = stack.pop
push = stack.append
while stack:
root = pop()
names = os.listdir(root)
for name in names:
fullname = os.path.join(root, name)
# Avoid excess stat calls -- just one will do, thank you!
stat = os.stat(fullname)
mode = stat.st_mode
if S_ISREG(mode):
allfiles.append(fsdecode(fullname))
elif S_ISDIR(mode) and not S_ISLNK(mode):
push(fullname)
def _check_type(self, path):
"""Check the path
"""
s = os.stat(path)
if stat.S_ISDIR(s.st_mode) or stat.S_ISREG(s.st_mode):
return path
raise ValueError("Invalid Charm at % %s" % (
path, "Invalid file type for a charm"))
def isdir(self):
try:
with self('r'):
return stat.S_ISDIR(self.stat().st_mode)
except IOError:
return False
def findall(dir = os.curdir):
"""Find all files under 'dir' and return the list of full filenames
(relative to 'dir').
"""
from stat import ST_MODE, S_ISREG, S_ISDIR, S_ISLNK
list = []
stack = [dir]
pop = stack.pop
push = stack.append
while stack:
dir = pop()
names = os.listdir(dir)
for name in names:
if dir != os.curdir: # avoid the dreaded "./" syndrome
fullname = os.path.join(dir, name)
else:
fullname = name
# Avoid excess stat calls -- just one will do, thank you!
stat = os.stat(fullname)
mode = stat[ST_MODE]
if S_ISREG(mode):
list.append(fullname)
elif S_ISDIR(mode) and not S_ISLNK(mode):
push(fullname)
return list
def walk(top, func, arg):
"""Directory tree walk with callback function.
For each directory in the directory tree rooted at top (including top
itself, but excluding '.' and '..'), call func(arg, dirname, fnames).
dirname is the name of the directory, and fnames a list of the names of
the files and subdirectories in dirname (excluding '.' and '..'). func
may modify the fnames list in-place (e.g. via del or slice assignment),
and walk will only recurse into the subdirectories whose names remain in
fnames; this can be used to implement a filter, or to impose a specific
order of visiting. No semantics are defined for, or required of, arg,
beyond that arg is always passed to func. It can be used, e.g., to pass
a filename pattern, or a mutable object designed to accumulate
statistics. Passing None for arg is common."""
warnings.warnpy3k("In 3.x, os.path.walk is removed in favor of os.walk.",
stacklevel=2)
try:
names = os.listdir(top)
except os.error:
return
func(arg, top, names)
for name in names:
name = join(top, name)
try:
st = os.lstat(name)
except os.error:
continue
if stat.S_ISDIR(st.st_mode):
walk(name, func, arg)
# Expand paths beginning with '~' or '~user'.
# '~' means $HOME; '~user' means that user's home directory.
# If the path doesn't begin with '~', or if the user or $HOME is unknown,
# the path is returned unchanged (leaving error reporting to whatever
# function is called with the expanded path as argument).
# See also module 'glob' for expansion of *, ? and [...] in pathnames.
# (A function should also be defined to do full *sh-style environment
# variable expansion.)
def isdir(s):
"""Return true if the pathname refers to an existing directory."""
try:
st = os.stat(s)
except os.error:
return False
return stat.S_ISDIR(st.st_mode)
def phase2(self): # Distinguish files, directories, funnies
self.common_dirs = []
self.common_files = []
self.common_funny = []
for x in self.common:
a_path = os.path.join(self.left, x)
b_path = os.path.join(self.right, x)
ok = 1
try:
a_stat = os.stat(a_path)
except os.error, why:
# print 'Can\'t stat', a_path, ':', why[1]
ok = 0
try:
b_stat = os.stat(b_path)
except os.error, why:
# print 'Can\'t stat', b_path, ':', why[1]
ok = 0
if ok:
a_type = stat.S_IFMT(a_stat.st_mode)
b_type = stat.S_IFMT(b_stat.st_mode)
if a_type != b_type:
self.common_funny.append(x)
elif stat.S_ISDIR(a_type):
self.common_dirs.append(x)
elif stat.S_ISREG(a_type):
self.common_files.append(x)
else:
self.common_funny.append(x)
else:
self.common_funny.append(x)
def __hashDir(self, prefix, path=b''):
entries = []
try:
dirEntries = os.listdir(os.path.join(prefix, path if path else b'.'))
except OSError as e:
logging.getLogger(__name__).warning("Cannot list directory: %s", str(e))
dirEntries = []
for f in dirEntries:
e = os.path.join(path, f)
try:
s = os.lstat(os.path.join(prefix, e))
if stat.S_ISDIR(s.st_mode):
# skip useless directories
if f in self.__ignoreDirs: continue
# add training '/' for directores for correct sorting
f = f + os.fsencode(os.path.sep)
else:
# skip useless files
if f in DirHasher.IGNORE_FILES: continue
entries.append((e, f, s))
except OSError as err:
logging.getLogger(__name__).warning("Cannot stat '%s': %s", e, str(err))
entries = sorted(entries, key=lambda x: x[1])
dirList = [
(struct.pack("=L", s.st_mode) + self.__hashEntry(prefix, e, s) + f)
for (e, f, s) in entries
]
dirBlob = b"".join(dirList)
m = hashlib.sha1()
m.update(dirBlob)
return m.digest()
def findall(self):
"""Find all files under the base and set ``allfiles`` to the absolute
pathnames of files found.
"""
from stat import S_ISREG, S_ISDIR, S_ISLNK
self.allfiles = allfiles = []
root = self.base
stack = [root]
pop = stack.pop
push = stack.append
while stack:
root = pop()
names = os.listdir(root)
for name in names:
fullname = os.path.join(root, name)
# Avoid excess stat calls -- just one will do, thank you!
stat = os.stat(fullname)
mode = stat.st_mode
if S_ISREG(mode):
allfiles.append(fsdecode(fullname))
elif S_ISDIR(mode) and not S_ISLNK(mode):
push(fullname)
def _get_default_cache_dir(self):
def _unsafe_dir():
raise RuntimeError('Cannot determine safe temp directory. You '
'need to explicitly provide one.')
tmpdir = tempfile.gettempdir()
# On windows the temporary directory is used specific unless
# explicitly forced otherwise. We can just use that.
if os.name == 'nt':
return tmpdir
if not hasattr(os, 'getuid'):
_unsafe_dir()
dirname = '_jinja2-cache-%d' % os.getuid()
actual_dir = os.path.join(tmpdir, dirname)
try:
os.mkdir(actual_dir, stat.S_IRWXU)
except OSError as e:
if e.errno != errno.EEXIST:
raise
try:
os.chmod(actual_dir, stat.S_IRWXU)
actual_dir_stat = os.lstat(actual_dir)
if actual_dir_stat.st_uid != os.getuid() \
or not stat.S_ISDIR(actual_dir_stat.st_mode) \
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
_unsafe_dir()
except OSError as e:
if e.errno != errno.EEXIST:
raise
actual_dir_stat = os.lstat(actual_dir)
if actual_dir_stat.st_uid != os.getuid() \
or not stat.S_ISDIR(actual_dir_stat.st_mode) \
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
_unsafe_dir()
return actual_dir
def rmtree(path, ignore_errors=False, onerror=auto_chmod):
"""Recursively delete a directory tree.
This code is taken from the Python 2.4 version of 'shutil', because
the 2.3 version doesn't really work right.
"""
if ignore_errors:
def onerror(*args):
pass
elif onerror is None:
def onerror(*args):
raise
names = []
try:
names = os.listdir(path)
except os.error:
onerror(os.listdir, path, sys.exc_info())
for name in names:
fullname = os.path.join(path, name)
try:
mode = os.lstat(fullname).st_mode
except os.error:
mode = 0
if stat.S_ISDIR(mode):
rmtree(fullname, ignore_errors, onerror)
else:
try:
os.remove(fullname)
except os.error:
onerror(os.remove, fullname, sys.exc_info())
try:
os.rmdir(path)
except os.error:
onerror(os.rmdir, path, sys.exc_info())
def findall(self):
"""Find all files under the base and set ``allfiles`` to the absolute
pathnames of files found.
"""
from stat import S_ISREG, S_ISDIR, S_ISLNK
self.allfiles = allfiles = []
root = self.base
stack = [root]
pop = stack.pop
push = stack.append
while stack:
root = pop()
names = os.listdir(root)
for name in names:
fullname = os.path.join(root, name)
# Avoid excess stat calls -- just one will do, thank you!
stat = os.stat(fullname)
mode = stat.st_mode
if S_ISREG(mode):
allfiles.append(fsdecode(fullname))
elif S_ISDIR(mode) and not S_ISLNK(mode):
push(fullname)
def h_file(fname):
"""
Compute a hash value for a file by using md5. This method may be replaced by
a faster version if necessary. The following uses the file size and the timestamp value::
import stat
from waflib import Utils
def h_file(fname):
st = os.stat(fname)
if stat.S_ISDIR(st[stat.ST_MODE]): raise IOError('not a file')
m = Utils.md5()
m.update(str(st.st_mtime))
m.update(str(st.st_size))
m.update(fname)
return m.digest()
Utils.h_file = h_file
:type fname: string
:param fname: path to the file to hash
:return: hash of the file contents
"""
f = open(fname, 'rb')
m = md5()
try:
while fname:
fname = f.read(200000)
m.update(fname)
finally:
f.close()
return m.digest()
def h_file(filename):
"""now folders can have a signature too"""
st = os.stat(filename)
if stat.S_ISDIR(st[stat.ST_MODE]):
return Utils.md5(filename).digest()
m = Utils.md5()
m.update(str(st.st_mtime))
m.update(str(st.st_size))
m.update(filename)
return m.digest()
def h_file(fname):
"""
Compute a hash value for a file by using md5. This method may be replaced by
a faster version if necessary. The following uses the file size and the timestamp value::
import stat
from waflib import Utils
def h_file(fname):
st = os.stat(fname)
if stat.S_ISDIR(st[stat.ST_MODE]): raise IOError('not a file')
m = Utils.md5()
m.update(str(st.st_mtime))
m.update(str(st.st_size))
m.update(fname)
return m.digest()
Utils.h_file = h_file
:type fname: string
:param fname: path to the file to hash
:return: hash of the file contents
"""
f = open(fname, 'rb')
m = md5()
try:
while fname:
fname = f.read(200000)
m.update(fname)
finally:
f.close()
return m.digest()
def h_file(filename):
"""now folders can have a signature too"""
st = os.stat(filename)
if stat.S_ISDIR(st[stat.ST_MODE]):
return Utils.md5(filename).digest()
m = Utils.md5()
m.update(str(st.st_mtime))
m.update(str(st.st_size))
m.update(filename)
return m.digest()
def _remote_path_isdir(sftp, remote_path):
if _remote_path_exists(sftp, remote_path):
mode = sftp.stat(remote_path).st_mode
if stat.S_ISDIR(mode) and not stat.S_ISLNK(mode):
return True
return False
def findall(self):
"""Find all files under the base and set ``allfiles`` to the absolute
pathnames of files found.
"""
from stat import S_ISREG, S_ISDIR, S_ISLNK
self.allfiles = allfiles = []
root = self.base
stack = [root]
pop = stack.pop
push = stack.append
while stack:
root = pop()
names = os.listdir(root)
for name in names:
fullname = os.path.join(root, name)
# Avoid excess stat calls -- just one will do, thank you!
stat = os.stat(fullname)
mode = stat.st_mode
if S_ISREG(mode):
allfiles.append(fsdecode(fullname))
elif S_ISDIR(mode) and not S_ISLNK(mode):
push(fullname)
def convert(self, value, param, ctx):
rv = value
is_dash = self.file_okay and self.allow_dash and rv in (b'-', '-')
if not is_dash:
if self.resolve_path:
rv = os.path.realpath(rv)
try:
st = os.stat(rv)
except OSError:
if not self.exists:
return self.coerce_path_result(rv)
self.fail('%s "%s" does not exist.' % (
self.path_type,
filename_to_ui(value)
), param, ctx)
if not self.file_okay and stat.S_ISREG(st.st_mode):
self.fail('%s "%s" is a file.' % (
self.path_type,
filename_to_ui(value)
), param, ctx)
if not self.dir_okay and stat.S_ISDIR(st.st_mode):
self.fail('%s "%s" is a directory.' % (
self.path_type,
filename_to_ui(value)
), param, ctx)
if self.writable and not os.access(value, os.W_OK):
self.fail('%s "%s" is not writable.' % (
self.path_type,
filename_to_ui(value)
), param, ctx)
if self.readable and not os.access(value, os.R_OK):
self.fail('%s "%s" is not readable.' % (
self.path_type,
filename_to_ui(value)
), param, ctx)
return self.coerce_path_result(rv)