def t05(w,h):
pretty = '%s t5' % __file__
print(pretty)
result_path = w.make_tempdir()
os.chmod(result_path, stat.S_IRUSR) # user read permission only
try:
h.run_gtest('some_target', result_path)
print(
'FAIL %s: expected error (not writable result directory): %s'
% (pretty, temp_dir)
)
return False
except Exception as e:
if e.message != 'result_path not a writable directory: %s' % result_path:
print('FAIL %s: wrong error message: %s' % (pretty, str(e)))
return False
finally:
os.removedirs(result_path)
return True
# successful execution of run_gtest(): check files on host, verfiy their
# content and that traces from run removed from handset
python类S_IRUSR的实例源码
def t05(w,h):
pretty = '%s t5' % __file__
print(pretty)
result_path = w.make_tempdir()
os.chmod(result_path, stat.S_IRUSR) # user read permission only
try:
h.run_gtest('some_target', result_path)
print(
'FAIL %s: expected error (not writable result directory): %s'
% (pretty, temp_dir)
)
return False
except Exception as e:
if e.message != 'result_path not a writable directory: %s' % result_path:
print('FAIL %s: wrong error message: %s' % (pretty, str(e)))
return False
finally:
os.removedirs(result_path)
return True
# successful execution of run_gtest(): check files on host, verfiy their
# content and that traces from run removed from handset
def commit(self):
"""
Writes the configuration to disk (into '$RootDir/EXAConf')
"""
self.config.write()
# reload in order to force type conversion
# --> parameters added as lists during runtime are converted back to strings (as if they have been added manually)
self.config.reload()
# modify permissions
try:
os.chmod(self.conf_path, stat.S_IRUSR | stat.S_IWUSR)
except OSError as e:
raise EXAConfError("Failed to change permissions for '%s': %s" % (self.conf_path, e))
#}}}
#{{{ Platform supported
def main(args):
vm_name = args['<vm_name>']
# get domain from libvirt
con = libvirt.open('qemu:///system')
domain = con.lookupByName(vm_name)
path = os.path.join(os.getcwd(), '{}.raw'.format(vm_name))
with open(path, 'w') as f:
# chmod to be r/w by everyone
os.chmod(path, stat.S_IRUSR | stat.S_IWUSR |
stat.S_IRGRP | stat.S_IWGRP |
stat.S_IROTH | stat.S_IWOTH)
# take a ram dump
flags = libvirt.VIR_DUMP_MEMORY_ONLY
dumpformat = libvirt.VIR_DOMAIN_CORE_DUMP_FORMAT_RAW
domain.coreDumpWithFormat(path, dumpformat, flags)
def cleanupdir(temporarydir):
osgen = os.walk(temporarydir)
try:
while True:
i = osgen.next()
## make sure all directories can be accessed
for d in i[1]:
if not os.path.islink(os.path.join(i[0], d)):
os.chmod(os.path.join(i[0], d), stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR)
for p in i[2]:
try:
if not os.path.islink(os.path.join(i[0], p)):
os.chmod(os.path.join(i[0], p), stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR)
except Exception, e:
#print e
pass
except StopIteration:
pass
try:
shutil.rmtree(temporarydir)
except:
## nothing that can be done right now, so just give up
pass
def test_read_only_bytecode(self):
# When bytecode is read-only but should be rewritten, fail silently.
with source_util.create_modules('_temp') as mapping:
# Create bytecode that will need to be re-created.
py_compile.compile(mapping['_temp'])
bytecode_path = imp.cache_from_source(mapping['_temp'])
with open(bytecode_path, 'r+b') as bytecode_file:
bytecode_file.seek(0)
bytecode_file.write(b'\x00\x00\x00\x00')
# Make the bytecode read-only.
os.chmod(bytecode_path,
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
try:
# Should not raise IOError!
self.import_(mapping['_temp'], '_temp')
finally:
# Make writable for eventual clean-up.
os.chmod(bytecode_path, stat.S_IWUSR)
def test_execute_bit_not_copied(self):
# Issue 6070: under posix .pyc files got their execute bit set if
# the .py file had the execute bit set, but they aren't executable.
with temp_umask(0o022):
sys.path.insert(0, os.curdir)
try:
fname = TESTFN + os.extsep + "py"
open(fname, 'w').close()
os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
fn = imp.cache_from_source(fname)
unlink(fn)
__import__(TESTFN)
if not os.path.exists(fn):
self.fail("__import__ did not result in creation of "
"either a .pyc or .pyo file")
s = os.stat(fn)
self.assertEqual(stat.S_IMODE(s.st_mode),
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
finally:
del sys.path[0]
remove_files(TESTFN)
unload(TESTFN)
def set_own_perm(usr, dir_list):
uid = pwd.getpwnam(usr).pw_uid
gid = grp.getgrnam(usr).gr_gid
perm_mask_rw = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP
perm_mask_rwx = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
for cdir in dir_list:
os.chown(cdir, uid, gid)
os.chmod(cdir, perm_mask_rwx)
for croot, sub_dirs, cfiles in os.walk(cdir):
for fs_name in sub_dirs:
os.chown(os.path.join(croot, fs_name), uid, gid)
os.chmod(os.path.join(croot, fs_name), perm_mask_rwx)
for fs_name in cfiles:
os.chown(os.path.join(croot, fs_name), uid, gid)
os.chmod(os.path.join(croot, fs_name), perm_mask_rw)
def set_own_perm(usr, dir_list):
uid = pwd.getpwnam(usr).pw_uid
gid = grp.getgrnam(usr).gr_gid
perm_mask_rw = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP
perm_mask_rwx = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
for cdir in dir_list:
os.chown(cdir, uid, gid)
os.chmod(cdir, perm_mask_rwx)
for croot, sub_dirs, cfiles in os.walk(cdir):
for fs_name in sub_dirs:
os.chown(os.path.join(croot, fs_name), uid, gid)
os.chmod(os.path.join(croot, fs_name), perm_mask_rwx)
for fs_name in cfiles:
os.chown(os.path.join(croot, fs_name), uid, gid)
os.chmod(os.path.join(croot, fs_name), perm_mask_rw)
def write(self):
""" Writes configuration file """
# wait for file to unlock. Timeout after 5 seconds
for _ in range(5):
if not self._writing:
self._writing = True
break
time.sleep(1)
else:
_LOGGER.error('Could not write to configuration file. It is busy.')
return
# dump JSON to config file and unlock
encoded = self.encode()
json.dump(encoded, open(self._filetmp, 'w'), sort_keys=True, indent=4,
separators=(',', ': '))
os.chmod(self._filetmp, stat.S_IRUSR | stat.S_IWUSR)
try:
shutil.move(self._filetmp, self._file)
_LOGGER.debug('Config file succesfully updated.')
except shutil.Error as e:
_LOGGER.error('Failed to move temp config file to original error: ' + str(e))
self._writing = False
_LOGGER.debug('Wrote configuration file')
def test_execute_bit_not_copied(self):
# Issue 6070: under posix .pyc files got their execute bit set if
# the .py file had the execute bit set, but they aren't executable.
oldmask = os.umask(022)
sys.path.insert(0, os.curdir)
try:
fname = TESTFN + os.extsep + "py"
f = open(fname, 'w').close()
os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
__import__(TESTFN)
fn = fname + 'c'
if not os.path.exists(fn):
fn = fname + 'o'
if not os.path.exists(fn):
self.fail("__import__ did not result in creation of "
"either a .pyc or .pyo file")
s = os.stat(fn)
self.assertEqual(stat.S_IMODE(s.st_mode),
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
finally:
os.umask(oldmask)
remove_files(TESTFN)
unload(TESTFN)
del sys.path[0]
def test_readonly_files(self):
dir = _fname
os.mkdir(dir)
try:
fname = os.path.join(dir, 'db')
f = dumbdbm.open(fname, 'n')
self.assertEqual(list(f.keys()), [])
for key in self._dict:
f[key] = self._dict[key]
f.close()
os.chmod(fname + ".dir", stat.S_IRUSR)
os.chmod(fname + ".dat", stat.S_IRUSR)
os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
f = dumbdbm.open(fname, 'r')
self.assertEqual(sorted(f.keys()), sorted(self._dict))
f.close() # don't write
finally:
test_support.rmtree(dir)
def test_execute_bit_not_copied(self):
# Issue 6070: under posix .pyc files got their execute bit set if
# the .py file had the execute bit set, but they aren't executable.
oldmask = os.umask(022)
sys.path.insert(0, os.curdir)
try:
fname = TESTFN + os.extsep + "py"
f = open(fname, 'w').close()
os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
__import__(TESTFN)
fn = fname + 'c'
if not os.path.exists(fn):
fn = fname + 'o'
if not os.path.exists(fn):
self.fail("__import__ did not result in creation of "
"either a .pyc or .pyo file")
s = os.stat(fn)
self.assertEqual(stat.S_IMODE(s.st_mode),
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
finally:
os.umask(oldmask)
remove_files(TESTFN)
unload(TESTFN)
del sys.path[0]
def test_readonly_files(self):
dir = _fname
os.mkdir(dir)
try:
fname = os.path.join(dir, 'db')
f = dumbdbm.open(fname, 'n')
self.assertEqual(list(f.keys()), [])
for key in self._dict:
f[key] = self._dict[key]
f.close()
os.chmod(fname + ".dir", stat.S_IRUSR)
os.chmod(fname + ".dat", stat.S_IRUSR)
os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
f = dumbdbm.open(fname, 'r')
self.assertEqual(sorted(f.keys()), sorted(self._dict))
f.close() # don't write
finally:
test_support.rmtree(dir)
def _init_client():
global _configuration_directory
def prompt(msg):
sys.stdout.write('%s: ' % msg)
response = sys.stdin.readline()
return response.rstrip('\r\n')
client_id = prompt('Client Id')
client_secret = prompt('Client Secret')
redirect_uri = prompt('Redirect Uri')
if not os.path.exists(_configuration_directory):
os.mkdir(_configuration_directory, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
client = _CommandClient(client_id, client_secret, redirect_uri)
_init_oauth_process(client)
# save first time, meaning configuration works
client.save_configuration()
return client
def create_ca(self):
""" Create a new CA """
temp_ca_key_file = '/tmp/{}'.format(self.ca_name)
temp_ca_cert_file = '{}.pub'.format(temp_ca_key_file)
ca_key_file = self.full_path(self.ca_path, self.ca_name)
ca_cert_file = '{}.pub'.format(ca_key_file)
subprocess.call(['ssh-keygen',
'-f', temp_ca_key_file,
'-q', '-P', ''])
if not os.path.exists(self.ca_path):
self.mkdir_recursive(self.ca_path)
# If CA was successfully created move it to the correct location and
# set appropriate permissions
if os.path.isfile(temp_ca_key_file) and os.path.isfile(temp_ca_cert_file):
os.rename(temp_ca_cert_file, ca_cert_file)
os.chmod(ca_cert_file, stat.S_IRUSR)
os.rename(temp_ca_key_file, ca_key_file)
os.chmod(ca_key_file, stat.S_IRUSR)
def test_execute_bit_not_copied(self):
# Issue 6070: under posix .pyc files got their execute bit set if
# the .py file had the execute bit set, but they aren't executable.
oldmask = os.umask(0o22)
sys.path.insert(0, os.curdir)
try:
fname = TESTFN + os.extsep + "py"
f = open(fname, 'w').close()
os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
__import__(TESTFN)
fn = fname + 'c'
if not os.path.exists(fn):
fn = fname + 'o'
if not os.path.exists(fn):
self.fail("__import__ did not result in creation of "
"either a .pyc or .pyo file")
s = os.stat(fn)
assert(s.st_mode & (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH))
assert(not (s.st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)))
finally:
os.umask(oldmask)
clean_tmpfiles(fname)
unload(TESTFN)
del sys.path[0]
def test_read_only_bytecode(self):
# When bytecode is read-only but should be rewritten, fail silently.
with source_util.create_modules('_temp') as mapping:
# Create bytecode that will need to be re-created.
py_compile.compile(mapping['_temp'])
bytecode_path = imp.cache_from_source(mapping['_temp'])
with open(bytecode_path, 'r+b') as bytecode_file:
bytecode_file.seek(0)
bytecode_file.write(b'\x00\x00\x00\x00')
# Make the bytecode read-only.
os.chmod(bytecode_path,
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
try:
# Should not raise IOError!
self.import_(mapping['_temp'], '_temp')
finally:
# Make writable for eventual clean-up.
os.chmod(bytecode_path, stat.S_IWUSR)
def test_mknod_dir_fd(self):
# Test using mknodat() to create a FIFO (the only use specified
# by POSIX).
support.unlink(support.TESTFN)
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
except OSError as e:
# Some old systems don't allow unprivileged users to use
# mknod(), or only support creating device nodes.
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
else:
self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
finally:
posix.close(f)
def secure_file(fname, remove_existing=False):
flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL # Refer to "man 2 open".
mode = stat.S_IRUSR | stat.S_IWUSR # This is 0o600 in octal and 384 in decimal.
if remove_existing:
# For security, remove file with potentially elevated mode
try:
os.remove(fname)
except OSError:
pass
# Open file descriptor
umask_original = os.umask(0)
try:
fdesc = os.open(fname, flags, mode)
finally:
os.umask(umask_original)
return fdesc
def testXid(self):
sc = StringCodec()
xid = Xid(format=0, global_id="gid", branch_id="bid")
sc.write_compound(xid)
assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid'
dec = sc.read_compound(Xid)
assert xid.__dict__ == dec.__dict__
# def testLoadReadOnly(self):
# spec = "amqp.0-10-qpid-errata.xml"
# f = testrunner.get_spec_file(spec)
# dest = tempfile.mkdtemp()
# shutil.copy(f, dest)
# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest)
# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR)
# fname = os.path.join(dest, spec)
# load(fname)
# assert not os.path.exists("%s.pcl" % fname)
def store_index(self,ipath):
self.logger.info("# indexed_fasta.store_index('%s')" % ipath)
# write to tmp-file first and in the end rename in order to have this atomic
# otherwise parallel building of the same index may screw it up.
import tempfile
tmp = tempfile.NamedTemporaryFile(mode="w",dir = os.path.dirname(ipath),delete=False)
for chrom in sorted(self.chrom_stats.keys()):
ofs,ldata,skip,skipchar,size = self.chrom_stats[chrom]
tmp.write("%s\t%d\t%d\t%d\t%r\t%d\n" % (chrom,ofs,ldata,skip,skipchar,size))
# make sure everything is on disk
os.fsync(tmp)
tmp.close()
# make it accessible to everyone
import stat
os.chmod(tmp.name, stat.S_IROTH | stat.S_IRGRP | stat.S_IRUSR)
# this is atomic on POSIX as we have created tmp in the same directory,
# therefore same filesystem
os.rename(tmp.name,ipath)
def store_index(self,ipath):
debug("# indexed_fasta.store_index('%s')" % ipath)
# write to tmp-file first and in the end rename in order to have this atomic
# otherwise parallel building of the same index may screw it up.
import tempfile
tmp = tempfile.NamedTemporaryFile(mode="w",dir = os.path.dirname(ipath),delete=False)
for chrom in sorted(self.chrom_stats.keys()):
ofs,ldata,skip,skipchar,size = self.chrom_stats[chrom]
tmp.write("%s\t%d\t%d\t%d\t%r\t%d\n" % (chrom,ofs,ldata,skip,skipchar,size))
# make sure everything is on disk
os.fsync(tmp)
tmp.close()
# make it accessible to everyone
import stat
os.chmod(tmp.name, stat.S_IROTH | stat.S_IRGRP | stat.S_IRUSR)
# this is atomic on POSIX as we have created tmp in the same directory,
# therefore same filesystem
os.rename(tmp.name,ipath)
def testXid(self):
sc = StringCodec()
xid = Xid(format=0, global_id="gid", branch_id="bid")
sc.write_compound(xid)
assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid'
dec = sc.read_compound(Xid)
assert xid.__dict__ == dec.__dict__
# def testLoadReadOnly(self):
# spec = "amqp.0-10-qpid-errata.xml"
# f = testrunner.get_spec_file(spec)
# dest = tempfile.mkdtemp()
# shutil.copy(f, dest)
# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest)
# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR)
# fname = os.path.join(dest, spec)
# load(fname)
# assert not os.path.exists("%s.pcl" % fname)
def login(func):
"""Install Python function as a LoginHook."""
def func_wrapper(hook='login'):
path = '/var/root/Library/mh_%shook.py' % hook
writesource(func, path)
# only root should read and execute
os.chown(path, 0, 0)
os.chmod(path, stat.S_IXUSR | stat.S_IRUSR)
if hook == 'login':
hooks.login(path)
else:
hooks.logout(path)
return path
return func_wrapper
def tearDown(self):
'''
Disconnect ramdisk, unloading data to pre-build location.
'''
self.mbl.chownR(self.keyuser, self.tmphome + "/src")
# chmod so it's readable by everyone, writable by the group
self.mbl.chmodR(stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IWGRP, self.tmphome + "/src", "append")
self.libc.sync()
self.libc.sync()
# Copy back to pseudo-build directory
call([self.RSYNC, "-aqp", self.tmphome + "/src/MacBuild/dmgs", self.buildHome])
call([self.RSYNC, "-aqp", self.tmphome + "/src/", self.buildHome + "/builtSrc"])
self.libc.sync()
self.libc.sync()
os.chdir(self.buildHome)
self._exit(self.ramdisk, self.luggage, 0)
def _create_eae_zipfile(self, zip_file_name, main_file_path, data_files=None):
to_zip = []
if data_files is None:
data_files = []
# Handle main script
to_zip.append(main_file_path)
# Prepare the zip file
zip_path = "/tmp/" + zip_file_name
zipf = zipfile.ZipFile(zip_path, mode='w', compression=zipfile.ZIP_DEFLATED, allowZip64=True)
for f in to_zip:
zipf.write(f)
zipf.close()
# Handle other files & dirs
for f in data_files:
zipCommand = "zip -r -u -0 " + zip_path + " " + f
call([zipCommand], shell=True)
# Chmod 666 the zip file so it can be accessed
os.chmod(zip_path, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH)
return zip_path
def test_create_script_perms(self):
"""this tests the create_script function (permissions).
"""
script_file = os.path.join(self.root, 'script')
# Test non-default mode (+x)
mode = (stat.S_IRUSR |
stat.S_IRGRP |
stat.S_IROTH)
utils.create_script(
script_file,
's6.run',
mode=mode,
user='testproid',
home='home',
shell='shell',
_alias={
's6_setuidgid': '/test/s6-setuidgid',
}
)
self.assertEqual(utils.os.stat(script_file).st_mode, 33060)
def save_app(manifest, container_dir, app_json=STATE_JSON):
"""Saves app manifest and freezes to object."""
# Save the manifest with allocated vip and ports in the state
state_file = os.path.join(container_dir, app_json)
fs.write_safe(
state_file,
lambda f: json.dump(manifest, f)
)
# chmod for the file to be world readable.
if os.name == 'posix':
os.chmod(
state_file,
stat.S_IWUSR | stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH
)
# Freeze the app data into a namedtuple object
return utils.to_obj(manifest)
def test_execute_bit_not_copied(self):
# Issue 6070: under posix .pyc files got their execute bit set if
# the .py file had the execute bit set, but they aren't executable.
oldmask = os.umask(022)
sys.path.insert(0, os.curdir)
try:
fname = TESTFN + os.extsep + "py"
f = open(fname, 'w').close()
os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
__import__(TESTFN)
fn = fname + 'c'
if not os.path.exists(fn):
fn = fname + 'o'
if not os.path.exists(fn):
self.fail("__import__ did not result in creation of "
"either a .pyc or .pyo file")
s = os.stat(fn)
self.assertEqual(stat.S_IMODE(s.st_mode),
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
finally:
os.umask(oldmask)
remove_files(TESTFN)
unload(TESTFN)
del sys.path[0]