def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
python类chmod()的实例源码
def postprocess(self, tempname, filename):
"""Perform any platform-specific postprocessing of `tempname`
This is where Mac header rewrites should be done; other platforms don't
have anything special they should do.
Resource providers should call this method ONLY after successfully
extracting a compressed resource. They must NOT call it on resources
that are already in the filesystem.
`tempname` is the current (temporary) name of the file, and `filename`
is the name it will be renamed to by the caller after this routine
returns.
"""
if os.name == 'posix':
# Make the resource executable
mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
os.chmod(tempname, mode)
def test_zipfile_attributes():
# With the change from ZipFile.write() to .writestr(), we need to manually
# set member attributes.
with temporary_directory() as tempdir:
files = (('foo', 0o644), ('bar', 0o755))
for filename, mode in files:
path = os.path.join(tempdir, filename)
with codecs.open(path, 'w', encoding='utf-8') as fp:
fp.write(filename + '\n')
os.chmod(path, mode)
zip_base_name = os.path.join(tempdir, 'dummy')
zip_filename = wheel.archive.make_wheelfile_inner(
zip_base_name, tempdir)
with readable_zipfile(zip_filename) as zf:
for filename, mode in files:
info = zf.getinfo(os.path.join(tempdir, filename))
assert info.external_attr == (mode | 0o100000) << 16
assert info.compress_type == zipfile.ZIP_DEFLATED
def postprocess(self, tempname, filename):
"""Perform any platform-specific postprocessing of `tempname`
This is where Mac header rewrites should be done; other platforms don't
have anything special they should do.
Resource providers should call this method ONLY after successfully
extracting a compressed resource. They must NOT call it on resources
that are already in the filesystem.
`tempname` is the current (temporary) name of the file, and `filename`
is the name it will be renamed to by the caller after this routine
returns.
"""
if os.name == 'posix':
# Make the resource executable
mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
os.chmod(tempname, mode)
def copymode(src, dst, *, follow_symlinks=True):
"""Copy mode bits from src to dst.
If follow_symlinks is not set, symlinks aren't followed if and only
if both `src` and `dst` are symlinks. If `lchmod` isn't available
(e.g. Linux) this method does nothing.
"""
if not follow_symlinks and os.path.islink(src) and os.path.islink(dst):
if hasattr(os, 'lchmod'):
stat_func, chmod_func = os.lstat, os.lchmod
else:
return
elif hasattr(os, 'chmod'):
stat_func, chmod_func = os.stat, os.chmod
else:
return
st = stat_func(src)
chmod_func(dst, stat.S_IMODE(st.st_mode))
def postprocess(self, tempname, filename):
"""Perform any platform-specific postprocessing of `tempname`
This is where Mac header rewrites should be done; other platforms don't
have anything special they should do.
Resource providers should call this method ONLY after successfully
extracting a compressed resource. They must NOT call it on resources
that are already in the filesystem.
`tempname` is the current (temporary) name of the file, and `filename`
is the name it will be renamed to by the caller after this routine
returns.
"""
if os.name == 'posix':
# Make the resource executable
mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
os.chmod(tempname, mode)
def postprocess(self, tempname, filename):
"""Perform any platform-specific postprocessing of `tempname`
This is where Mac header rewrites should be done; other platforms don't
have anything special they should do.
Resource providers should call this method ONLY after successfully
extracting a compressed resource. They must NOT call it on resources
that are already in the filesystem.
`tempname` is the current (temporary) name of the file, and `filename`
is the name it will be renamed to by the caller after this routine
returns.
"""
if os.name == 'posix':
# Make the resource executable
mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
os.chmod(tempname, mode)
def _test_NoAccessDir(self, nodeName):
devBooter, devMgr = self.launchDeviceManager("/nodes/%s/DeviceManager.dcd.xml" % nodeName)
device = devMgr._get_registeredDevices()[0]
fileMgr = self._domMgr._get_fileMgr()
dirname = '/noaccess'
testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
if not os.path.exists(testdir):
os.mkdir(testdir, 0000)
else:
os.chmod(testdir, 0000)
try:
self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
self.assertRaises(CF.LoadableDevice.LoadFail, device.load, fileMgr, dirname, CF.LoadableDevice.SHARED_LIBRARY)
finally:
os.rmdir(testdir)
def test_ExistsException(self):
self.assertNotEqual(self._domMgr, None)
fileMgr = self._domMgr._get_fileMgr()
# Makes sure that FileSystem::exists() throws correct exception and
# doesn't kill domain for files in directories it cannot access
dirname = '/noaccess'
testdir = os.path.join(scatest.getSdrPath(), 'dom' + dirname)
if not os.path.exists(testdir):
os.mkdir(testdir, 0644)
else:
os.chmod(testdir, 0644)
try:
self.assertFalse(os.access(testdir, os.R_OK|os.X_OK), 'Current user can still access directory')
self.assertRaises(CF.InvalidFileName, fileMgr.exists, os.path.join(dirname, 'testfile'))
finally:
os.rmdir(testdir)
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def handle(self, *args, **options):
commit_msg_path = os.path.join(self.HOOK_PATH, 'commit-msg')
hook_exists = os.path.exists(commit_msg_path)
if hook_exists:
with open(commit_msg_path, 'r') as fp:
hook_content = fp.read()
else:
hook_content = '#!/usr/bin/env bash\n\n'
if 'ZERODOWNTIME_COMMIT_MSG_HOOK' not in hook_content:
hook_content += COMMIT_MSG_HOOK
with open(commit_msg_path, 'w') as fp:
fp.write(hook_content)
st = os.stat(commit_msg_path)
os.chmod(commit_msg_path, st.st_mode | stat.S_IEXEC)
def _clean_upgrade(binary_ok, binary_path, path, temp_path):
if binary_ok:
import stat
# save the permissions from the current binary
old_stat = os.stat(binary_path)
# rename the current binary in order to replace it with the latest
os.rename(binary_path, path + "/old")
os.rename(temp_path, binary_path)
# set the same permissions that had the previous binary
os.chmod(binary_path, old_stat.st_mode | stat.S_IEXEC)
# delete the old binary
os.remove(path + "/old")
print("mongoaudit updated, restarting...")
os.execl(binary_path, binary_path, *sys.argv)
else:
os.remove(temp_path)
print("couldn't download the latest binary")
def download_driver_file(whichbin, url, base_path):
if url.endswith('.tar.gz'):
ext = '.tar.gz'
else:
ext = '.zip'
print("Downloading from: {}".format(url))
download_file(url, '/tmp/pwr_temp{}'.format(ext))
if ext == '.tar.gz':
import tarfile
tar = tarfile.open('/tmp/pwr_temp{}'.format(ext), "r:gz")
tar.extractall('{}/'.format(base_path))
tar.close()
else:
import zipfile
with zipfile.ZipFile('/tmp/pwr_temp{}'.format(ext), "r") as z:
z.extractall('{}/'.format(base_path))
# if whichbin == 'wires' and '/v{}/'.format(latest_gecko_driver) in url:
# os.rename('{}/geckodriver'.format(base_path),
# '{}/wires'.format(base_path))
# os.chmod('{}/wires'.format(base_path), 0o775)
if whichbin == 'wires':
os.chmod('{}/geckodriver'.format(base_path), 0o775)
else:
os.chmod('{}/chromedriver'.format(base_path), 0o775)
def execute(self):
mode = self.rest(1)
if not mode:
mode = str(self.quantifier)
try:
mode = int(mode, 8)
if mode < 0 or mode > 0o777:
raise ValueError
except ValueError:
self.fm.notify("Need an octal number between 0 and 777!", bad=True)
return
for file in self.fm.thistab.get_selection():
try:
os.chmod(file.path, mode)
except Exception as ex:
self.fm.notify(ex)
try:
# reloading directory. maybe its better to reload the selected
# files only.
self.fm.thisdir.load_content()
except Exception:
pass
def postprocess(self, tempname, filename):
"""Perform any platform-specific postprocessing of `tempname`
This is where Mac header rewrites should be done; other platforms don't
have anything special they should do.
Resource providers should call this method ONLY after successfully
extracting a compressed resource. They must NOT call it on resources
that are already in the filesystem.
`tempname` is the current (temporary) name of the file, and `filename`
is the name it will be renamed to by the caller after this routine
returns.
"""
if os.name == 'posix':
# Make the resource executable
mode = ((os.stat(tempname).st_mode) | 0x16D) & 0xFFF # 0555, 07777
os.chmod(tempname, mode)
def postprocess(self, tempname, filename):
"""Perform any platform-specific postprocessing of `tempname`
This is where Mac header rewrites should be done; other platforms don't
have anything special they should do.
Resource providers should call this method ONLY after successfully
extracting a compressed resource. They must NOT call it on resources
that are already in the filesystem.
`tempname` is the current (temporary) name of the file, and `filename`
is the name it will be renamed to by the caller after this routine
returns.
"""
if os.name == 'posix':
# Make the resource executable
mode = ((os.stat(tempname).st_mode) | 0x16D) & 0xFFF # 0555, 07777
os.chmod(tempname, mode)
def rmtree_errorhandler(func, path, exc_info):
"""On Windows, the files in .svn are read-only, so when rmtree() tries to
remove them, an exception is thrown. We catch that here, remove the
read-only attribute, and hopefully continue without problems."""
exctype, value = exc_info[:2]
if not ((exctype is WindowsError and value.args[0] == 5) or #others
(exctype is OSError and value.args[0] == 13) or #python2.4
(exctype is PermissionError and value.args[3] == 5) #python3.3
):
raise
# file type should currently be read only
if ((os.stat(path).st_mode & stat.S_IREAD) != stat.S_IREAD):
raise
# convert to read/write
os.chmod(path, stat.S_IWRITE)
# use the original function to repeat the operation
func(path)
def postprocess(self, tempname, filename):
"""Perform any platform-specific postprocessing of `tempname`
This is where Mac header rewrites should be done; other platforms don't
have anything special they should do.
Resource providers should call this method ONLY after successfully
extracting a compressed resource. They must NOT call it on resources
that are already in the filesystem.
`tempname` is the current (temporary) name of the file, and `filename`
is the name it will be renamed to by the caller after this routine
returns.
"""
if os.name == 'posix':
# Make the resource executable
mode = ((os.stat(tempname).st_mode) | 0o555) & 0o7777
os.chmod(tempname, mode)
def decrypt_file(file, key):
"""
Decrypts the file ``file``.
The encrypted file is assumed to end with the ``.enc`` extension. The
decrypted file is saved to the same location without the ``.enc``
extension.
The permissions on the decrypted file are automatically set to 0o600.
See also :func:`doctr.local.encrypt_file`.
"""
if not file.endswith('.enc'):
raise ValueError("%s does not end with .enc" % file)
fer = Fernet(key)
with open(file, 'rb') as f:
decrypted_file = fer.decrypt(f.read())
with open(file[:-4], 'wb') as f:
f.write(decrypted_file)
os.chmod(file[:-4], 0o600)
def write_torque_script(command, outfile, walltime, queue, name, out, err, print_exec=True):
with open(outfile, 'w') as script:
script.write('#PBS -l walltime={}\n'.format(walltime))
script.write('#PBS -q regular\n')
script.write('#PBS -N {}\n'.format(name))
script.write('#PBS -o {}.o\n'.format(out))
script.write('#PBS -e {}.e\n'.format(err))
script.write('cd ${PBS_O_WORKDIR}\n')
script.write(command)
os.chmod(outfile, 0o755)
if print_exec:
print('qsub {};'.format(outfile))
return outfile
def make_run_all_script(wd):
run_all = """#!/bin/bash
for i in *.pdb; do
echo "Running ${i}..."
seq=`echo ${i} | cut -d. -f1`
curr_dir=`pwd`
cp ${i} temp.pdb
tleap -f leaprc
rm temp.pdb
mv temp_mod.pdb xleap_modified/${seq}_xleap.pdb
mv temp_mod.prmtop amber_minimized/${seq}.prmtop
mv temp_mod.inpcrd amber_minimized/${seq}.inpcrd
done
"""
my_file = op.join(wd, 'run_all.sh')
with open(my_file, 'w') as f:
f.write(run_all)
os.chmod(my_file, 0o755)
return my_file
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def mkdir(path, owner='root', group='root', perms=0o555, force=False):
"""Create a directory"""
log("Making dir {} {}:{} {:o}".format(path, owner, group,
perms))
uid = pwd.getpwnam(owner).pw_uid
gid = grp.getgrnam(group).gr_gid
realpath = os.path.abspath(path)
path_exists = os.path.exists(realpath)
if path_exists and force:
if not os.path.isdir(realpath):
log("Removing non-directory file {} prior to mkdir()".format(path))
os.unlink(realpath)
os.makedirs(realpath, perms)
elif not path_exists:
os.makedirs(realpath, perms)
os.chown(realpath, uid, gid)
os.chmod(realpath, perms)
def get_stream(playlist, directory, own_name):
modellist.append(str(own_name))
start_time = "_" + str(datetime.datetime.now().hour) + "-" + str(datetime.datetime.now().minute)
ffs_script = directory + "ts_to_mp4.sh"
merged_file = encoded + own_name + start_time + ".mp4"
stream_file = directory + own_name + start_time + ".ts"
with open(ffs_script, "a+", encoding="utf8") as ffs:
ffs.write("\nffmpeg -i " + stream_file + " -strict -2 -c:v copy " + merged_file + "\n")
ffs.write("chmod 666 " + merged_file + "\n")
os.chmod(ffs_script, 0o777)
with open(oneclick_file, "a+", encoding="utf8") as ocf:
ocf.write("\nffmpeg -i " + stream_file + " -strict -2 -c:v copy " + merged_file + "\n")
ocf.write("chmod 666 " + merged_file + "\n")
os.chmod(oneclick_file, 0o777)
hlsvar = "hlsvariant://" + playlist
print("Retrieving the Streamfile for " + str(own_name))
baitlist_file = cwd + "baitlist.txt"
with open(baitlist_file, "a+", encoding="utf8") as bl:
bl.write(own_name + "\n")
if not os.path.isfile(stream_file):
subprocess.check_call(["livestreamer", "--hls-segment-threads", str(numcpucores), "--retry-streams", "5", "--retry-open", "5", "--hls-segment-attempts", "5", "--hls-segment-timeout", "20", "--http-header", "User-Agent=Mozilla/5.0 (X11; Fedora; Linux x86_64; rv:50.0) Gecko/20100101 Firefox/50.0", "-o", stream_file , hlsvar, "best"])
os.chmod(stream_file, 0o666)
modellist.remove(str(own_name))
return
def apply_copy(self):
Utils.def_attrs(self, fun=copy_func)
self.default_install_path = 0
lst = self.to_list(self.source)
self.meths.remove('process_source')
for filename in lst:
node = self.path.find_resource(filename)
if not node: raise Errors.WafError('cannot find input file %s for processing' % filename)
target = self.target
if not target or len(lst)>1: target = node.name
# TODO the file path may be incorrect
newnode = self.path.find_or_declare(target)
tsk = self.create_task('copy', node, newnode)
tsk.fun = self.fun
tsk.chmod = getattr(self, 'chmod', Utils.O644)
if not tsk.env:
tsk.debug()
raise Errors.WafError('task without an environment')
def apply_copy(self):
Utils.def_attrs(self, fun=copy_func)
self.default_install_path = 0
lst = self.to_list(self.source)
self.meths.remove('process_source')
for filename in lst:
node = self.path.find_resource(filename)
if not node: raise Errors.WafError('cannot find input file %s for processing' % filename)
target = self.target
if not target or len(lst)>1: target = node.name
# TODO the file path may be incorrect
newnode = self.path.find_or_declare(target)
tsk = self.create_task('copy', node, newnode)
tsk.fun = self.fun
tsk.chmod = getattr(self, 'chmod', Utils.O644)
if not tsk.env:
tsk.debug()
raise Errors.WafError('task without an environment')