def _clean_check(cmd, target):
"""
Run the command to download target. If the command fails, clean up before
re-raising the error.
"""
try:
subprocess.check_call(cmd)
except subprocess.CalledProcessError:
if os.access(target, os.F_OK):
os.unlink(target)
raise
python类access()的实例源码
def copy_from_host(module):
compress = module.params.get('compress')
src = module.params.get('src')
if not os.path.exists(src):
module.fail_json(msg="file not found: {}".format(src))
if not os.access(src, os.R_OK):
module.fail_json(msg="file is not readable: {}".format(src))
mode = oct(os.stat(src).st_mode & 0o777)
with open(src, 'rb') as f:
raw_data = f.read()
sha1 = hashlib.sha1(raw_data).hexdigest()
data = zlib.compress(raw_data) if compress else raw_data
module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
source=src)
def which(program):
"""Determines whether program exists
"""
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
raise
def populate_memory(self, areas):
for name, address, size, permission, input_file in areas:
perm = self.unicorn_permissions(permission)
self.vm.mem_map(address, size, perm)
self.areas[name] = [address, size, permission,]
msg = "Map %s @%x (size=%d,perm=%s)" % (name, address, size, permission)
if input_file is not None and os.access(input_file, os.R_OK):
code = open(input_file, 'rb').read()
self.vm.mem_write(address, bytes(code[:size]))
msg += " and content from '%s'" % input_file
self.log(msg, "Setup")
self.start_addr = self.areas[".text"][0]
self.end_addr = -1
return True
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def __addTooltip(self, item, text, hideWarn=False):
self.__loadTooltip()
if not ToolTip:
if not hideWarn:
self.warn("ToolTips unavailable - check tooltip.py is in the lib folder")
else:
# turn off warnings about tooltips
if hideWarn:
myWarn = self.__pauseWarn()
var = StringVar(self.topLevel)
var.set(text)
tip = ToolTip(item, delay=500, follow_mouse=1, textvariable=var)
item.tooltip = tip
if hideWarn:
self.__resumeWarn(myWarn)
return var
#####################################
# FUNCTIONS to show pop-up dialogs
#####################################
# function to access the last made pop_up
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def get_config_file_path(cls):
if not cls.IS_GLOBAL:
# local to this directory
base_path = os.path.join('.')
else:
base_path = os.path.expanduser('~')
if not os.access(base_path, os.W_OK):
base_path = '/tmp'
base_path = os.path.join(base_path, '.polyaxon')
if not os.path.exists(base_path):
try:
os.makedirs(base_path)
except OSError:
# Except permission denied and potential race conditions
# in multi-threaded environments.
logger.error('Could not create config directory `{}`'.format(base_path))
return os.path.join(base_path, cls.CONFIG_FILE_NAME)
def _test_NoAccessDir(self, nodeName):
devBooter, devMgr = self.launchDeviceManager("/nodes/%s/DeviceManager.dcd.xml" % nodeName)
device = devMgr._get_registeredDevices()[0]
fileMgr = self._domMgr._get_fileMgr()
dirname = '/noaccess'
testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
if not os.path.exists(testdir):
os.mkdir(testdir, 0000)
else:
os.chmod(testdir, 0000)
try:
self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
self.assertRaises(CF.LoadableDevice.LoadFail, device.load, fileMgr, dirname, CF.LoadableDevice.SHARED_LIBRARY)
finally:
os.rmdir(testdir)
def test_ExistsException(self):
self.assertNotEqual(self._domMgr, None)
fileMgr = self._domMgr._get_fileMgr()
# Makes sure that FileSystem::exists() throws correct exception and
# doesn't kill domain for files in directories it cannot access
dirname = '/noaccess'
testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
if not os.path.exists(testdir):
os.mkdir(testdir, 0644)
else:
os.chmod(testdir, 0644)
try:
self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
self.assertRaises(CF.InvalidFileName, fileMgr.exists, os.path.join(dirname, 'testfile'))
finally:
os.rmdir(testdir)
def check_fastq(fastq):
# Check if fastq is readable
if not os.access(fastq, os.R_OK):
martian.exit("Do not have file read permission for FASTQ file: %s" % fastq)
# Check if fastq is gzipped
is_gzip_fastq = True
try:
with gzip.open(fastq) as f:
f.read(1)
except:
is_gzip_fastq = False
if is_gzip_fastq and not fastq.endswith(cr_constants.GZIP_SUFFIX):
martian.exit("Input FASTQ file is gzipped but filename does not have %s suffix: %s" % (fastq, cr_constants.GZIP_SUFFIX))
if not is_gzip_fastq and fastq.endswith(cr_constants.GZIP_SUFFIX):
martian.exit("Input FASTQ file is not gzipped but filename has %s suffix: %s" % (fastq, cr_constants.GZIP_SUFFIX))
def safeInstall():
FACTORIOPATH = getFactorioPath()
try:
if not os.path.isdir("%s" % (FACTORIOPATH) ):
if os.access("%s/.." % (FACTORIOPATH), os.W_OK):
os.mkdir(FACTORIOPATH, 0o777)
else:
subprocess.call(['sudo', 'mkdir', '-p', FACTORIOPATH])
subprocess.call(['sudo', 'chown', getpass.getuser(), FACTORIOPATH])
os.mkdir(os.path.join(FACTORIOPATH, "saves"))
os.mkdir(os.path.join(FACTORIOPATH, "config"))
with open("%s/.bashrc" % (os.path.expanduser("~")), "r+") as bashrc:
lines = bashrc.read()
if lines.find("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n") == -1:
bashrc.write("eval \"$(_FACTOTUM_COMPLETE=source factotum)\"\n")
print("You'll want to restart your shell for command autocompletion. Tab is your friend.")
updateFactorio()
except IOError as e:
print("Cannot make %s. Please check permissions. Error %s" % (FACTORIOPATH, e))
sys.exit(1)
def __init__(self, dir, file_template=_default_file_template,
truncate_slug_length=40,
version_locations=None,
sourceless=False, output_encoding="utf-8"):
self.dir = dir
self.file_template = file_template
self.version_locations = version_locations
self.truncate_slug_length = truncate_slug_length or 40
self.sourceless = sourceless
self.output_encoding = output_encoding
self.revision_map = revision.RevisionMap(self._load_revisions)
if not os.access(dir, os.F_OK):
raise util.CommandError("Path doesn't exist: %r. Please use "
"the 'init' command to create a new "
"scripts folder." % dir)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def __init__(self, certstore):
"""
certstore - path to the file used to store
CA certificates
eg /etc/apache/ssl.crt/ca-bundle.crt
>>> v = Verifier('/etc/dummy.crt')
>>> v.verify('pippo')
Traceback (most recent call last):
File "/usr/lib/python2.3/doctest.py", line 442, in _run_examples_inner
compileflags, 1) in globs
File "<string>", line 1, in ?
File "verifier.py", line 46, in verify
self._setup()
File "verifier.py", line 36, in _setup
raise VerifierError, "cannot access %s" % self._certstore
VerifierError: cannot access /etc/dummy.crt
>>>
"""
self._certstore = certstore
self._smime = None
def main(argv):
dir_ = argv[1]
for entry in sorted(os.listdir(dir_)):
path = os.path.join(dir_, entry)
size = os.path.getsize(path)
mtime = os.path.getmtime(path)
mtime_format = time.strftime("%b %d %H:%M", time.localtime(mtime))
reset = '\033[0m'
if os.path.isdir(path):
color = '\033[1;94m'
elif os.access(path, os.X_OK):
color = '\033[1;92m'
else:
color = ''
senf.print_("%6d %13s %s%s%s" % (size, mtime_format, color,
entry, reset))
def check_exe(name, env=None):
"""
Ensure that a program exists
:type name: string
:param name: name or path to program
:return: path of the program or None
"""
if not name:
raise ValueError('Cannot execute an empty string!')
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(name)
if fpath and is_exe(name):
return os.path.abspath(name)
else:
env = env or os.environ
for path in env["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, name)
if is_exe(exe_file):
return os.path.abspath(exe_file)
return None
def check_exe(name, env=None):
"""
Ensure that a program exists
:type name: string
:param name: name or path to program
:return: path of the program or None
"""
if not name:
raise ValueError('Cannot execute an empty string!')
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(name)
if fpath and is_exe(name):
return os.path.abspath(name)
else:
env = env or os.environ
for path in env["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, name)
if is_exe(exe_file):
return os.path.abspath(exe_file)
return None
def check_exe(name, env=None):
"""
Ensure that a program exists
:type name: string
:param name: name or path to program
:return: path of the program or None
"""
if not name:
raise ValueError('Cannot execute an empty string!')
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(name)
if fpath and is_exe(name):
return os.path.abspath(name)
else:
env = env or os.environ
for path in env["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, name)
if is_exe(exe_file):
return os.path.abspath(exe_file)
return None
def _test_resource_paths(my):
path = my.server.get_resource_path('admin')
# not a very accurate test
my.assertEquals(True, 'etc/admin.tacticrc' in path)
paths = my.server.create_resource_paths()
sys_login = getpass.getuser()
dir = my.server.get_home_dir()
is_dir_writeable = os.access(dir, os.W_OK) and os.path.isdir(dir)
if dir and is_dir_writeable:
dir = "%s/.tactic/etc" % dir
else:
if os.name == 'nt':
dir = 'C:/sthpw/etc'
else:
dir = '/tmp/sthpw/etc'
compared = '%s/%s.tacticrc' %(dir, sys_login) in paths
my.assertEquals(True, compared)
# since we use admin to get resource path , my.login should also be admin
my.assertEquals('admin', my.server.get_login())
def find_boot_files(name, shortname, basedir):
# find vmlinuz or initrd
if name:
fullpath = name if name[0]=='/' else basedir + '/boot/' + name
else:
# try the (only) symlink at the root directory
try1 = basedir + '/' + shortname + '*'
found = sorted(glob.glob(try1))
if len(found) >= 1 and os.access(found[0], os.R_OK):
fullpath = os.path.realpath(found[0])
else:
# try the highest numbered version at /boot
try2 = basedir + '/boot/' + shortname + '*'
found = sorted(glob.glob(try2))
if len(found) < 1:
sys.exit('cannot read ' + try1 + ' and cannot find ' + try2)
fullpath = found[-1]
if (len(found) > 1):
warnings.warn('found more than one ' + try2 + ' , using ' + fullpath)
if not os.access(fullpath, os.R_OK):
sys.exit('failed to read ' + fullpath)
return fullpath
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def which(program):
import os
def is_exe(fpath):
return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
fpath, fname = os.path.split(program)
if fpath:
if is_exe(program):
return program
else:
for path in os.environ["PATH"].split(os.pathsep):
path = path.strip('"')
exe_file = os.path.join(path, program)
if is_exe(exe_file):
return exe_file
return None
#source: https://stackoverflow.com/questions/3431825/generating-an-md5-checksum-of-a-file
def check_path_owner(path):
# If we don't have a way to check the effective uid of this process, then
# we'll just assume that we own the directory.
if not hasattr(os, "geteuid"):
return True
previous = None
while path != previous:
if os.path.lexists(path):
# Check if path is writable by current user.
if os.geteuid() == 0:
# Special handling for root user in order to handle properly
# cases where users use sudo without -H flag.
try:
path_uid = get_path_uid(path)
except OSError:
return False
return path_uid == 0
else:
return os.access(path, os.W_OK)
else:
previous, path = path, os.path.dirname(path)
def check_job_path(jobs_dir, job):
if 'path' not in job:
raise Exception('job does not have the "path" attribute: %s' % job)
if not isinstance(job['path'], basestring):
raise Exception('"path" attribute is not a string: %s' % job)
# check that 'path' is rooted in the same directory as the .vcsjob file
if job['path'].startswith('/'):
raise Exception('job path is not relative: %s' % job['path'])
path = os.path.normpath(os.path.join(jobs_dir, job['path']))
if not path.startswith(jobs_dir):
raise Exception('job path is not inside the job tree: %s' % path)
# also check that the job exists and is runnable
if not os.path.isfile(path):
raise Exception('no such file: %s' % path)
if not os.access(path, os.X_OK):
raise Exception('file is not executable: %s' % path)
if 'profiles' not in job:
job['profiles'] = None
def check_job_path(jobs_dir, job):
if 'path' not in job:
raise Exception('job does not have the "path" attribute: %s' % job)
if not isinstance(job['path'], basestring):
raise Exception('"path" attribute is not a string: %s' % job)
# check that 'path' is rooted in the same directory as the .vcsjob file
if job['path'].startswith('/'):
raise Exception('job path is not relative: %s' % job['path'])
path = os.path.normpath(os.path.join(jobs_dir, job['path']))
if not path.startswith(jobs_dir):
raise Exception('job path is not inside the job tree: %s' % path)
# also check that the job exists and is runnable
if not os.path.isfile(path):
raise Exception('no such file: %s' % path)
if not os.access(path, os.X_OK):
raise Exception('file is not executable: %s' % path)
if 'profiles' not in job:
job['profiles'] = None
def libvlc_vlm_show_media(p_instance, psz_name):
'''Return information about the named media as a JSON
string representation.
This function is mainly intended for debugging use,
if you want programmatic access to the state of
a vlm_media_instance_t, please use the corresponding
libvlc_vlm_get_media_instance_xxx -functions.
Currently there are no such functions available for
vlm_media_t though.
@param p_instance: the instance.
@param psz_name: the name of the media, if the name is an empty string, all media is described.
@return: string with information about named media, or None on error.
'''
f = _Cfunctions.get('libvlc_vlm_show_media', None) or \
_Cfunction('libvlc_vlm_show_media', ((1,), (1,),), string_result,
ctypes.c_void_p, Instance, ctypes.c_char_p)
return f(p_instance, psz_name)