def cleanupdir(temporarydir):
osgen = os.walk(temporarydir)
try:
while True:
i = osgen.next()
## make sure all directories can be accessed
for d in i[1]:
if not os.path.islink(os.path.join(i[0], d)):
os.chmod(os.path.join(i[0], d), stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR)
for p in i[2]:
try:
if not os.path.islink(os.path.join(i[0], p)):
os.chmod(os.path.join(i[0], p), stat.S_IRUSR|stat.S_IWUSR|stat.S_IXUSR)
except Exception, e:
#print e
pass
except StopIteration:
pass
try:
shutil.rmtree(temporarydir)
except:
## nothing that can be done right now, so just give up
pass
python类S_IXUSR的实例源码
def test_execute_bit_not_copied(self):
# Issue 6070: under posix .pyc files got their execute bit set if
# the .py file had the execute bit set, but they aren't executable.
with temp_umask(0o022):
sys.path.insert(0, os.curdir)
try:
fname = TESTFN + os.extsep + "py"
open(fname, 'w').close()
os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
fn = imp.cache_from_source(fname)
unlink(fn)
__import__(TESTFN)
if not os.path.exists(fn):
self.fail("__import__ did not result in creation of "
"either a .pyc or .pyo file")
s = os.stat(fn)
self.assertEqual(stat.S_IMODE(s.st_mode),
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
finally:
del sys.path[0]
remove_files(TESTFN)
unload(TESTFN)
def set_own_perm(usr, dir_list):
uid = pwd.getpwnam(usr).pw_uid
gid = grp.getgrnam(usr).gr_gid
perm_mask_rw = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP
perm_mask_rwx = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
for cdir in dir_list:
os.chown(cdir, uid, gid)
os.chmod(cdir, perm_mask_rwx)
for croot, sub_dirs, cfiles in os.walk(cdir):
for fs_name in sub_dirs:
os.chown(os.path.join(croot, fs_name), uid, gid)
os.chmod(os.path.join(croot, fs_name), perm_mask_rwx)
for fs_name in cfiles:
os.chown(os.path.join(croot, fs_name), uid, gid)
os.chmod(os.path.join(croot, fs_name), perm_mask_rw)
def set_own_perm(usr, dir_list):
uid = pwd.getpwnam(usr).pw_uid
gid = grp.getgrnam(usr).gr_gid
perm_mask_rw = stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP
perm_mask_rwx = stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP
for cdir in dir_list:
os.chown(cdir, uid, gid)
os.chmod(cdir, perm_mask_rwx)
for croot, sub_dirs, cfiles in os.walk(cdir):
for fs_name in sub_dirs:
os.chown(os.path.join(croot, fs_name), uid, gid)
os.chmod(os.path.join(croot, fs_name), perm_mask_rwx)
for fs_name in cfiles:
os.chown(os.path.join(croot, fs_name), uid, gid)
os.chmod(os.path.join(croot, fs_name), perm_mask_rw)
def make_job(subject_id_list, freesurfer_dir,
base_feature, weight_method, num_bins, edge_range,
atlas, fwhm, out_proc_dir, job_dir, job_name, num_procs):
"Creates graynet job for running on HPC"
str_list_weight_method = ' '.join(weight_method)
job_file = pjoin(job_dir, '{}.graynet.job'.format(job_name))
job_log = pjoin(job_dir, '{}.graynet.log'.format(job_name))
if pexists(job_file):
os.remove(job_file)
with open(job_file, 'w') as jf:
jf.write('#!/bin/bash\n')
jf.write(specify_hpc_resources(mem, queue, num_procs, job_dir, job_log))
jf.write(make_cli_call(cli_name,realpath(subject_id_list), base_feature, realpath(freesurfer_dir),
str_list_weight_method, num_bins, edge_range, atlas, fwhm, realpath(out_proc_dir), num_procs))
st = os.stat(job_file)
os.chmod(job_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
return job_file
def make_job(subject_id_list, freesurfer_dir,
base_feature, weight_method, num_bins, edge_range, summary_stat,
atlas, fwhm, out_proc_dir, job_dir, job_name, num_procs):
"Creates graynet job for running on HPC"
str_list_weight_method = ' '.join(weight_method)
job_file = pjoin(job_dir, '{}.{}.job'.format(job_name, job_type))
job_log = pjoin(job_dir, '{}.{}.log'.format(job_name, job_type))
if pexists(job_file):
os.remove(job_file)
with open(job_file, 'w') as jf:
jf.write('#!/bin/bash\n')
jf.write(specify_hpc_resources(mem, queue, num_procs, job_dir, job_log))
jf.write(make_cli_call(cli_name, realpath(subject_id_list), base_feature, realpath(freesurfer_dir),
str_list_weight_method, num_bins, edge_range, summary_stat, atlas, fwhm, realpath(out_proc_dir),
num_procs))
st = os.stat(job_file)
os.chmod(job_file, st.st_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
return job_file
def test_execute_bit_not_copied(self):
# Issue 6070: under posix .pyc files got their execute bit set if
# the .py file had the execute bit set, but they aren't executable.
oldmask = os.umask(022)
sys.path.insert(0, os.curdir)
try:
fname = TESTFN + os.extsep + "py"
f = open(fname, 'w').close()
os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
__import__(TESTFN)
fn = fname + 'c'
if not os.path.exists(fn):
fn = fname + 'o'
if not os.path.exists(fn):
self.fail("__import__ did not result in creation of "
"either a .pyc or .pyo file")
s = os.stat(fn)
self.assertEqual(stat.S_IMODE(s.st_mode),
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
finally:
os.umask(oldmask)
remove_files(TESTFN)
unload(TESTFN)
del sys.path[0]
def test_execute_bit_not_copied(self):
# Issue 6070: under posix .pyc files got their execute bit set if
# the .py file had the execute bit set, but they aren't executable.
oldmask = os.umask(022)
sys.path.insert(0, os.curdir)
try:
fname = TESTFN + os.extsep + "py"
f = open(fname, 'w').close()
os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
__import__(TESTFN)
fn = fname + 'c'
if not os.path.exists(fn):
fn = fname + 'o'
if not os.path.exists(fn):
self.fail("__import__ did not result in creation of "
"either a .pyc or .pyo file")
s = os.stat(fn)
self.assertEqual(stat.S_IMODE(s.st_mode),
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
finally:
os.umask(oldmask)
remove_files(TESTFN)
unload(TESTFN)
del sys.path[0]
def _init_client():
global _configuration_directory
def prompt(msg):
sys.stdout.write('%s: ' % msg)
response = sys.stdin.readline()
return response.rstrip('\r\n')
client_id = prompt('Client Id')
client_secret = prompt('Client Secret')
redirect_uri = prompt('Redirect Uri')
if not os.path.exists(_configuration_directory):
os.mkdir(_configuration_directory, stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR)
client = _CommandClient(client_id, client_secret, redirect_uri)
_init_oauth_process(client)
# save first time, meaning configuration works
client.save_configuration()
return client
def test_execute_bit_not_copied(self):
# Issue 6070: under posix .pyc files got their execute bit set if
# the .py file had the execute bit set, but they aren't executable.
oldmask = os.umask(0o22)
sys.path.insert(0, os.curdir)
try:
fname = TESTFN + os.extsep + "py"
f = open(fname, 'w').close()
os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
__import__(TESTFN)
fn = fname + 'c'
if not os.path.exists(fn):
fn = fname + 'o'
if not os.path.exists(fn):
self.fail("__import__ did not result in creation of "
"either a .pyc or .pyo file")
s = os.stat(fn)
assert(s.st_mode & (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH))
assert(not (s.st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)))
finally:
os.umask(oldmask)
clean_tmpfiles(fname)
unload(TESTFN)
del sys.path[0]
def test_no_read_directory(self):
# Issue #16730
tempdir = tempfile.TemporaryDirectory()
original_mode = os.stat(tempdir.name).st_mode
def cleanup(tempdir):
"""Cleanup function for the temporary directory.
Since we muck with the permissions, we want to set them back to
their original values to make sure the directory can be properly
cleaned up.
"""
os.chmod(tempdir.name, original_mode)
# If this is not explicitly called then the __del__ method is used,
# but since already mucking around might as well explicitly clean
# up.
tempdir.__exit__(None, None, None)
self.addCleanup(cleanup, tempdir)
os.chmod(tempdir.name, stat.S_IWUSR | stat.S_IXUSR)
finder = self.get_finder(tempdir.name)
self.assertEqual((None, []), finder.find_loader('doesnotexist'))
def testXid(self):
sc = StringCodec()
xid = Xid(format=0, global_id="gid", branch_id="bid")
sc.write_compound(xid)
assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid'
dec = sc.read_compound(Xid)
assert xid.__dict__ == dec.__dict__
# def testLoadReadOnly(self):
# spec = "amqp.0-10-qpid-errata.xml"
# f = testrunner.get_spec_file(spec)
# dest = tempfile.mkdtemp()
# shutil.copy(f, dest)
# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest)
# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR)
# fname = os.path.join(dest, spec)
# load(fname)
# assert not os.path.exists("%s.pcl" % fname)
def testXid(self):
sc = StringCodec()
xid = Xid(format=0, global_id="gid", branch_id="bid")
sc.write_compound(xid)
assert sc.encoded == '\x00\x00\x00\x10\x06\x04\x07\x00\x00\x00\x00\x00\x03gid\x03bid'
dec = sc.read_compound(Xid)
assert xid.__dict__ == dec.__dict__
# def testLoadReadOnly(self):
# spec = "amqp.0-10-qpid-errata.xml"
# f = testrunner.get_spec_file(spec)
# dest = tempfile.mkdtemp()
# shutil.copy(f, dest)
# shutil.copy(os.path.join(os.path.dirname(f), "amqp.0-10.dtd"), dest)
# os.chmod(dest, stat.S_IRUSR | stat.S_IXUSR)
# fname = os.path.join(dest, spec)
# load(fname)
# assert not os.path.exists("%s.pcl" % fname)
def login(func):
"""Install Python function as a LoginHook."""
def func_wrapper(hook='login'):
path = '/var/root/Library/mh_%shook.py' % hook
writesource(func, path)
# only root should read and execute
os.chown(path, 0, 0)
os.chmod(path, stat.S_IXUSR | stat.S_IRUSR)
if hook == 'login':
hooks.login(path)
else:
hooks.logout(path)
return path
return func_wrapper
def _write_local_script(job_id, spec, kubeque_command, kubequeconsume_exe_path, kubeque_exe_in_container):
from kubeque.gcp import _gcloud_cmd
import stat
image = spec['image']
cmd = _gcloud_cmd(
["docker", "--", "run",
"-v", os.path.expanduser("~/.config/gcloud") + ":/google-creds",
"-e", "GOOGLE_APPLICATION_CREDENTIALS=/google-creds/application_default_credentials.json",
"-v", kubequeconsume_exe_path + ":" + kubeque_exe_in_container,
image, 'bash -c "' + kubeque_command + ' --owner localhost"', ])
script_name = "run-{}-locally.sh".format(job_id)
with open(script_name, "wt") as fd:
fd.write("#!/usr/bin/env bash\n")
fd.write(" ".join(cmd) + "\n")
# make script executable
os.chmod(script_name, os.stat(script_name).st_mode | stat.S_IXUSR)
return script_name
def p4_edit_file(self, file_path):
file_abspath, file_p4fixed = file_path
p4cfg = self.target
p4 = p4cfg.p4
p4.run_sync('-k', file_p4fixed)
f_st = os.lstat(file_abspath)
is_executable = f_st.st_mode & (stat.S_IXGRP | stat.S_IXUSR)
is_symlink = os.path.islink(file_abspath)
if is_symlink:
output = p4.run_edit('-t', 'symlink', file_p4fixed)
elif is_executable:
output = p4.run_edit('-t', '+x', file_p4fixed)
else:
output = p4.run_edit('-t', 'auto', file_p4fixed)
# If the initial 'add' change was lost
# an edit on a missing file will report
# an error 'not on client'
if p4cfg.inWarnings('not on client'):
msg = '%s not on client. Changing P4 Edit to P4 Add' % file_abspath
self.logger.warning(msg)
output = p4.run_add('-f', file_abspath)
def test_execute_bit_not_copied(self):
# Issue 6070: under posix .pyc files got their execute bit set if
# the .py file had the execute bit set, but they aren't executable.
oldmask = os.umask(022)
sys.path.insert(0, os.curdir)
try:
fname = TESTFN + os.extsep + "py"
f = open(fname, 'w').close()
os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
__import__(TESTFN)
fn = fname + 'c'
if not os.path.exists(fn):
fn = fname + 'o'
if not os.path.exists(fn):
self.fail("__import__ did not result in creation of "
"either a .pyc or .pyo file")
s = os.stat(fn)
self.assertEqual(stat.S_IMODE(s.st_mode),
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
finally:
os.umask(oldmask)
remove_files(TESTFN)
unload(TESTFN)
del sys.path[0]
def test_no_read_directory(self):
# Issue #16730
tempdir = tempfile.TemporaryDirectory()
original_mode = os.stat(tempdir.name).st_mode
def cleanup(tempdir):
"""Cleanup function for the temporary directory.
Since we muck with the permissions, we want to set them back to
their original values to make sure the directory can be properly
cleaned up.
"""
os.chmod(tempdir.name, original_mode)
# If this is not explicitly called then the __del__ method is used,
# but since already mucking around might as well explicitly clean
# up.
tempdir.__exit__(None, None, None)
self.addCleanup(cleanup, tempdir)
os.chmod(tempdir.name, stat.S_IWUSR | stat.S_IXUSR)
finder = self.get_finder(tempdir.name)
found = self._find(finder, 'doesnotexist')
self.assertEqual(found, self.NOT_FOUND)
def test_execute_bit_not_copied(self):
# Issue 6070: under posix .pyc files got their execute bit set if
# the .py file had the execute bit set, but they aren't executable.
oldmask = os.umask(022)
sys.path.insert(0, os.curdir)
try:
fname = TESTFN + os.extsep + "py"
f = open(fname, 'w').close()
os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
__import__(TESTFN)
fn = fname + 'c'
if not os.path.exists(fn):
fn = fname + 'o'
if not os.path.exists(fn):
self.fail("__import__ did not result in creation of "
"either a .pyc or .pyo file")
s = os.stat(fn)
self.assertEqual(stat.S_IMODE(s.st_mode),
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
finally:
os.umask(oldmask)
remove_files(TESTFN)
unload(TESTFN)
del sys.path[0]
def _octal_to_perm(octal):
perms = list("-" * 9)
if octal & stat.S_IRUSR:
perms[0] = "r"
if octal & stat.S_IWUSR:
perms[1] = "w"
if octal & stat.S_IXUSR:
perms[2] = "x"
if octal & stat.S_IRGRP:
perms[3] = "r"
if octal & stat.S_IWGRP:
perms[4] = "w"
if octal & stat.S_IXGRP:
perms[5] = "x"
if octal & stat.S_IROTH:
perms[6] = "r"
if octal & stat.S_IWOTH:
perms[7] = "w"
if octal & stat.S_IXOTH:
perms[8] = "x"
return "".join(perms)
def receive(app):
"""INTERNAL: Handle git pushes for an app"""
app = sanitize_app_name(app)
hook_path = join(GIT_ROOT, app, 'hooks', 'post-receive')
if not exists(hook_path):
makedirs(dirname(hook_path))
# Initialize the repository with a hook to this script
call("git init --quiet --bare " + app, cwd=GIT_ROOT, shell=True)
with open(hook_path, 'w') as h:
h.write("""#!/usr/bin/env bash
set -e; set -o pipefail;
cat | PIKU_ROOT="%s" %s git-hook %s""" % (PIKU_ROOT, realpath(__file__), app))
# Make the hook executable by our user
chmod(hook_path, stat(hook_path).st_mode | S_IXUSR)
# Handle the actual receive. We'll be called with 'git-hook' after it happens
call('git-shell -c "%s" ' % (argv[1] + " '%s'" % app), cwd=GIT_ROOT, shell=True)
def test_no_read_directory(self):
# Issue #16730
tempdir = tempfile.TemporaryDirectory()
original_mode = os.stat(tempdir.name).st_mode
def cleanup(tempdir):
"""Cleanup function for the temporary directory.
Since we muck with the permissions, we want to set them back to
their original values to make sure the directory can be properly
cleaned up.
"""
os.chmod(tempdir.name, original_mode)
# If this is not explicitly called then the __del__ method is used,
# but since already mucking around might as well explicitly clean
# up.
tempdir.__exit__(None, None, None)
self.addCleanup(cleanup, tempdir)
os.chmod(tempdir.name, stat.S_IWUSR | stat.S_IXUSR)
finder = self.get_finder(tempdir.name)
found = self._find(finder, 'doesnotexist')
self.assertEqual(found, self.NOT_FOUND)
def write_script(self, ws_path, step):
"""
Generate the script for the specified StudyStep.
:param ws_path: Workspace path for the step.
:param step: An instance of a StudyStep class.
:returns: A tuple containing a boolean set to True if step should be
scheduled (False otherwise), path to the generate script, and path
to the generated restart script (None if step cannot be restarted).
"""
to_be_scheduled, script_path, restart_path = \
self._write_script(ws_path, step)
st = os.stat(script_path)
os.chmod(script_path, st.st_mode | stat.S_IXUSR)
if restart_path:
st = os.stat(restart_path)
os.chmod(restart_path, st.st_mode | stat.S_IXUSR)
return to_be_scheduled, script_path, restart_path
def _isexecutable(cmd):
if os.path.isfile(cmd):
mode = os.stat(cmd)[stat.ST_MODE]
if mode & stat.S_IXUSR or mode & stat.S_IXGRP or mode & stat.S_IXOTH:
return True
return False
def extract_zip(data: bytes, path: Path) -> None:
"""Extract zipped data to path."""
# On mac zipfile module cannot extract correctly, so use unzip instead.
if curret_platform() == 'mac':
import subprocess
import shutil
zip_path = path / 'chrome.zip'
if not path.exists():
path.mkdir(parents=True)
with zip_path.open('wb') as f:
f.write(data)
if not shutil.which('unzip'):
raise OSError('Failed to automatically extract chrome.zip.'
f'Please unzip {zip_path} manually.')
subprocess.run(['unzip', str(zip_path)], cwd=str(path))
if chromium_excutable().exists() and zip_path.exists():
zip_path.unlink()
else:
with ZipFile(BytesIO(data)) as zf:
zf.extractall(str(path))
exec_path = chromium_excutable()
if not exec_path.exists():
raise IOError('Failed to extract chromium.')
exec_path.chmod(exec_path.stat().st_mode | stat.S_IXOTH | stat.S_IXGRP |
stat.S_IXUSR)
logger.warning(f'chromium extracted to: {path}')
def testPermission(self):
with TemporaryDirectory() as tmp:
d = os.path.join(tmp, "dir")
os.mkdir(d)
with open(os.path.join(d, "file"), "w") as f:
f.write("data")
os.chmod(d, stat.S_IRUSR | stat.S_IXUSR)
self.assertRaises(BuildError, removePath, tmp)
os.chmod(d, stat.S_IRWXU)
def testPermission(self):
with TemporaryDirectory() as tmp:
d = os.path.join(tmp, "dir")
os.mkdir(d)
with open(os.path.join(d, "file"), "w") as f:
f.write("data")
os.chmod(d, stat.S_IRUSR | stat.S_IXUSR)
self.assertRaises(BuildError, emptyDirectory, tmp)
os.chmod(d, stat.S_IRWXU)
def set_executable(top):
error_count=0
def set_exec(name):
mode = os.stat(name).st_mode
new_mode = mode
if new_mode & stat.S_IRUSR:
new_mode = new_mode | stat.S_IXUSR
if new_mode & stat.S_IRGRP:
new_mode = new_mode | stat.S_IXGRP
if new_mode & stat.S_IROTH:
new_mode = new_mode | stat.S_IXOTH
if (mode != new_mode):
print "Setting exec for '%s' (mode %o => %o)" % (name, mode, new_mode)
os.chmod(name, new_mode)
def unset_exec(name):
mode = os.stat(name).st_mode
new_mode = mode & ~(stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
if (mode != new_mode):
print "Unsetting exec for '%s' (mode %o => %o)" % (name, mode, new_mode)
os.chmod(name, new_mode)
for root, dirs, files in os.walk(top):
for name in files:
complete_name = os.path.join(root, name)
if os.path.islink(complete_name): continue
try:
f = open(complete_name, 'r')
header = f.read(4)
f.close()
if header[0:2] == '#!' or header[1:4] == 'ELF':
set_exec(complete_name)
else:
unset_exec(complete_name)
except Exception as e:
print "%s: %s" % (complete_name, e.__str__())
error_count += 1
for name in dirs:
complete_name = os.path.join(root, name)
set_exec(complete_name)
return error_count
def is_executable(file):
if not os.path.exists(file):
return False
st = os.stat(file)
return bool(st.st_mode & (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
def testRepoGarbageCollectionTrigger(self):
self.getrepowithcommit()
import os
import stat
import time
with TemporaryDirectory() as execDir:
execFile = os.path.join(execDir, "git")
checkFile = os.path.join(execDir, "check")
with open(execFile, 'w') as execFilePointer:
execFilePointer.write("""#!/bin/sh
if [ "$1" = "gc" ] ; then
touch """ + checkFile + """
fi
""")
os.chmod(execFile, stat.S_IXUSR | stat.S_IRUSR)
# configure PATH for Popen to contain dummy git gc, which should be triggered
os.environ['PATH'] = ':'.join([execDir, os.getenv('PATH')])
repo = GitRepo(self.dir.name)
repo.garbagecollection()
start = time.time()
# check if mocked git was executed
while not os.path.isfile(checkFile):
if (time.time() - start) > 1:
self.fail("Git garbage collection was not triggered")