def check_is_readonly(self, fname):
"""
Return `True` if @fname is read-only on the filesystem.
@fname
Path to a file.
"""
if not fname:
return
try:
mode = os.stat(fname)
read_only = (stat.S_IMODE(mode.st_mode) & stat.S_IWUSR != stat.S_IWUSR)
except FileNotFoundError:
return
return read_only
python类S_IWUSR的实例源码
def commit(self):
"""
Writes the configuration to disk (into '$RootDir/EXAConf')
"""
self.config.write()
# reload in order to force type conversion
# --> parameters added as lists during runtime are converted back to strings (as if they have been added manually)
self.config.reload()
# modify permissions
try:
os.chmod(self.conf_path, stat.S_IRUSR | stat.S_IWUSR)
except OSError as e:
raise EXAConfError("Failed to change permissions for '%s': %s" % (self.conf_path, e))
#}}}
#{{{ Platform supported
def rmtree(path):
"""Remove the given recursively.
:note: we use shutil rmtree but adjust its behaviour to see whether files that
couldn't be deleted are read-only. Windows will not remove them in that case"""
def onerror(func, path, exc_info):
# Is the error an access error ?
os.chmod(path, stat.S_IWUSR)
try:
func(path) # Will scream if still not possible to delete.
except Exception as ex:
if HIDE_WINDOWS_KNOWN_ERRORS:
raise SkipTest("FIXME: fails with: PermissionError\n %s", ex)
else:
raise
return shutil.rmtree(path, False, onerror)
def main(args):
vm_name = args['<vm_name>']
# get domain from libvirt
con = libvirt.open('qemu:///system')
domain = con.lookupByName(vm_name)
path = os.path.join(os.getcwd(), '{}.raw'.format(vm_name))
with open(path, 'w') as f:
# chmod to be r/w by everyone
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR |
stat.S_IRGRP | stat.S_IWGRP |
stat.S_IROTH | stat.S_IWOTH)
# take a ram dump
flags = libvirt.VIR_DUMP_MEMORY_ONLY
dumpformat = libvirt.VIR_DOMAIN_CORE_DUMP_FORMAT_RAW
domain.coreDumpWithFormat(path, dumpformat, flags)
def test_read_only_bytecode(self):
# When bytecode is read-only but should be rewritten, fail silently.
with source_util.create_modules('_temp') as mapping:
# Create bytecode that will need to be re-created.
py_compile.compile(mapping['_temp'])
bytecode_path = imp.cache_from_source(mapping['_temp'])
with open(bytecode_path, 'r+b') as bytecode_file:
bytecode_file.seek(0)
bytecode_file.write(b'\x00\x00\x00\x00')
# Make the bytecode read-only.
os.chmod(bytecode_path,
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
try:
# Should not raise IOError!
self.import_(mapping['_temp'], '_temp')
finally:
# Make writable for eventual clean-up.
os.chmod(bytecode_path, stat.S_IWUSR)
def set_own_perm(usr, dir_list):
uid = pwd.getpwnam(usr).pw_uid
gid = grp.getgrnam(usr).gr_gid
perm_mask_rw = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP
perm_mask_rwx = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
for cdir in dir_list:
os.chown(cdir, uid, gid)
os.chmod(cdir, perm_mask_rwx)
for croot, sub_dirs, cfiles in os.walk(cdir):
for fs_name in sub_dirs:
os.chown(os.path.join(croot, fs_name), uid, gid)
os.chmod(os.path.join(croot, fs_name), perm_mask_rwx)
for fs_name in cfiles:
os.chown(os.path.join(croot, fs_name), uid, gid)
os.chmod(os.path.join(croot, fs_name), perm_mask_rw)
def set_own_perm(usr, dir_list):
uid = pwd.getpwnam(usr).pw_uid
gid = grp.getgrnam(usr).gr_gid
perm_mask_rw = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP
perm_mask_rwx = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
for cdir in dir_list:
os.chown(cdir, uid, gid)
os.chmod(cdir, perm_mask_rwx)
for croot, sub_dirs, cfiles in os.walk(cdir):
for fs_name in sub_dirs:
os.chown(os.path.join(croot, fs_name), uid, gid)
os.chmod(os.path.join(croot, fs_name), perm_mask_rwx)
for fs_name in cfiles:
os.chown(os.path.join(croot, fs_name), uid, gid)
os.chmod(os.path.join(croot, fs_name), perm_mask_rw)
def write(self):
""" Writes configuration file """
# wait for file to unlock. Timeout after 5 seconds
for _ in range(5):
if not self._writing:
self._writing = True
break
time.sleep(1)
else:
_LOGGER.error('Could not write to configuration file. It is busy.')
return
# dump JSON to config file and unlock
encoded = self.encode()
json.dump(encoded, open(self._filetmp, 'w'), sort_keys=True, indent=4,
separators=(',', ': '))
os.chmod(self._filetmp, stat.S_IRUSR | stat.S_IWUSR)
try:
shutil.move(self._filetmp, self._file)
_LOGGER.debug('Config file succesfully updated.')
except shutil.Error as e:
_LOGGER.error('Failed to move temp config file to original error: ' + str(e))
self._writing = False
_LOGGER.debug('Wrote configuration file')
def merge_blanksky(infiles, outfile, clobber=False):
"""
Merge multiple blanksky files (e.g., 4 blanksky files for ACIS-I)
into a single file.
"""
if len(infiles) == 1:
logger.info("Only one blanksky file, no need to merge.")
if os.path.exists(outfile) and (not clobber):
raise OSError("File already exists: %s" % outfile)
shutil.move(infiles[0], outfile)
else:
logger.info("Merge multiple blanksky files ...")
clobber = "yes" if clobber else "no"
subprocess.check_call(["punlearn", "dmmerge"])
subprocess.check_call([
"dmmerge", "infile=%s" % ",".join(infiles),
"outfile=%s" % outfile, "clobber=%s" % clobber
])
write_keyword(outfile, keyword="DETNAM", value="ACIS-0123")
for f in infiles:
os.remove(f)
# Add write permission to the background file
st = os.stat(outfile)
os.chmod(outfile, st.st_mode | stat.S_IWUSR)
def _init_client():
global _configuration_directory
def prompt(msg):
sys.stdout.write('%s: ' % msg)
response = sys.stdin.readline()
return response.rstrip('\r\n')
client_id = prompt('Client Id')
client_secret = prompt('Client Secret')
redirect_uri = prompt('Redirect Uri')
if not os.path.exists(_configuration_directory):
os.mkdir(_configuration_directory, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
client = _CommandClient(client_id, client_secret, redirect_uri)
_init_oauth_process(client)
# save first time, meaning configuration works
client.save_configuration()
return client
def test_no_read_directory(self):
# Issue #16730
tempdir = tempfile.TemporaryDirectory()
original_mode = os.stat(tempdir.name).st_mode
def cleanup(tempdir):
"""Cleanup function for the temporary directory.
Since we muck with the permissions, we want to set them back to
their original values to make sure the directory can be properly
cleaned up.
"""
os.chmod(tempdir.name, original_mode)
# If this is not explicitly called then the __del__ method is used,
# but since already mucking around might as well explicitly clean
# up.
tempdir.__exit__(None, None, None)
self.addCleanup(cleanup, tempdir)
os.chmod(tempdir.name, stat.S_IWUSR | stat.S_IXUSR)
finder = self.get_finder(tempdir.name)
self.assertEqual((None, []), finder.find_loader('doesnotexist'))
def test_read_only_bytecode(self):
# When bytecode is read-only but should be rewritten, fail silently.
with source_util.create_modules('_temp') as mapping:
# Create bytecode that will need to be re-created.
py_compile.compile(mapping['_temp'])
bytecode_path = imp.cache_from_source(mapping['_temp'])
with open(bytecode_path, 'r+b') as bytecode_file:
bytecode_file.seek(0)
bytecode_file.write(b'\x00\x00\x00\x00')
# Make the bytecode read-only.
os.chmod(bytecode_path,
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
try:
# Should not raise IOError!
self.import_(mapping['_temp'], '_temp')
finally:
# Make writable for eventual clean-up.
os.chmod(bytecode_path, stat.S_IWUSR)
def secure_file(fname, remove_existing=False):
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL # Refer to "man 2 open".
mode = stat.S_IRUSR | stat.S_IWUSR # This is 0o600 in octal and 384 in decimal.
if remove_existing:
# For security, remove file with potentially elevated mode
try:
os.remove(fname)
except OSError:
pass
# Open file descriptor
umask_original = os.umask(0)
try:
fdesc = os.open(fname, flags, mode)
finally:
os.umask(umask_original)
return fdesc
def remove_safe_file(self, path):
if not SideBarSelection().isNone(path):
try:
os.remove(path)
except:
try:
if not os.access(path, os.W_OK):
import stat
os.chmod(path, stat.S_IWUSR)
os.remove(path)
except:
# raise error in case we were unable to delete.
if os.path.exists(path):
print("Unable to remove file:\n"+path)
os.remove(path)
else:
print('path is none')
print(path)
def _create_eae_zipfile(self, zip_file_name, main_file_path, data_files=None):
to_zip = []
if data_files is None:
data_files = []
# Handle main script
to_zip.append(main_file_path)
# Prepare the zip file
zip_path = "/tmp/" + zip_file_name
zipf = zipfile.ZipFile(zip_path, mode='w', compression=zipfile.ZIP_DEFLATED, allowZip64=True)
for f in to_zip:
zipf.write(f)
zipf.close()
# Handle other files & dirs
for f in data_files:
zipCommand = "zip -r -u -0 " + zip_path + " " + f
call([zipCommand], shell=True)
# Chmod 666 the zip file so it can be accessed
os.chmod(zip_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH)
return zip_path
def save_app(manifest, container_dir, app_json=STATE_JSON):
"""Saves app manifest and freezes to object."""
# Save the manifest with allocated vip and ports in the state
state_file = os.path.join(container_dir, app_json)
fs.write_safe(
state_file,
lambda f: json.dump(manifest, f)
)
# chmod for the file to be world readable.
if os.name == 'posix':
os.chmod(
state_file,
stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
)
# Freeze the app data into a namedtuple object
return utils.to_obj(manifest)
def test_no_read_directory(self):
# Issue #16730
tempdir = tempfile.TemporaryDirectory()
original_mode = os.stat(tempdir.name).st_mode
def cleanup(tempdir):
"""Cleanup function for the temporary directory.
Since we muck with the permissions, we want to set them back to
their original values to make sure the directory can be properly
cleaned up.
"""
os.chmod(tempdir.name, original_mode)
# If this is not explicitly called then the __del__ method is used,
# but since already mucking around might as well explicitly clean
# up.
tempdir.__exit__(None, None, None)
self.addCleanup(cleanup, tempdir)
os.chmod(tempdir.name, stat.S_IWUSR | stat.S_IXUSR)
finder = self.get_finder(tempdir.name)
found = self._find(finder, 'doesnotexist')
self.assertEqual(found, self.NOT_FOUND)
def test_read_only_bytecode(self):
# When bytecode is read-only but should be rewritten, fail silently.
with source_util.create_modules('_temp') as mapping:
# Create bytecode that will need to be re-created.
py_compile.compile(mapping['_temp'])
bytecode_path = self.util.cache_from_source(mapping['_temp'])
with open(bytecode_path, 'r+b') as bytecode_file:
bytecode_file.seek(0)
bytecode_file.write(b'\x00\x00\x00\x00')
# Make the bytecode read-only.
os.chmod(bytecode_path,
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
try:
# Should not raise OSError!
self.import_(mapping['_temp'], '_temp')
finally:
# Make writable for eventual clean-up.
os.chmod(bytecode_path, stat.S_IWUSR)
def test_mknod(self):
# Test using mknod() to create a FIFO (the only use specified
# by POSIX).
support.unlink(support.TESTFN)
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
try:
posix.mknod(support.TESTFN, mode, 0)
except OSError as e:
# Some old systems don't allow unprivileged users to use
# mknod(), or only support creating device nodes.
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
else:
self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
# Keyword arguments are also supported
support.unlink(support.TESTFN)
try:
posix.mknod(path=support.TESTFN, mode=mode, device=0,
dir_fd=None)
except OSError as e:
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
def test_mknod_dir_fd(self):
# Test using mknodat() to create a FIFO (the only use specified
# by POSIX).
support.unlink(support.TESTFN)
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
except OSError as e:
# Some old systems don't allow unprivileged users to use
# mknod(), or only support creating device nodes.
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
else:
self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
finally:
posix.close(f)
def _get_netrc(cls):
# Buffer the '.netrc' path. Use an empty file if it doesn't exist.
path = cls.get_netrc_path()
content = ''
if os.path.exists(path):
st = os.stat(path)
if st.st_mode & (stat.S_IRWXG | stat.S_IRWXO):
print >> sys.stderr, (
'WARNING: netrc file %s cannot be used because its file '
'permissions are insecure. netrc file permissions should be '
'600.' % path)
with open(path) as fd:
content = fd.read()
# Load the '.netrc' file. We strip comments from it because processing them
# can trigger a bug in Windows. See crbug.com/664664.
content = '\n'.join(l for l in content.splitlines()
if l.strip() and not l.strip().startswith('#'))
with tempdir() as tdir:
netrc_path = os.path.join(tdir, 'netrc')
with open(netrc_path, 'w') as fd:
fd.write(content)
os.chmod(netrc_path, (stat.S_IRUSR | stat.S_IWUSR))
return cls._get_netrc_from_path(netrc_path)
def create_boilerplate():
"""
Create kibitzr.yml and kibitzr-creds.yml in current directory
if they do not exist.
"""
if not os.path.exists('kibitzr.yml'):
with open('kibitzr.yml', 'wt') as fp:
logger.info("Saving sample check in kibitzr.yml")
fp.write(KIBITZR_YML)
else:
logger.info("kibitzr.yml already exists. Skipping")
if not os.path.exists('kibitzr-creds.yml'):
with open('kibitzr-creds.yml', 'wt') as fp:
logger.info("Creating kibitzr-creds.yml")
fp.write(KIBITZR_CREDS_YML)
os.chmod('kibitzr-creds.yml', stat.S_IRUSR | stat.S_IWUSR)
else:
logger.info("kibitzr-creds.yml already exists. Skipping")
def _octal_to_perm(octal):
perms = list("-" * 9)
if octal & stat.S_IRUSR:
perms[0] = "r"
if octal & stat.S_IWUSR:
perms[1] = "w"
if octal & stat.S_IXUSR:
perms[2] = "x"
if octal & stat.S_IRGRP:
perms[3] = "r"
if octal & stat.S_IWGRP:
perms[4] = "w"
if octal & stat.S_IXGRP:
perms[5] = "x"
if octal & stat.S_IROTH:
perms[6] = "r"
if octal & stat.S_IWOTH:
perms[7] = "w"
if octal & stat.S_IXOTH:
perms[8] = "x"
return "".join(perms)
def _get_netrc(cls):
# Buffer the '.netrc' path. Use an empty file if it doesn't exist.
path = cls.get_netrc_path()
content = ''
if os.path.exists(path):
st = os.stat(path)
if st.st_mode & (stat.S_IRWXG | stat.S_IRWXO):
print >> sys.stderr, (
'WARNING: netrc file %s cannot be used because its file '
'permissions are insecure. netrc file permissions should be '
'600.' % path)
with open(path) as fd:
content = fd.read()
# Load the '.netrc' file. We strip comments from it because processing them
# can trigger a bug in Windows. See crbug.com/664664.
content = '\n'.join(l for l in content.splitlines()
if l.strip() and not l.strip().startswith('#'))
with tempdir() as tdir:
netrc_path = os.path.join(tdir, 'netrc')
with open(netrc_path, 'w') as fd:
fd.write(content)
os.chmod(netrc_path, (stat.S_IRUSR | stat.S_IWUSR))
return cls._get_netrc_from_path(netrc_path)
def rmtree_remove_readonly_files(func, path, exc_info):
"""
Error handler for ``shutil.rmtree``.
If the error is due to an access error (read only file)
it attempts to add write permission and then retries.
If the error is for another reason it re-raises the error.
Usage : ``shutil.rmtree(path, onerror=rmtree_remove_readonly_files)``
This code was copied from
http://stackoverflow.com/questions/2656322/shutil-rmtree-fails-on-windows-with-access-is-denied
"""
import stat
if not os.access(path, os.W_OK):
# Is the error an access error ?
os.chmod(path, stat.S_IWUSR)
func(path)
else:
raise
def check_is_readonly(self, fname):
'''
Returns `True` if @fname is read-only on the filesystem.
@fname
Path to a file.
'''
if not fname:
return
try:
mode = os.stat(fname)
read_only = (stat.S_IMODE(mode.st_mode) & stat.S_IWUSR != stat.S_IWUSR)
except FileNotFoundError:
return
return read_only
def test_no_read_directory(self):
# Issue #16730
tempdir = tempfile.TemporaryDirectory()
original_mode = os.stat(tempdir.name).st_mode
def cleanup(tempdir):
"""Cleanup function for the temporary directory.
Since we muck with the permissions, we want to set them back to
their original values to make sure the directory can be properly
cleaned up.
"""
os.chmod(tempdir.name, original_mode)
# If this is not explicitly called then the __del__ method is used,
# but since already mucking around might as well explicitly clean
# up.
tempdir.__exit__(None, None, None)
self.addCleanup(cleanup, tempdir)
os.chmod(tempdir.name, stat.S_IWUSR | stat.S_IXUSR)
finder = self.get_finder(tempdir.name)
found = self._find(finder, 'doesnotexist')
self.assertEqual(found, self.NOT_FOUND)
def test_read_only_bytecode(self):
# When bytecode is read-only but should be rewritten, fail silently.
with source_util.create_modules('_temp') as mapping:
# Create bytecode that will need to be re-created.
py_compile.compile(mapping['_temp'])
bytecode_path = self.util.cache_from_source(mapping['_temp'])
with open(bytecode_path, 'r+b') as bytecode_file:
bytecode_file.seek(0)
bytecode_file.write(b'\x00\x00\x00\x00')
# Make the bytecode read-only.
os.chmod(bytecode_path,
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
try:
# Should not raise OSError!
self.import_(mapping['_temp'], '_temp')
finally:
# Make writable for eventual clean-up.
os.chmod(bytecode_path, stat.S_IWUSR)
def write(filename, data, undisclosed=False):
"""
Write JSON data to (owner only readable) file.
This will also create missing directories.
Args:
filename (str): file path
data (str): data to write
undisclosed (bool): whether data should be owner only readable
"""
makedirs(os.path.dirname(filename))
with open(filename, 'w') as fp:
if undisclosed:
os.chmod(filename, stat.S_IRUSR | stat.S_IWUSR)
json.dump(data, fp)
def open_for_writing(filename, permissions=None, mode="w", force=False):
if permissions is None:
permissions = stat.S_IRUSR | stat.S_IWUSR
if force and os.path.exists(filename):
os.unlink(filename)
umask_original = os.umask(0)
try:
fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_EXCL, permissions)
finally:
os.umask(umask_original)
# Open file handle and write to file
with os.fdopen(fd, mode) as f:
yield f