def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
python类W_OK的实例源码
def is_admin(user=lib.global_parameters['USER'], activeroot='/'):
""" Check if the specified user has administrative privileges on given system"""
platform = get_system_type_from_active_root(activeroot)
ip = get_address_from_active_root(activeroot)
if activeroot.startswith('/'):
if platform.startswith('linux'):
# on linux check if euid is 0
from pwd import getpwnam
if getpwnam(user).pw_uid == 0:
return True
return False
elif platform.startswith('win'):
from win32net import NetUserGetLocalGroups
return 'Administrators' in NetUserGetLocalGroups(ip, user) # should work on remote systems too
# on Windows only admins can write to C:\Windows\temp
#if os.access(os.path.join(os.environ.get('SystemRoot', 'C:\\Windows'), 'temp'), os.W_OK):
# return True
#return False
else:
log.warn('Cannot check root privileges, platform is not fully supported (%s).' % platform)
return False
else:
log.warn('Cannot check root privileges, remote system analysis (%s) not supported yet.' % activeroot)
return False
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def CheckOutputFile(filename):
"""Check that the given file does not exist and can be opened for writing.
Args:
filename: The name of the file.
Raises:
FileExistsError: if the given filename is not found
FileNotWritableError: if the given filename is not readable.
"""
full_path = os.path.abspath(filename)
if os.path.exists(full_path):
raise FileExistsError('%s: output file exists' % filename)
elif not os.access(os.path.dirname(full_path), os.W_OK):
raise FileNotWritableError(
'%s: not writable' % os.path.dirname(full_path))
def CheckOutputFile(filename):
"""Check that the given file does not exist and can be opened for writing.
Args:
filename: The name of the file.
Raises:
FileExistsError: if the given filename is not found
FileNotWritableError: if the given filename is not readable.
"""
full_path = os.path.abspath(filename)
if os.path.exists(full_path):
raise FileExistsError('%s: output file exists' % filename)
elif not os.access(os.path.dirname(full_path), os.W_OK):
raise FileNotWritableError(
'%s: not writable' % os.path.dirname(full_path))
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def ExecRecursiveMirror(self, source, dest):
"""Emulation of rm -rf out && cp -af in out."""
if os.path.exists(dest):
if os.path.isdir(dest):
def _on_error(fn, path, dummy_excinfo):
# The operation failed, possibly because the file is set to
# read-only. If that's why, make it writable and try the op again.
if not os.access(path, os.W_OK):
os.chmod(path, stat.S_IWRITE)
fn(path)
shutil.rmtree(dest, onerror=_on_error)
else:
if not os.access(dest, os.W_OK):
# Attempt to make the file writable before deleting it.
os.chmod(dest, stat.S_IWRITE)
os.unlink(dest)
if os.path.isdir(source):
shutil.copytree(source, dest)
else:
shutil.copy2(source, dest)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def export(self):
''' Export visible layers and layer groups to CoaSprite '''
if os.path.isfile(os.path.join(self.path, self.name)):
show_error_msg('ABORTING!\nDestination is not a folder.\n {path}/{name}'.format(path=self.path, name=self.name))
return
if not os.access(self.path, os.W_OK):
show_error_msg('ABORTING!\nDestination is not a writable.\n {path}'.format(path=self.path))
return
if os.path.isdir(os.path.join(self.path, self.name)):
show_error_msg('Destination exists, I may have overwritten something in {path}/{name}'.format(path=self.path, name=self.name))
self.mkdir()
# Loop through visible layers
self.img = self.original_img.duplicate()
self.img.undo_group_start()
for layer in self.img.layers:
if layer.visible:
name = '{name}.png'.format(name=layer.name)
pdb.gimp_image_set_active_layer(self.img, layer)
# Crop and the layer position
pdb.plug_in_autocrop_layer(self.img, layer)
z = 0 - pdb.gimp_image_get_item_position(self.img, layer)
if isinstance(layer, gimp.GroupLayer):
if len(layer.children) > 0:
self.sprites.append(self.export_sprite_sheet(layer, name, layer.offsets, z))
else:
self.sprites.append(self.export_sprite(layer, name, layer.offsets, z))
self.write_json()
self.img.undo_group_end()
pdb.gimp_image_delete(self.img)
def is_writable(self, path):
result = False
while not result:
if os.path.exists(path):
result = os.access(path, os.W_OK)
break
parent = os.path.dirname(path)
if parent == path:
break
path = parent
return result
def get_cache_base(suffix=None):
"""
Return the default base location for distlib caches. If the directory does
not exist, it is created. Use the suffix provided for the base directory,
and default to '.distlib' if it isn't provided.
On Windows, if LOCALAPPDATA is defined in the environment, then it is
assumed to be a directory, and will be the parent directory of the result.
On POSIX, and on Windows if LOCALAPPDATA is not defined, the user's home
directory - using os.expanduser('~') - will be the parent directory of
the result.
The result is just the directory '.distlib' in the parent directory as
determined above, or with the name specified with ``suffix``.
"""
if suffix is None:
suffix = '.distlib'
if os.name == 'nt' and 'LOCALAPPDATA' in os.environ:
result = os.path.expandvars('$localappdata')
else:
# Assume posix, or old Windows
result = os.path.expanduser('~')
# we use 'isdir' instead of 'exists', because we want to
# fail if there's a file with that name
if os.path.isdir(result):
usable = os.access(result, os.W_OK)
if not usable:
logger.warning('Directory exists but is not writable: %s', result)
else:
try:
os.makedirs(result)
usable = True
except OSError:
logger.warning('Unable to create %s', result, exc_info=True)
usable = False
if not usable:
result = tempfile.mkdtemp()
logger.warning('Default location unusable, using %s', result)
return os.path.join(result, suffix)
def _mkstemp_inner(dir, pre, suf, flags, output_type):
"""Code common to mkstemp, TemporaryFile, and NamedTemporaryFile."""
names = _get_candidate_names()
if output_type is bytes:
names = map(_os.fsencode, names)
for seq in range(TMP_MAX):
name = next(names)
file = _os.path.join(dir, pre + name + suf)
try:
fd = _os.open(file, flags, 0o600)
except FileExistsError:
continue # try again
except PermissionError:
# This exception is thrown when a directory with the chosen name
# already exists on windows.
if (_os.name == 'nt' and _os.path.isdir(dir) and
_os.access(dir, _os.W_OK)):
continue
else:
raise
return (fd, _os.path.abspath(file))
raise FileExistsError(_errno.EEXIST,
"No usable temporary file name found")
# User visible interfaces.
def test_get_default_keystore_path(self):
"""
Checks we the default keystore directory exists or create it.
Verify the path is correct and that we have read/write access to it.
"""
keystore_dir = PyWalib.get_default_keystore_path()
if not os.path.exists(keystore_dir):
os.makedirs(keystore_dir)
# checks path correctness
self.assertTrue(keystore_dir.endswith(".config/pyethapp/keystore/"))
# checks read/write access
self.assertEqual(os.access(keystore_dir, os.R_OK), True)
self.assertEqual(os.access(keystore_dir, os.W_OK), True)
def is_writable(self, path):
result = False
while not result:
if os.path.exists(path):
result = os.access(path, os.W_OK)
break
parent = os.path.dirname(path)
if parent == path:
break
path = parent
return result
def get_cache_base(suffix=None):
"""
Return the default base location for distlib caches. If the directory does
not exist, it is created. Use the suffix provided for the base directory,
and default to '.distlib' if it isn't provided.
On Windows, if LOCALAPPDATA is defined in the environment, then it is
assumed to be a directory, and will be the parent directory of the result.
On POSIX, and on Windows if LOCALAPPDATA is not defined, the user's home
directory - using os.expanduser('~') - will be the parent directory of
the result.
The result is just the directory '.distlib' in the parent directory as
determined above, or with the name specified with ``suffix``.
"""
if suffix is None:
suffix = '.distlib'
if os.name == 'nt' and 'LOCALAPPDATA' in os.environ:
result = os.path.expandvars('$localappdata')
else:
# Assume posix, or old Windows
result = os.path.expanduser('~')
# we use 'isdir' instead of 'exists', because we want to
# fail if there's a file with that name
if os.path.isdir(result):
usable = os.access(result, os.W_OK)
if not usable:
logger.warning('Directory exists but is not writable: %s', result)
else:
try:
os.makedirs(result)
usable = True
except OSError:
logger.warning('Unable to create %s', result, exc_info=True)
usable = False
if not usable:
result = tempfile.mkdtemp()
logger.warning('Default location unusable, using %s', result)
return os.path.join(result, suffix)
def check_folder_writable(checkfolder):
if not os.access(checkfolder, os.W_OK):
logging.error("You don't have access to %s" % checkfolder)
sys.exit(1)
# Function that gets protected packages or returns an empty dictionary
def make_writeable(self, filename):
"""
Make sure that the file is writeable.
Useful if our source is read-only.
"""
if sys.platform.startswith('java'):
# On Jython there is no os.access()
return
if not os.access(filename, os.W_OK):
st = os.stat(filename)
new_permissions = stat.S_IMODE(st.st_mode) | stat.S_IWUSR
os.chmod(filename, new_permissions)
def main(args, outs):
hostname = socket.gethostname()
print "Checking run folder..."
tk_preflight.check_rta_complete(args.run_path)
print "Checking RunInfo.xml..."
tk_preflight.check_runinfo_xml(args.run_path)
print "Checking system environment..."
ok, msg = tk_preflight.check_ld_library_path()
if not ok:
martian.exit(msg)
print "Checking barcode whitelist..."
tk_preflight.check_barcode_whitelist(args.barcode_whitelist)
if args.check_executables:
print "Checking bcl2fastq..."
(rta_version, rc_i2_read, bcl_params) = tk_bcl.get_rta_version(args.run_path)
martian.log_info("RTA Version: %s" % rta_version)
martian.log_info("BCL Params: %s" % str(bcl_params))
(major_ver, full_ver) = tk_bcl.check_bcl2fastq(hostname, rta_version)
martian.log_info("Running bcl2fastq mode: %s. Version: %s" % (major_ver, full_ver))
ok, msg = tk_preflight.check_open_fh()
if not ok:
martian.exit(msg)
if args.output_path is not None:
tk_preflight.check_folder_or_create("--output-dir", args.output_path, hostname, permission=os.W_OK|os.X_OK)
if args.interop_output_path is not None:
tk_preflight.check_folder_or_create("--interop-dir", args.interop_output_path, hostname, permission=os.W_OK|os.X_OK)
if args.max_bcl2fastq_threads < 1:
msg = "Cannot run bcl2fastq with zero threads."
martian.exit(msg)