def copy_from_host(module):
compress = module.params.get('compress')
src = module.params.get('src')
if not os.path.exists(src):
module.fail_json(msg="file not found: {}".format(src))
if not os.access(src, os.R_OK):
module.fail_json(msg="file is not readable: {}".format(src))
mode = oct(os.stat(src).st_mode & 0o777)
with open(src, 'rb') as f:
raw_data = f.read()
sha1 = hashlib.sha1(raw_data).hexdigest()
data = zlib.compress(raw_data) if compress else raw_data
module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
source=src)
python类R_OK的实例源码
def populate_memory(self, areas):
for name, address, size, permission, input_file in areas:
perm = self.unicorn_permissions(permission)
self.vm.mem_map(address, size, perm)
self.areas[name] = [address, size, permission,]
msg = "Map %s @%x (size=%d,perm=%s)" % (name, address, size, permission)
if input_file is not None and os.access(input_file, os.R_OK):
code = open(input_file, 'rb').read()
self.vm.mem_write(address, bytes(code[:size]))
msg += " and content from '%s'" % input_file
self.log(msg, "Setup")
self.start_addr = self.areas[".text"][0]
self.end_addr = -1
return True
def _test_NoAccessDir(self, nodeName):
devBooter, devMgr = self.launchDeviceManager("/nodes/%s/DeviceManager.dcd.xml" % nodeName)
device = devMgr._get_registeredDevices()[0]
fileMgr = self._domMgr._get_fileMgr()
dirname = '/noaccess'
testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
if not os.path.exists(testdir):
os.mkdir(testdir, 0000)
else:
os.chmod(testdir, 0000)
try:
self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
self.assertRaises(CF.LoadableDevice.LoadFail, device.load, fileMgr, dirname, CF.LoadableDevice.SHARED_LIBRARY)
finally:
os.rmdir(testdir)
def test_ExistsException(self):
self.assertNotEqual(self._domMgr, None)
fileMgr = self._domMgr._get_fileMgr()
# Makes sure that FileSystem::exists() throws correct exception and
# doesn't kill domain for files in directories it cannot access
dirname = '/noaccess'
testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
if not os.path.exists(testdir):
os.mkdir(testdir, 0644)
else:
os.chmod(testdir, 0644)
try:
self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
self.assertRaises(CF.InvalidFileName, fileMgr.exists, os.path.join(dirname, 'testfile'))
finally:
os.rmdir(testdir)
def check_fastq(fastq):
# Check if fastq is readable
if not os.access(fastq, os.R_OK):
martian.exit("Do not have file read permission for FASTQ file: %s" % fastq)
# Check if fastq is gzipped
is_gzip_fastq = True
try:
with gzip.open(fastq) as f:
f.read(1)
except:
is_gzip_fastq = False
if is_gzip_fastq and not fastq.endswith(cr_constants.GZIP_SUFFIX):
martian.exit("Input FASTQ file is gzipped but filename does not have %s suffix: %s" % (fastq, cr_constants.GZIP_SUFFIX))
if not is_gzip_fastq and fastq.endswith(cr_constants.GZIP_SUFFIX):
martian.exit("Input FASTQ file is not gzipped but filename has %s suffix: %s" % (fastq, cr_constants.GZIP_SUFFIX))
def find_boot_files(name, shortname, basedir):
# find vmlinuz or initrd
if name:
fullpath = name if name[0]=='/' else basedir + '/boot/' + name
else:
# try the (only) symlink at the root directory
try1 = basedir + '/' + shortname + '*'
found = sorted(glob.glob(try1))
if len(found) >= 1 and os.access(found[0], os.R_OK):
fullpath = os.path.realpath(found[0])
else:
# try the highest numbered version at /boot
try2 = basedir + '/boot/' + shortname + '*'
found = sorted(glob.glob(try2))
if len(found) < 1:
sys.exit('cannot read ' + try1 + ' and cannot find ' + try2)
fullpath = found[-1]
if (len(found) > 1):
warnings.warn('found more than one ' + try2 + ' , using ' + fullpath)
if not os.access(fullpath, os.R_OK):
sys.exit('failed to read ' + fullpath)
return fullpath
def copy_from_host(module):
compress = module.params.get('compress')
src = module.params.get('src')
if not os.path.exists(src):
module.fail_json(msg="file not found: {}".format(src))
if not os.access(src, os.R_OK):
module.fail_json(msg="file is not readable: {}".format(src))
mode = oct(os.stat(src).st_mode & 0o777)
with open(src, 'rb') as f:
raw_data = f.read()
sha1 = hashlib.sha1(raw_data).hexdigest()
data = zlib.compress(raw_data) if compress else raw_data
module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
source=src)
def check_access(filename, write_required=True):
"""
Checks if user has read and optionaly write access to specified file.
Uses acl first and possix file permisions if acl cannot be used.
Returns true only if user has both required access rights.
"""
if HAVE_POSIX1E:
for pset in posix1e.ACL(file=filename):
if pset.tag_type == posix1e.ACL_USER and pset.qualifier == os.geteuid():
if pset.permset.test(posix1e.ACL_READ) and (not write_required or pset.permset.test(posix1e.ACL_WRITE)):
return True
if pset.tag_type == posix1e.ACL_GROUP and pset.qualifier in os.getgroups():
if pset.permset.test(posix1e.ACL_READ) and (not write_required or pset.permset.test(posix1e.ACL_WRITE)):
return True
if write_required:
return os.access(filename, os.R_OK | os.W_OK)
return os.access(filename, os.R_OK)
def checkUSBStick(self):
self.target_dir = None
allpartitions = [ (r.description, r.mountpoint) for r in harddiskmanager.getMountedPartitions(onlyhotplug = True)]
print "[checkUSBStick] found partitions:", allpartitions
usbpartition = []
for x in allpartitions:
print x, x[1] == '/', x[0].find("USB"), access(x[1], R_OK)
if x[1] != '/' and x[0].find("USB") > -1: # and access(x[1], R_OK) is True:
usbpartition.append(x)
print usbpartition
if len(usbpartition) == 1:
self.target_dir = usbpartition[0][1]
self.md5_passback = self.getFeed
self.md5_failback = self.askStartWizard
self.md5verify(self.stickimage_md5, self.target_dir)
elif not usbpartition:
print "[NFIFlash] needs to create usb flasher stick first!"
self.askStartWizard()
else:
self.askStartWizard()
def __new__(cls, filename):
files = []
timestamp = 0
for path in process.get_config_directories():
config_file = os.path.realpath(os.path.join(path, filename))
if config_file not in files and os.access(config_file, os.R_OK):
try:
timestamp = max(timestamp, os.stat(config_file).st_mtime)
except (OSError, IOError):
continue
files.append(config_file)
instance = cls.instances.get(filename, None)
if instance is None or instance.files != files or instance.timestamp < timestamp:
instance = object.__new__(cls)
instance.parser = SafeConfigParser()
instance.parser.optionxform = lambda x: x.replace('-', '_')
instance.files = instance.parser.read(files)
instance.filename = filename
instance.timestamp = timestamp
cls.instances[filename] = instance
return instance
def effectivelyReadable(self):
uid = os.getuid()
euid = os.geteuid()
gid = os.getgid()
egid = os.getegid()
# This is probably true most of the time, so just let os.access()
# handle it. Avoids potential bugs in the rest of this function.
if uid == euid and gid == egid:
return os.access(self.name, os.R_OK)
st = os.stat(self.name)
# This may be wrong depending on the semantics of your OS.
# i.e. if the file is -------r--, does the owner have access or not?
if st.st_uid == euid:
return st.st_mode & stat.S_IRUSR != 0
# See comment for UID check above.
groups = os.getgroups()
if st.st_gid == egid or st.st_gid in groups:
return st.st_mode & stat.S_IRGRP != 0
return st.st_mode & stat.S_IROTH != 0
def effectivelyReadable(self):
uid = os.getuid()
euid = os.geteuid()
gid = os.getgid()
egid = os.getegid()
# This is probably true most of the time, so just let os.access()
# handle it. Avoids potential bugs in the rest of this function.
if uid == euid and gid == egid:
return os.access(self.name, os.R_OK)
st = os.stat(self.name)
# This may be wrong depending on the semantics of your OS.
# i.e. if the file is -------r--, does the owner have access or not?
if st.st_uid == euid:
return st.st_mode & stat.S_IRUSR != 0
# See comment for UID check above.
groups = os.getgroups()
if st.st_gid == egid or st.st_gid in groups:
return st.st_mode & stat.S_IRGRP != 0
return st.st_mode & stat.S_IROTH != 0
def cfndiff_module_validation(module):
'''
Validate for correct module call/usage in ansible.
'''
# Boto3 is required!
if not HAS_BOTO3:
module.fail_json(msg='boto3 is required. Try pip install boto3')
# cfn_flip is required!
if not HAS_CFN_FLIP:
module.fail_json(msg='cfn_flip is required. Try pip install cfn_flip')
template = module.params['template']
b_template = to_bytes(template, errors='surrogate_or_strict')
# Validate path of template
if not os.path.exists(b_template):
module.fail_json(msg="template %s not found" % (template))
if not os.access(b_template, os.R_OK):
module.fail_json(msg="template %s not readable" % (template))
if os.path.isdir(b_template):
module.fail_json(msg="diff does not support recursive diff of directory: %s" % (template))
return module
def get_pip_requirements(fname=os.path.join(dingdangpath.LIB_PATH,
'requirements.txt')):
"""
Gets the PIP requirements from a text file. If the files does not exists
or is not readable, it returns None
Arguments:
fname -- (optional) the requirement text file (Default:
"client/requirements.txt")
Returns:
A list of pip requirement objects or None
"""
logger = logging.getLogger(__name__)
if os.access(fname, os.R_OK):
reqs = list(pip.req.parse_requirements(fname))
logger.debug("Found %d PIP requirements in file '%s'", len(reqs),
fname)
return reqs
else:
logger.debug("PIP requirements file '%s' not found or not readable",
fname)
def _setup_image(self, image_path):
"""
Load the image located at the specified path
@type image_path: str
@param image_path: the relative or absolute file path to the image file
@rtype: sensor_msgs/Image or None
@param: Returns sensor_msgs/Image if image convertable and None otherwise
"""
if not os.access(image_path, os.R_OK):
rospy.logerr("Cannot read file at '{0}'".format(image_path))
return None
img = cv2.imread(image_path)
# Return msg
return cv_bridge.CvBridge().cv2_to_imgmsg(img, encoding="bgr8")
def copydir(self, path):
"""
Copy the contents of the local directory given by path to google cloud.
Maintain the same directory structure on remote.
This is (intentionally) a blocking call, so clients can report errors if
the transfer fails.
:type path: string
:param path: relative or absolute path to the directory that needs to be copied
:return: True when transfer is complete
:raises OSError: path doesn't exist or permission denied
:raises ValueError: if the library cannot determine the file size
:raises gcloud.exceptions.GCloudError: if upload status gives error response
"""
if not os.access(path, os.R_OK):
raise OSError('Permission denied')
for filename in find_files(path):
blob = Blob(filename, self)
blob.upload_from_filename(filename)
return True
def register(self,name,URI):
origname,name=name,self.validateName(name)
URI=self.validateURI(URI)
fn=self.translate(name)
self.lock.acquire()
try:
if os.access(fn,os.R_OK):
Log.msg('NameServer','name already exists:',name)
raise Pyro.errors.NamingError('name already exists',name)
try:
open(fn,'w').write(str(URI)+'\n')
self._dosynccall("register",origname,URI)
Log.msg('NameServer','registered',name,'with URI',str(URI))
except IOError,x:
if x.errno==errno.ENOENT:
raise Pyro.errors.NamingError('(parent)group not found')
elif x.errno==errno.ENOTDIR:
raise Pyro.errors.NamingError('parent is no group')
else:
raise Pyro.errors.NamingError(str(x))
finally:
self.lock.release()
def deleteGroup(self,groupname):
groupname=self.validateName(groupname)
if groupname==':':
Log.msg('NameServer','attempt to deleteGroup root group')
raise Pyro.errors.NamingError('not allowed to delete root group')
dirnam = self.translate(groupname)
self.lock.acquire()
try:
if not os.access(dirnam,os.R_OK):
raise Pyro.errors.NamingError('group not found',groupname)
try:
shutil.rmtree(dirnam)
self._dosynccall("deleteGroup",groupname)
Log.msg('NameServer','deleted group',groupname)
except OSError,x:
if x.errno==errno.ENOENT:
raise Pyro.errors.NamingError('group not found',groupname)
elif x.errno==errno.ENOTDIR:
raise Pyro.errors.NamingError('is no group',groupname)
else:
raise Pyro.errors.NamingError(str(x))
finally:
self.lock.release()
def set_imageinfo( memoryFilePath ):
path = r'{}'.format( memoryFilePath )
path = os.path.abspath( path )
try:
if os.access( path, os.F_OK):
if os.access( path, os.R_OK ):
cwd = os.getcwd()
imageinfo = [ 'vol.py', '-f', '{}'.format( path ), 'imageinfo', \
'--output=text', \
'--output-file={}'.format( os.path.join( cwd, 'imageinfo.text' ))]
return imageinfo
else:
print '\n[!] Error File Permissions: No Read Access for {}\n'.format( path )
else:
print '\n[!] Error FilePath: Does not exist {}\n'.format( path )
except Exception as set_imageinfo_error:
print '[!] EXCEPTION ERROR: < set_imageinfo > function'
print set_imageinfo_error
def copy_from_host(module):
compress = module.params.get('compress')
src = module.params.get('src')
if not os.path.exists(src):
module.fail_json(msg="file not found: {}".format(src))
if not os.access(src, os.R_OK):
module.fail_json(msg="file is not readable: {}".format(src))
mode = oct(os.stat(src).st_mode & 0o777)
with open(src, 'rb') as f:
raw_data = f.read()
sha1 = hashlib.sha1(raw_data).hexdigest()
data = zlib.compress(raw_data) if compress else raw_data
module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
source=src)
def check_local_file_valid(self, local_path):
"""????????(??????)
:param local_path:
:return:
"""
if not os.path.exists(local_path):
self._err_tips = 'local_file %s not exist!' % local_path
return False
if not os.path.isfile(local_path):
self._err_tips = 'local_file %s is not regular file!' % local_path
return False
if not os.access(local_path, os.R_OK):
self._err_tips = 'local_file %s is not readable!' % local_path
return False
return True
def copy_from_host(module):
compress = module.params.get('compress')
src = module.params.get('src')
if not os.path.exists(src):
module.fail_json(msg="file not found: {}".format(src))
if not os.access(src, os.R_OK):
module.fail_json(msg="file is not readable: {}".format(src))
mode = oct(os.stat(src).st_mode & 0o777)
with open(src, 'rb') as f:
raw_data = f.read()
sha1 = hashlib.sha1(raw_data).hexdigest()
data = zlib.compress(raw_data) if compress else raw_data
module.exit_json(content=base64.b64encode(data), sha1=sha1, mode=mode,
source=src)
def check_file_freshness(filename, newer_than=3600):
"""
Check a file exists, is readable and is newer than <n> seconds (where
<n> defaults to 3600).
"""
# First check the file exists and is readable
if not os.path.exists(filename):
raise CriticalError("%s: does not exist." % (filename))
if os.access(filename, os.R_OK) == 0:
raise CriticalError("%s: is not readable." % (filename))
# Then ensure the file is up-to-date enough
mtime = os.stat(filename).st_mtime
last_modified = time.time() - mtime
if last_modified > newer_than:
raise CriticalError("%s: was last modified on %s and is too old "
"(> %s seconds)."
% (filename, time.ctime(mtime), newer_than))
if last_modified < 0:
raise CriticalError("%s: was last modified on %s which is in the "
"future."
% (filename, time.ctime(mtime)))
def validate_args(self, args: configargparse.Namespace) -> None:
_docker_args.validate_shared_args(args)
if not args.docker_buildfile:
raise ErrorMessage("Using the docker builder requires you to specify a Dockerfile template via "
"--docker-buildfile.")
if not os.path.exists(args.docker_buildfile) or not os.access(args.docker_buildfile, os.R_OK):
raise ErrorMessage("It seems that GoPythonGo can't find or isn't allowed to read %s" %
highlight(args.docker_buildfile))
for arg in args.docker_buildargs:
if "=" not in arg:
raise ErrorMessage("A Docker build arg must be in the form 'key=value'. Consult the %s "
"documentation for more information. '%s' does not contain a '='." %
(highlight("docker build"), arg))
for var in args.dockerfile_vars:
if "=" not in var:
raise ErrorMessage("A Dockerfile Jinja template context variable must be in the form 'key=value'. "
"'%s' does not contain a '='" % var)
def user_password(self):
passwd = ''
if HAVE_SPWD:
try:
passwd = spwd.getspnam(self.name)[1]
except KeyError:
return passwd
if not self.user_exists():
return passwd
elif self.SHADOWFILE:
# Read shadow file for user's encrypted password string
if os.path.exists(self.SHADOWFILE) and os.access(self.SHADOWFILE, os.R_OK):
for line in open(self.SHADOWFILE).readlines():
if line.startswith('%s:' % self.name):
passwd = line.split(':')[1]
return passwd
def get_pid_location(module):
"""
Try to find a pid directory in the common locations, falling
back to the user's home directory if no others exist
"""
for dir in ['/var/run', '/var/lib/run', '/run', os.path.expanduser("~/")]:
try:
if os.path.isdir(dir) and os.access(dir, os.R_OK|os.W_OK):
return os.path.join(dir, '.accelerate.pid')
except:
pass
module.fail_json(msg="couldn't find any valid directory to use for the accelerate pid file")
# NOTE: this shares a fair amount of code in common with async_wrapper, if async_wrapper were a new module we could move
# this into utils.module_common and probably should anyway
def _configure_base(module, base, conf_file, disable_gpg_check):
"""Configure the dnf Base object."""
conf = base.conf
# Turn off debug messages in the output
conf.debuglevel = 0
# Set whether to check gpg signatures
conf.gpgcheck = not disable_gpg_check
# Don't prompt for user confirmations
conf.assumeyes = True
# Change the configuration file path if provided
if conf_file:
# Fail if we can't read the configuration file.
if not os.access(conf_file, os.R_OK):
module.fail_json(
msg="cannot read configuration file", conf_file=conf_file)
else:
conf.config_file_path = conf_file
# Read the configuration file
conf.read()
def get_file_content(path, default=None, strip=True):
data = default
if os.path.exists(path) and os.access(path, os.R_OK):
try:
try:
datafile = open(path)
data = datafile.read()
if strip:
data = data.strip()
if len(data) == 0:
data = default
finally:
datafile.close()
except:
# ignore errors as some jails/containers might have readable permissions but not allow reads to proc
# done in 2 blocks for 2.4 compat
pass
return data
def getMappingsFromTable(self):
self._maps = []
sz = self.memory_mapping.rowCount()
for i in range(sz):
name = self.memory_mapping.item(i, 0)
if not name:
continue
name = name.text()
address = self.memory_mapping.item(i, 1)
if address:
if ishex(address.text()):
address = int(address.text(), 0x10)
else:
address = int(address.text())
size = self.memory_mapping.item(i, 2)
if size:
size = int(size.text(), 0x10) if ishex(size.text()) else int(size.text())
permission = self.memory_mapping.item(i, 3)
if permission:
permission = permission.text()
read_from_file = self.memory_mapping.item(i, 4)
if read_from_file and not os.access(read_from_file.text(), os.R_OK):
read_from_file = None
self._maps.append([name, address, size, permission, read_from_file])
return
def loadCode(self, title, filter, run_disassembler):
qFile, qFilter = QFileDialog.getOpenFileName(self, title, EXAMPLES_PATH, filter)
if not os.access(qFile, os.R_OK):
return
if run_disassembler or qFile.endswith(".raw"):
body = disassemble_file(qFile, self.arch)
self.loadFile(qFile, data=body)
else:
self.loadFile(qFile)
return