def copyfile(src, dst):
"""Copy data from src to dst"""
if _samefile(src, dst):
raise Error("`%s` and `%s` are the same file" % (src, dst))
for fn in [src, dst]:
try:
st = os.stat(fn)
except OSError:
# File most likely does not exist
pass
else:
# XXX What about other special files? (sockets, devices...)
if stat.S_ISFIFO(st.st_mode):
raise SpecialFileError("`%s` is a named pipe" % fn)
with open(src, 'rb') as fsrc:
with open(dst, 'wb') as fdst:
copyfileobj(fsrc, fdst)
python类stat()的实例源码
def get_latest_data_subdir(pattern=None, take=-1):
def get_date(f):
return os.stat(os.path.join(BASE_DATA_DIR, f)).st_mtime
try:
dirs = next(os.walk(BASE_DATA_DIR))[1]
except StopIteration:
return None
if pattern is not None:
dirs = (d for d in dirs if pattern in d)
dirs = list(sorted(dirs, key=get_date))
if len(dirs) == 0:
return None
return dirs[take]
def filemetadata(filename):
# type: (str) -> Optional[FileMeta]
p_filename = which(filename)
if p_filename is None:
return None
filename = p_filename
s = os.stat(filename)
if filename != sys.executable:
result = run_executable(filename, ['--version'])
versionstring = result.stdout
else:
# filename is the Python interpreter itself
versionstring = bytestr(sys.version)
return FileMeta(filename, s.st_size, s.st_mtime, filesha(filename), versionstring)
# ----------------------------------------------------------------------
def handle(self, *args, **options):
commit_msg_path = os.path.join(self.HOOK_PATH, 'commit-msg')
hook_exists = os.path.exists(commit_msg_path)
if hook_exists:
with open(commit_msg_path, 'r') as fp:
hook_content = fp.read()
else:
hook_content = '#!/usr/bin/env bash\n\n'
if 'ZERODOWNTIME_COMMIT_MSG_HOOK' not in hook_content:
hook_content += COMMIT_MSG_HOOK
with open(commit_msg_path, 'w') as fp:
fp.write(hook_content)
st = os.stat(commit_msg_path)
os.chmod(commit_msg_path, st.st_mode | stat.S_IEXEC)
def _clean_upgrade(binary_ok, binary_path, path, temp_path):
if binary_ok:
import stat
# save the permissions from the current binary
old_stat = os.stat(binary_path)
# rename the current binary in order to replace it with the latest
os.rename(binary_path, path + "/old")
os.rename(temp_path, binary_path)
# set the same permissions that had the previous binary
os.chmod(binary_path, old_stat.st_mode | stat.S_IEXEC)
# delete the old binary
os.remove(path + "/old")
print("mongoaudit updated, restarting...")
os.execl(binary_path, binary_path, *sys.argv)
else:
os.remove(temp_path)
print("couldn't download the latest binary")
def handle_ref_error(self):
try:
if os.stat(self.ref).st_size>0:
with open(self.ref) as f:
for i in range(2):
line=f.next().strip()
if i == 0 and line[0]!='>':
return QtGui.QMessageBox.question(self, 'Error !', 'Please check your input reference !',
QtGui.QMessageBox.Ok)
if i == 1 and len(re.findall("[^ATGCN]", line.upper()))>0:
return QtGui.QMessageBox.question(self, 'Error !', 'Please check your input reference !',
QtGui.QMessageBox.Ok)
else:
return QtGui.QMessageBox.question(self, 'Warning !', 'The selected reference file is empty, please check !',
QtGui.QMessageBox.Ok)
except:
return QtGui.QMessageBox.question(self, 'Error !', 'Please input a reference file !',
QtGui.QMessageBox.Ok)
def find_INSTALL(Makefile, flags):
'''
See the doc-string for find_prefix as well.
Sets Makefile['INSTALL'] if needed.
$(INSTALL) is normally "install", but on Solares it needs to be
"/usr/ucb/install".
'''
if 'INSTALL' not in Makefile:
trywith = [
'/usr/ucb/install'
]
for install in trywith:
try:
os.stat(install)
except:
continue
if flags['v']:
sys.stdout.write('Using "' + install + '" as `install`\n.')
Makefile['INSTALL'] = install
return False
Makefile['INSTALL'] = 'install'
return False
def __hashEntry(self, prefix, entry, s):
if stat.S_ISREG(s.st_mode):
digest = self.__index.check(prefix, entry, s, hashFile)
elif stat.S_ISDIR(s.st_mode):
digest = self.__hashDir(prefix, entry)
elif stat.S_ISLNK(s.st_mode):
digest = self.__index.check(prefix, entry, s, DirHasher.__hashLink)
elif stat.S_ISBLK(s.st_mode) or stat.S_ISCHR(s.st_mode):
digest = struct.pack("<L", s.st_rdev)
elif stat.S_ISFIFO(s.st_mode):
digest = b''
else:
digest = b''
logging.getLogger(__name__).warning("Unknown file: %s", entry)
return digest
def download(url, filename):
"""Download .su3 file, return True on success"""
USER_AGENT = "Wget/1.11.4"
url = "{}i2pseeds.su3".format(url)
req = urllib.request.Request(url, headers={"User-Agent": USER_AGENT})
try:
with urllib.request.urlopen(req) as resp:
with open(filename, 'wb') as f:
f.write(resp.read())
if os.stat(filename).st_size > 0:
return True
else:
return False
except URLError as e:
return False
def assert_file_contents_equal(a, b):
'''
Assert that two files have the same size and hash.
'''
def generate_error_message(a, b):
'''
This creates the error message for the assertion error
'''
size_a = os.stat(a).st_size
size_b = os.stat(b).st_size
if size_a == size_b:
return "Files have the same size but different contents"
else:
return "Files have different sizes: a:%d b: %d" % (size_a, size_b)
assert file_digest(a) == file_digest(b), generate_error_message(a, b)
def assert_file_contents_equal(a, b):
'''
Assert that two files have the same size and hash.
'''
def generate_error_message(a, b):
'''
This creates the error message for the assertion error
'''
size_a = os.stat(a).st_size
size_b = os.stat(b).st_size
if size_a == size_b:
return "Files have the same size but different contents"
else:
return "Files have different sizes: a:%d b: %d" % (size_a, size_b)
assert file_digest(a) == file_digest(b), generate_error_message(a, b)
def cached_data_age(self, name):
"""Return age of data cached at `name` in seconds or 0 if
cache doesn't exist
:param name: name of datastore
:type name: ``unicode``
:returns: age of datastore in seconds
:rtype: ``int``
"""
cache_path = self.cachefile('%s.%s' % (name, self.cache_serializer))
if not os.path.exists(cache_path):
return 0
return time.time() - os.stat(cache_path).st_mtime
def cached_data_age(self, name):
"""Return age of data cached at `name` in seconds or 0 if
cache doesn't exist
:param name: name of datastore
:type name: ``unicode``
:returns: age of datastore in seconds
:rtype: ``int``
"""
cache_path = self.cachefile('%s.%s' % (name, self.cache_serializer))
if not os.path.exists(cache_path):
return 0
return time.time() - os.stat(cache_path).st_mtime
def _warn_unsafe_extraction_path(path):
"""
If the default extraction path is overridden and set to an insecure
location, such as /tmp, it opens up an opportunity for an attacker to
replace an extracted file with an unauthorized payload. Warn the user
if a known insecure location is used.
See Distribute #375 for more details.
"""
if os.name == 'nt' and not path.startswith(os.environ['windir']):
# On Windows, permissions are generally restrictive by default
# and temp directories are not writable by other users, so
# bypass the warning.
return
mode = os.stat(path).st_mode
if mode & stat.S_IWOTH or mode & stat.S_IWGRP:
msg = ("%s is writable by group/others and vulnerable to attack "
"when "
"used with get_resource_filename. Consider a more secure "
"location (set with .set_extraction_path or the "
"PYTHON_EGG_CACHE environment variable)." % path)
warnings.warn(msg, UserWarning)
def seek(self, offset, whence=os.SEEK_SET):
"""Similar to 'seek' of file descriptor; works only for
regular files.
"""
if whence == os.SEEK_SET:
self._overlap.Offset = offset
elif whence == os.SEEK_CUR:
self._overlap.Offset += offset
else:
assert whence == os.SEEK_END
if isinstance(self._path, str):
sb = os.stat(self._path)
self._overlap.Offset = sb.st_size + offset
else:
self._overlap.Offset = offset
def seek(self, offset, whence=os.SEEK_SET):
"""Similar to 'seek' of file descriptor; works only for
regular files.
"""
if whence == os.SEEK_SET:
self._overlap.Offset = offset
elif whence == os.SEEK_CUR:
self._overlap.Offset += offset
else:
assert whence == os.SEEK_END
if isinstance(self._path, str):
sb = os.stat(self._path)
self._overlap.Offset = sb.st_size + offset
else:
self._overlap.Offset = offset
def is_block_device(path):
'''
Confirm device at path is a valid block device node.
:returns: boolean: True if path is a block device, False if not.
'''
if not os.path.exists(path):
return False
return S_ISBLK(os.stat(path).st_mode)
def maybe_download(filename, work_directory):
"""Download the data from Yann's website, unless it's already here."""
if not os.path.exists(work_directory):
os.mkdir(work_directory)
filepath = os.path.join(work_directory, filename)
if not os.path.exists(filepath):
filepath, _ = urllib.request.urlretrieve(SOURCE_URL + filename, filepath)
statinfo = os.stat(filepath)
print('Succesfully downloaded', filename, statinfo.st_size, 'bytes.')
return filepath
def maybe_download(filename, work_directory):
"""Download the data from Yann's website, unless it's already here."""
if not os.path.exists(work_directory):
os.mkdir(work_directory)
filepath = os.path.join(work_directory, filename)
if not os.path.exists(filepath):
filepath, _ = urllib.request.urlretrieve(SOURCE_URL + filename, filepath)
statinfo = os.stat(filepath)
print('Succesfully downloaded', filename, statinfo.st_size, 'bytes.')
return filepath