def test_read_only_bytecode(self):
# When bytecode is read-only but should be rewritten, fail silently.
with source_util.create_modules('_temp') as mapping:
# Create bytecode that will need to be re-created.
py_compile.compile(mapping['_temp'])
bytecode_path = self.util.cache_from_source(mapping['_temp'])
with open(bytecode_path, 'r+b') as bytecode_file:
bytecode_file.seek(0)
bytecode_file.write(b'\x00\x00\x00\x00')
# Make the bytecode read-only.
os.chmod(bytecode_path,
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
try:
# Should not raise OSError!
self.import_(mapping['_temp'], '_temp')
finally:
# Make writable for eventual clean-up.
os.chmod(bytecode_path, stat.S_IWUSR)
python类S_IRUSR的实例源码
def test_mknod(self):
# Test using mknod() to create a FIFO (the only use specified
# by POSIX).
support.unlink(support.TESTFN)
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
try:
posix.mknod(support.TESTFN, mode, 0)
except OSError as e:
# Some old systems don't allow unprivileged users to use
# mknod(), or only support creating device nodes.
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
else:
self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
# Keyword arguments are also supported
support.unlink(support.TESTFN)
try:
posix.mknod(path=support.TESTFN, mode=mode, device=0,
dir_fd=None)
except OSError as e:
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
def test_mknod_dir_fd(self):
# Test using mknodat() to create a FIFO (the only use specified
# by POSIX).
support.unlink(support.TESTFN)
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
except OSError as e:
# Some old systems don't allow unprivileged users to use
# mknod(), or only support creating device nodes.
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
else:
self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
finally:
posix.close(f)
def test_execute_bit_not_copied(self):
# Issue 6070: under posix .pyc files got their execute bit set if
# the .py file had the execute bit set, but they aren't executable.
oldmask = os.umask(022)
sys.path.insert(0, os.curdir)
try:
fname = TESTFN + os.extsep + "py"
f = open(fname, 'w').close()
os.chmod(fname, (stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH |
stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH))
__import__(TESTFN)
fn = fname + 'c'
if not os.path.exists(fn):
fn = fname + 'o'
if not os.path.exists(fn):
self.fail("__import__ did not result in creation of "
"either a .pyc or .pyo file")
s = os.stat(fn)
self.assertEqual(stat.S_IMODE(s.st_mode),
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
finally:
os.umask(oldmask)
remove_files(TESTFN)
unload(TESTFN)
del sys.path[0]
def _get_netrc(cls):
# Buffer the '.netrc' path. Use an empty file if it doesn't exist.
path = cls.get_netrc_path()
content = ''
if os.path.exists(path):
st = os.stat(path)
if st.st_mode & (stat.S_IRWXG | stat.S_IRWXO):
print >> sys.stderr, (
'WARNING: netrc file %s cannot be used because its file '
'permissions are insecure. netrc file permissions should be '
'600.' % path)
with open(path) as fd:
content = fd.read()
# Load the '.netrc' file. We strip comments from it because processing them
# can trigger a bug in Windows. See crbug.com/664664.
content = '\n'.join(l for l in content.splitlines()
if l.strip() and not l.strip().startswith('#'))
with tempdir() as tdir:
netrc_path = os.path.join(tdir, 'netrc')
with open(netrc_path, 'w') as fd:
fd.write(content)
os.chmod(netrc_path, (stat.S_IRUSR | stat.S_IWUSR))
return cls._get_netrc_from_path(netrc_path)
def create_boilerplate():
"""
Create kibitzr.yml and kibitzr-creds.yml in current directory
if they do not exist.
"""
if not os.path.exists('kibitzr.yml'):
with open('kibitzr.yml', 'wt') as fp:
logger.info("Saving sample check in kibitzr.yml")
fp.write(KIBITZR_YML)
else:
logger.info("kibitzr.yml already exists. Skipping")
if not os.path.exists('kibitzr-creds.yml'):
with open('kibitzr-creds.yml', 'wt') as fp:
logger.info("Creating kibitzr-creds.yml")
fp.write(KIBITZR_CREDS_YML)
os.chmod('kibitzr-creds.yml', stat.S_IRUSR | stat.S_IWUSR)
else:
logger.info("kibitzr-creds.yml already exists. Skipping")
def _octal_to_perm(octal):
perms = list("-" * 9)
if octal & stat.S_IRUSR:
perms[0] = "r"
if octal & stat.S_IWUSR:
perms[1] = "w"
if octal & stat.S_IXUSR:
perms[2] = "x"
if octal & stat.S_IRGRP:
perms[3] = "r"
if octal & stat.S_IWGRP:
perms[4] = "w"
if octal & stat.S_IXGRP:
perms[5] = "x"
if octal & stat.S_IROTH:
perms[6] = "r"
if octal & stat.S_IWOTH:
perms[7] = "w"
if octal & stat.S_IXOTH:
perms[8] = "x"
return "".join(perms)
def _get_netrc(cls):
# Buffer the '.netrc' path. Use an empty file if it doesn't exist.
path = cls.get_netrc_path()
content = ''
if os.path.exists(path):
st = os.stat(path)
if st.st_mode & (stat.S_IRWXG | stat.S_IRWXO):
print >> sys.stderr, (
'WARNING: netrc file %s cannot be used because its file '
'permissions are insecure. netrc file permissions should be '
'600.' % path)
with open(path) as fd:
content = fd.read()
# Load the '.netrc' file. We strip comments from it because processing them
# can trigger a bug in Windows. See crbug.com/664664.
content = '\n'.join(l for l in content.splitlines()
if l.strip() and not l.strip().startswith('#'))
with tempdir() as tdir:
netrc_path = os.path.join(tdir, 'netrc')
with open(netrc_path, 'w') as fd:
fd.write(content)
os.chmod(netrc_path, (stat.S_IRUSR | stat.S_IWUSR))
return cls._get_netrc_from_path(netrc_path)
def init_workspace(cfg, is_inplace=False):
log.info("init workspace is_inplace:%r",is_inplace)
if is_inplace:
init_workspace_inplace(cfg)
else:
# create a clean workspace
workspace_dir = os.path.join(BASE_HOME,cfg.exec_home)
if not os.path.exists(workspace_dir):
os.mkdir(workspace_dir)
else:
shutil.rmtree(workspace_dir)
os.mkdir(workspace_dir)
cfg.workspace_dir = workspace_dir
# copy target file into workspace
if os.path.exists(cfg.target):
shutil.copy(cfg.target, workspace_dir)
target_abs_path = os.path.join(workspace_dir, os.path.basename(cfg.target))
cfg.target_abs_path = target_abs_path
os.chmod(target_abs_path, stat.S_IRUSR)
else:
log.critical("%s dose not exist.", cfg.target)
os._exit(1)
log.info("Target absolute path: %s", cfg.target_abs_path)
# I can not change dir at here, the dir will be changed everytime when analyzer starts.
def test_read_only_bytecode(self):
# When bytecode is read-only but should be rewritten, fail silently.
with source_util.create_modules('_temp') as mapping:
# Create bytecode that will need to be re-created.
py_compile.compile(mapping['_temp'])
bytecode_path = self.util.cache_from_source(mapping['_temp'])
with open(bytecode_path, 'r+b') as bytecode_file:
bytecode_file.seek(0)
bytecode_file.write(b'\x00\x00\x00\x00')
# Make the bytecode read-only.
os.chmod(bytecode_path,
stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH)
try:
# Should not raise OSError!
self.import_(mapping['_temp'], '_temp')
finally:
# Make writable for eventual clean-up.
os.chmod(bytecode_path, stat.S_IWUSR)
def test_mknod_dir_fd(self):
# Test using mknodat() to create a FIFO (the only use specified
# by POSIX).
support.unlink(support.TESTFN)
mode = stat.S_IFIFO | stat.S_IRUSR | stat.S_IWUSR
f = posix.open(posix.getcwd(), posix.O_RDONLY)
try:
posix.mknod(support.TESTFN, mode, 0, dir_fd=f)
except OSError as e:
# Some old systems don't allow unprivileged users to use
# mknod(), or only support creating device nodes.
self.assertIn(e.errno, (errno.EPERM, errno.EINVAL))
else:
self.assertTrue(stat.S_ISFIFO(posix.stat(support.TESTFN).st_mode))
finally:
posix.close(f)
def write(filename, data, undisclosed=False):
"""
Write JSON data to (owner only readable) file.
This will also create missing directories.
Args:
filename (str): file path
data (str): data to write
undisclosed (bool): whether data should be owner only readable
"""
makedirs(os.path.dirname(filename))
with open(filename, 'w') as fp:
if undisclosed:
os.chmod(filename, stat.S_IRUSR | stat.S_IWUSR)
json.dump(data, fp)
def open_for_writing(filename, permissions=None, mode="w", force=False):
if permissions is None:
permissions = stat.S_IRUSR | stat.S_IWUSR
if force and os.path.exists(filename):
os.unlink(filename)
umask_original = os.umask(0)
try:
fd = os.open(filename, os.O_WRONLY | os.O_CREAT | os.O_EXCL, permissions)
finally:
os.umask(umask_original)
# Open file handle and write to file
with os.fdopen(fd, mode) as f:
yield f
def printlog(message, logpath=False):
# I think the logging module is great, but this will be used for the time being
# Eventually, we will want to write out multiple output formats: xml,json, etc
if logpath:
# Next iteration of project will include a more secure logger,
# for now, we will just write results to a file directly.
# flags = os.O_WRONLY | os.O_CREAT | os.O_EXCL
# mode = stat.S_IRUSR | stat.S_IWUSR
# umask_original = os.umask(0)
# try:
# fdesc = os.open(logpath, flags, mode)
# # except(OSError):
# # print("[-] Log file exists. Remove it, or change log filename")
# # raise SystemExit
# finally:
# os.umask(umask_original)
# with os.fdopen(fdesc, 'w') as fout:
with open(logpath, 'a') as fout:
fout.write("{}\n".format(message))
print("{}".format(message))
def _get_netrc(cls):
# Buffer the '.netrc' path. Use an empty file if it doesn't exist.
path = cls.get_netrc_path()
content = ''
if os.path.exists(path):
st = os.stat(path)
if st.st_mode & (stat.S_IRWXG | stat.S_IRWXO):
print >> sys.stderr, (
'WARNING: netrc file %s cannot be used because its file '
'permissions are insecure. netrc file permissions should be '
'600.' % path)
with open(path) as fd:
content = fd.read()
# Load the '.netrc' file. We strip comments from it because processing them
# can trigger a bug in Windows. See crbug.com/664664.
content = '\n'.join(l for l in content.splitlines()
if l.strip() and not l.strip().startswith('#'))
with tempdir() as tdir:
netrc_path = os.path.join(tdir, 'netrc')
with open(netrc_path, 'w') as fd:
fd.write(content)
os.chmod(netrc_path, (stat.S_IRUSR | stat.S_IWUSR))
return cls._get_netrc_from_path(netrc_path)
def is_file_readable(filepath):
"""
Check if the file given is readable to the user we are currently running
at
"""
uid = os.getuid()
euid = os.geteuid()
gid = os.getgid()
egid = os.getegid()
# This is probably true most of the time, so just let os.access()
# handle it. Avoids potential bugs in the rest of this function.
if uid == euid and gid == egid:
return os.access(filepath, os.R_OK)
st = os.stat(filepath)
if st.st_uid == euid:
return st.st_mode & stat.S_IRUSR != 0
groups = os.getgroups()
if st.st_gid == egid or st.st_gid in groups:
return st.st_mode & stat.S_IRGRP != 0
return st.st_mode & stat.S_IROTH != 0
def test_safe_makedirs(self):
from softwarecenter.utils import safe_makedirs
from tempfile import mkdtemp
tmp = mkdtemp()
# create base dir
target = os.path.join(tmp, "foo", "bar")
safe_makedirs(target)
# we need the patch to ensure that the code is actually executed
with patch("os.path.exists") as mock_:
mock_.return_value = False
self.assertTrue(os.path.isdir(target))
# ensure that creating the base dir again does not crash
safe_makedirs(target)
self.assertTrue(os.path.isdir(target))
# ensure we still get regular errors like permission denied
# (stat.S_IRUSR)
os.chmod(os.path.join(tmp, "foo"), 0400)
self.assertRaises(OSError, safe_makedirs, target)
# set back to stat.(S_IRUSR|S_IWUSR|S_IXUSR) to make rmtree work
os.chmod(os.path.join(tmp, "foo"), 0700)
# cleanup
shutil.rmtree(tmp)
def test_no_write_access_for_cache_dir(self):
""" test for bug LP: #688682 """
# make the test cache dir non-writeable
import softwarecenter.paths
cache_dir = softwarecenter.paths.SOFTWARE_CENTER_CACHE_DIR
# set not-writable (mode 0400)
os.chmod(cache_dir, stat.S_IRUSR)
self.assertFalse(os.access(cache_dir, os.W_OK))
# and then start up the logger
import softwarecenter.log
softwarecenter.log # pyflakes
# check that the old directory was moved aside (renamed with a ".0" appended)
self.assertTrue(os.path.exists(cache_dir + ".0"))
self.assertFalse(os.path.exists(cache_dir + ".1"))
# and check that the new directory was created and is now writable
self.assertTrue(os.path.exists(cache_dir))
self.assertTrue(os.access(cache_dir, os.W_OK))
def apply_playbook(playbook, tags=None, extra_vars=None):
tags = tags or []
tags = ",".join(tags)
charmhelpers.contrib.templating.contexts.juju_state_to_yaml(
ansible_vars_path, namespace_separator='__',
allow_hyphens_in_keys=False, mode=(stat.S_IRUSR | stat.S_IWUSR))
# we want ansible's log output to be unbuffered
env = os.environ.copy()
env['PYTHONUNBUFFERED'] = "1"
call = [
'ansible-playbook',
'-c',
'local',
playbook,
]
if tags:
call.extend(['--tags', '{}'.format(tags)])
if extra_vars:
extra = ["%s=%s" % (k, v) for k, v in extra_vars.items()]
call.extend(['--extra-vars', " ".join(extra)])
subprocess.check_call(call, env=env)
def testPermission(self):
with TemporaryDirectory() as tmp:
d = os.path.join(tmp, "dir")
os.mkdir(d)
with open(os.path.join(d, "file"), "w") as f:
f.write("data")
os.chmod(d, stat.S_IRUSR | stat.S_IXUSR)
self.assertRaises(BuildError, removePath, tmp)
os.chmod(d, stat.S_IRWXU)