def test_tmp_files():
tmp_dir = tempfile.mkdtemp(dir="/tmp")
assert len(os.listdir(tmp_dir)) == 0
csvw = CSVW(csv_path="./tests/books.csv",
metadata_path="./tests/books.csv-metadata.json",
temp_dir=tmp_dir)
assert len(os.listdir(tmp_dir)) == 0
csvw.to_rdf(fmt="nt")
created_files = os.listdir(tmp_dir)
assert len(created_files) == 1, "nt serialization should generate only 1 temp file"
assert created_files[0].endswith(".nt")
os.remove(os.path.join(tmp_dir, created_files[0]))
assert len(os.listdir(tmp_dir)) == 0
csvw.to_rdf(fmt="turtle")
created_files = os.listdir(tmp_dir)
assert len(created_files) == 2, "ttl serialization should generate two temps file"
assert any([f.endswith(".nt") for f in created_files])
assert any([f.endswith(".ttl") for f in created_files])
# Check permissions
expected_flags = [stat.S_IRUSR, stat.S_IRGRP, stat.S_IROTH]
unexpected_flags = [stat.S_IWUSR, stat.S_IWGRP, stat.S_IWOTH]
for f in created_files:
st = os.stat(os.path.join(tmp_dir, f))
for flag, non_flag in zip(expected_flags, unexpected_flags):
assert bool(st.st_mode & flag)
assert not bool(st.st_mode & non_flag)
csvw.close()
assert len(os.listdir(tmp_dir)) == 0
python类S_IROTH的实例源码
def load_package_by_remote(self):
"""
??? ??????? ???? ??
:return:
"""
# socore package clone from remote repository
repository_url = self.__score_base + ':' + self.__score_package + '.git'
# Repository Key ? ?? ?? ???.
logging.debug("git Path :"+self.__package_path)
# repo = Repo(str(self.__package_path))
git = Git(os.getcwd())
# check deploy key
if os.path.exists(conf.DEFAULT_SCORE_REPOSITORY_KEY):
st = os.stat(conf.DEFAULT_SCORE_REPOSITORY_KEY)
# owner read only
if bool(st.st_mode & stat.S_IRGRP or st.st_mode & stat.S_IROTH):
os.chmod(conf.DEFAULT_SCORE_REPOSITORY_KEY, 0o600)
ssh_cmd = 'ssh -o StrictHostKeyChecking=no -i '+conf.DEFAULT_SCORE_REPOSITORY_KEY
logging.debug("SSH KEY COMMAND : "+ssh_cmd)
git.custom_environment(GIT_SSH_COMMAND=ssh_cmd)
logging.debug(f"load_package_by_remote repository_url({repository_url}) package_path({self.__package_path})")
self.__package_repository = Repo._clone(git, repository_url, self.__package_path, GitCmdObjectDB, None)
logging.debug(f"load_package_by_remote result({self.__package_repository})")
if conf.DEFAULT_SCORE_BRANCH != conf.DEFAULT_SCORE_BRANCH_MASTER:
self.__package_repository.git.checkout(conf.DEFAULT_SCORE_BRANCH)
return True
def install_step(self):
"""Building was performed in install dir, so just fix permissions."""
# fix permissions of OpenFOAM dir
fullpath = os.path.join(self.installdir, self.openfoamdir)
adjust_permissions(fullpath, stat.S_IROTH, add=True, recursive=True, ignore_errors=True)
adjust_permissions(fullpath, stat.S_IXOTH, add=True, recursive=True, onlydirs=True, ignore_errors=True)
# fix permissions of ThirdParty dir and subdirs (also for 2.x)
# if the thirdparty tarball is installed
fullpath = os.path.join(self.installdir, self.thrdpartydir)
if os.path.exists(fullpath):
adjust_permissions(fullpath, stat.S_IROTH, add=True, recursive=True, ignore_errors=True)
adjust_permissions(fullpath, stat.S_IXOTH, add=True, recursive=True, onlydirs=True, ignore_errors=True)
def save_code(path, code):
with open(path, 'w') as f:
f.write("#!/usr/bin/env bash\n")
f.write(code)
chmod(path, stat.S_IRWXU | stat.S_IRGRP | stat.S_IROTH)
def close(self):
if self._fh:
self._fh.close()
self._fh = None
subprocess.check_call([bgzip_exe, "--force", self._basepath])
os.rename(self._basepath + ".gz", self.filename)
# open file with FastaFile to create indexes, then make all read-only
_fh = FastaFile(self.filename)
_fh.close()
os.chmod(self.filename, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
os.chmod(self.filename + ".fai", stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
os.chmod(self.filename + ".gzi", stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
logger.info("{} written; added {} sequences".format(self.filename, len(self._added)))
def main():
parser = argparse.ArgumentParser(
description='Generate a script that invokes the Dart tester')
parser.add_argument('--out',
help='Path to the invocation file to generate',
required=True)
parser.add_argument('--source-dir',
help='Path to test sources',
required=True)
parser.add_argument('--dot-packages',
help='Path to the .packages file',
required=True)
parser.add_argument('--test-runner',
help='Path to the test runner',
required=True)
parser.add_argument('--flutter-shell',
help='Path to the Flutter shell',
required=True)
args = parser.parse_args()
test_file = args.out
test_path = os.path.dirname(test_file)
if not os.path.exists(test_path):
os.makedirs(test_path)
script_template = string.Template('''#!/bin/sh
$test_runner \\
--packages=$dot_packages \\
--shell=$flutter_shell \\
--test-directory=$source_dir
''')
with open(test_file, 'w') as file:
file.write(script_template.substitute(args.__dict__))
permissions = (stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR |
stat.S_IRGRP | stat.S_IWGRP | stat.S_IXGRP |
stat.S_IROTH)
os.chmod(test_file, permissions)
def test_path_no_perms(self):
with utils.NamedTemporaryDirectory() as tmp_d:
os.chmod(tmp_d, stat.S_IREAD | stat.S_IRGRP | stat.S_IROTH)
self.assertFalse(utils.is_writeable(tmp_d))
def _selinuxCreateFauxFilesystems():
(fd, path) = tempfile.mkstemp(prefix="mock-selinux-plugin.")
with os.fdopen(fd, 'w') as out:
with open("/proc/filesystems") as host:
for line in host:
if "selinuxfs" not in line:
out.write(line)
os.chmod(path, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
return path
def stats_to_str(s):
return "%s%s%s%s" % (dbpcs(s.st_mode),
rwx(s.st_mode, stat.S_IRUSR, stat.S_IWUSR, stat.S_IXUSR),
rwx(s.st_mode, stat.S_IRGRP, stat.S_IWGRP, stat.S_IXGRP),
rwx(s.st_mode, stat.S_IROTH, stat.S_IWOTH, stat.S_IXOTH))
def add_exec(file_path):
"""Add execution rights for user, group and others to the given file"""
print("change permissions")
# change Permission with bitwies or and the constants from stat modul
os.chmod(file_path, stat.S_IRUSR | stat.S_IWUSR| stat.S_IXUSR | \
stat.S_IRGRP | stat.S_IXGRP | stat.S_IROTH | stat.S_IXOTH)
# global vars are very bad but I think the only solution for this
def make_pidfile(self):
if not os.path.isdir(PIDFILE_PATH):
os.mkdir(PIDFILE_PATH)
pidfilename = os.path.join(PIDFILE_PATH, "{}.pid".format(self.name))
for fn in os.listdir(PIDFILE_PATH):
if not fn.endswith('.pid') or fn.count('_') != 2:
continue
pid, model, clsname = fn.split('_')
if clsname == self.__class__.__name__ + '.pid' and model == self.model_name:
self.kill_process(fn)
with open(pidfilename, 'w+') as f:
f.write(str(self.process.pid))
os.chmod(pidfilename, stat.S_IRUSR | stat.S_IWUSR | stat.S_IRGRP | stat.S_IWGRP | stat.S_IROTH | stat.S_IWOTH)
def checkFixPermissionsForPath(pth):
mode = os.stat(pth).st_mode
groupReadable = stat.S_IRGRP & mode == stat.S_IRGRP
otherReadable = stat.S_IROTH & mode == stat.S_IROTH
msg = ''
if not groupReadable:
msg += ' not group readable'
mode |= stat.S_IRGRP
if not otherReadable:
msg += ' not other readable'
mode |= stat.S_IROTH
if len(msg.strip())>0:
print("%s, fixing permissions for %s" % (msg, pth))
os.chmod(pth, mode)
def test_consoleloggingreporter_dir_not_writeable(consolelogger, serial_driver, tmpdir):
os.chmod(str(tmpdir), stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
def return_test(self, size=1, timeout=0.0):
return b"test"
serial_driver.serial.in_waiting = 4
serial_driver.serial.read = return_test
serial_driver.read()
def is_executable_file(path):
"""Checks that path is an executable regular file (or a symlink to a file).
This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``, but
on some platforms :func:`os.access` gives us the wrong answer, so this
checks permission bits directly.
"""
# follow symlinks,
fpath = os.path.realpath(path)
# return False for non-files (directories, fifo, etc.)
if not os.path.isfile(fpath):
return False
# On Solaris, etc., "If the process has appropriate privileges, an
# implementation may indicate success for X_OK even if none of the
# execute file permission bits are set."
#
# For this reason, it is necessary to explicitly check st_mode
# get file mode using os.stat, and check if `other',
# that is anybody, may read and execute.
mode = os.stat(fpath).st_mode
if mode & stat.S_IROTH and mode & stat.S_IXOTH:
return True
# get current user's group ids, and check if `group',
# when matching ours, may read and execute.
user_gids = os.getgroups() + [os.getgid()]
if (os.stat(fpath).st_gid in user_gids and
mode & stat.S_IRGRP and mode & stat.S_IXGRP):
return True
# finally, if file owner matches our effective userid,
# check if `user', may read and execute.
user_gids = os.getgroups() + [os.getgid()]
if (os.stat(fpath).st_uid == os.geteuid() and
mode & stat.S_IRUSR and mode & stat.S_IXUSR):
return True
return False
def test_set_config_value_file_permissions(self):
with mock.patch('azure.cli.core._config.GLOBAL_CONFIG_DIR', self.config_dir), \
mock.patch('azure.cli.core._config.GLOBAL_CONFIG_PATH', self.config_path):
set_global_config_value('test_section', 'test_option', 'a_value')
file_mode = os.stat(self.config_path).st_mode
self.assertTrue(bool(file_mode & stat.S_IRUSR))
self.assertTrue(bool(file_mode & stat.S_IWUSR))
self.assertFalse(bool(file_mode & stat.S_IXUSR))
self.assertFalse(bool(file_mode & stat.S_IRGRP))
self.assertFalse(bool(file_mode & stat.S_IWGRP))
self.assertFalse(bool(file_mode & stat.S_IXGRP))
self.assertFalse(bool(file_mode & stat.S_IROTH))
self.assertFalse(bool(file_mode & stat.S_IWOTH))
self.assertFalse(bool(file_mode & stat.S_IXOTH))
def _rename_file(src, dst):
"""Rename the specified file"""
if os.name == 'nt':
# TODO: check that fs.rm_safe works on windows, and if not, fix
# fs.rm_safe.
fs.rm_safe(dst)
os.rename(src, dst)
mode = os.stat(dst).st_mode
mode |= (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
if _is_executable(dst):
mode |= (stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH)
os.chmod(dst, mode)
def is_executable_file(path):
"""Checks that path is an executable regular file (or a symlink to a file).
This is roughly ``os.path isfile(path) and os.access(path, os.X_OK)``, but
on some platforms :func:`os.access` gives us the wrong answer, so this
checks permission bits directly.
"""
# follow symlinks,
fpath = os.path.realpath(path)
# return False for non-files (directories, fifo, etc.)
if not os.path.isfile(fpath):
return False
# On Solaris, etc., "If the process has appropriate privileges, an
# implementation may indicate success for X_OK even if none of the
# execute file permission bits are set."
#
# For this reason, it is necessary to explicitly check st_mode
# get file mode using os.stat, and check if `other',
# that is anybody, may read and execute.
mode = os.stat(fpath).st_mode
if mode & stat.S_IROTH and mode & stat.S_IXOTH:
return True
# get current user's group ids, and check if `group',
# when matching ours, may read and execute.
user_gids = os.getgroups() + [os.getgid()]
if (os.stat(fpath).st_gid in user_gids and
mode & stat.S_IRGRP and mode & stat.S_IXGRP):
return True
# finally, if file owner matches our effective userid,
# check if `user', may read and execute.
user_gids = os.getgroups() + [os.getgid()]
if (os.stat(fpath).st_uid == os.geteuid() and
mode & stat.S_IRUSR and mode & stat.S_IXUSR):
return True
return False
def _copy_permissions(mode, filename):
"""
If the file in the archive has some permissions (this assumes a file
won't be writable/executable without being readable), apply those
permissions to the unarchived file.
"""
if mode & stat.S_IROTH:
os.chmod(filename, mode)