def _setup_script_dir(self, email_list):
"""
setup the directory in the log_dir to hold all of the shell scripts we
will generate. Also write the epilogue script to this directory
:param email_list: we need the list of email addresses (usually a
single address) when generating the epilogue script from the
template
"""
script_dir = os.path.join(self.log_dir, _SHELL_SCRIPT_DIR)
try:
os.makedirs(script_dir)
os.chmod(script_dir, stat.S_IRWXU)
if self.need_to_write_epilogue:
with open(self.epilogue_filename, "w") as epilogue_file:
epilogue_file.write(self.generate_epilogue(email_list))
os.chmod(self.epilogue_filename, stat.S_IRWXU)
self.need_to_write_epilogue = False
except OSError as exception:
if exception.errno != errno.EEXIST:
print('Error while creating directory ' + script_dir, file=sys.stderr)
raise
python类S_IRWXU的实例源码
def get_perf(filename):
''' run conlleval.pl perl script to obtain
precision/recall and F1 score '''
_conlleval = 'conlleval.pl'
if not isfile(_conlleval):
# download('http://www-etud.iro.umontreal.ca/~mesnilgr/atis/conlleval.pl')
os.system('wget https://www.comp.nus.edu.sg/%7Ekanmy/courses/practicalNLP_2008/packages/conlleval.pl')
chmod('conlleval.pl', stat.S_IRWXU) # give the execute permissions
proc = subprocess.Popen(["perl", _conlleval], stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)
stdout, _ = proc.communicate(open(filename).read())
for line in stdout.split('\n'):
if 'accuracy' in line:
out = line.split()
break
# out = ['accuracy:', '16.26%;', 'precision:', '0.00%;', 'recall:', '0.00%;', 'FB1:', '0.00']
precision = float(out[3][:-2])
recall = float(out[5][:-2])
f1score = float(out[7])
return {'p': precision, 'r': recall, 'f1': f1score}
def getattr(self, path, fh=None):
stat = self.send_cmd(cmd='stat', path=path)
if stat['status'] != 'ok':
raise FuseOSError(ENOENT)
fuse_stat = {}
# Set it to 777 access
fuse_stat['st_mode'] = 0
if stat['type'] == 'folder':
fuse_stat['st_mode'] |= S_IFDIR
else:
fuse_stat['st_mode'] |= S_IFREG
fuse_stat['st_size'] = stat.get('size', 0)
fuse_stat['st_ctime'] = dateparse(stat['created']).timestamp() # TODO change to local timezone
fuse_stat['st_mtime'] = dateparse(stat['updated']).timestamp()
fuse_stat['st_atime'] = dateparse(stat['updated']).timestamp() # TODO not supported ?
fuse_stat['st_mode'] |= S_IRWXU | S_IRWXG | S_IRWXO
fuse_stat['st_nlink'] = 1
fuse_stat['st_uid'] = os.getuid()
fuse_stat['st_gid'] = os.getgid()
return fuse_stat
def get_perf(filename):
''' run conlleval.pl perl script to obtain
precision/recall and F1 score '''
_conlleval = os.path.dirname(os.path.realpath(__file__)) + '/conlleval.pl'
os.chmod(_conlleval, stat.S_IRWXU) # give the execute permissions
proc = subprocess.Popen(["perl",
_conlleval],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
stdout, _ = proc.communicate(''.join(open(filename).readlines()))
for line in stdout.split('\n'):
if 'accuracy' in line:
out = line.split()
break
precision = float(out[6][:-2])
recall = float(out[8][:-2])
f1score = float(out[10])
return {'p': precision, 'r': recall, 'f1': f1score}
def makeFilesExecutable():
scriptPath = xbmc.translatePath(xbmcaddon.Addon(id = 'emulator.tools.retroarch').getAddonInfo('path'))
scriptPath = os.path.join(scriptPath, 'bin')
file1 = os.path.join(scriptPath, 'retroarch.sh')
file2 = os.path.join(scriptPath, 'retroarch.start')
file3 = os.path.join(scriptPath, 'retroarch')
try:
os.chmod(file1, stat.S_IRWXU|stat.S_IRWXG|stat.S_IROTH|stat.S_IXOTH)
os.chmod(file2, stat.S_IRWXU|stat.S_IRWXG|stat.S_IROTH|stat.S_IXOTH)
os.chmod(file3, stat.S_IRWXU|stat.S_IRWXG|stat.S_IROTH|stat.S_IXOTH)
d = xbmcgui.Dialog()
d.ok('RetroArch', 'File permissions applied', 'scripts should now be executable')
except:
d = xbmcgui.Dialog()
d.ok('RetroArch', 'Failed to apply permissions', 'Please try again later or do it manualy via ssh')
def test_copy_symlinks(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
write_file(src, 'foo')
os.symlink(src, src_link)
if hasattr(os, 'lchmod'):
os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
# don't follow
shutil.copy(src_link, dst, follow_symlinks=True)
self.assertFalse(os.path.islink(dst))
self.assertEqual(read_file(src), read_file(dst))
os.remove(dst)
# follow
shutil.copy(src_link, dst, follow_symlinks=False)
self.assertTrue(os.path.islink(dst))
self.assertEqual(os.readlink(dst), os.readlink(src_link))
if hasattr(os, 'lchmod'):
self.assertEqual(os.lstat(src_link).st_mode,
os.lstat(dst).st_mode)
def build_unpack_comic(self, comic_path):
logging.info("%s unpack requested" % comic_path)
for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, "build"), topdown=False):
for f in files:
os.chmod(os.path.join(root, f), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777
os.remove(os.path.join(root, f))
for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, "build"), topdown=False):
for d in dirs:
os.chmod(os.path.join(root, d), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777
os.rmdir(os.path.join(root, d))
if comic_path.endswith(".cbr"):
opened_rar = rarfile.RarFile(comic_path)
opened_rar.extractall(os.path.join(gazee.TEMP_DIR, "build"))
elif comic_path.endswith(".cbz"):
opened_zip = zipfile.ZipFile(comic_path)
opened_zip.extractall(os.path.join(gazee.TEMP_DIR, "build"))
return
def user_unpack_comic(self, comic_path, user):
logging.info("%s unpack requested" % comic_path)
for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, user), topdown=False):
for f in files:
os.chmod(os.path.join(root, f), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777
os.remove(os.path.join(root, f))
for root, dirs, files in os.walk(os.path.join(gazee.TEMP_DIR, user), topdown=False):
for d in dirs:
os.chmod(os.path.join(root, d), stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777
os.rmdir(os.path.join(root, d))
if comic_path.endswith(".cbr"):
opened_rar = rarfile.RarFile(comic_path)
opened_rar.extractall(os.path.join(gazee.TEMP_DIR, user))
elif comic_path.endswith(".cbz"):
opened_zip = zipfile.ZipFile(comic_path)
opened_zip.extractall(os.path.join(gazee.TEMP_DIR, user))
return
# This method will return a list of .jpg files in their numberical order to be fed into the reading view.
def test_copy_symlinks(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
write_file(src, 'foo')
os.symlink(src, src_link)
if hasattr(os, 'lchmod'):
os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
# don't follow
shutil.copy(src_link, dst, follow_symlinks=True)
self.assertFalse(os.path.islink(dst))
self.assertEqual(read_file(src), read_file(dst))
os.remove(dst)
# follow
shutil.copy(src_link, dst, follow_symlinks=False)
self.assertTrue(os.path.islink(dst))
self.assertEqual(os.readlink(dst), os.readlink(src_link))
if hasattr(os, 'lchmod'):
self.assertEqual(os.lstat(src_link).st_mode,
os.lstat(dst).st_mode)
def env(self):
class Environment(str): pass
with contexts.tempdir(prefix='setuptools-test.') as env_dir:
env = Environment(env_dir)
os.chmod(env_dir, stat.S_IRWXU)
subs = 'home', 'lib', 'scripts', 'data', 'egg-base'
env.paths = dict(
(dirname, os.path.join(env_dir, dirname))
for dirname in subs
)
list(map(os.mkdir, env.paths.values()))
config = os.path.join(env.paths['home'], '.pydistutils.cfg')
with open(config, 'w') as f:
f.write(DALS("""
[egg_info]
egg-base = %(egg-base)s
""" % env.paths
))
yield env
def env(self, tmpdir):
"""
Create a package environment, similar to a virtualenv,
in which packages are installed.
"""
class Environment(str):
pass
env = Environment(tmpdir)
tmpdir.chmod(stat.S_IRWXU)
subs = 'home', 'lib', 'scripts', 'data', 'egg-base'
env.paths = dict(
(dirname, str(tmpdir / dirname))
for dirname in subs
)
list(map(os.mkdir, env.paths.values()))
return env
def cleanup(self):
print("Cleaning up")
if os.path.exists(self.temp):
# The check for "tempxxx" is to guard against bugs with path operations
# and command-line arguments which might lead to the accidental deletion
# of an important directory.
if "tempxxx" in self.temp:
def handleRemoveReadonly(func, path, exc):
excvalue = exc[1]
if excvalue.errno == errno.EACCES:
os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777
func(path)
else:
raise
print("rmtree {0}".format(self.temp));
shutil.rmtree(self.temp, ignore_errors=False, onerror=handleRemoveReadonly)
def env(self, tmpdir):
"""
Create a package environment, similar to a virtualenv,
in which packages are installed.
"""
class Environment(str):
pass
env = Environment(tmpdir)
tmpdir.chmod(stat.S_IRWXU)
subs = 'home', 'lib', 'scripts', 'data', 'egg-base'
env.paths = dict(
(dirname, str(tmpdir / dirname))
for dirname in subs
)
list(map(os.mkdir, env.paths.values()))
return env
def env(self):
with contexts.tempdir(prefix='setuptools-test.') as env_dir:
env = Environment(env_dir)
os.chmod(env_dir, stat.S_IRWXU)
subs = 'home', 'lib', 'scripts', 'data', 'egg-base'
env.paths = dict(
(dirname, os.path.join(env_dir, dirname))
for dirname in subs
)
list(map(os.mkdir, env.paths.values()))
build_files({
env.paths['home']: {
'.pydistutils.cfg': DALS("""
[egg_info]
egg-base = %(egg-base)s
""" % env.paths)
}
})
yield env
def get_perf(filename):
''' run conlleval.pl perl script to obtain
precision/recall and F1 score '''
_conlleval = PREFIX + 'conlleval.pl'
if not isfile(_conlleval):
#download('http://www-etud.iro.umontreal.ca/~mesnilgr/atis/conlleval.pl')
os.system('wget https://www.comp.nus.edu.sg/%7Ekanmy/courses/practicalNLP_2008/packages/conlleval.pl')
chmod('conlleval.pl', stat.S_IRWXU) # give the execute permissions
proc = subprocess.Popen(["perl", _conlleval], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
stdout, _ = proc.communicate(open(filename,'rb').read())
for line in stdout.decode("utf-8").split('\n'):
if 'accuracy' in line:
out = line.split()
break
# out = ['accuracy:', '16.26%;', 'precision:', '0.00%;', 'recall:', '0.00%;', 'FB1:', '0.00']
precision = float(out[3][:-2])
recall = float(out[5][:-2])
f1score = float(out[7])
return {'p':precision, 'r':recall, 'f1':f1score}
def get_perfo(filename):
'''
work around for using a PERL script in python
dirty but still works.
'''
tempfile = str(random.randint(1,numpy.iinfo('i').max)) + '.txt'
if not isfile(PREFIX + 'conlleval.pl'):
os.system('wget https://www.comp.nus.edu.sg/%7Ekanmy/courses/practicalNLP_2008/packages/conlleval.pl')
#download('http://www-etud.iro.umontreal.ca/~mesnilgr/atis/conlleval.pl')
chmod('conlleval.pl', stat.S_IRWXU) # give the execute permissions
if len(PREFIX) > 0:
chmod(PREFIX + 'conlleval.pl', stat.S_IRWXU) # give the execute permissions
cmd = PREFIX + 'conlleval.pl < %s | grep accuracy > %s'%(filename,tempfile)
else:
cmd = './conlleval.pl < %s | grep accuracy > %s'%(filename,tempfile)
print(cmd)
out = os.system(cmd)
out = open(tempfile).readlines()[0].split()
os.system('rm %s'%tempfile)
precision = float(out[6][:-2])
recall = float(out[8][:-2])
f1score = float(out[10])
return {'p':precision, 'r':recall, 'f1':f1score}
def setUpClass( cls ):
print('Python: ' + sys.executable)
try:
os.makedirs( cls.TEST_RUN_DIR )
except:
pass
os.chdir( cls.TEST_RUN_DIR )
os.environ['HOME'] = cls.TEST_RUN_DIR
cache_dir = os.path.join(os.environ['HOME'], '.cache', 'futoin-cid')
for cleanup_dir in (cache_dir, cls.TEST_DIR):
if os.path.exists( cleanup_dir ) :
for ( path, dirs, files ) in os.walk( cleanup_dir ) :
for id in dirs + files :
try:
os.chmod( os.path.join( path, id ), stat.S_IRWXU )
except:
pass
shutil.rmtree( cleanup_dir )
if cls._create_test_dir:
os.mkdir(cls.TEST_DIR)
os.chdir(cls.TEST_DIR)
def test_copy_symlinks(self):
tmp_dir = self.mkdtemp()
src = os.path.join(tmp_dir, 'foo')
dst = os.path.join(tmp_dir, 'bar')
src_link = os.path.join(tmp_dir, 'baz')
write_file(src, 'foo')
os.symlink(src, src_link)
if hasattr(os, 'lchmod'):
os.lchmod(src_link, stat.S_IRWXU | stat.S_IRWXO)
# don't follow
shutil.copy(src_link, dst, follow_symlinks=True)
self.assertFalse(os.path.islink(dst))
self.assertEqual(read_file(src), read_file(dst))
os.remove(dst)
# follow
shutil.copy(src_link, dst, follow_symlinks=False)
self.assertTrue(os.path.islink(dst))
self.assertEqual(os.readlink(dst), os.readlink(src_link))
if hasattr(os, 'lchmod'):
self.assertEqual(os.lstat(src_link).st_mode,
os.lstat(dst).st_mode)
def _apply_operation_to_mode(user, operator, mode_to_apply, current_mode):
if operator == '=':
if user == 'u':
mask = stat.S_IRWXU | stat.S_ISUID
elif user == 'g':
mask = stat.S_IRWXG | stat.S_ISGID
elif user == 'o':
mask = stat.S_IRWXO | stat.S_ISVTX
# mask out u, g, or o permissions from current_mode and apply new permissions
inverse_mask = mask ^ PERM_BITS
new_mode = (current_mode & inverse_mask) | mode_to_apply
elif operator == '+':
new_mode = current_mode | mode_to_apply
elif operator == '-':
new_mode = current_mode - (current_mode & mode_to_apply)
return new_mode
def testPermission(self):
with TemporaryDirectory() as tmp:
d = os.path.join(tmp, "dir")
os.mkdir(d)
with open(os.path.join(d, "file"), "w") as f:
f.write("data")
os.chmod(d, stat.S_IRUSR | stat.S_IXUSR)
self.assertRaises(BuildError, removePath, tmp)
os.chmod(d, stat.S_IRWXU)
def testPermission(self):
with TemporaryDirectory() as tmp:
d = os.path.join(tmp, "dir")
os.mkdir(d)
with open(os.path.join(d, "file"), "w") as f:
f.write("data")
os.chmod(d, stat.S_IRUSR | stat.S_IXUSR)
self.assertRaises(BuildError, emptyDirectory, tmp)
os.chmod(d, stat.S_IRWXU)
def _get_default_cache_dir(self):
def _unsafe_dir():
raise RuntimeError('Cannot determine safe temp directory. You '
'need to explicitly provide one.')
tmpdir = tempfile.gettempdir()
# On windows the temporary directory is used specific unless
# explicitly forced otherwise. We can just use that.
if os.name == 'nt':
return tmpdir
if not hasattr(os, 'getuid'):
_unsafe_dir()
dirname = '_jinja2-cache-%d' % os.getuid()
actual_dir = os.path.join(tmpdir, dirname)
try:
os.mkdir(actual_dir, stat.S_IRWXU)
except OSError as e:
if e.errno != errno.EEXIST:
raise
try:
os.chmod(actual_dir, stat.S_IRWXU)
actual_dir_stat = os.lstat(actual_dir)
if actual_dir_stat.st_uid != os.getuid() \
or not stat.S_ISDIR(actual_dir_stat.st_mode) \
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
_unsafe_dir()
except OSError as e:
if e.errno != errno.EEXIST:
raise
actual_dir_stat = os.lstat(actual_dir)
if actual_dir_stat.st_uid != os.getuid() \
or not stat.S_ISDIR(actual_dir_stat.st_mode) \
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
_unsafe_dir()
return actual_dir
def _get_default_cache_dir(self):
def _unsafe_dir():
raise RuntimeError('Cannot determine safe temp directory. You '
'need to explicitly provide one.')
tmpdir = tempfile.gettempdir()
# On windows the temporary directory is used specific unless
# explicitly forced otherwise. We can just use that.
if os.name == 'nt':
return tmpdir
if not hasattr(os, 'getuid'):
_unsafe_dir()
dirname = '_jinja2-cache-%d' % os.getuid()
actual_dir = os.path.join(tmpdir, dirname)
try:
os.mkdir(actual_dir, stat.S_IRWXU)
except OSError as e:
if e.errno != errno.EEXIST:
raise
try:
os.chmod(actual_dir, stat.S_IRWXU)
actual_dir_stat = os.lstat(actual_dir)
if actual_dir_stat.st_uid != os.getuid() \
or not stat.S_ISDIR(actual_dir_stat.st_mode) \
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
_unsafe_dir()
except OSError as e:
if e.errno != errno.EEXIST:
raise
actual_dir_stat = os.lstat(actual_dir)
if actual_dir_stat.st_uid != os.getuid() \
or not stat.S_ISDIR(actual_dir_stat.st_mode) \
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
_unsafe_dir()
return actual_dir
def removeReadOnly(func, path, exc):
"""Called by shutil.rmtree when it encounters a readonly file.
"""
excvalue = exc[1]
if func in (os.rmdir, os.remove) and excvalue.errno == errno.EACCES:
os.chmod(path, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) # 0777
func(path)
else:
raise RuntimeError('Could not remove {0}'.format(path))
def _get_default_cache_dir(self):
def _unsafe_dir():
raise RuntimeError('Cannot determine safe temp directory. You '
'need to explicitly provide one.')
tmpdir = tempfile.gettempdir()
# On windows the temporary directory is used specific unless
# explicitly forced otherwise. We can just use that.
if os.name == 'nt':
return tmpdir
if not hasattr(os, 'getuid'):
_unsafe_dir()
dirname = '_jinja2-cache-%d' % os.getuid()
actual_dir = os.path.join(tmpdir, dirname)
try:
os.mkdir(actual_dir, stat.S_IRWXU)
except OSError as e:
if e.errno != errno.EEXIST:
raise
try:
os.chmod(actual_dir, stat.S_IRWXU)
actual_dir_stat = os.lstat(actual_dir)
if actual_dir_stat.st_uid != os.getuid() \
or not stat.S_ISDIR(actual_dir_stat.st_mode) \
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
_unsafe_dir()
except OSError as e:
if e.errno != errno.EEXIST:
raise
actual_dir_stat = os.lstat(actual_dir)
if actual_dir_stat.st_uid != os.getuid() \
or not stat.S_ISDIR(actual_dir_stat.st_mode) \
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
_unsafe_dir()
return actual_dir
def create_tempdir():
"""Create a directory within the system temp directory used to create temp files."""
try:
if os.path.isdir(tempdir):
shutil.rmtree(tempdir)
os.mkdir(tempdir)
# Make sure the directory can be removed by anyone in case the user
# runs ST later as another user.
os.chmod(tempdir, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO)
except PermissionError:
if sublime.platform() != 'windows':
current_user = pwd.getpwuid(os.geteuid())[0]
temp_uid = os.stat(tempdir).st_uid
temp_user = pwd.getpwuid(temp_uid)[0]
message = (
'The SublimeLinter temp directory:\n\n{0}\n\ncould not be cleared '
'because it is owned by \'{1}\' and you are logged in as \'{2}\'. '
'Please use sudo to remove the temp directory from a terminal.'
).format(tempdir, temp_user, current_user)
else:
message = (
'The SublimeLinter temp directory ({}) could not be reset '
'because it belongs to a different user.'
).format(tempdir)
sublime.error_message(message)
from . import persist
persist.debug('temp directory:', tempdir)
def _get_default_cache_dir(self):
def _unsafe_dir():
raise RuntimeError('Cannot determine safe temp directory. You '
'need to explicitly provide one.')
tmpdir = tempfile.gettempdir()
# On windows the temporary directory is used specific unless
# explicitly forced otherwise. We can just use that.
if os.name == 'nt':
return tmpdir
if not hasattr(os, 'getuid'):
_unsafe_dir()
dirname = '_jinja2-cache-%d' % os.getuid()
actual_dir = os.path.join(tmpdir, dirname)
try:
os.mkdir(actual_dir, stat.S_IRWXU)
except OSError as e:
if e.errno != errno.EEXIST:
raise
try:
os.chmod(actual_dir, stat.S_IRWXU)
actual_dir_stat = os.lstat(actual_dir)
if actual_dir_stat.st_uid != os.getuid() \
or not stat.S_ISDIR(actual_dir_stat.st_mode) \
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
_unsafe_dir()
except OSError as e:
if e.errno != errno.EEXIST:
raise
actual_dir_stat = os.lstat(actual_dir)
if actual_dir_stat.st_uid != os.getuid() \
or not stat.S_ISDIR(actual_dir_stat.st_mode) \
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
_unsafe_dir()
return actual_dir
def startDB(self):
import kinterbasdb
self.DB_NAME = os.path.join(self.DB_DIR, DBTestConnector.DB_NAME)
os.chmod(self.DB_DIR, stat.S_IRWXU + stat.S_IRWXG + stat.S_IRWXO)
sql = 'create database "%s" user "%s" password "%s"'
sql %= (self.DB_NAME, self.DB_USER, self.DB_PASS);
conn = kinterbasdb.create_database(sql)
conn.close()
def _get_default_cache_dir(self):
def _unsafe_dir():
raise RuntimeError('Cannot determine safe temp directory. You '
'need to explicitly provide one.')
tmpdir = tempfile.gettempdir()
# On windows the temporary directory is used specific unless
# explicitly forced otherwise. We can just use that.
if os.name == 'nt':
return tmpdir
if not hasattr(os, 'getuid'):
_unsafe_dir()
dirname = '_jinja2-cache-%d' % os.getuid()
actual_dir = os.path.join(tmpdir, dirname)
try:
os.mkdir(actual_dir, stat.S_IRWXU)
except OSError as e:
if e.errno != errno.EEXIST:
raise
try:
os.chmod(actual_dir, stat.S_IRWXU)
actual_dir_stat = os.lstat(actual_dir)
if actual_dir_stat.st_uid != os.getuid() \
or not stat.S_ISDIR(actual_dir_stat.st_mode) \
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
_unsafe_dir()
except OSError as e:
if e.errno != errno.EEXIST:
raise
actual_dir_stat = os.lstat(actual_dir)
if actual_dir_stat.st_uid != os.getuid() \
or not stat.S_ISDIR(actual_dir_stat.st_mode) \
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
_unsafe_dir()
return actual_dir
def _get_default_cache_dir(self):
def _unsafe_dir():
raise RuntimeError('Cannot determine safe temp directory. You '
'need to explicitly provide one.')
tmpdir = tempfile.gettempdir()
# On windows the temporary directory is used specific unless
# explicitly forced otherwise. We can just use that.
if os.name == 'nt':
return tmpdir
if not hasattr(os, 'getuid'):
_unsafe_dir()
dirname = '_jinja2-cache-%d' % os.getuid()
actual_dir = os.path.join(tmpdir, dirname)
try:
os.mkdir(actual_dir, stat.S_IRWXU)
except OSError as e:
if e.errno != errno.EEXIST:
raise
try:
os.chmod(actual_dir, stat.S_IRWXU)
actual_dir_stat = os.lstat(actual_dir)
if actual_dir_stat.st_uid != os.getuid() \
or not stat.S_ISDIR(actual_dir_stat.st_mode) \
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
_unsafe_dir()
except OSError as e:
if e.errno != errno.EEXIST:
raise
actual_dir_stat = os.lstat(actual_dir)
if actual_dir_stat.st_uid != os.getuid() \
or not stat.S_ISDIR(actual_dir_stat.st_mode) \
or stat.S_IMODE(actual_dir_stat.st_mode) != stat.S_IRWXU:
_unsafe_dir()
return actual_dir